From 4a65f52939b38eff8c664742c58b62e69ca24a51 Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 21 Dec 2022 16:26:36 -0500 Subject: [PATCH] Removed Packet.send() This patch decouples sending a message from the internals of the Packet classes. This allows the rest of the code to use Packet objects as type hints in methods to enforce Packets in the plugins. The send method was moved to a single place in the threads.tx.send() --- aprsd/client.py | 6 ++- aprsd/clients/aprsis.py | 6 +-- aprsd/cmds/send_message.py | 31 +++++++++------ aprsd/cmds/webchat.py | 22 ++++++----- aprsd/flask.py | 20 +++++++--- aprsd/packets/core.py | 34 +++------------- aprsd/packets/tracker.py | 5 ++- aprsd/plugin.py | 16 ++++---- aprsd/plugins/email.py | 34 +++++++++------- aprsd/threads/rx.py | 80 +++++++++++++++++++++----------------- aprsd/threads/tx.py | 36 ++++++++++++----- 11 files changed, 159 insertions(+), 131 deletions(-) diff --git a/aprsd/client.py b/aprsd/client.py index 4285990..3d06e08 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -8,7 +8,7 @@ from aprslib.exceptions import LoginError from aprsd import config as aprsd_config from aprsd import exception from aprsd.clients import aprsis, kiss -from aprsd.packets import core +from aprsd.packets import core, packet_list from aprsd.utils import trace @@ -59,6 +59,10 @@ class Client: self._client.set_filter(self.filter) return self._client + def send(self, packet: core.Packet): + packet_list.PacketList().tx(packet) + self.client.send(packet) + def reset(self): """Call this to force a rebuild/reconnect.""" if self._client: diff --git a/aprsd/clients/aprsis.py b/aprsd/clients/aprsis.py index fc22008..191f43d 100644 --- a/aprsd/clients/aprsis.py +++ b/aprsd/clients/aprsis.py @@ -12,6 +12,7 @@ import wrapt import aprsd from aprsd import stats +from aprsd.packets import core LOG = logging.getLogger("APRSD") @@ -32,10 +33,9 @@ class Aprsdis(aprslib.IS): LOG.info("Shutdown Aprsdis client.") @wrapt.synchronized(lock) - def send(self, msg): + def send(self, packet: core.Packet): """Send an APRS Message object.""" - line = str(msg) - self.sendall(line) + self.sendall(packet.raw) def _socket_readlines(self, blocking=False): """ diff --git a/aprsd/cmds/send_message.py b/aprsd/cmds/send_message.py index 469901d..9f69fc5 100644 --- a/aprsd/cmds/send_message.py +++ b/aprsd/cmds/send_message.py @@ -9,6 +9,7 @@ import click import aprsd from aprsd import cli_helper, client, packets from aprsd.aprsd import cli +from aprsd.threads import tx LOG = logging.getLogger("APRSD") @@ -109,12 +110,14 @@ def send_message( got_response = True from_call = packet.from_call our_call = config["aprsd"]["callsign"].lower() - ack_pkt = packets.AckPacket( - from_call=our_call, - to_call=from_call, - msgNo=packet.msgNo, + tx.send( + packets.AckPacket( + from_call=our_call, + to_call=from_call, + msgNo=packet.msgNo, + ), + direct=True, ) - ack_pkt.send_direct() if got_ack: if wait_response: @@ -135,16 +138,20 @@ def send_message( # we should bail after we get the ack and send an ack back for the # message if raw: - pkt = packets.Packet(from_call="", to_call="", raw=raw) - pkt.send_direct() + tx.send( + packets.Packet(from_call="", to_call="", raw=raw), + direct=True, + ) sys.exit(0) else: - pkt = packets.MessagePacket( - from_call=aprs_login, - to_call=tocallsign, - message_text=command, + tx.send( + packets.MessagePacket( + from_call=aprs_login, + to_call=tocallsign, + message_text=command, + ), + direct=True, ) - pkt.send_direct() if no_ack: sys.exit(0) diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index c53b44f..261ba19 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -25,7 +25,7 @@ from aprsd import config as aprsd_config from aprsd import packets, stats, threads, utils from aprsd.aprsd import cli from aprsd.logging import rich as aprsd_logging -from aprsd.threads import rx +from aprsd.threads import rx, tx from aprsd.utils import objectstore, trace @@ -150,7 +150,7 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread): self.got_ack = True def process_our_message_packet(self, packet: packets.MessagePacket): - LOG.info(f"process non ack PACKET {packet}") + LOG.info(f"process MessagePacket {repr(packet)}") packet.get("addresse", None) fromcall = packet.from_call @@ -321,7 +321,7 @@ class SendMessageNamespace(Namespace): self.msg = pkt msgs = SentMessages() msgs.add(pkt) - pkt.send() + tx.send(pkt) msgs.set_status(pkt.msgNo, "Sending") obj = msgs.get(pkt.msgNo) socketio.emit( @@ -336,14 +336,16 @@ class SendMessageNamespace(Namespace): LOG.debug(f"Lat DDM {lat}") LOG.debug(f"Long DDM {long}") - beacon = packets.GPSPacket( - from_call=self._config["aprs"]["login"], - to_call="APDW16", - latitude=lat, - longitude=long, - comment="APRSD WebChat Beacon", + tx.send( + packets.GPSPacket( + from_call=self._config["aprs"]["login"], + to_call="APDW16", + latitude=lat, + longitude=long, + comment="APRSD WebChat Beacon", + ), + direct=True, ) - beacon.send_direct() def handle_message(self, data): LOG.debug(f"WS Data {data}") diff --git a/aprsd/flask.py b/aprsd/flask.py index 8f4ddf6..3f374a3 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -23,6 +23,7 @@ from aprsd import packets, plugin, stats, threads, utils from aprsd.clients import aprsis from aprsd.logging import log from aprsd.logging import rich as aprsd_logging +from aprsd.threads import tx LOG = logging.getLogger("APRSD") @@ -181,7 +182,11 @@ class SendMessageThread(threads.APRSDRXThread): except LoginError as e: f"Failed to setup Connection {e}" - self.packet.send_direct(aprsis_client=self.aprs_client) + tx.send( + self.packet, + direct=True, + aprs_client=self.aprs_client, + ) SentMessages().set_status(self.packet.msgNo, "Sent") while not self.thread_stop: @@ -218,12 +223,15 @@ class SendMessageThread(threads.APRSDRXThread): "reply", SentMessages().get(self.packet.msgNo), namespace="/sendmsg", ) - ack_pkt = packets.AckPacket( - from_call=self.request["from"], - to_call=packet.from_call, - msgNo=msg_number, + tx.send( + packets.AckPacket( + from_call=self.request["from"], + to_call=packet.from_call, + msgNo=msg_number, + ), + direct=True, + aprs_client=self.aprsis_client, ) - ack_pkt.send_direct(aprsis_client=self.aprsis_client) SentMessages().set_status(self.packet.msgNo, "Ack Sent") # Now we can exit, since we are done. diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index 8638edf..782db3f 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -10,9 +10,6 @@ from typing import List import dacite -from aprsd import client -from aprsd.packets.packet_list import PacketList # noqa: F401 -from aprsd.threads import tx from aprsd.utils import counter from aprsd.utils import json as aprsd_json @@ -87,7 +84,7 @@ class Packet(metaclass=abc.ABCMeta): else: return default - def _init_for_send(self): + def prepare(self): """Do stuff here that is needed prior to sending over the air.""" # now build the raw message for sending self._build_raw() @@ -164,7 +161,7 @@ class Packet(metaclass=abc.ABCMeta): log_list.append(f"{header}________({name})") LOG.info("\n".join(log_list)) - LOG.debug(self) + LOG.debug(repr(self)) def _filter_for_send(self) -> str: """Filter and format message string for FCC.""" @@ -176,22 +173,10 @@ class Packet(metaclass=abc.ABCMeta): # We all miss George Carlin return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) - def send(self): - """Method to send a packet.""" - self._init_for_send() - thread = tx.SendPacketThread(packet=self) - thread.start() - - def send_direct(self, aprsis_client=None): - """Send the message in the same thread as caller.""" - self._init_for_send() - if aprsis_client: - cl = aprsis_client - else: - cl = client.factory.create().client - self.log(header="TX Message Direct") - cl.send(self.raw) - PacketList().tx(self) + def __str__(self): + """Show the raw version of the packet""" + self.prepare() + return self.raw @dataclass @@ -219,13 +204,6 @@ class AckPacket(PathPacket): self.msgNo, ) - def send(self): - """Method to send a packet.""" - self._init_for_send() - thread = tx.SendAckThread(packet=self) - LOG.warning(f"Starting thread to TXACK {self}") - thread.start() - @dataclass class MessagePacket(PathPacket): diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index 8d3d931..e58e323 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -3,6 +3,7 @@ import threading import wrapt +from aprsd.threads import tx from aprsd.utils import objectstore @@ -93,11 +94,11 @@ class PacketTrack(objectstore.ObjectStoreMixin): for key in self.data.keys(): pkt = self.data[key] if pkt.last_send_attempt < pkt.retry_count: - pkt.send() + tx.send(pkt) def _resend(self, packet): packet._last_send_attempt = 0 - packet.send() + tx.send(packet) def restart_delayed(self, count=None, most_recent=True): """Walk the list of delayed messages and restart them if any.""" diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 75a8e4c..d25adad 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -40,7 +40,7 @@ class APRSDPluginSpec: """A hook specification namespace.""" @hookspec - def filter(self, packet): + def filter(self, packet: packets.core.Packet): """My special little hook that you can customize.""" @@ -117,11 +117,11 @@ class APRSDPluginBase(metaclass=abc.ABCMeta): thread.stop() @abc.abstractmethod - def filter(self, packet): + def filter(self, packet: packets.core.Packet): pass @abc.abstractmethod - def process(self, packet): + def process(self, packet: packets.core.Packet): """This is called when the filter passes.""" @@ -158,7 +158,7 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): LOG.warning("Watch list enabled, but no callsigns set.") @hookimpl - def filter(self, packet): + def filter(self, packet: packets.core.Packet): result = packets.NULL_MESSAGE if self.enabled: wl = watch_list.WatchList() @@ -210,7 +210,7 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): self.enabled = True @hookimpl - def filter(self, packet): + def filter(self, packet: packets.core.MessagePacket): result = None message = packet.get("message_text", None) @@ -270,7 +270,7 @@ class HelpPlugin(APRSDRegexCommandPluginBase): def help(self): return "Help: send APRS help or help " - def process(self, packet): + def process(self, packet: packets.core.MessagePacket): LOG.info("HelpPlugin") # fromcall = packet.get("from") message = packet.message_text @@ -455,12 +455,12 @@ class PluginManager: LOG.info("Completed Plugin Loading.") - def run(self, packet): + def run(self, packet: packets.core.MessagePacket): """Execute all the pluguns run method.""" with self.lock: return self._pluggy_pm.hook.filter(packet=packet) - def run_watchlist(self, packet): + def run_watchlist(self, packet: packets.core.Packet): with self.lock: return self._watchlist_pm.hook.filter(packet=packet) diff --git a/aprsd/plugins/email.py b/aprsd/plugins/email.py index 1e49f4a..e5ea994 100644 --- a/aprsd/plugins/email.py +++ b/aprsd/plugins/email.py @@ -11,6 +11,7 @@ import time import imapclient from aprsd import packets, plugin, stats, threads +from aprsd.threads import tx from aprsd.utils import trace @@ -464,12 +465,13 @@ def resend_email(config, count, fromcall): from_addr = shortcuts_inverted[from_addr] # asterisk indicates a resend reply = "-" + from_addr + " * " + body.decode(errors="ignore") - pkt = packets.MessagePacket( - from_call=config["aprsd"]["callsign"], - to_call=fromcall, - message_text=reply, + tx.send( + packets.MessagePacket( + from_call=config["aprsd"]["callsign"], + to_call=fromcall, + message_text=reply, + ), ) - pkt.send() msgexists = True if msgexists is not True: @@ -486,12 +488,13 @@ def resend_email(config, count, fromcall): str(m).zfill(2), str(s).zfill(2), ) - pkt = packets.MessagePacket( - from_call=config["aprsd"]["callsign"], - to_call=fromcall, - message_text=reply, + tx.send( + packets.MessagePacket( + from_call=config["aprsd"]["callsign"], + to_call=fromcall, + message_text=reply, + ), ) - pkt.send() # check email more often since we're resending one now EmailInfo().delay = 60 @@ -606,12 +609,13 @@ class APRSDEmailThread(threads.APRSDThread): reply = "-" + from_addr + " " + body.decode(errors="ignore") # Send the message to the registered user in the # config ham.callsign - pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=self.config["ham"]["callsign"], - message_text=reply, + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=self.config["ham"]["callsign"], + message_text=reply, + ), ) - pkt.send() # flag message as sent via aprs try: server.add_flags(msgid, ["APRS"]) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index b860c46..2880d70 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -6,7 +6,7 @@ import time import aprslib from aprsd import client, packets, plugin -from aprsd.threads import APRSDThread +from aprsd.threads import APRSDThread, tx LOG = logging.getLogger("APRSD") @@ -131,12 +131,13 @@ class APRSDProcessPacketThread(APRSDThread): # It's a MessagePacket and it's for us! # let any threads do their thing, then ack # send an ack last - ack_pkt = packets.AckPacket( - from_call=self.config["aprsd"]["callsign"], - to_call=from_call, - msgNo=msg_id, + tx.send( + packets.AckPacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + msgNo=msg_id, + ), ) - ack_pkt.send() self.process_our_message_packet(packet) else: @@ -175,18 +176,20 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): for subreply in reply: LOG.debug(f"Sending '{subreply}'") if isinstance(subreply, packets.Packet): - subreply.send() + tx.send(subreply) else: - to_call = self.config["aprsd"]["watch_list"]["alert_callsign"] - msg_pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=to_call, - message_text=subreply, + wl = self.config["aprsd"]["watch_list"] + to_call = wl["alert_callsign"] + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=to_call, + message_text=subreply, + ), ) - msg_pkt.send() elif isinstance(reply, packets.Packet): # We have a message based object. - reply.send() + tx.send(reply) except Exception as ex: LOG.error("Plugin failed!!!") LOG.exception(ex) @@ -212,17 +215,18 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): for subreply in reply: LOG.debug(f"Sending '{subreply}'") if isinstance(subreply, packets.Packet): - subreply.send() + tx.send(subreply) else: - msg_pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=from_call, - message_text=subreply, + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=subreply, + ), ) - msg_pkt.send() elif isinstance(reply, packets.Packet): # We have a message based object. - reply.send() + tx.send(reply) replied = True else: replied = True @@ -232,34 +236,38 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): # usage string if reply is not packets.NULL_MESSAGE: LOG.debug(f"Sending '{reply}'") - msg_pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=from_call, - message_text=reply, + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=reply, + ), ) - msg_pkt.send() # If the message was for us and we didn't have a # response, then we send a usage statement. if to_call == self.config["aprsd"]["callsign"] and not replied: LOG.warning("Sending help!") - msg_pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=from_call, - message_text="Unknown command! Send 'help' message for help", + message_text = "Unknown command! Send 'help' message for help" + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=message_text, + ), ) - msg_pkt.send() except Exception as ex: LOG.error("Plugin failed!!!") LOG.exception(ex) # Do we need to send a reply? if to_call == self.config["aprsd"]["callsign"]: reply = "A Plugin failed! try again?" - msg_pkt = packets.MessagePacket( - from_call=self.config["aprsd"]["callsign"], - to_call=from_call, - message_text=reply, + tx.send( + packets.MessagePacket( + from_call=self.config["aprsd"]["callsign"], + to_call=from_call, + message_text=reply, + ), ) - msg_pkt.send() LOG.debug("Completed process_our_message_packet") diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py index 4511e95..1902ad0 100644 --- a/aprsd/threads/tx.py +++ b/aprsd/threads/tx.py @@ -4,12 +4,34 @@ import time from aprsd import client from aprsd import threads as aprsd_threads -from aprsd.packets import packet_list, tracker +from aprsd.packets import core, packet_list, tracker LOG = logging.getLogger("APRSD") +def send(packet: core.Packet, direct=False, aprs_client=None): + """Send a packet either in a thread or directly to the client.""" + # prepare the packet for sending. + # This constructs the packet.raw + packet.prepare() + if not direct: + if isinstance(packet, core.AckPacket): + thread = SendAckThread(packet=packet) + else: + thread = SendPacketThread(packet=packet) + thread.start() + else: + if aprs_client: + cl = aprs_client + else: + cl = client.factory.create() + + packet.log(header="TX") + cl.send(packet) + packet_list.PacketList().tx(packet) + + class SendPacketThread(aprsd_threads.APRSDThread): loop_count: int = 1 @@ -37,7 +59,7 @@ class SendPacketThread(aprsd_threads.APRSDThread): # The message has been removed from the tracking queue # So it got acked and we are done. LOG.info( - f"{packet.__class__.__name__}" + f"{self.packet.__class__.__name__}" f"({self.packet.msgNo}) " "Message Send Complete via Ack.", ) @@ -72,10 +94,7 @@ class SendPacketThread(aprsd_threads.APRSDThread): if send_now: # no attempt time, so lets send it, and start # tracking the time. - packet.log("TX") - cl = client.factory.create().client - cl.send(packet.raw) - packet_list.PacketList().tx(packet) + send(packet, direct=True) packet.last_send_time = datetime.datetime.now() packet.send_count += 1 @@ -123,10 +142,7 @@ class SendAckThread(aprsd_threads.APRSDThread): send_now = True if send_now: - cl = client.factory.create().client - self.packet.log("TX") - cl.send(self.packet.raw) - packet_list.PacketList().tx(self.packet) + send(self.packet, direct=True) self.packet.send_count += 1 self.packet.last_send_time = datetime.datetime.now()