diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index f22ae2b..173eee7 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -2,13 +2,11 @@ import datetime import json import logging from logging.handlers import RotatingFileHandler -import queue import signal import sys import threading import time -import aprslib from aprslib import util as aprslib_util import click from device_detector import DeviceDetector @@ -27,7 +25,6 @@ from aprsd import config as aprsd_config from aprsd import messaging, packets, stats, threads, utils from aprsd.aprsd import cli from aprsd.logging import rich as aprsd_logging -from aprsd.threads import aprsd as aprsd_thread from aprsd.threads import rx from aprsd.utils import objectstore, trace @@ -35,14 +32,6 @@ from aprsd.utils import objectstore, trace LOG = logging.getLogger("APRSD") auth = HTTPBasicAuth() users = None -rx_msg_queue = queue.Queue(maxsize=20) -tx_msg_queue = queue.Queue(maxsize=20) -control_queue = queue.Queue(maxsize=20) -msg_queues = { - "rx": rx_msg_queue, - "control": control_queue, - "tx": tx_msg_queue, -} def signal_handler(sig, frame): @@ -143,62 +132,19 @@ def verify_password(username, password): class WebChatRXThread(rx.APRSDRXThread): - """Class that connects to aprsis/kiss and waits for messages.""" + """Class that connects to APRISIS/kiss and waits for messages. + + After the packet is received from APRSIS/KISS, the packet is + sent to processing in the WebChatProcessPacketThread. + """ + def __init__(self, config, socketio): + super().__init__(None, config) + self.socketio = socketio + self.connected = False def connected(self, connected=True): self.connected = connected - def stop(self): - self.thread_stop = True - client.factory.create().client.stop() - - def loop(self): - # setup the consumer of messages and block until a messages - msg = None - try: - msg = self.msg_queues["tx"].get_nowait() - except queue.Empty: - pass - - try: - if msg: - LOG.debug("GOT msg from TX queue!!") - msg.send() - except ( - aprslib.exceptions.ConnectionDrop, - aprslib.exceptions.ConnectionError, - ): - LOG.error("Connection dropped, reconnecting") - # Put it back on the queue to send. - self.msg_queues["tx"].put(msg) - # Force the deletion of the client object connected to aprs - # This will cause a reconnect, next time client.get_client() - # is called - self._client.reset() - time.sleep(2) - - try: - # When new packets come in the consumer will process - # the packet - - # This call blocks until thread stop() is called. - self._client.client.consumer( - self.process_packet, raw=False, blocking=False, - ) - - except ( - aprslib.exceptions.ConnectionDrop, - aprslib.exceptions.ConnectionError, - ): - LOG.error("Connection dropped, reconnecting") - time.sleep(5) - # Force the deletion of the client object connected to aprs - # This will cause a reconnect, next time client.get_client() - # is called - self._client.reset() - return True - return True - def process_packet(self, *args, **kwargs): # packet = self._client.decode_packet(*args, **kwargs) if "packet" in kwargs: @@ -207,101 +153,55 @@ class WebChatRXThread(rx.APRSDRXThread): packet = self._client.decode_packet(*args, **kwargs) LOG.debug(f"GOT Packet {packet}") - self.msg_queues["rx"].put(packet) + thread = WebChatProcessPacketThread( + config=self.config, + packet=packet, + socketio=self.socketio, + ) + thread.start() -class WebChatTXThread(aprsd_thread.APRSDThread): - """Class that """ - def __init__(self, msg_queues, config, socketio): - super().__init__("_TXThread_") - self.msg_queues = msg_queues - self.config = config +class WebChatProcessPacketThread(rx.APRSDProcessPacketThread): + """Class that handles packets being sent to us.""" + def __init__(self, config, packet, socketio): self.socketio = socketio self.connected = False - - def loop(self): - try: - msg = self.msg_queues["control"].get_nowait() - self.connected = msg["connected"] - except queue.Empty: - pass - try: - packet = self.msg_queues["rx"].get_nowait() - if packet: - # we got a packet and we need to send it to the - # web socket - self.process_packet(packet) - except queue.Empty: - pass - except Exception as ex: - LOG.exception(ex) - time.sleep(1) - - return True + super().__init__(config, packet) def process_ack_packet(self, packet): + super().process_ack_packet(packet) ack_num = packet.get("msgNo") - LOG.info(f"We got ack for our sent message {ack_num}") - messaging.log_packet(packet) SentMessages().ack(int(ack_num)) - tracker = messaging.MsgTrack() - tracker.remove(ack_num) self.socketio.emit( "ack", SentMessages().get(int(ack_num)), namespace="/sendmsg", ) - stats.APRSDStats().ack_rx_inc() self.got_ack = True - def process_packet(self, packet): - LOG.info(f"process PACKET {packet}") - tocall = packet.get("addresse", None) + def process_non_ack_packet(self, packet): + LOG.info(f"process non ack PACKET {packet}") + packet.get("addresse", None) fromcall = packet["from"] - msg = packet.get("message_text", None) - msg_id = packet.get("msgNo", "0") - msg_response = packet.get("response", None) - if ( - tocall.lower() == self.config["aprsd"]["callsign"].lower() - and msg_response == "ack" - ): - self.process_ack_packet(packet) - elif tocall.lower() == self.config["aprsd"]["callsign"].lower(): - messaging.log_message( - "Received Message", - packet["raw"], - msg, - fromcall=fromcall, - msg_num=msg_id, - ) - # let any threads do their thing, then ack - # send an ack last - ack = messaging.AckMessage( - self.config["aprsd"]["callsign"], - fromcall, - msg_id=msg_id, - ) - ack.send() - - packets.PacketList().add(packet) - stats.APRSDStats().msgs_rx_inc() - message = packet.get("message_text", None) - msg = { - "id": 0, - "ts": time.time(), - "ack": False, - "from": fromcall, - "to": packet["to"], - "raw": packet["raw"], - "message": message, - "status": None, - "last_update": None, - "reply": None, - } - self.socketio.emit( - "new", msg, - namespace="/sendmsg", - ) + packets.PacketList().add(packet) + stats.APRSDStats().msgs_rx_inc() + message = packet.get("message_text", None) + msg = { + "id": 0, + "ts": time.time(), + "ack": False, + "from": fromcall, + "to": packet["to"], + "raw": packet["raw"], + "message": message, + "status": None, + "last_update": None, + "reply": None, + } + self.socketio.emit( + "new", msg, + namespace="/sendmsg", + ) class WebChatFlask(flask_classful.FlaskView): @@ -418,9 +318,8 @@ class SendMessageNamespace(Namespace): msg = None request = None - def __init__(self, namespace=None, config=None, msg_queues=None): + def __init__(self, namespace=None, config=None): self._config = config - self._msg_queues = msg_queues super().__init__(namespace) def on_connect(self): @@ -430,13 +329,9 @@ class SendMessageNamespace(Namespace): "connected", {"data": "/sendmsg Connected"}, namespace="/sendmsg", ) - msg = {"connected": True} - self._msg_queues["control"].put(msg) def on_disconnect(self): LOG.debug("WS Disconnected") - msg = {"connected": False} - self._msg_queues["control"].put(msg) def on_send(self, data): global socketio @@ -458,7 +353,6 @@ class SendMessageNamespace(Namespace): namespace="/sendmsg", ) msg.send() - # self._msg_queues["tx"].put(msg) def on_gps(self, data): LOG.debug(f"WS on_GPS: {data}") @@ -567,7 +461,6 @@ def init_flask(config, loglevel, quiet): socketio.on_namespace( SendMessageNamespace( "/sendmsg", config=config, - msg_queues=msg_queues, ), ) return socketio, flask_app @@ -644,18 +537,11 @@ def webchat(ctx, flush, port): (socketio, app) = init_flask(config, loglevel, quiet) rx_thread = WebChatRXThread( - msg_queues=msg_queues, - config=config, - ) - LOG.info("Start RX Thread") - rx_thread.start() - tx_thread = WebChatTXThread( - msg_queues=msg_queues, config=config, socketio=socketio, ) - LOG.info("Start TX Thread") - tx_thread.start() + LOG.info("Start RX Thread") + rx_thread.start() keepalive = threads.KeepAliveThread(config=config) LOG.info("Start KeepAliveThread") diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index d2103b1..5529bfa 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -58,17 +58,32 @@ class APRSDRXThread(APRSDThread): class APRSDPluginRXThread(APRSDRXThread): + """Process received packets. + + This is the main APRSD Server command thread that + receives packets from APRIS and then sends them for + processing in the PluginProcessPacketThread. + """ def process_packet(self, *args, **kwargs): packet = self._client.decode_packet(*args, **kwargs) - thread = APRSDProcessPacketThread(packet=packet, config=self.config) + thread = APRSDPluginProcessPacketThread( + config=self.config, + packet=packet, + ) thread.start() class APRSDProcessPacketThread(APRSDThread): + """Base class for processing received packets. - def __init__(self, packet, config): - self.packet = packet + This is the base class for processing packets coming from + the consumer. This base class handles sending ack packets and + will ack a message before sending the packet to the subclass + for processing.""" + + def __init__(self, config, packet): self.config = config + self.packet = packet name = self.packet["raw"][:10] super().__init__(f"RXPKT-{name}") @@ -119,7 +134,10 @@ class APRSDProcessPacketThread(APRSDThread): ) # Only ack messages that were sent directly to us - if (tocall.lower() == self.config["aprsd"]["callsign"].lower()): + if ( + tocall + and tocall.lower() == self.config["aprsd"]["callsign"].lower() + ): stats.APRSDStats().msgs_rx_inc() # let any threads do their thing, then ack # send an ack last @@ -130,69 +148,89 @@ class APRSDProcessPacketThread(APRSDThread): ) ack.send() - pm = plugin.PluginManager() - try: - results = pm.run(packet) - wl = packets.WatchList() - wl.update_seen(packet) - replied = False - for reply in results: - if isinstance(reply, list): - # one of the plugins wants to send multiple messages - replied = True - for subreply in reply: - LOG.debug(f"Sending '{subreply}'") - if isinstance(subreply, messaging.Message): - subreply.send() - else: - msg = messaging.TextMessage( - self.config["aprsd"]["callsign"], - fromcall, - subreply, - ) - msg.send() - elif isinstance(reply, messaging.Message): - # We have a message based object. - LOG.debug(f"Sending '{reply}'") - reply.send() - replied = True - else: - replied = True - # A plugin can return a null message flag which signals - # us that they processed the message correctly, but have - # nothing to reply with, so we avoid replying with a - # usage string - if reply is not messaging.NULL_MESSAGE: - LOG.debug(f"Sending '{reply}'") + self.process_non_ack_packet(packet) + else: + LOG.info("Packet was not for us.") + LOG.debug("Packet processing complete") + @abc.abstractmethod + def process_non_ack_packet(self, *args, **kwargs): + """Ack packets already dealt with here.""" + + +class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): + """Process the packet through the plugin manager. + + This is the main aprsd server plugin processing thread.""" + + def process_non_ack_packet(self, packet): + """Send the packet through the plugins.""" + fromcall = packet["from"] + tocall = packet.get("addresse", None) + msg = packet.get("message_text", None) + packet.get("msgNo", "0") + packet.get("response", None) + pm = plugin.PluginManager() + try: + results = pm.run(packet) + wl = packets.WatchList() + wl.update_seen(packet) + replied = False + for reply in results: + if isinstance(reply, list): + # one of the plugins wants to send multiple messages + replied = True + for subreply in reply: + LOG.debug(f"Sending '{subreply}'") + if isinstance(subreply, messaging.Message): + subreply.send() + else: msg = messaging.TextMessage( self.config["aprsd"]["callsign"], fromcall, - reply, + subreply, ) msg.send() + elif isinstance(reply, messaging.Message): + # We have a message based object. + LOG.debug(f"Sending '{reply}'") + reply.send() + replied = True + else: + replied = True + # A plugin can return a null message flag which signals + # us that they processed the message correctly, but have + # nothing to reply with, so we avoid replying with a + # usage string + if reply is not messaging.NULL_MESSAGE: + LOG.debug(f"Sending '{reply}'") - # If the message was for us and we didn't have a - # response, then we send a usage statement. - if tocall == self.config["aprsd"]["callsign"] and not replied: - LOG.warning("Sending help!") - msg = messaging.TextMessage( - self.config["aprsd"]["callsign"], - fromcall, - "Unknown command! Send 'help' message for help", - ) - msg.send() - except Exception as ex: - LOG.error("Plugin failed!!!") - LOG.exception(ex) - # Do we need to send a reply? - if tocall == self.config["aprsd"]["callsign"]: - reply = "A Plugin failed! try again?" - msg = messaging.TextMessage( - self.config["aprsd"]["callsign"], - fromcall, - reply, - ) - msg.send() + msg = messaging.TextMessage( + self.config["aprsd"]["callsign"], + fromcall, + reply, + ) + msg.send() - LOG.debug("Packet processing complete") + # If the message was for us and we didn't have a + # response, then we send a usage statement. + if tocall == self.config["aprsd"]["callsign"] and not replied: + LOG.warning("Sending help!") + msg = messaging.TextMessage( + self.config["aprsd"]["callsign"], + fromcall, + "Unknown command! Send 'help' message for help", + ) + msg.send() + except Exception as ex: + LOG.error("Plugin failed!!!") + LOG.exception(ex) + # Do we need to send a reply? + if tocall == self.config["aprsd"]["callsign"]: + reply = "A Plugin failed! try again?" + msg = messaging.TextMessage( + self.config["aprsd"]["callsign"], + fromcall, + reply, + ) + msg.send()