From 5f4cf89733d1de5e3506993fc899363b446c72bb Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 19 Aug 2021 11:39:29 -0400 Subject: [PATCH 1/7] Refactor Message processing and MORE This patch refactors how the recieved message processing happens. We now handle all incoming packets the same. Removed the notification thread to handle the watchlist packets. This is now done with a unified plugins architecture that allows different capabilities via the new plugin structure. All packets sent to us will be sent through all of the plugins. It's the plugins job to decide what to do with that packet or ignore it. Email is no longer a special case for the most part. All email functions have been migrated to the EmailPlugin, including starting the EmailThread, which works in the background to check for new emails and send those to the registered callsign. The EmailPlugin now starts the EmailThread itself. All plugins are now build on the new APRSDPluginBase which has a common set of features. The APRSDPluginBase calls self.setup() upon creation, which allows all plugins to do whatever they want for initiali startup. The EmailPlugin uses setup() to start the EmailThread if email is enabled. --- aprsd/main.py | 69 +---- aprsd/messaging.py | 4 +- aprsd/packets.py | 19 +- aprsd/plugin.py | 189 ++++++------ aprsd/plugins/email.py | 594 +++++++++++++++++++++++++++++++++++++- aprsd/plugins/fortune.py | 4 +- aprsd/plugins/location.py | 4 +- aprsd/plugins/notify.py | 4 +- aprsd/plugins/ping.py | 6 +- aprsd/plugins/query.py | 4 +- aprsd/plugins/stock.py | 4 +- aprsd/plugins/time.py | 8 +- aprsd/plugins/version.py | 4 +- aprsd/plugins/weather.py | 16 +- aprsd/stats.py | 2 +- aprsd/threads.py | 272 +++++++---------- tests/test_email.py | 2 +- tests/test_main.py | 6 +- tests/test_plugin.py | 34 ++- 19 files changed, 873 insertions(+), 372 deletions(-) diff --git a/aprsd/main.py b/aprsd/main.py index 3017297..8403e69 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -25,7 +25,6 @@ import logging from logging import NullHandler from logging.handlers import RotatingFileHandler import os -import queue import signal import sys import time @@ -227,12 +226,6 @@ def setup_logging(config, loglevel, quiet): def check_version(loglevel, config_file): config = utils.parse_config(config_file) - # Force setting the config to the modules that need it - # TODO(Walt): convert these modules to classes that can - # Accept the config as a constructor param, instead of this - # hacky global setting - email.CONFIG = config - setup_logging(config, loglevel, False) level, msg = utils._check_version() if level: @@ -414,12 +407,6 @@ def send_message( help="The log level to use for aprsd.log", ) @click.option("--quiet", is_flag=True, default=False, help="Don't log to stdout") -@click.option( - "--disable-validation", - is_flag=True, - default=False, - help="Disable email shortcut validation. Bad email addresses can result in broken email responses!!", -) @click.option( "-c", "--config", @@ -440,7 +427,6 @@ def send_message( def server( loglevel, quiet, - disable_validation, config_file, flush, ): @@ -453,12 +439,6 @@ def server( config = utils.parse_config(config_file) - # Force setting the config to the modules that need it - # TODO(Walt): convert these modules to classes that can - # Accept the config as a constructor param, instead of this - # hacky global setting - email.CONFIG = config - setup_logging(config, loglevel, quiet) level, msg = utils._check_version() if level: @@ -479,18 +459,6 @@ def server( trace.setup_tracing(["method", "api"]) stats.APRSDStats(config) - email_enabled = config["aprsd"]["email"].get("enabled", False) - - if email_enabled: - # TODO(walt): Make email processing/checking optional? - # Maybe someone only wants this to process messages with plugins only. - valid = email.validate_email_config(config, disable_validation) - if not valid: - LOG.error("Failed to validate email config options") - sys.exit(-1) - else: - LOG.info("Email services not enabled.") - # Create the initial PM singleton and Register plugins plugin_manager = plugin.PluginManager(config) plugin_manager.setup_plugins() @@ -509,34 +477,25 @@ def server( LOG.debug("Loading saved MsgTrack object.") messaging.MsgTrack().load() - rx_notify_queue = queue.Queue(maxsize=20) - rx_msg_queue = queue.Queue(maxsize=20) - tx_msg_queue = queue.Queue(maxsize=20) - msg_queues = { - "rx": rx_msg_queue, - "tx": tx_msg_queue, - "notify": rx_notify_queue, - } + packets.PacketList(config=config) + + rx_thread = threads.APRSDRXThread( + msg_queues=threads.msg_queues, + config=config, + ) + tx_thread = threads.APRSDTXThread( + msg_queues=threads.msg_queues, + config=config, + ) + + rx_thread.start() + tx_thread.start() - rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config) - tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config) if "watch_list" in config["aprsd"] and config["aprsd"]["watch_list"].get( "enabled", True, ): - packets.PacketList(config) - notify_thread = threads.APRSDNotifyThread( - msg_queues=msg_queues, - config=config, - ) - notify_thread.start() - - if email_enabled: - email_thread = email.APRSDEmailThread(msg_queues=msg_queues, config=config) - email_thread.start() - - rx_thread.start() - tx_thread.start() + packets.WatchList(config=config) messaging.MsgTrack().restart() diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 09750e5..00ed7f5 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -9,7 +9,7 @@ import re import threading import time -from aprsd import client, stats, threads, trace, utils +from aprsd import client, packets, stats, threads, trace, utils LOG = logging.getLogger("APRSD") @@ -424,6 +424,7 @@ class SendMessageThread(threads.APRSDThread): ) cl.sendall(str(msg)) stats.APRSDStats().msgs_tx_inc() + packets.PacketList().add(msg.dict()) msg.last_send_time = datetime.datetime.now() msg.last_send_attempt += 1 @@ -527,6 +528,7 @@ class SendAckThread(threads.APRSDThread): ) cl.sendall(str(self.ack)) stats.APRSDStats().ack_tx_inc() + packets.PacketList().add(self.ack.dict()) self.ack.last_send_attempt += 1 self.ack.last_send_time = datetime.datetime.now() time.sleep(5) diff --git a/aprsd/packets.py b/aprsd/packets.py index ae6e7d0..ac08535 100644 --- a/aprsd/packets.py +++ b/aprsd/packets.py @@ -27,7 +27,7 @@ class PacketList: def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - cls._instance.packet_list = utils.RingBuffer(100) + cls._instance.packet_list = utils.RingBuffer(1000) cls._instance.lock = threading.Lock() return cls._instance @@ -42,7 +42,10 @@ class PacketList: def add(self, packet): with self.lock: packet["ts"] = time.time() - if "from" in packet and packet["from"] == self.config["aprs"]["login"]: + if ( + "fromcall" in packet + and packet["fromcall"] == self.config["aprs"]["login"] + ): self.total_tx += 1 else: self.total_recv += 1 @@ -156,3 +159,15 @@ def get_packet_type(packet): elif msg_format == "mic-e": packet_type = PACKET_TYPE_MICE return packet_type + + +def is_message_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_MESSAGE + + +def is_ack_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_ACK + + +def is_mice_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_MICE diff --git a/aprsd/plugin.py b/aprsd/plugin.py index f361827..5bedee7 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -8,6 +8,7 @@ import os import re import threading +from aprsd import messaging, packets import pluggy from thesmuggler import smuggle @@ -39,12 +40,48 @@ class APRSDCommandSpec: """A hook specification namespace.""" @hookspec - def run(self, packet): + def filter(self, packet): """My special little hook that you can customize.""" -class APRSDNotificationPluginBase(metaclass=abc.ABCMeta): - """Base plugin class for all notification ased plugins. +class APRSDPluginBase(metaclass=abc.ABCMeta): + """The base class for all APRSD Plugins.""" + + config = None + message_counter = 0 + version = "1.0" + + def __init__(self, config): + self.config = config + self.message_counter = 0 + self.setup() + + @property + def message_count(self): + return self.message_counter + + @property + def version(self): + """Version""" + raise NotImplementedError + + def setup(self): + """Do any plugin setup here.""" + pass + + @hookimpl + @abc.abstractmethod + def filter(self, packet): + pass + + @abc.abstractmethod + def process(self, packet): + """This is called when the filter passes.""" + pass + + +class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): + """Base plugin class for all notification APRSD plugins. All these plugins will get every packet seen by APRSD's registered list of HAM callsigns in the config file's @@ -55,24 +92,26 @@ class APRSDNotificationPluginBase(metaclass=abc.ABCMeta): this class. """ - def __init__(self, config): - """The aprsd config object is stored.""" - self.config = config - self.message_counter = 0 + def filter(self, packet): + wl = packets.WatchList() + result = messaging.NULL_MESSAGE + if wl.callsign_in_watchlist(packet["from"]): + # packet is from a callsign in the watch list + result = self.process() + wl.update_seen(packet) - @hookimpl - def run(self, packet): - return self.notify(packet) + return result - @abc.abstractmethod - def notify(self, packet): - """This is the main method called when a packet is rx. +<<<<<<< HEAD This will get called when a packet is seen by a callsign registered in the watch list in the config file.""" class APRSDMessagePluginBase(metaclass=abc.ABCMeta): +======= +class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): +>>>>>>> 2e7c884 (Refactor Message processing and MORE) """Base Message plugin class. When you want to search for a particular command in an @@ -80,11 +119,6 @@ class APRSDMessagePluginBase(metaclass=abc.ABCMeta): based off of this class. """ - def __init__(self, config): - """The aprsd config object is stored.""" - self.config = config - self.message_counter = 0 - @property def command_name(self): """The usage string help.""" @@ -100,24 +134,32 @@ class APRSDMessagePluginBase(metaclass=abc.ABCMeta): """Version""" raise NotImplementedError - @property - def message_count(self): - return self.message_counter - @hookimpl - def run(self, packet): + def filter(self, packet): + result = None + message = packet.get("message_text", None) - if re.search(self.command_regex, message): - self.message_counter += 1 - return self.command(packet) + msg_format = packet.get("format", None) + tocall = packet.get("addresse", None) - @abc.abstractmethod - def command(self, packet): - """This is the command that runs when the regex matches. + # Only process messages destined for us + # and is an APRS message format and has a message. + if ( + tocall == self.config["aprs"]["login"] + and msg_format == "message" + and message + ): + if re.search(self.command_regex, message): + self.message_counter += 1 + result = self.process(packet) +<<<<<<< HEAD To reply with a message over the air, return a string to send. """ +======= + return result +>>>>>>> 2e7c884 (Refactor Message processing and MORE) class PluginManager: @@ -125,10 +167,7 @@ class PluginManager: _instance = None # the pluggy PluginManager for all Message plugins - _pluggy_msg_pm = None - - # the pluggy PluginManager for all Notification plugins - _pluggy_notify_pm = None + _pluggy_pm = None # aprsd config dict config = None @@ -173,15 +212,17 @@ class PluginManager: def is_plugin(self, obj): for c in inspect.getmro(obj): - if issubclass(c, APRSDMessagePluginBase) or issubclass( - c, - APRSDNotificationPluginBase, - ): + if issubclass(c, APRSDPluginBase): return True return False - def _create_class(self, module_class_string, super_cls: type = None, **kwargs): + def _create_class( + self, + module_class_string, + super_cls: type = None, + **kwargs, + ): """ Method to create a class from a fqn python string. :param module_class_string: full name of the class to create an object of @@ -213,7 +254,7 @@ class PluginManager: obj = cls(**kwargs) return obj - def _load_msg_plugin(self, plugin_name): + def _load_plugin(self, plugin_name): """ Given a python fully qualified class path.name, Try importing the path, then creating the object, @@ -223,61 +264,35 @@ class PluginManager: try: plugin_obj = self._create_class( plugin_name, - APRSDMessagePluginBase, + APRSDPluginBase, config=self.config, ) if plugin_obj: LOG.info( - "Registering Message plugin '{}'({}) '{}'".format( - plugin_name, - plugin_obj.version, - plugin_obj.command_regex, - ), - ) - self._pluggy_msg_pm.register(plugin_obj) - except Exception as ex: - LOG.exception(f"Couldn't load plugin '{plugin_name}'", ex) - - def _load_notify_plugin(self, plugin_name): - """ - Given a python fully qualified class path.name, - Try importing the path, then creating the object, - then registering it as a aprsd Command Plugin - """ - plugin_obj = None - try: - plugin_obj = self._create_class( - plugin_name, - APRSDNotificationPluginBase, - config=self.config, - ) - if plugin_obj: - LOG.info( - "Registering Notification plugin '{}'({})".format( + "Registering plugin '{}'({})".format( plugin_name, plugin_obj.version, ), ) - self._pluggy_notify_pm.register(plugin_obj) + self._pluggy_pm.register(plugin_obj) except Exception as ex: LOG.exception(f"Couldn't load plugin '{plugin_name}'", ex) def reload_plugins(self): with self.lock: - del self._pluggy_msg_pm - del self._pluggy_notify_pm + del self._pluggy_pm self.setup_plugins() def setup_plugins(self): """Create the plugin manager and register plugins.""" - LOG.info("Loading APRSD Message Plugins") - enabled_msg_plugins = self.config["aprsd"].get("enabled_plugins", None) - self._pluggy_msg_pm = pluggy.PluginManager("aprsd") - self._pluggy_msg_pm.add_hookspecs(APRSDCommandSpec) - if enabled_msg_plugins: - for p_name in enabled_msg_plugins: - self._load_msg_plugin(p_name) + LOG.info("Loading APRSD Plugins") + enabled_plugins = self.config["aprsd"].get("enabled_plugins", None) + self._pluggy_pm = pluggy.PluginManager("aprsd") + self._pluggy_pm.add_hookspecs(APRSDCommandSpec) + if enabled_plugins: + for p_name in enabled_plugins: + self._load_plugin(p_name) else: # Enabled plugins isn't set, so we default to loading all of # the core plugins. @@ -285,34 +300,24 @@ class PluginManager: self._load_plugin(p_name) if self.config["aprsd"]["watch_list"].get("enabled", False): - LOG.info("Loading APRSD Notification Plugins") + LOG.info("Loading APRSD WatchList Plugins") enabled_notify_plugins = self.config["aprsd"]["watch_list"].get( "enabled_plugins", None, ) - self._pluggy_notify_pm = pluggy.PluginManager("aprsd") - self._pluggy_notify_pm.add_hookspecs(APRSDCommandSpec) if enabled_notify_plugins: for p_name in enabled_notify_plugins: - self._load_notify_plugin(p_name) - - else: - LOG.info("Skipping Custom Plugins directory.") + self._load_plugin(p_name) LOG.info("Completed Plugin Loading.") def run(self, packet): """Execute all the pluguns run method.""" with self.lock: - return self._pluggy_msg_pm.hook.run(packet=packet) - - def notify(self, packet): - """Execute all the notify pluguns run method.""" - with self.lock: - return self._pluggy_notify_pm.hook.run(packet=packet) + return self._pluggy_pm.hook.filter(packet=packet) def register_msg(self, obj): """Register the plugin.""" - self._pluggy_msg_pm.register(obj) + self._pluggy_pm.register(obj) - def get_msg_plugins(self): - return self._pluggy_msg_pm.get_plugins() + def get_plugins(self): + return self._pluggy_pm.get_plugins() diff --git a/aprsd/plugins/email.py b/aprsd/plugins/email.py index 0d0d9c9..f5d6449 100644 --- a/aprsd/plugins/email.py +++ b/aprsd/plugins/email.py @@ -1,14 +1,25 @@ +import datetime +import email +from email.mime.text import MIMEText +import imaplib import logging import re +import smtplib import time -from aprsd import email, messaging, plugin, trace +from aprsd import messaging, plugin, stats, threads, trace +import imapclient +from validate_email import validate_email LOG = logging.getLogger("APRSD") +# This gets forced set from main.py prior to being used internally +CONFIG = {} +check_email_delay = 60 -class EmailPlugin(plugin.APRSDMessagePluginBase): + +class EmailPlugin(plugin.APRSDRegexCommandPluginBase): """Email Plugin.""" version = "1.0" @@ -18,10 +29,38 @@ class EmailPlugin(plugin.APRSDMessagePluginBase): # message_number:time combos so we don't resend the same email in # five mins {int:int} email_sent_dict = {} + enabled = False + + def setup(self): + """Ensure that email is enabled and start the thread.""" + global CONFIG + CONFIG = self.config + + email_enabled = self.config["aprsd"]["email"].get("enabled", False) + validation = self.config["aprsd"]["email"].get("validate", False) + + if email_enabled: + valid = validate_email_config(self.config, validation) + if not valid: + LOG.error("Failed to validate email config options.") + LOG.error("EmailPlugin DISABLED!!!!") + else: + self.enabled = True + email_thread = APRSDEmailThread( + msg_queues=threads.msg_queues, + config=self.config, + ) + email_thread.start() + else: + LOG.info("Email services not enabled.") @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("Email COMMAND") + if not self.enabled: + # Email has not been enabled + # so the plugin will just NOOP + return messaging.NULL_MESSAGE fromcall = packet.get("from") message = packet.get("message_text", None) @@ -39,7 +78,7 @@ class EmailPlugin(plugin.APRSDMessagePluginBase): r = re.search("^-([0-9])[0-9]*$", message) if r is not None: LOG.debug("RESEND EMAIL") - email.resend_email(r.group(1), fromcall) + resend_email(r.group(1), fromcall) reply = messaging.NULL_MESSAGE # -user@address.com body of email elif re.search(r"^-([A-Za-z0-9_\-\.@]+) (.*)", message): @@ -49,14 +88,16 @@ class EmailPlugin(plugin.APRSDMessagePluginBase): to_addr = a.group(1) content = a.group(2) - email_address = email.get_email_from_shortcut(to_addr) + email_address = get_email_from_shortcut(to_addr) if not email_address: reply = "Bad email address" return reply # send recipient link to aprs.fi map if content == "mapme": - content = "Click for my location: http://aprs.fi/{}".format( + content = ( + "Click for my location: http://aprs.fi/{}" "" + ).format( self.config["ham"]["callsign"], ) too_soon = 0 @@ -74,9 +115,9 @@ class EmailPlugin(plugin.APRSDMessagePluginBase): reply = messaging.NULL_MESSAGE if send_result != 0: reply = f"-{to_addr} failed" - # messaging.send_message(fromcall, "-" + to_addr + " failed") else: - # clear email sent dictionary if somehow goes over 100 + # clear email sent dictionary if somehow goes + # over 100 if len(self.email_sent_dict) > 98: LOG.debug( "DEBUG: email_sent_dict is big (" @@ -97,3 +138,540 @@ class EmailPlugin(plugin.APRSDMessagePluginBase): # messaging.send_message(fromcall, "Bad email address") return reply + + +@trace.trace +def _imap_connect(): + global CONFIG + imap_port = CONFIG["aprsd"]["email"]["imap"].get("port", 143) + use_ssl = CONFIG["aprsd"]["email"]["imap"].get("use_ssl", False) + # host = CONFIG["aprsd"]["email"]["imap"]["host"] + # msg = "{}{}:{}".format("TLS " if use_ssl else "", host, imap_port) + # LOG.debug("Connect to IMAP host {} with user '{}'". + # format(msg, CONFIG['imap']['login'])) + + try: + server = imapclient.IMAPClient( + CONFIG["aprsd"]["email"]["imap"]["host"], + port=imap_port, + use_uid=True, + ssl=use_ssl, + timeout=30, + ) + except Exception as e: + LOG.error("Failed to connect IMAP server", e) + return + + try: + server.login( + CONFIG["aprsd"]["email"]["imap"]["login"], + CONFIG["aprsd"]["email"]["imap"]["password"], + ) + except (imaplib.IMAP4.error, Exception) as e: + msg = getattr(e, "message", repr(e)) + LOG.error("Failed to login {}".format(msg)) + return + + server.select_folder("INBOX") + + server.fetch = trace.trace(server.fetch) + server.search = trace.trace(server.search) + server.remove_flags = trace.trace(server.remove_flags) + server.add_flags = trace.trace(server.add_flags) + return server + + +@trace.trace +def _smtp_connect(): + host = CONFIG["aprsd"]["email"]["smtp"]["host"] + smtp_port = CONFIG["aprsd"]["email"]["smtp"]["port"] + use_ssl = CONFIG["aprsd"]["email"]["smtp"].get("use_ssl", False) + msg = "{}{}:{}".format("SSL " if use_ssl else "", host, smtp_port) + LOG.debug( + "Connect to SMTP host {} with user '{}'".format( + msg, + CONFIG["aprsd"]["email"]["imap"]["login"], + ), + ) + + try: + if use_ssl: + server = smtplib.SMTP_SSL( + host=host, + port=smtp_port, + timeout=30, + ) + else: + server = smtplib.SMTP( + host=host, + port=smtp_port, + timeout=30, + ) + except Exception: + LOG.error("Couldn't connect to SMTP Server") + return + + LOG.debug("Connected to smtp host {}".format(msg)) + + debug = CONFIG["aprsd"]["email"]["smtp"].get("debug", False) + if debug: + server.set_debuglevel(5) + server.sendmail = trace.trace(server.sendmail) + + try: + server.login( + CONFIG["aprsd"]["email"]["smtp"]["login"], + CONFIG["aprsd"]["email"]["smtp"]["password"], + ) + except Exception: + LOG.error("Couldn't connect to SMTP Server") + return + + LOG.debug("Logged into SMTP server {}".format(msg)) + return server + + +def validate_shortcuts(config): + shortcuts = config["aprsd"]["email"].get("shortcuts", None) + if not shortcuts: + return + + LOG.info( + "Validating {} Email shortcuts. This can take up to 10 seconds" + " per shortcut".format(len(shortcuts)), + ) + delete_keys = [] + for key in shortcuts: + LOG.info("Validating {}:{}".format(key, shortcuts[key])) + is_valid = validate_email( + email_address=shortcuts[key], + check_regex=True, + check_mx=False, + from_address=config["aprsd"]["email"]["smtp"]["login"], + helo_host=config["aprsd"]["email"]["smtp"]["host"], + smtp_timeout=10, + dns_timeout=10, + use_blacklist=True, + debug=False, + ) + if not is_valid: + LOG.error( + "'{}' is an invalid email address. Removing shortcut".format( + shortcuts[key], + ), + ) + delete_keys.append(key) + + for key in delete_keys: + del config["aprsd"]["email"]["shortcuts"][key] + + LOG.info( + "Available shortcuts: {}".format( + config["aprsd"]["email"]["shortcuts"], + ), + ) + + +def get_email_from_shortcut(addr): + if CONFIG["aprsd"]["email"].get("shortcuts", False): + return CONFIG["aprsd"]["email"]["shortcuts"].get(addr, addr) + else: + return addr + + +def validate_email_config(config, disable_validation=False): + """function to simply ensure we can connect to email services. + + This helps with failing early during startup. + """ + LOG.info("Checking IMAP configuration") + imap_server = _imap_connect() + LOG.info("Checking SMTP configuration") + smtp_server = _smtp_connect() + + # Now validate and flag any shortcuts as invalid + if not disable_validation: + validate_shortcuts(config) + else: + LOG.info("Shortcuts email validation is Disabled!!, you were warned.") + + if imap_server and smtp_server: + return True + else: + return False + + +@trace.trace +def parse_email(msgid, data, server): + envelope = data[b"ENVELOPE"] + # email address match + # use raw string to avoid invalid escape secquence errors r"string here" + f = re.search(r"([\.\w_-]+@[\.\w_-]+)", str(envelope.from_[0])) + if f is not None: + from_addr = f.group(1) + else: + from_addr = "noaddr" + LOG.debug("Got a message from '{}'".format(from_addr)) + try: + m = server.fetch([msgid], ["RFC822"]) + except Exception as e: + LOG.exception("Couldn't fetch email from server in parse_email", e) + return + + msg = email.message_from_string(m[msgid][b"RFC822"].decode(errors="ignore")) + if msg.is_multipart(): + text = "" + html = None + # default in case body somehow isn't set below - happened once + body = b"* unreadable msg received" + # this uses the last text or html part in the email, + # phone companies often put content in an attachment + for part in msg.get_payload(): + if part.get_content_charset() is None: + # or BREAK when we hit a text or html? + # We cannot know the character set, + # so return decoded "something" + LOG.debug("Email got unknown content type") + text = part.get_payload(decode=True) + continue + + charset = part.get_content_charset() + + if part.get_content_type() == "text/plain": + LOG.debug("Email got text/plain") + text = str( + part.get_payload(decode=True), + str(charset), + "ignore", + ).encode("utf8", "replace") + + if part.get_content_type() == "text/html": + LOG.debug("Email got text/html") + html = str( + part.get_payload(decode=True), + str(charset), + "ignore", + ).encode("utf8", "replace") + + if text is not None: + # strip removes white space fore and aft of string + body = text.strip() + else: + body = html.strip() + else: # message is not multipart + # email.uscc.net sends no charset, blows up unicode function below + LOG.debug("Email is not multipart") + if msg.get_content_charset() is None: + text = str(msg.get_payload(decode=True), "US-ASCII", "ignore").encode( + "utf8", + "replace", + ) + else: + text = str( + msg.get_payload(decode=True), + msg.get_content_charset(), + "ignore", + ).encode("utf8", "replace") + body = text.strip() + + # FIXED: UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 + # in position 6: ordinal not in range(128) + # decode with errors='ignore'. be sure to encode it before we return + # it below, also with errors='ignore' + try: + body = body.decode(errors="ignore") + except Exception as e: + LOG.error("Unicode decode failure: " + str(e)) + LOG.error("Unidoce decode failed: " + str(body)) + body = "Unreadable unicode msg" + # strip all html tags + body = re.sub("<[^<]+?>", "", body) + # strip CR/LF, make it one line, .rstrip fails at this + body = body.replace("\n", " ").replace("\r", " ") + # ascii might be out of range, so encode it, removing any error characters + body = body.encode(errors="ignore") + return body, from_addr + + +# end parse_email + + +@trace.trace +def send_email(to_addr, content): + global check_email_delay + + shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] + email_address = get_email_from_shortcut(to_addr) + LOG.info("Sending Email_________________") + + if to_addr in shortcuts: + LOG.info("To : " + to_addr) + to_addr = email_address + LOG.info(" (" + to_addr + ")") + subject = CONFIG["ham"]["callsign"] + # content = content + "\n\n(NOTE: reply with one line)" + LOG.info("Subject : " + subject) + LOG.info("Body : " + content) + + # check email more often since there's activity right now + check_email_delay = 60 + + msg = MIMEText(content) + msg["Subject"] = subject + msg["From"] = CONFIG["aprsd"]["email"]["smtp"]["login"] + msg["To"] = to_addr + server = _smtp_connect() + if server: + try: + server.sendmail( + CONFIG["aprsd"]["email"]["smtp"]["login"], + [to_addr], + msg.as_string(), + ) + stats.APRSDStats().email_tx_inc() + except Exception as e: + msg = getattr(e, "message", repr(e)) + LOG.error("Sendmail Error!!!! '{}'", msg) + server.quit() + return -1 + server.quit() + return 0 + + +@trace.trace +def resend_email(count, fromcall): + global check_email_delay + date = datetime.datetime.now() + month = date.strftime("%B")[:3] # Nov, Mar, Apr + day = date.day + year = date.year + today = "{}-{}-{}".format(day, month, year) + + shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] + # swap key/value + shortcuts_inverted = {v: k for k, v in shortcuts.items()} + + try: + server = _imap_connect() + except Exception as e: + LOG.exception("Failed to Connect to IMAP. Cannot resend email ", e) + return + + try: + messages = server.search(["SINCE", today]) + except Exception as e: + LOG.exception("Couldn't search for emails in resend_email ", e) + return + + # LOG.debug("%d messages received today" % len(messages)) + + msgexists = False + + messages.sort(reverse=True) + del messages[int(count) :] # only the latest "count" messages + for message in messages: + try: + parts = server.fetch(message, ["ENVELOPE"]).items() + except Exception as e: + LOG.exception("Couldn't fetch email parts in resend_email", e) + continue + + for msgid, data in list(parts): + # one at a time, otherwise order is random + (body, from_addr) = parse_email(msgid, data, server) + # unset seen flag, will stay bold in email client + try: + server.remove_flags(msgid, [imapclient.SEEN]) + except Exception as e: + LOG.exception("Failed to remove SEEN flag in resend_email", e) + + if from_addr in shortcuts_inverted: + # reverse lookup of a shortcut + from_addr = shortcuts_inverted[from_addr] + # asterisk indicates a resend + reply = "-" + from_addr + " * " + body.decode(errors="ignore") + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage( + CONFIG["aprs"]["login"], + fromcall, + reply, + ) + msg.send() + msgexists = True + + if msgexists is not True: + stm = time.localtime() + h = stm.tm_hour + m = stm.tm_min + s = stm.tm_sec + # append time as a kind of serial number to prevent FT1XDR from + # thinking this is a duplicate message. + # The FT1XDR pretty much ignores the aprs message number in this + # regard. The FTM400 gets it right. + reply = "No new msg {}:{}:{}".format( + str(h).zfill(2), + str(m).zfill(2), + str(s).zfill(2), + ) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply) + msg.send() + + # check email more often since we're resending one now + check_email_delay = 60 + + server.logout() + # end resend_email() + + +class APRSDEmailThread(threads.APRSDThread): + def __init__(self, msg_queues, config): + super().__init__("EmailThread") + self.msg_queues = msg_queues + self.config = config + self.past = datetime.datetime.now() + + def loop(self): + global check_email_delay + + LOG.debug("Starting Loop") + + check_email_delay = 60 + time.sleep(5) + stats.APRSDStats().email_thread_update() + # always sleep for 5 seconds and see if we need to check email + # This allows CTRL-C to stop the execution of this loop sooner + # than check_email_delay time + now = datetime.datetime.now() + if now - self.past > datetime.timedelta(seconds=check_email_delay): + # It's time to check email + + # slowly increase delay every iteration, max out at 300 seconds + # any send/receive/resend activity will reset this to 60 seconds + if check_email_delay < 300: + check_email_delay += 1 + LOG.debug( + "check_email_delay is " + str(check_email_delay) + " seconds", + ) + + shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] + # swap key/value + shortcuts_inverted = {v: k for k, v in shortcuts.items()} + + date = datetime.datetime.now() + month = date.strftime("%B")[:3] # Nov, Mar, Apr + day = date.day + year = date.year + today = "{}-{}-{}".format(day, month, year) + + try: + server = _imap_connect() + except Exception as e: + LOG.exception("IMAP failed to connect.", e) + return True + + try: + messages = server.search(["SINCE", today]) + except Exception as e: + LOG.exception( + "IMAP failed to search for messages since today.", + e, + ) + return True + LOG.debug("{} messages received today".format(len(messages))) + + try: + _msgs = server.fetch(messages, ["ENVELOPE"]) + except Exception as e: + LOG.exception("IMAP failed to fetch/flag messages: ", e) + return True + + for msgid, data in _msgs.items(): + envelope = data[b"ENVELOPE"] + LOG.debug( + 'ID:%d "%s" (%s)' + % (msgid, envelope.subject.decode(), envelope.date), + ) + f = re.search( + r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", + str(envelope.from_[0]), + ) + if f is not None: + from_addr = f.group(1) + else: + from_addr = "noaddr" + + # LOG.debug("Message flags/tags: " + + # str(server.get_flags(msgid)[msgid])) + # if "APRS" not in server.get_flags(msgid)[msgid]: + # in python3, imap tags are unicode. in py2 they're strings. + # so .decode them to handle both + try: + taglist = [ + x.decode(errors="ignore") + for x in server.get_flags(msgid)[msgid] + ] + except Exception as e: + LOG.exception("Failed to get flags.", e) + break + + if "APRS" not in taglist: + # if msg not flagged as sent via aprs + try: + server.fetch([msgid], ["RFC822"]) + except Exception as e: + LOG.exception( + "Failed single server fetch for RFC822", + e, + ) + break + + (body, from_addr) = parse_email(msgid, data, server) + # unset seen flag, will stay bold in email client + try: + server.remove_flags(msgid, [imapclient.SEEN]) + except Exception as e: + LOG.exception("Failed to remove flags SEEN", e) + # Not much we can do here, so lets try and + # send the aprs message anyway + + if from_addr in shortcuts_inverted: + # reverse lookup of a shortcut + from_addr = shortcuts_inverted[from_addr] + + reply = "-" + from_addr + " " + body.decode(errors="ignore") + msg = messaging.TextMessage( + self.config["aprs"]["login"], + self.config["ham"]["callsign"], + reply, + ) + self.msg_queues["tx"].put(msg) + # flag message as sent via aprs + try: + server.add_flags(msgid, ["APRS"]) + # unset seen flag, will stay bold in email client + except Exception as e: + LOG.exception("Couldn't add APRS flag to email", e) + + try: + server.remove_flags(msgid, [imapclient.SEEN]) + except Exception as e: + LOG.exception("Couldn't remove seen flag from email", e) + + # check email more often since we just received an email + check_email_delay = 60 + + # reset clock + LOG.debug("Done looping over Server.fetch, logging out.") + self.past = datetime.datetime.now() + try: + server.logout() + except Exception as e: + LOG.exception("IMAP failed to logout: ", e) + return True + else: + # We haven't hit the email delay yet. + # LOG.debug("Delta({}) < {}".format(now - past, check_email_delay)) + return True + + return True diff --git a/aprsd/plugins/fortune.py b/aprsd/plugins/fortune.py index 27fe632..e6ac01a 100644 --- a/aprsd/plugins/fortune.py +++ b/aprsd/plugins/fortune.py @@ -8,7 +8,7 @@ from aprsd import plugin, trace LOG = logging.getLogger("APRSD") -class FortunePlugin(plugin.APRSDMessagePluginBase): +class FortunePlugin(plugin.APRSDRegexCommandPluginBase): """Fortune.""" version = "1.0" @@ -16,7 +16,7 @@ class FortunePlugin(plugin.APRSDMessagePluginBase): command_name = "fortune" @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("FortunePlugin") # fromcall = packet.get("from") diff --git a/aprsd/plugins/location.py b/aprsd/plugins/location.py index a201a17..59b3847 100644 --- a/aprsd/plugins/location.py +++ b/aprsd/plugins/location.py @@ -8,7 +8,7 @@ from aprsd import plugin, plugin_utils, trace, utils LOG = logging.getLogger("APRSD") -class LocationPlugin(plugin.APRSDMessagePluginBase): +class LocationPlugin(plugin.APRSDRegexCommandPluginBase): """Location!""" version = "1.0" @@ -16,7 +16,7 @@ class LocationPlugin(plugin.APRSDMessagePluginBase): command_name = "location" @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("Location Plugin") fromcall = packet.get("from") message = packet.get("message_text", None) diff --git a/aprsd/plugins/notify.py b/aprsd/plugins/notify.py index 0687020..c3806fa 100644 --- a/aprsd/plugins/notify.py +++ b/aprsd/plugins/notify.py @@ -6,7 +6,7 @@ from aprsd import messaging, packets, plugin LOG = logging.getLogger("APRSD") -class NotifySeenPlugin(plugin.APRSDNotificationPluginBase): +class NotifySeenPlugin(plugin.APRSDWatchListPluginBase): """Notification plugin to send seen message for callsign. @@ -21,7 +21,7 @@ class NotifySeenPlugin(plugin.APRSDNotificationPluginBase): """The aprsd config object is stored.""" super().__init__(config) - def notify(self, packet): + def process(self, packet): LOG.info("BaseNotifyPlugin") notify_callsign = self.config["aprsd"]["watch_list"]["alert_callsign"] diff --git a/aprsd/plugins/ping.py b/aprsd/plugins/ping.py index 471bc07..77d78ec 100644 --- a/aprsd/plugins/ping.py +++ b/aprsd/plugins/ping.py @@ -7,7 +7,7 @@ from aprsd import plugin, trace LOG = logging.getLogger("APRSD") -class PingPlugin(plugin.APRSDMessagePluginBase): +class PingPlugin(plugin.APRSDRegexCommandPluginBase): """Ping.""" version = "1.0" @@ -15,8 +15,8 @@ class PingPlugin(plugin.APRSDMessagePluginBase): command_name = "ping" @trace.trace - def command(self, packet): - LOG.info("PINGPlugin") + def process(self, packet): + LOG.info("PingPlugin") # fromcall = packet.get("from") # message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") diff --git a/aprsd/plugins/query.py b/aprsd/plugins/query.py index b6e3eb0..a658e74 100644 --- a/aprsd/plugins/query.py +++ b/aprsd/plugins/query.py @@ -8,7 +8,7 @@ from aprsd import messaging, plugin, trace LOG = logging.getLogger("APRSD") -class QueryPlugin(plugin.APRSDMessagePluginBase): +class QueryPlugin(plugin.APRSDRegexCommandPluginBase): """Query command.""" version = "1.0" @@ -16,7 +16,7 @@ class QueryPlugin(plugin.APRSDMessagePluginBase): command_name = "query" @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("Query COMMAND") fromcall = packet.get("from") diff --git a/aprsd/plugins/stock.py b/aprsd/plugins/stock.py index 83d487e..debf3d5 100644 --- a/aprsd/plugins/stock.py +++ b/aprsd/plugins/stock.py @@ -9,7 +9,7 @@ from aprsd import plugin, trace LOG = logging.getLogger("APRSD") -class StockPlugin(plugin.APRSDMessagePluginBase): +class StockPlugin(plugin.APRSDRegexCommandPluginBase): """Stock market plugin for fetching stock quotes""" version = "1.0" @@ -17,7 +17,7 @@ class StockPlugin(plugin.APRSDMessagePluginBase): command_name = "stock" @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("StockPlugin") # fromcall = packet.get("from") diff --git a/aprsd/plugins/time.py b/aprsd/plugins/time.py index 819e657..81df83a 100644 --- a/aprsd/plugins/time.py +++ b/aprsd/plugins/time.py @@ -11,7 +11,7 @@ from aprsd import fuzzyclock, plugin, plugin_utils, trace, utils LOG = logging.getLogger("APRSD") -class TimePlugin(plugin.APRSDMessagePluginBase): +class TimePlugin(plugin.APRSDRegexCommandPluginBase): """Time command.""" version = "1.0" @@ -42,7 +42,7 @@ class TimePlugin(plugin.APRSDMessagePluginBase): return reply @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("TIME COMMAND") # So we can mock this in unit tests localzone = self._get_local_tz() @@ -57,7 +57,7 @@ class TimeOpenCageDataPlugin(TimePlugin): command_name = "Time" @trace.trace - def command(self, packet): + def process(self, packet): fromcall = packet.get("from") message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") @@ -123,7 +123,7 @@ class TimeOWMPlugin(TimePlugin): command_name = "Time" @trace.trace - def command(self, packet): + def process(self, packet): fromcall = packet.get("from") message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") diff --git a/aprsd/plugins/version.py b/aprsd/plugins/version.py index cad661d..2a5e41a 100644 --- a/aprsd/plugins/version.py +++ b/aprsd/plugins/version.py @@ -7,7 +7,7 @@ from aprsd import plugin, stats, trace LOG = logging.getLogger("APRSD") -class VersionPlugin(plugin.APRSDMessagePluginBase): +class VersionPlugin(plugin.APRSDRegexCommandPluginBase): """Version of APRSD Plugin.""" version = "1.0" @@ -19,7 +19,7 @@ class VersionPlugin(plugin.APRSDMessagePluginBase): email_sent_dict = {} @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("Version COMMAND") # fromcall = packet.get("from") # message = packet.get("message_text", None) diff --git a/aprsd/plugins/weather.py b/aprsd/plugins/weather.py index 6f94c45..77e68c6 100644 --- a/aprsd/plugins/weather.py +++ b/aprsd/plugins/weather.py @@ -10,7 +10,7 @@ from aprsd import plugin, plugin_utils, trace, utils LOG = logging.getLogger("APRSD") -class USWeatherPlugin(plugin.APRSDMessagePluginBase): +class USWeatherPlugin(plugin.APRSDRegexCommandPluginBase): """USWeather Command Returns a weather report for the calling weather station @@ -28,7 +28,7 @@ class USWeatherPlugin(plugin.APRSDMessagePluginBase): command_name = "weather" @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("Weather Plugin") fromcall = packet.get("from") # message = packet.get("message_text", None) @@ -71,7 +71,7 @@ class USWeatherPlugin(plugin.APRSDMessagePluginBase): return reply -class USMetarPlugin(plugin.APRSDMessagePluginBase): +class USMetarPlugin(plugin.APRSDRegexCommandPluginBase): """METAR Command This provides a METAR weather report from a station near the caller @@ -91,7 +91,7 @@ class USMetarPlugin(plugin.APRSDMessagePluginBase): command_name = "Metar" @trace.trace - def command(self, packet): + def process(self, packet): fromcall = packet.get("from") message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") @@ -162,7 +162,7 @@ class USMetarPlugin(plugin.APRSDMessagePluginBase): return reply -class OWMWeatherPlugin(plugin.APRSDMessagePluginBase): +class OWMWeatherPlugin(plugin.APRSDRegexCommandPluginBase): """OpenWeatherMap Weather Command This provides weather near the caller or callsign. @@ -186,7 +186,7 @@ class OWMWeatherPlugin(plugin.APRSDMessagePluginBase): command_name = "Weather" @trace.trace - def command(self, packet): + def process(self, packet): fromcall = packet.get("from") message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") @@ -282,7 +282,7 @@ class OWMWeatherPlugin(plugin.APRSDMessagePluginBase): return reply -class AVWXWeatherPlugin(plugin.APRSDMessagePluginBase): +class AVWXWeatherPlugin(plugin.APRSDRegexCommandPluginBase): """AVWXWeatherMap Weather Command Fetches a METAR weather report for the nearest @@ -310,7 +310,7 @@ class AVWXWeatherPlugin(plugin.APRSDMessagePluginBase): command_name = "Weather" @trace.trace - def command(self, packet): + def process(self, packet): fromcall = packet.get("from") message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") diff --git a/aprsd/stats.py b/aprsd/stats.py index d70ab8d..4153ea9 100644 --- a/aprsd/stats.py +++ b/aprsd/stats.py @@ -183,7 +183,7 @@ class APRSDStats: last_aprsis_keepalive = "never" pm = plugin.PluginManager() - plugins = pm.get_msg_plugins() + plugins = pm.get_plugins() plugin_stats = {} def full_name_with_qualname(obj): diff --git a/aprsd/threads.py b/aprsd/threads.py index 60fec15..0ae0ed3 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -17,6 +17,13 @@ RX_THREAD = "RX" TX_THREAD = "TX" EMAIL_THREAD = "Email" +rx_msg_queue = queue.Queue(maxsize=20) +tx_msg_queue = queue.Queue(maxsize=20) +msg_queues = { + "rx": rx_msg_queue, + "tx": tx_msg_queue, +} + class APRSDThreadList: """Singleton class that keeps track of application wide threads.""" @@ -57,6 +64,10 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): def stop(self): self.thread_stop = True + @abc.abstractmethod + def loop(self): + pass + def run(self): LOG.debug("Starting") while not self.thread_stop: @@ -92,7 +103,11 @@ class KeepAliveThread(APRSDThread): current, peak = tracemalloc.get_traced_memory() stats_obj.set_memory(current) stats_obj.set_memory_peak(peak) - keepalive = "Uptime {} Tracker {} " "Msgs TX:{} RX:{} Last:{} Email:{} Packets:{} RAM Current:{} Peak:{}".format( + keepalive = ( + "Uptime {} Tracker {} Msgs TX:{} RX:{} " + "Last:{} Email:{} Packets:{} RAM Current:{} " + "Peak:{}" + ).format( utils.strfdelta(stats_obj.uptime), len(tracker), stats_obj.msgs_tx, @@ -116,51 +131,6 @@ class KeepAliveThread(APRSDThread): return True -class APRSDNotifyThread(APRSDThread): - last_seen = {} - - def __init__(self, msg_queues, config): - super().__init__("NOTIFY_MSG") - self.msg_queues = msg_queues - self.config = config - packets.WatchList(config=config) - - def loop(self): - try: - packet = self.msg_queues["notify"].get(timeout=5) - wl = packets.WatchList() - if wl.callsign_in_watchlist(packet["from"]): - # NOW WE RUN through the notify plugins. - # If they return a msg, then we queue it for sending. - pm = plugin.PluginManager() - results = pm.notify(packet) - for reply in results: - if reply is not messaging.NULL_MESSAGE: - watch_list_conf = self.config["aprsd"]["watch_list"] - - msg = messaging.TextMessage( - self.config["aprs"]["login"], - watch_list_conf["alert_callsign"], - reply, - ) - self.msg_queues["tx"].put(msg) - - wl.update_seen(packet) - else: - LOG.debug( - "Ignoring packet from '{}'. Not in watch list.".format( - packet["from"], - ), - ) - - # Allows stats object to have latest info from the last_seen dict - LOG.debug("Packet processing complete") - except queue.Empty: - pass - # Continue to loop - return True - - class APRSDRXThread(APRSDThread): def __init__(self, msg_queues, config): super().__init__("RX_MSG") @@ -229,133 +199,102 @@ class APRSDRXThread(APRSDThread): stats.APRSDStats().ack_rx_inc() return - def process_mic_e_packet(self, packet): - LOG.info("Mic-E Packet detected. Currenlty unsupported.") - messaging.log_packet(packet) - stats.APRSDStats().msgs_mice_inc() - return - - def process_message_packet(self, packet): - fromcall = packet["from"] - message = packet.get("message_text", None) - - msg_id = packet.get("msgNo", "0") - - messaging.log_message( - "Received Message", - packet["raw"], - message, - fromcall=fromcall, - msg_num=msg_id, - ) - - found_command = False - # Get singleton of the PM - pm = plugin.PluginManager() - try: - results = pm.run(packet) - for reply in results: - if isinstance(reply, list): - # one of the plugins wants to send multiple messages - found_command = True - for subreply in reply: - LOG.debug(f"Sending '{subreply}'") - - msg = messaging.TextMessage( - self.config["aprs"]["login"], - fromcall, - subreply, - ) - self.msg_queues["tx"].put(msg) - - else: - found_command = True - # A plugin can return a null message flag which signals - # us that they processed the message correctly, but have - # nothing to reply with, so we avoid replying with a usage string - if reply is not messaging.NULL_MESSAGE: - LOG.debug(f"Sending '{reply}'") - - msg = messaging.TextMessage( - self.config["aprs"]["login"], - fromcall, - reply, - ) - self.msg_queues["tx"].put(msg) - else: - LOG.debug("Got NULL MESSAGE from plugin") - - if not found_command: - plugins = pm.get_msg_plugins() - names = [x.command_name for x in plugins] - names.sort() - - # reply = "Usage: {}".format(", ".join(names)) - reply = "Usage: weather, locate [call], time, fortune, ping" - - msg = messaging.TextMessage( - self.config["aprs"]["login"], - fromcall, - reply, - ) - self.msg_queues["tx"].put(msg) - except Exception as ex: - LOG.exception("Plugin failed!!!", ex) - reply = "A Plugin failed! try again?" - msg = messaging.TextMessage(self.config["aprs"]["login"], fromcall, reply) - self.msg_queues["tx"].put(msg) - - # let any threads do their thing, then ack - # send an ack last - ack = messaging.AckMessage( - self.config["aprs"]["login"], - fromcall, - msg_id=msg_id, - ) - self.msg_queues["tx"].put(ack) - def process_packet(self, packet): """Process a packet recieved from aprs-is server.""" + packets.PacketList().add(packet) + stats.APRSDStats().msgs_rx_inc() - try: - LOG.debug("Adding packet to notify queue {}".format(packet["raw"])) - self.msg_queues["notify"].put(packet) - packets.PacketList().add(packet) + fromcall = packet["from"] + tocall = packet.get("addresse", None) + msg = packet.get("message_text", None) + msg_id = packet.get("msgNo", "0") + msg_response = packet.get("response", None) + LOG.debug("Got packet from '{}' - {}".format(fromcall, packet)) - # since we can see packets from anyone now with the - # watch list, we need to filter messages directly only to us. - tocall = packet.get("addresse", None) + # We don't put ack packets destined for us through the + # plugins. + if tocall == self.config["aprs"]["login"] and msg_response == "ack": + self.process_ack_packet(packet) + else: + # It's not an ACK for us, so lets run it through + # the plugins. + messaging.log_message( + "Received Message", + packet["raw"], + msg, + fromcall=fromcall, + msg_num=msg_id, + ) + + # Only ack messages that were sent directly to us if tocall == self.config["aprs"]["login"]: - stats.APRSDStats().msgs_rx_inc() - packets.PacketList().add(packet) - - msg = packet.get("message_text", None) - msg_format = packet.get("format", None) - msg_response = packet.get("response", None) - if msg_format == "message" and msg: - # we want to send the message through the - # plugins - self.process_message_packet(packet) - return - elif msg_response == "ack": - self.process_ack_packet(packet) - return - - if msg_format == "mic-e": - # process a mic-e packet - self.process_mic_e_packet(packet) - return - else: - LOG.debug( - "Ignoring '{}' packet from '{}' to '{}'".format( - packets.get_packet_type(packet), - packet["from"], - tocall, - ), + # let any threads do their thing, then ack + # send an ack last + ack = messaging.AckMessage( + self.config["aprs"]["login"], + fromcall, + msg_id=msg_id, ) + self.msg_queues["tx"].put(ack) + + pm = plugin.PluginManager() + try: + results = pm.run(packet) + LOG.debug("RESULTS {}".format(results)) + replied = False + for reply in results: + if isinstance(reply, list): + # one of the plugins wants to send multiple messages + replied = True + for subreply in reply: + LOG.debug("Sending '{}'".format(subreply)) + + msg = messaging.TextMessage( + self.config["aprs"]["login"], + fromcall, + subreply, + ) + self.msg_queues["tx"].put(msg) + + else: + replied = True + # A plugin can return a null message flag which signals + # us that they processed the message correctly, but have + # nothing to reply with, so we avoid replying with a + # usage string + if reply is not messaging.NULL_MESSAGE: + LOG.debug("Sending '{}'".format(reply)) + + msg = messaging.TextMessage( + self.config["aprs"]["login"], + fromcall, + reply, + ) + self.msg_queues["tx"].put(msg) + + # If the message was for us and we didn't have a + # response, then we send a usage statement. + if tocall == self.config["aprs"]["login"] and not replied: + reply = "Usage: weather, locate [call], time, fortune, ping" + + msg = messaging.TextMessage( + self.config["aprs"]["login"], + fromcall, + reply, + ) + self.msg_queues["tx"].put(msg) + except Exception as ex: + LOG.exception("Plugin failed!!!", ex) + # Do we need to send a reply? + if tocall == self.config["aprs"]["login"]: + reply = "A Plugin failed! try again?" + msg = messaging.TextMessage( + self.config["aprs"]["login"], + fromcall, + reply, + ) + self.msg_queues["tx"].put(msg) - except (aprslib.ParseError, aprslib.UnknownFormat) as exp: - LOG.exception("Failed to parse packet from aprs-is", exp) LOG.debug("Packet processing complete") @@ -368,7 +307,6 @@ class APRSDTXThread(APRSDThread): def loop(self): try: msg = self.msg_queues["tx"].get(timeout=5) - packets.PacketList().add(msg.dict()) msg.send() except queue.Empty: pass diff --git a/tests/test_email.py b/tests/test_email.py index 08a35e2..6a4532f 100644 --- a/tests/test_email.py +++ b/tests/test_email.py @@ -1,6 +1,6 @@ import unittest -from aprsd import email +from aprsd.plugins import email class TestEmail(unittest.TestCase): diff --git a/tests/test_main.py b/tests/test_main.py index a140131..bd62b55 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,7 +1,7 @@ import sys import unittest -from aprsd import email +from aprsd.plugins import email if sys.version_info >= (3, 2): @@ -11,8 +11,8 @@ else: class TestMain(unittest.TestCase): - @mock.patch("aprsd.email._imap_connect") - @mock.patch("aprsd.email._smtp_connect") + @mock.patch("aprsd.plugins.email._imap_connect") + @mock.patch("aprsd.plugins.email._smtp_connect") def test_validate_email(self, imap_mock, smtp_mock): """Test to make sure we fail.""" imap_mock.return_value = None diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 8ee2cd7..fd1d406 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -23,7 +23,11 @@ class TestPlugin(unittest.TestCase): stats.APRSDStats(self.config) def fake_packet(self, fromcall="KFART", message=None, msg_number=None): - packet = {"from": fromcall} + packet = { + "from": fromcall, + "addresse": self.config["aprs"]["login"], + "format": "message", + } if message: packet["message_text"] = message @@ -38,7 +42,7 @@ class TestPlugin(unittest.TestCase): mock_which.return_value = None expected = "Fortune command not installed" packet = self.fake_packet(message="fortune") - actual = fortune.run(packet) + actual = fortune.filter(packet) self.assertEqual(expected, actual) @mock.patch("subprocess.check_output") @@ -51,7 +55,7 @@ class TestPlugin(unittest.TestCase): expected = "Funny fortune" packet = self.fake_packet(message="fortune") - actual = fortune.run(packet) + actual = fortune.filter(packet) self.assertEqual(expected, actual) @mock.patch("aprsd.messaging.MsgTrack.flush") @@ -60,7 +64,7 @@ class TestPlugin(unittest.TestCase): query = query_plugin.QueryPlugin(self.config) expected = "Deleted ALL pending msgs." - actual = query.run(packet) + actual = query.filter(packet) mock_flush.assert_called_once() self.assertEqual(expected, actual) @@ -72,7 +76,7 @@ class TestPlugin(unittest.TestCase): query = query_plugin.QueryPlugin(self.config) expected = "No pending msgs to resend" - actual = query.run(packet) + actual = query.filter(packet) mock_restart.assert_not_called() self.assertEqual(expected, actual) mock_restart.reset_mock() @@ -80,7 +84,7 @@ class TestPlugin(unittest.TestCase): # add a message msg = messaging.TextMessage(self.fromcall, "testing", self.ack) track.add(msg) - actual = query.run(packet) + actual = query.filter(packet) mock_restart.assert_called_once() @mock.patch("aprsd.plugins.time.TimePlugin._get_local_tz") @@ -106,7 +110,7 @@ class TestPlugin(unittest.TestCase): msg_number=1, ) - actual = time.run(packet) + actual = time.filter(packet) self.assertEqual(None, actual) cur_time = fuzzy(h, m, 1) @@ -121,7 +125,7 @@ class TestPlugin(unittest.TestCase): cur_time, local_short_str, ) - actual = time.run(packet) + actual = time.filter(packet) self.assertEqual(expected, actual) @mock.patch("time.localtime") @@ -140,7 +144,7 @@ class TestPlugin(unittest.TestCase): msg_number=1, ) - result = ping.run(packet) + result = ping.filter(packet) self.assertEqual(None, result) def ping_str(h, m, s): @@ -158,7 +162,7 @@ class TestPlugin(unittest.TestCase): message="Ping", msg_number=1, ) - actual = ping.run(packet) + actual = ping.filter(packet) expected = ping_str(h, m, s) self.assertEqual(expected, actual) @@ -167,10 +171,10 @@ class TestPlugin(unittest.TestCase): message="ping", msg_number=1, ) - actual = ping.run(packet) + actual = ping.filter(packet) self.assertEqual(expected, actual) - @mock.patch("aprsd.plugin.PluginManager.get_msg_plugins") + @mock.patch("aprsd.plugin.PluginManager.get_plugins") def test_version(self, mock_get_plugins): expected = f"APRSD ver:{aprsd.__version__} uptime:0:0:0" version = version_plugin.VersionPlugin(self.config) @@ -181,7 +185,7 @@ class TestPlugin(unittest.TestCase): msg_number=1, ) - actual = version.run(packet) + actual = version.filter(packet) self.assertEqual(None, actual) packet = self.fake_packet( @@ -189,7 +193,7 @@ class TestPlugin(unittest.TestCase): message="version", msg_number=1, ) - actual = version.run(packet) + actual = version.filter(packet) self.assertEqual(expected, actual) packet = self.fake_packet( @@ -197,5 +201,5 @@ class TestPlugin(unittest.TestCase): message="Version", msg_number=1, ) - actual = version.run(packet) + actual = version.filter(packet) self.assertEqual(expected, actual) From 86777d838cdb69b5fdedadf785d81479be94ba4d Mon Sep 17 00:00:00 2001 From: Hemna Date: Fri, 20 Aug 2021 15:21:47 -0400 Subject: [PATCH 2/7] Added threads functions to APRSDPluginBase This patch updates the APRSDPluginBase class to include standard methods for allowing plugins to create, start, stop threads that the plugin might need/use. Also update the aprsd-dev to correctly start the threads and stop them for testing plugin functionality. Also added more unit tests and fake objects for unit tests. --- aprsd/dev.py | 10 ++- aprsd/plugin.py | 49 +++++++++++-- aprsd/plugins/email.py | 12 ++-- tests/fake.py | 66 ++++++++++++++++++ tests/test_plugin.py | 152 ++++++++++++++++++++++++++++++++--------- 5 files changed, 242 insertions(+), 47 deletions(-) create mode 100644 tests/fake.py diff --git a/aprsd/dev.py b/aprsd/dev.py index a8052dd..5c70835 100644 --- a/aprsd/dev.py +++ b/aprsd/dev.py @@ -179,7 +179,6 @@ def test_plugin( """APRSD Plugin test app.""" config = utils.parse_config(config_file) - email.CONFIG = config setup_logging(config, loglevel, False) LOG.info(f"Test APRSD PLugin version: {aprsd.__version__}") @@ -189,12 +188,19 @@ def test_plugin( client.Client(config) pm = plugin.PluginManager(config) - obj = pm._create_class(plugin_path, plugin.APRSDMessagePluginBase, config=config) + obj = pm._create_class(plugin_path, plugin.APRSDPluginBase, config=config) packet = {"from": fromcall, "message_text": message, "msgNo": 1} +<<<<<<< HEAD reply = obj.run(packet) LOG.info(f"Result = '{reply}'") +======= + reply = obj.filter(packet) + # Plugin might have threads, so lets stop them so we can exit. + obj.stop_threads() + LOG.info("Result = '{}'".format(reply)) +>>>>>>> f8f84c4 (Added threads functions to APRSDPluginBase) if __name__ == "__main__": diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 5bedee7..56b8fd7 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -8,7 +8,7 @@ import os import re import threading -from aprsd import messaging, packets +from aprsd import messaging, packets, threads import pluggy from thesmuggler import smuggle @@ -51,10 +51,40 @@ class APRSDPluginBase(metaclass=abc.ABCMeta): message_counter = 0 version = "1.0" + # Holds the list of APRSDThreads that the plugin creates + threads = [] + def __init__(self, config): self.config = config self.message_counter = 0 self.setup() + self.threads = self.create_threads() + if self.threads: + self.start_threads() + + def start_threads(self): + if self.threads: + if not isinstance(self.threads, list): + self.threads = [self.threads] + + try: + for thread in self.threads: + if isinstance(thread, threads.APRSDThread): + thread.start() + else: + LOG.error( + "Can't start thread {}:{}, Must be a child " + "of aprsd.threads.APRSDThread".format( + self, + thread, + ), + ) + except Exception: + LOG.error( + "Failed to start threads for plugin {}".format( + self, + ), + ) @property def message_count(self): @@ -69,6 +99,16 @@ class APRSDPluginBase(metaclass=abc.ABCMeta): """Do any plugin setup here.""" pass + def create_threads(self): + """Gives the plugin writer the ability start a background thread.""" + return [] + + def stop_threads(self): + """Stop any threads this plugin might have created.""" + for thread in self.threads: + if isinstance(thread, threads.APRSDThread): + thread.stop() + @hookimpl @abc.abstractmethod def filter(self, packet): @@ -129,11 +169,6 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): """The regex to match from the caller""" raise NotImplementedError - @property - def version(self): - """Version""" - raise NotImplementedError - @hookimpl def filter(self, packet): result = None @@ -225,7 +260,7 @@ class PluginManager: ): """ Method to create a class from a fqn python string. - :param module_class_string: full name of the class to create an object of + :param module_class_string: full name of the class to create an object :param super_cls: expected super class for validity, None if bypass :param kwargs: parameters to pass :return: diff --git a/aprsd/plugins/email.py b/aprsd/plugins/email.py index f5d6449..317eb37 100644 --- a/aprsd/plugins/email.py +++ b/aprsd/plugins/email.py @@ -46,14 +46,16 @@ class EmailPlugin(plugin.APRSDRegexCommandPluginBase): LOG.error("EmailPlugin DISABLED!!!!") else: self.enabled = True - email_thread = APRSDEmailThread( - msg_queues=threads.msg_queues, - config=self.config, - ) - email_thread.start() else: LOG.info("Email services not enabled.") + def create_threads(self): + if self.enabled: + return APRSDEmailThread( + msg_queues=threads.msg_queues, + config=self.config, + ) + @trace.trace def process(self, packet): LOG.info("Email COMMAND") diff --git a/tests/fake.py b/tests/fake.py new file mode 100644 index 0000000..24c3ee1 --- /dev/null +++ b/tests/fake.py @@ -0,0 +1,66 @@ +from aprsd import packets, plugin, threads + +FAKE_MESSAGE_TEXT = "fake MeSSage" +FAKE_FROM_CALLSIGN = "KFART" +FAKE_TO_CALLSIGN = "KMINE" + + +def fake_packet( + fromcall=FAKE_FROM_CALLSIGN, + tocall=FAKE_TO_CALLSIGN, + message=None, + msg_number=None, + message_format=packets.PACKET_TYPE_MESSAGE, +): + packet = { + "from": fromcall, + "addresse": tocall, + "format": message_format, + } + if message: + packet["message_text"] = message + + if msg_number: + packet["msgNo"] = msg_number + + return packet + + +class FakeBaseNoThreadsPlugin(plugin.APRSDPluginBase): + version = "1.0" + + def filter(self, packet): + return None + + def process(self, packet): + return "process" + + +class FakeThread(threads.APRSDThread): + def __init__(self): + super().__init__("FakeThread") + + def loop(self): + return True + + +class FakeBaseThreadsPlugin(plugin.APRSDPluginBase): + version = "1.0" + + def filter(self, packet): + return None + + def process(self, packet): + return "process" + + def create_threads(self): + return FakeThread() + + +class FakeRegexCommandPlugin(plugin.APRSDRegexCommandPluginBase): + version = "1.0" + command_regex = "^[fF]" + command_name = "fake" + + def process(self, packet): + return FAKE_MESSAGE_TEXT diff --git a/tests/test_plugin.py b/tests/test_plugin.py index fd1d406..23380e0 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -4,7 +4,7 @@ from unittest import mock import pytz import aprsd -from aprsd import messaging, stats, utils +from aprsd import messaging, packets, stats, utils from aprsd.fuzzyclock import fuzzy from aprsd.plugins import fortune as fortune_plugin from aprsd.plugins import ping as ping_plugin @@ -12,36 +12,122 @@ from aprsd.plugins import query as query_plugin from aprsd.plugins import time as time_plugin from aprsd.plugins import version as version_plugin +from . import fake + class TestPlugin(unittest.TestCase): def setUp(self): - self.fromcall = "KFART" + self.fromcall = fake.FAKE_FROM_CALLSIGN self.ack = 1 self.config = utils.DEFAULT_CONFIG_DICT self.config["ham"]["callsign"] = self.fromcall + self.config["aprs"]["login"] = fake.FAKE_TO_CALLSIGN # Inintialize the stats object with the config stats.APRSDStats(self.config) - def fake_packet(self, fromcall="KFART", message=None, msg_number=None): - packet = { - "from": fromcall, - "addresse": self.config["aprs"]["login"], - "format": "message", - } - if message: - packet["message_text"] = message + @mock.patch.object(fake.FakeBaseNoThreadsPlugin, "process") + def test_base_plugin_no_threads(self, mock_process): + p = fake.FakeBaseNoThreadsPlugin(self.config) - if msg_number: - packet["msgNo"] = msg_number + expected = [] + actual = p.create_threads() + self.assertEqual(expected, actual) - return packet + expected = "1.0" + actual = p.version + self.assertEqual(expected, actual) + expected = 0 + actual = p.message_counter + self.assertEqual(expected, actual) + + expected = None + actual = p.filter(fake.fake_packet()) + self.assertEqual(expected, actual) + mock_process.assert_not_called() + + @mock.patch.object(fake.FakeBaseThreadsPlugin, "create_threads") + def test_base_plugin_threads_created(self, mock_create): + fake.FakeBaseThreadsPlugin(self.config) + mock_create.assert_called_once() + + def test_base_plugin_threads(self): + p = fake.FakeBaseThreadsPlugin(self.config) + actual = p.create_threads() + self.assertTrue(isinstance(actual, fake.FakeThread)) + p.stop_threads() + + @mock.patch.object(fake.FakeRegexCommandPlugin, "process") + def test_regex_base_not_called(self, mock_process): + p = fake.FakeRegexCommandPlugin(self.config) + packet = fake.fake_packet(message="a") + expected = None + actual = p.filter(packet) + self.assertEqual(expected, actual) + mock_process.assert_not_called() + + packet = fake.fake_packet(tocall="notMe", message="f") + expected = None + actual = p.filter(packet) + self.assertEqual(expected, actual) + mock_process.assert_not_called() + + packet = fake.fake_packet( + message="F", + message_format=packets.PACKET_TYPE_MICE, + ) + expected = None + actual = p.filter(packet) + self.assertEqual(expected, actual) + mock_process.assert_not_called() + + packet = fake.fake_packet( + message="f", + message_format=packets.PACKET_TYPE_ACK, + ) + expected = None + actual = p.filter(packet) + self.assertEqual(expected, actual) + mock_process.assert_not_called() + + @mock.patch.object(fake.FakeRegexCommandPlugin, "process") + def test_regex_base_assert_called(self, mock_process): + p = fake.FakeRegexCommandPlugin(self.config) + packet = fake.fake_packet(message="f") + p.filter(packet) + mock_process.assert_called_once() + + def test_regex_base_process_called(self): + p = fake.FakeRegexCommandPlugin(self.config) + + packet = fake.fake_packet(message="f") + expected = fake.FAKE_MESSAGE_TEXT + actual = p.filter(packet) + self.assertEqual(expected, actual) + + packet = fake.fake_packet(message="F") + expected = fake.FAKE_MESSAGE_TEXT + actual = p.filter(packet) + self.assertEqual(expected, actual) + + packet = fake.fake_packet(message="fake") + expected = fake.FAKE_MESSAGE_TEXT + actual = p.filter(packet) + self.assertEqual(expected, actual) + + packet = fake.fake_packet(message="FAKE") + expected = fake.FAKE_MESSAGE_TEXT + actual = p.filter(packet) + self.assertEqual(expected, actual) + + +class TestFortunePlugin(TestPlugin): @mock.patch("shutil.which") def test_fortune_fail(self, mock_which): fortune = fortune_plugin.FortunePlugin(self.config) mock_which.return_value = None expected = "Fortune command not installed" - packet = self.fake_packet(message="fortune") + packet = fake.fake_packet(message="fortune") actual = fortune.filter(packet) self.assertEqual(expected, actual) @@ -54,13 +140,15 @@ class TestPlugin(unittest.TestCase): mock_output.return_value = "Funny fortune" expected = "Funny fortune" - packet = self.fake_packet(message="fortune") + packet = fake.fake_packet(message="fortune") actual = fortune.filter(packet) self.assertEqual(expected, actual) + +class TestQueryPlugin(TestPlugin): @mock.patch("aprsd.messaging.MsgTrack.flush") def test_query_flush(self, mock_flush): - packet = self.fake_packet(message="!delete") + packet = fake.fake_packet(message="!delete") query = query_plugin.QueryPlugin(self.config) expected = "Deleted ALL pending msgs." @@ -72,7 +160,7 @@ class TestPlugin(unittest.TestCase): def test_query_restart_delayed(self, mock_restart): track = messaging.MsgTrack() track.track = {} - packet = self.fake_packet(message="!4") + packet = fake.fake_packet(message="!4") query = query_plugin.QueryPlugin(self.config) expected = "No pending msgs to resend" @@ -87,6 +175,8 @@ class TestPlugin(unittest.TestCase): actual = query.filter(packet) mock_restart.assert_called_once() + +class TestTimePlugins(TestPlugin): @mock.patch("aprsd.plugins.time.TimePlugin._get_local_tz") @mock.patch("aprsd.plugins.time.TimePlugin._get_utcnow") def test_time(self, mock_utcnow, mock_localtz): @@ -104,8 +194,7 @@ class TestPlugin(unittest.TestCase): fake_time.tm_sec = 13 time = time_plugin.TimePlugin(self.config) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="location", msg_number=1, ) @@ -115,8 +204,7 @@ class TestPlugin(unittest.TestCase): cur_time = fuzzy(h, m, 1) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="time", msg_number=1, ) @@ -128,6 +216,8 @@ class TestPlugin(unittest.TestCase): actual = time.filter(packet) self.assertEqual(expected, actual) + +class TestPingPlugin(TestPlugin): @mock.patch("time.localtime") def test_ping(self, mock_time): fake_time = mock.MagicMock() @@ -138,8 +228,7 @@ class TestPlugin(unittest.TestCase): ping = ping_plugin.PingPlugin(self.config) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="location", msg_number=1, ) @@ -157,8 +246,7 @@ class TestPlugin(unittest.TestCase): + str(s).zfill(2) ) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="Ping", msg_number=1, ) @@ -166,21 +254,21 @@ class TestPlugin(unittest.TestCase): expected = ping_str(h, m, s) self.assertEqual(expected, actual) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="ping", msg_number=1, ) actual = ping.filter(packet) self.assertEqual(expected, actual) + +class TestVersionPlugin(TestPlugin): @mock.patch("aprsd.plugin.PluginManager.get_plugins") def test_version(self, mock_get_plugins): expected = f"APRSD ver:{aprsd.__version__} uptime:0:0:0" version = version_plugin.VersionPlugin(self.config) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="No", msg_number=1, ) @@ -188,16 +276,14 @@ class TestPlugin(unittest.TestCase): actual = version.filter(packet) self.assertEqual(None, actual) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="version", msg_number=1, ) actual = version.filter(packet) self.assertEqual(expected, actual) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="Version", msg_number=1, ) From 8e627c98b3cbd058417273864bdae8bd2148346d Mon Sep 17 00:00:00 2001 From: Hemna Date: Fri, 20 Aug 2021 16:25:54 -0400 Subject: [PATCH 3/7] Added tracking plugin processing This patch adds plugin rx/tx processing of packets. This tracks how many messages a plugin processes (recieves) and how many packets result in a plugin sending a message out. This patch also adds a new plugins tab on the admin page. --- aprsd/flask.py | 5 +++++ aprsd/plugin.py | 16 ++++++++++++++-- aprsd/stats.py | 5 ++++- aprsd/web/static/js/main.js | 18 ++++++++++++++++++ aprsd/web/templates/index.html | 8 ++++++++ 5 files changed, 49 insertions(+), 3 deletions(-) diff --git a/aprsd/flask.py b/aprsd/flask.py index 7577425..63014c9 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -61,6 +61,10 @@ class APRSDFlask(flask_classful.FlaskView): watch_count = 0 watch_age = 0 + pm = plugin.PluginManager() + plugins = pm.get_plugins() + plugin_count = len(plugins) + return flask.render_template( "index.html", initial_stats=stats, @@ -69,6 +73,7 @@ class APRSDFlask(flask_classful.FlaskView): config_json=json.dumps(self.config), watch_count=watch_count, watch_age=watch_age, + plugin_count=plugin_count, ) @auth.login_required diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 56b8fd7..14601c2 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -48,7 +48,8 @@ class APRSDPluginBase(metaclass=abc.ABCMeta): """The base class for all APRSD Plugins.""" config = None - message_counter = 0 + rx_count = 0 + tx_count = 0 version = "1.0" # Holds the list of APRSDThreads that the plugin creates @@ -103,6 +104,12 @@ class APRSDPluginBase(metaclass=abc.ABCMeta): """Gives the plugin writer the ability start a background thread.""" return [] + def rx_inc(self): + self.rx_count += 1 + + def tx_inc(self): + self.tx_count += 1 + def stop_threads(self): """Stop any threads this plugin might have created.""" for thread in self.threads: @@ -137,7 +144,10 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): result = messaging.NULL_MESSAGE if wl.callsign_in_watchlist(packet["from"]): # packet is from a callsign in the watch list + self.rx_inc() result = self.process() + if result: + self.tx_inc() wl.update_seen(packet) return result @@ -185,8 +195,10 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): and message ): if re.search(self.command_regex, message): - self.message_counter += 1 + self.rx_inc() result = self.process(packet) + if result: + self.tx_inc() <<<<<<< HEAD To reply with a message over the air, return a string diff --git a/aprsd/stats.py b/aprsd/stats.py index 4153ea9..144c6bf 100644 --- a/aprsd/stats.py +++ b/aprsd/stats.py @@ -193,7 +193,10 @@ class APRSDStats: ) for p in plugins: - plugin_stats[full_name_with_qualname(p)] = p.message_count + plugin_stats[full_name_with_qualname(p)] = { + "rx": p.rx_count, + "tx": p.tx_count, + } wl = packets.WatchList() diff --git a/aprsd/web/static/js/main.js b/aprsd/web/static/js/main.js index 9d3f930..f3849b3 100644 --- a/aprsd/web/static/js/main.js +++ b/aprsd/web/static/js/main.js @@ -58,6 +58,23 @@ function update_watchlist_from_packet(callsign, val) { //console.log(watchlist) } +function update_plugins( data ) { + var plugindiv = $("#pluginDiv"); + var html_str = '' + plugindiv.html('') + + var plugins = data["stats"]["plugins"]; + var keys = Object.keys(plugins); + keys.sort(); + for (var i=0; i'; + } + html_str += "
Plugin NameProcessed PacketsSent Packets
' + val["rx"] + '' + val["tx"] + '
"; + plugindiv.append(html_str); +} + function update_packets( data ) { var packetsdiv = $("#packetsDiv"); //nuke the contents first, then add to it. @@ -120,6 +137,7 @@ function start_update() { success: function(data) { update_stats(data); update_watchlist(data); + update_plugins(data); }, complete: function() { setTimeout(statsworker, 10000); diff --git a/aprsd/web/templates/index.html b/aprsd/web/templates/index.html index 082f5a6..5314a1c 100644 --- a/aprsd/web/templates/index.html +++ b/aprsd/web/templates/index.html @@ -71,6 +71,7 @@
Charts
Messages
Watch List
+
Plugins
Config
Raw JSON
@@ -129,6 +130,13 @@
Loading
+
+

+ Plugins Loaded ({{ plugin_count }}) +

+
Loading
+
+

Config

{{ config_json|safe }}
From 8b5f21eece6618385fbed6c130e17099414d8b73 Mon Sep 17 00:00:00 2001 From: Hemna Date: Mon, 23 Aug 2021 14:08:14 -0400 Subject: [PATCH 4/7] Rebase from master and run gray This patch is a rebase of master after the introduction of switching from black to gray code formatting. --- aprsd/dev.py | 9 ++------- aprsd/main.py | 3 +-- aprsd/plugin.py | 19 ++----------------- aprsd/plugins/email.py | 19 ++++++++++--------- aprsd/threads.py | 8 ++++---- tests/fake.py | 1 + tox.ini | 2 +- 7 files changed, 21 insertions(+), 40 deletions(-) diff --git a/aprsd/dev.py b/aprsd/dev.py index 5c70835..f5d9528 100644 --- a/aprsd/dev.py +++ b/aprsd/dev.py @@ -14,7 +14,7 @@ import click_completion # local imports here import aprsd -from aprsd import client, email, plugin, utils +from aprsd import client, plugin, utils # setup the global logger @@ -192,15 +192,10 @@ def test_plugin( packet = {"from": fromcall, "message_text": message, "msgNo": 1} -<<<<<<< HEAD - reply = obj.run(packet) - LOG.info(f"Result = '{reply}'") -======= reply = obj.filter(packet) # Plugin might have threads, so lets stop them so we can exit. obj.stop_threads() - LOG.info("Result = '{}'".format(reply)) ->>>>>>> f8f84c4 (Added threads functions to APRSDPluginBase) + LOG.info(f"Result = '{reply}'") if __name__ == "__main__": diff --git a/aprsd/main.py b/aprsd/main.py index 8403e69..1e1c3c6 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -37,8 +37,7 @@ import click_completion # local imports here import aprsd from aprsd import ( - client, email, flask, messaging, packets, plugin, stats, threads, trace, - utils, + client, flask, messaging, packets, plugin, stats, threads, trace, utils, ) diff --git a/aprsd/plugin.py b/aprsd/plugin.py index 14601c2..b67898d 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -8,10 +8,11 @@ import os import re import threading -from aprsd import messaging, packets, threads import pluggy from thesmuggler import smuggle +from aprsd import messaging, packets, threads + # setup the global logger LOG = logging.getLogger("APRSD") @@ -98,7 +99,6 @@ class APRSDPluginBase(metaclass=abc.ABCMeta): def setup(self): """Do any plugin setup here.""" - pass def create_threads(self): """Gives the plugin writer the ability start a background thread.""" @@ -124,7 +124,6 @@ class APRSDPluginBase(metaclass=abc.ABCMeta): @abc.abstractmethod def process(self, packet): """This is called when the filter passes.""" - pass class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): @@ -153,15 +152,7 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): return result -<<<<<<< HEAD - This will get called when a packet is seen by a callsign - registered in the watch list in the config file.""" - - -class APRSDMessagePluginBase(metaclass=abc.ABCMeta): -======= class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): ->>>>>>> 2e7c884 (Refactor Message processing and MORE) """Base Message plugin class. When you want to search for a particular command in an @@ -200,13 +191,7 @@ class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): if result: self.tx_inc() -<<<<<<< HEAD - To reply with a message over the air, return a string - to send. - """ -======= return result ->>>>>>> 2e7c884 (Refactor Message processing and MORE) class PluginManager: diff --git a/aprsd/plugins/email.py b/aprsd/plugins/email.py index 317eb37..2af05c6 100644 --- a/aprsd/plugins/email.py +++ b/aprsd/plugins/email.py @@ -7,10 +7,11 @@ import re import smtplib import time -from aprsd import messaging, plugin, stats, threads, trace import imapclient from validate_email import validate_email +from aprsd import messaging, plugin, stats, threads, trace + LOG = logging.getLogger("APRSD") @@ -171,7 +172,7 @@ def _imap_connect(): ) except (imaplib.IMAP4.error, Exception) as e: msg = getattr(e, "message", repr(e)) - LOG.error("Failed to login {}".format(msg)) + LOG.error(f"Failed to login {msg}") return server.select_folder("INBOX") @@ -213,7 +214,7 @@ def _smtp_connect(): LOG.error("Couldn't connect to SMTP Server") return - LOG.debug("Connected to smtp host {}".format(msg)) + LOG.debug(f"Connected to smtp host {msg}") debug = CONFIG["aprsd"]["email"]["smtp"].get("debug", False) if debug: @@ -229,7 +230,7 @@ def _smtp_connect(): LOG.error("Couldn't connect to SMTP Server") return - LOG.debug("Logged into SMTP server {}".format(msg)) + LOG.debug(f"Logged into SMTP server {msg}") return server @@ -244,7 +245,7 @@ def validate_shortcuts(config): ) delete_keys = [] for key in shortcuts: - LOG.info("Validating {}:{}".format(key, shortcuts[key])) + LOG.info(f"Validating {key}:{shortcuts[key]}") is_valid = validate_email( email_address=shortcuts[key], check_regex=True, @@ -313,7 +314,7 @@ def parse_email(msgid, data, server): from_addr = f.group(1) else: from_addr = "noaddr" - LOG.debug("Got a message from '{}'".format(from_addr)) + LOG.debug(f"Got a message from '{from_addr}'") try: m = server.fetch([msgid], ["RFC822"]) except Exception as e: @@ -447,7 +448,7 @@ def resend_email(count, fromcall): month = date.strftime("%B")[:3] # Nov, Mar, Apr day = date.day year = date.year - today = "{}-{}-{}".format(day, month, year) + today = f"{day}-{month}-{year}" shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] # swap key/value @@ -564,7 +565,7 @@ class APRSDEmailThread(threads.APRSDThread): month = date.strftime("%B")[:3] # Nov, Mar, Apr day = date.day year = date.year - today = "{}-{}-{}".format(day, month, year) + today = f"{day}-{month}-{year}" try: server = _imap_connect() @@ -580,7 +581,7 @@ class APRSDEmailThread(threads.APRSDThread): e, ) return True - LOG.debug("{} messages received today".format(len(messages))) + LOG.debug(f"{len(messages)} messages received today") try: _msgs = server.fetch(messages, ["ENVELOPE"]) diff --git a/aprsd/threads.py b/aprsd/threads.py index 0ae0ed3..4ad6be4 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -209,7 +209,7 @@ class APRSDRXThread(APRSDThread): msg = packet.get("message_text", None) msg_id = packet.get("msgNo", "0") msg_response = packet.get("response", None) - LOG.debug("Got packet from '{}' - {}".format(fromcall, packet)) + LOG.debug(f"Got packet from '{fromcall}' - {packet}") # We don't put ack packets destined for us through the # plugins. @@ -240,14 +240,14 @@ class APRSDRXThread(APRSDThread): pm = plugin.PluginManager() try: results = pm.run(packet) - LOG.debug("RESULTS {}".format(results)) + LOG.debug(f"RESULTS {results}") replied = False for reply in results: if isinstance(reply, list): # one of the plugins wants to send multiple messages replied = True for subreply in reply: - LOG.debug("Sending '{}'".format(subreply)) + LOG.debug(f"Sending '{subreply}'") msg = messaging.TextMessage( self.config["aprs"]["login"], @@ -263,7 +263,7 @@ class APRSDRXThread(APRSDThread): # nothing to reply with, so we avoid replying with a # usage string if reply is not messaging.NULL_MESSAGE: - LOG.debug("Sending '{}'".format(reply)) + LOG.debug(f"Sending '{reply}'") msg = messaging.TextMessage( self.config["aprs"]["login"], diff --git a/tests/fake.py b/tests/fake.py index 24c3ee1..a45ead3 100644 --- a/tests/fake.py +++ b/tests/fake.py @@ -1,5 +1,6 @@ from aprsd import packets, plugin, threads + FAKE_MESSAGE_TEXT = "fake MeSSage" FAKE_FROM_CALLSIGN = "KFART" FAKE_TO_CALLSIGN = "KMINE" diff --git a/tox.ini b/tox.ini index 0a415d1..95e1349 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,7 @@ minversion = 2.9.0 skipdist = True skip_missing_interpreters = true -envlist = pre-commit,pep8,fmt-check,py{36,37,38} +envlist = pre-commit,pep8,py{36,37,38} # Activate isolated build environment. tox will use a virtual environment # to build a source distribution from the source tree. For build tools and From 0f384b0e856403611dfb00dd6bce63bbcfb665a9 Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 24 Aug 2021 13:31:33 -0400 Subject: [PATCH 5/7] Updated select timeouts This patch updates the select timeouts for threads. This allows threads to exit quicker when user hits CTRL-C. Updates the KeepAlive Thread to include total packets. --- aprsd/client.py | 2 +- aprsd/listen.py | 2 +- aprsd/main.py | 11 ++++++----- aprsd/messaging.py | 11 ++--------- aprsd/plugin.py | 23 ++++++++++++++++++++++- aprsd/threads.py | 43 ++++++++++++++----------------------------- aprsd/utils.py | 10 ++++++++-- 7 files changed, 54 insertions(+), 48 deletions(-) diff --git a/aprsd/client.py b/aprsd/client.py index 87faf5b..40c9d40 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -84,7 +84,7 @@ class Aprsdis(aprslib.IS): thread_stop = False # timeout in seconds - select_timeout = 10 + select_timeout = 1 def stop(self): self.thread_stop = True diff --git a/aprsd/listen.py b/aprsd/listen.py index 8d8a9c0..4342447 100644 --- a/aprsd/listen.py +++ b/aprsd/listen.py @@ -294,7 +294,7 @@ def listen( # TODO(walt) - manually edit this list # prior to running aprsd-listen listen - watch_list = [] + watch_list = ["k*"] # build last seen list last_seen = {} diff --git a/aprsd/main.py b/aprsd/main.py index 1e1c3c6..a3cbd9e 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -157,7 +157,7 @@ def signal_handler(sig, frame): datetime.datetime.now(), ), ) - time.sleep(5) + time.sleep(1.5) tracker = messaging.MsgTrack() tracker.save() LOG.info(stats.APRSDStats()) @@ -458,15 +458,16 @@ def server( trace.setup_tracing(["method", "api"]) stats.APRSDStats(config) - # Create the initial PM singleton and Register plugins - plugin_manager = plugin.PluginManager(config) - plugin_manager.setup_plugins() try: cl = client.Client(config) cl.client except LoginError: sys.exit(-1) + # Create the initial PM singleton and Register plugins + plugin_manager = plugin.PluginManager(config) + plugin_manager.setup_plugins() + # Now load the msgTrack from disk if any if flush: LOG.debug("Deleting saved MsgTrack.") @@ -498,7 +499,7 @@ def server( messaging.MsgTrack().restart() - keepalive = threads.KeepAliveThread() + keepalive = threads.KeepAliveThread(config=config) keepalive.start() try: diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 00ed7f5..53809e0 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -545,15 +545,8 @@ def log_packet(packet): ack = packet.get("ack", None) log_message( - "Packet", - packet["raw"], - msg, - fromcall=fromcall, - tocall=tocall, - ack=ack, - packet_type=response_type, - msg_num=msg_num, - ) + "Packet", packet["raw"], msg, fromcall=fromcall, tocall=tocall, + ack=ack, packet_type=response_type, msg_num=msg_num, ) def log_message( diff --git a/aprsd/plugin.py b/aprsd/plugin.py index b67898d..ebf160c 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -11,7 +11,7 @@ import threading import pluggy from thesmuggler import smuggle -from aprsd import messaging, packets, threads +from aprsd import client, messaging, packets, threads # setup the global logger @@ -137,6 +137,27 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): by a particular HAM callsign, write a plugin based off of this class. """ + enabled = False + + def setup(self): + # if we have a watch list enabled, we need to add filtering + # to enable seeing packets from the watch list. + if "watch_list" in self.config["aprsd"] and self.config["aprsd"][ + "watch_list" + ].get("enabled", False): + # watch list is enabled + self.enabled = True + watch_list = self.config["aprsd"]["watch_list"].get( + "callsigns", + [], + ) + # make sure the timeout is set or this doesn't work + if watch_list: + aprs_client = client.get_client() + filter_str = "b/{}".format("/".join(watch_list)) + aprs_client.set_filter(filter_str) + else: + LOG.warning("Watch list enabled, but no callsigns set.") def filter(self, packet): wl = packets.WatchList() diff --git a/aprsd/threads.py b/aprsd/threads.py index 4ad6be4..aa9f1e5 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -52,6 +52,7 @@ class APRSDThreadList: """Iterate over all threads and call stop on them.""" with self.lock: for th in self.threads_list: + LOG.debug(f"Stopping Thread {th.name}") th.stop() @@ -82,15 +83,16 @@ class KeepAliveThread(APRSDThread): cntr = 0 checker_time = datetime.datetime.now() - def __init__(self): + def __init__(self, config): tracemalloc.start() super().__init__("KeepAlive") + self.config = config def loop(self): if self.cntr % 6 == 0: tracker = messaging.MsgTrack() stats_obj = stats.APRSDStats() - packets_list = packets.PacketList().packet_list + pl = packets.PacketList() now = datetime.datetime.now() last_email = stats_obj.email_thread_time if last_email: @@ -104,21 +106,22 @@ class KeepAliveThread(APRSDThread): stats_obj.set_memory(current) stats_obj.set_memory_peak(peak) keepalive = ( - "Uptime {} Tracker {} Msgs TX:{} RX:{} " - "Last:{} Email:{} Packets:{} RAM Current:{} " - "Peak:{}" + "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " + "Last:{} Email: {} - RAM Current:{} Peak:{}" ).format( + self.config["aprs"]["login"], utils.strfdelta(stats_obj.uptime), + pl.total_recv, + pl.total_tx, len(tracker), stats_obj.msgs_tx, stats_obj.msgs_rx, last_msg_time, email_thread_time, - len(packets_list), utils.human_size(current), utils.human_size(peak), ) - LOG.debug(keepalive) + LOG.info(keepalive) # Check version every hour delta = now - self.checker_time if delta > datetime.timedelta(hours=1): @@ -127,7 +130,7 @@ class KeepAliveThread(APRSDThread): if level: LOG.warning(msg) self.cntr += 1 - time.sleep(10) + time.sleep(1) return True @@ -144,23 +147,6 @@ class APRSDRXThread(APRSDThread): def loop(self): aprs_client = client.get_client() - # if we have a watch list enabled, we need to add filtering - # to enable seeing packets from the watch list. - if "watch_list" in self.config["aprsd"] and self.config["aprsd"][ - "watch_list" - ].get("enabled", False): - # watch list is enabled - watch_list = self.config["aprsd"]["watch_list"].get( - "callsigns", - [], - ) - # make sure the timeout is set or this doesn't work - if watch_list: - filter_str = "p/{}".format("/".join(watch_list)) - aprs_client.set_filter(filter_str) - else: - LOG.warning("Watch list enabled, but no callsigns set.") - # setup the consumer of messages and block until a messages try: # This will register a packet consumer with aprslib @@ -202,14 +188,13 @@ class APRSDRXThread(APRSDThread): def process_packet(self, packet): """Process a packet recieved from aprs-is server.""" packets.PacketList().add(packet) - stats.APRSDStats().msgs_rx_inc() fromcall = packet["from"] tocall = packet.get("addresse", None) msg = packet.get("message_text", None) msg_id = packet.get("msgNo", "0") msg_response = packet.get("response", None) - LOG.debug(f"Got packet from '{fromcall}' - {packet}") + # LOG.debug(f"Got packet from '{fromcall}' - {packet}") # We don't put ack packets destined for us through the # plugins. @@ -228,6 +213,7 @@ class APRSDRXThread(APRSDThread): # Only ack messages that were sent directly to us if tocall == self.config["aprs"]["login"]: + stats.APRSDStats().msgs_rx_inc() # let any threads do their thing, then ack # send an ack last ack = messaging.AckMessage( @@ -240,7 +226,6 @@ class APRSDRXThread(APRSDThread): pm = plugin.PluginManager() try: results = pm.run(packet) - LOG.debug(f"RESULTS {results}") replied = False for reply in results: if isinstance(reply, list): @@ -306,7 +291,7 @@ class APRSDTXThread(APRSDThread): def loop(self): try: - msg = self.msg_queues["tx"].get(timeout=5) + msg = self.msg_queues["tx"].get(timeout=1) msg.send() except queue.Empty: pass diff --git a/aprsd/utils.py b/aprsd/utils.py index 3793b90..295de97 100644 --- a/aprsd/utils.py +++ b/aprsd/utils.py @@ -394,8 +394,11 @@ def human_size(bytes, units=None): return str(bytes) + units[0] if bytes < 1024 else human_size(bytes >> 10, units[1:]) -def strfdelta(tdelta, fmt="{hours}:{minutes}:{seconds}"): - d = {"days": tdelta.days} +def strfdelta(tdelta, fmt="{hours:{width}}:{minutes:{width}}:{seconds:{width}}"): + d = { + "days": tdelta.days, + "width": "02", + } d["hours"], rem = divmod(tdelta.seconds, 3600) d["minutes"], d["seconds"] = divmod(rem, 60) return fmt.format(**d) @@ -460,6 +463,9 @@ class RingBuffer: """return list of elements in correct order""" return self.data[self.cur :] + self.data[: self.cur] + def __len__(self): + return len(self.data) + def append(self, x): """append an element at the end of the buffer""" From 2e9b42d7af4a96c3abdcab5853ba380ef0adc474 Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 24 Aug 2021 14:08:24 -0400 Subject: [PATCH 6/7] Added days to uptime string formatting The uptime string formatter was missing days. --- aprsd/utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aprsd/utils.py b/aprsd/utils.py index 295de97..14b47ff 100644 --- a/aprsd/utils.py +++ b/aprsd/utils.py @@ -399,6 +399,9 @@ def strfdelta(tdelta, fmt="{hours:{width}}:{minutes:{width}}:{seconds:{width}}") "days": tdelta.days, "width": "02", } + if tdelta.days > 0: + fmt = "{days} days " + fmt + d["hours"], rem = divmod(tdelta.seconds, 3600) d["minutes"], d["seconds"] = divmod(rem, 60) return fmt.format(**d) From 61967b5fe8a088106124c5b0ce35a471be14c795 Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 24 Aug 2021 15:22:50 -0400 Subject: [PATCH 7/7] Removed TXThread Since all outbound messages have a send() method that starts a separate there, there really is no reason for the transmit queue thread at all. All it did was get a message from the queue and then call send on it, which would start another thread. This removes that intermediate TXThread. When you want to send a message just call send() on the message object. --- aprsd/email.py | 548 ----------------------------------------- aprsd/main.py | 5 - aprsd/messaging.py | 2 - aprsd/packets.py | 6 +- aprsd/plugins/email.py | 2 +- aprsd/threads.py | 37 ++- tests/test_plugin.py | 2 +- 7 files changed, 33 insertions(+), 569 deletions(-) delete mode 100644 aprsd/email.py diff --git a/aprsd/email.py b/aprsd/email.py deleted file mode 100644 index 039e204..0000000 --- a/aprsd/email.py +++ /dev/null @@ -1,548 +0,0 @@ -import datetime -import email -from email.mime.text import MIMEText -import imaplib -import logging -import re -import smtplib -import time - -import imapclient -from validate_email import validate_email - -from aprsd import messaging, stats, threads, trace - - -LOG = logging.getLogger("APRSD") - -# This gets forced set from main.py prior to being used internally -CONFIG = None - - -@trace.trace -def _imap_connect(): - imap_port = CONFIG["aprsd"]["email"]["imap"].get("port", 143) - use_ssl = CONFIG["aprsd"]["email"]["imap"].get("use_ssl", False) - host = CONFIG["aprsd"]["email"]["imap"]["host"] - msg = "{}{}:{}".format("TLS " if use_ssl else "", host, imap_port) - # LOG.debug("Connect to IMAP host {} with user '{}'". - # format(msg, CONFIG['imap']['login'])) - - try: - server = imapclient.IMAPClient( - CONFIG["aprsd"]["email"]["imap"]["host"], - port=imap_port, - use_uid=True, - ssl=use_ssl, - timeout=30, - ) - except Exception as e: - LOG.error("Failed to connect IMAP server", e) - return - - try: - server.login( - CONFIG["aprsd"]["email"]["imap"]["login"], - CONFIG["aprsd"]["email"]["imap"]["password"], - ) - except (imaplib.IMAP4.error, Exception) as e: - msg = getattr(e, "message", repr(e)) - LOG.error(f"Failed to login {msg}") - return - - server.select_folder("INBOX") - - server.fetch = trace.trace(server.fetch) - server.search = trace.trace(server.search) - server.remove_flags = trace.trace(server.remove_flags) - server.add_flags = trace.trace(server.add_flags) - return server - - -@trace.trace -def _smtp_connect(): - host = CONFIG["aprsd"]["email"]["smtp"]["host"] - smtp_port = CONFIG["aprsd"]["email"]["smtp"]["port"] - use_ssl = CONFIG["aprsd"]["email"]["smtp"].get("use_ssl", False) - msg = "{}{}:{}".format("SSL " if use_ssl else "", host, smtp_port) - LOG.debug( - "Connect to SMTP host {} with user '{}'".format( - msg, - CONFIG["aprsd"]["email"]["imap"]["login"], - ), - ) - - try: - if use_ssl: - server = smtplib.SMTP_SSL( - host=host, - port=smtp_port, - timeout=30, - ) - else: - server = smtplib.SMTP( - host=host, - port=smtp_port, - timeout=30, - ) - except Exception: - LOG.error("Couldn't connect to SMTP Server") - return - - LOG.debug(f"Connected to smtp host {msg}") - - debug = CONFIG["aprsd"]["email"]["smtp"].get("debug", False) - if debug: - server.set_debuglevel(5) - server.sendmail = trace.trace(server.sendmail) - - try: - server.login( - CONFIG["aprsd"]["email"]["smtp"]["login"], - CONFIG["aprsd"]["email"]["smtp"]["password"], - ) - except Exception: - LOG.error("Couldn't connect to SMTP Server") - return - - LOG.debug(f"Logged into SMTP server {msg}") - return server - - -def validate_shortcuts(config): - shortcuts = config["aprsd"]["email"].get("shortcuts", None) - if not shortcuts: - return - - LOG.info( - "Validating {} Email shortcuts. This can take up to 10 seconds" - " per shortcut".format(len(shortcuts)), - ) - delete_keys = [] - for key in shortcuts: - LOG.info(f"Validating {key}:{shortcuts[key]}") - is_valid = validate_email( - email_address=shortcuts[key], - check_regex=True, - check_mx=False, - from_address=config["aprsd"]["email"]["smtp"]["login"], - helo_host=config["aprsd"]["email"]["smtp"]["host"], - smtp_timeout=10, - dns_timeout=10, - use_blacklist=True, - debug=False, - ) - if not is_valid: - LOG.error( - "'{}' is an invalid email address. Removing shortcut".format( - shortcuts[key], - ), - ) - delete_keys.append(key) - - for key in delete_keys: - del config["aprsd"]["email"]["shortcuts"][key] - - LOG.info("Available shortcuts: {}".format(config["aprsd"]["email"]["shortcuts"])) - - -def get_email_from_shortcut(addr): - if CONFIG["aprsd"]["email"].get("shortcuts", False): - return CONFIG["aprsd"]["email"]["shortcuts"].get(addr, addr) - else: - return addr - - -def validate_email_config(config, disable_validation=False): - """function to simply ensure we can connect to email services. - - This helps with failing early during startup. - """ - LOG.info("Checking IMAP configuration") - imap_server = _imap_connect() - LOG.info("Checking SMTP configuration") - smtp_server = _smtp_connect() - - # Now validate and flag any shortcuts as invalid - if not disable_validation: - validate_shortcuts(config) - else: - LOG.info("Shortcuts email validation is Disabled!!, you were warned.") - - if imap_server and smtp_server: - return True - else: - return False - - -@trace.trace -def parse_email(msgid, data, server): - envelope = data[b"ENVELOPE"] - # email address match - # use raw string to avoid invalid escape secquence errors r"string here" - f = re.search(r"([\.\w_-]+@[\.\w_-]+)", str(envelope.from_[0])) - if f is not None: - from_addr = f.group(1) - else: - from_addr = "noaddr" - LOG.debug(f"Got a message from '{from_addr}'") - try: - m = server.fetch([msgid], ["RFC822"]) - except Exception as e: - LOG.exception("Couldn't fetch email from server in parse_email", e) - return - - msg = email.message_from_string(m[msgid][b"RFC822"].decode(errors="ignore")) - if msg.is_multipart(): - text = "" - html = None - # default in case body somehow isn't set below - happened once - body = b"* unreadable msg received" - # this uses the last text or html part in the email, phone companies often put content in an attachment - for part in msg.get_payload(): - if part.get_content_charset() is None: - # or BREAK when we hit a text or html? - # We cannot know the character set, - # so return decoded "something" - LOG.debug("Email got unknown content type") - text = part.get_payload(decode=True) - continue - - charset = part.get_content_charset() - - if part.get_content_type() == "text/plain": - LOG.debug("Email got text/plain") - text = str( - part.get_payload(decode=True), - str(charset), - "ignore", - ).encode("utf8", "replace") - - if part.get_content_type() == "text/html": - LOG.debug("Email got text/html") - html = str( - part.get_payload(decode=True), - str(charset), - "ignore", - ).encode("utf8", "replace") - - if text is not None: - # strip removes white space fore and aft of string - body = text.strip() - else: - body = html.strip() - else: # message is not multipart - # email.uscc.net sends no charset, blows up unicode function below - LOG.debug("Email is not multipart") - if msg.get_content_charset() is None: - text = str(msg.get_payload(decode=True), "US-ASCII", "ignore").encode( - "utf8", - "replace", - ) - else: - text = str( - msg.get_payload(decode=True), - msg.get_content_charset(), - "ignore", - ).encode("utf8", "replace") - body = text.strip() - - # FIXED: UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 6: ordinal not in range(128) - # decode with errors='ignore'. be sure to encode it before we return it below, also with errors='ignore' - try: - body = body.decode(errors="ignore") - except Exception as e: - LOG.error("Unicode decode failure: " + str(e)) - LOG.error("Unidoce decode failed: " + str(body)) - body = "Unreadable unicode msg" - # strip all html tags - body = re.sub("<[^<]+?>", "", body) - # strip CR/LF, make it one line, .rstrip fails at this - body = body.replace("\n", " ").replace("\r", " ") - # ascii might be out of range, so encode it, removing any error characters - body = body.encode(errors="ignore") - return (body, from_addr) - - -# end parse_email - - -@trace.trace -def send_email(to_addr, content): - global check_email_delay - - shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] - email_address = get_email_from_shortcut(to_addr) - LOG.info("Sending Email_________________") - - if to_addr in shortcuts: - LOG.info("To : " + to_addr) - to_addr = email_address - LOG.info(" (" + to_addr + ")") - subject = CONFIG["ham"]["callsign"] - # content = content + "\n\n(NOTE: reply with one line)" - LOG.info("Subject : " + subject) - LOG.info("Body : " + content) - - # check email more often since there's activity right now - check_email_delay = 60 - - msg = MIMEText(content) - msg["Subject"] = subject - msg["From"] = CONFIG["aprsd"]["email"]["smtp"]["login"] - msg["To"] = to_addr - server = _smtp_connect() - if server: - try: - server.sendmail( - CONFIG["aprsd"]["email"]["smtp"]["login"], - [to_addr], - msg.as_string(), - ) - stats.APRSDStats().email_tx_inc() - except Exception as e: - msg = getattr(e, "message", repr(e)) - LOG.error("Sendmail Error!!!! '{}'", msg) - server.quit() - return -1 - server.quit() - return 0 - - -# end send_email - - -@trace.trace -def resend_email(count, fromcall): - global check_email_delay - date = datetime.datetime.now() - month = date.strftime("%B")[:3] # Nov, Mar, Apr - day = date.day - year = date.year - today = f"{day}-{month}-{year}" - - shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] - # swap key/value - shortcuts_inverted = {v: k for k, v in shortcuts.items()} - - try: - server = _imap_connect() - except Exception as e: - LOG.exception("Failed to Connect to IMAP. Cannot resend email ", e) - return - - try: - messages = server.search(["SINCE", today]) - except Exception as e: - LOG.exception("Couldn't search for emails in resend_email ", e) - return - - # LOG.debug("%d messages received today" % len(messages)) - - msgexists = False - - messages.sort(reverse=True) - del messages[int(count) :] # only the latest "count" messages - for message in messages: - try: - parts = server.fetch(message, ["ENVELOPE"]).items() - except Exception as e: - LOG.exception("Couldn't fetch email parts in resend_email", e) - continue - - for msgid, data in list(parts): - # one at a time, otherwise order is random - (body, from_addr) = parse_email(msgid, data, server) - # unset seen flag, will stay bold in email client - try: - server.remove_flags(msgid, [imapclient.SEEN]) - except Exception as e: - LOG.exception("Failed to remove SEEN flag in resend_email", e) - - if from_addr in shortcuts_inverted: - # reverse lookup of a shortcut - from_addr = shortcuts_inverted[from_addr] - # asterisk indicates a resend - reply = "-" + from_addr + " * " + body.decode(errors="ignore") - # messaging.send_message(fromcall, reply) - msg = messaging.TextMessage( - CONFIG["aprs"]["login"], - fromcall, - reply, - ) - msg.send() - msgexists = True - - if msgexists is not True: - stm = time.localtime() - h = stm.tm_hour - m = stm.tm_min - s = stm.tm_sec - # append time as a kind of serial number to prevent FT1XDR from - # thinking this is a duplicate message. - # The FT1XDR pretty much ignores the aprs message number in this - # regard. The FTM400 gets it right. - reply = "No new msg {}:{}:{}".format( - str(h).zfill(2), - str(m).zfill(2), - str(s).zfill(2), - ) - # messaging.send_message(fromcall, reply) - msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply) - msg.send() - - # check email more often since we're resending one now - check_email_delay = 60 - - server.logout() - # end resend_email() - - -class APRSDEmailThread(threads.APRSDThread): - def __init__(self, msg_queues, config): - super().__init__("EmailThread") - self.msg_queues = msg_queues - self.config = config - - @trace.trace - def run(self): - global check_email_delay - - LOG.debug("Starting") - - check_email_delay = 60 - past = datetime.datetime.now() - while not self.thread_stop: - time.sleep(5) - stats.APRSDStats().email_thread_update() - # always sleep for 5 seconds and see if we need to check email - # This allows CTRL-C to stop the execution of this loop sooner - # than check_email_delay time - now = datetime.datetime.now() - if now - past > datetime.timedelta(seconds=check_email_delay): - # It's time to check email - - # slowly increase delay every iteration, max out at 300 seconds - # any send/receive/resend activity will reset this to 60 seconds - if check_email_delay < 300: - check_email_delay += 1 - LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds") - - shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] - # swap key/value - shortcuts_inverted = {v: k for k, v in shortcuts.items()} - - date = datetime.datetime.now() - month = date.strftime("%B")[:3] # Nov, Mar, Apr - day = date.day - year = date.year - today = f"{day}-{month}-{year}" - - server = None - try: - server = _imap_connect() - except Exception as e: - LOG.exception("IMAP failed to connect.", e) - - if not server: - continue - - try: - messages = server.search(["SINCE", today]) - except Exception as e: - LOG.exception("IMAP failed to search for messages since today.", e) - continue - LOG.debug(f"{len(messages)} messages received today") - - try: - _msgs = server.fetch(messages, ["ENVELOPE"]) - except Exception as e: - LOG.exception("IMAP failed to fetch/flag messages: ", e) - continue - - for msgid, data in _msgs.items(): - envelope = data[b"ENVELOPE"] - LOG.debug( - 'ID:%d "%s" (%s)' - % (msgid, envelope.subject.decode(), envelope.date), - ) - f = re.search( - r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", - str(envelope.from_[0]), - ) - if f is not None: - from_addr = f.group(1) - else: - from_addr = "noaddr" - - # LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid])) - # if "APRS" not in server.get_flags(msgid)[msgid]: - # in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both - try: - taglist = [ - x.decode(errors="ignore") - for x in server.get_flags(msgid)[msgid] - ] - except Exception as e: - LOG.exception("Failed to get flags.", e) - break - - if "APRS" not in taglist: - # if msg not flagged as sent via aprs - try: - server.fetch([msgid], ["RFC822"]) - except Exception as e: - LOG.exception("Failed single server fetch for RFC822", e) - break - - (body, from_addr) = parse_email(msgid, data, server) - # unset seen flag, will stay bold in email client - try: - server.remove_flags(msgid, [imapclient.SEEN]) - except Exception as e: - LOG.exception("Failed to remove flags SEEN", e) - # Not much we can do here, so lets try and - # send the aprs message anyway - - if from_addr in shortcuts_inverted: - # reverse lookup of a shortcut - from_addr = shortcuts_inverted[from_addr] - - reply = "-" + from_addr + " " + body.decode(errors="ignore") - msg = messaging.TextMessage( - self.config["aprs"]["login"], - self.config["ham"]["callsign"], - reply, - ) - self.msg_queues["tx"].put(msg) - # flag message as sent via aprs - try: - server.add_flags(msgid, ["APRS"]) - # unset seen flag, will stay bold in email client - except Exception as e: - LOG.exception("Couldn't add APRS flag to email", e) - - try: - server.remove_flags(msgid, [imapclient.SEEN]) - except Exception as e: - LOG.exception("Couldn't remove seen flag from email", e) - - # check email more often since we just received an email - check_email_delay = 60 - - # reset clock - LOG.debug("Done looping over Server.fetch, logging out.") - past = datetime.datetime.now() - try: - server.logout() - except Exception as e: - LOG.exception("IMAP failed to logout: ", e) - continue - else: - # We haven't hit the email delay yet. - # LOG.debug("Delta({}) < {}".format(now - past, check_email_delay)) - pass - - # Remove ourselves from the global threads list - threads.APRSDThreadList().remove(self) - LOG.info("Exiting") diff --git a/aprsd/main.py b/aprsd/main.py index a3cbd9e..d7ee309 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -483,13 +483,8 @@ def server( msg_queues=threads.msg_queues, config=config, ) - tx_thread = threads.APRSDTXThread( - msg_queues=threads.msg_queues, - config=config, - ) rx_thread.start() - tx_thread.start() if "watch_list" in config["aprsd"] and config["aprsd"]["watch_list"].get( "enabled", diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 53809e0..1d2c8b3 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -468,8 +468,6 @@ class AckMessage(Message): thread = SendAckThread(self) thread.start() - # end send_ack() - def send_direct(self): """Send an ack message without a separate thread.""" cl = client.get_client() diff --git a/aprsd/packets.py b/aprsd/packets.py index ac08535..ef5e3f5 100644 --- a/aprsd/packets.py +++ b/aprsd/packets.py @@ -56,10 +56,12 @@ class PacketList: return self.packet_list.get() def total_received(self): - return self.total_recv + with self.lock: + return self.total_recv def total_sent(self): - return self.total_tx + with self.lock: + return self.total_tx class WatchList: diff --git a/aprsd/plugins/email.py b/aprsd/plugins/email.py index 2af05c6..c5bf6c4 100644 --- a/aprsd/plugins/email.py +++ b/aprsd/plugins/email.py @@ -648,7 +648,7 @@ class APRSDEmailThread(threads.APRSDThread): self.config["ham"]["callsign"], reply, ) - self.msg_queues["tx"].put(msg) + msg.send() # flag message as sent via aprs try: server.add_flags(msgid, ["APRS"]) diff --git a/aprsd/threads.py b/aprsd/threads.py index aa9f1e5..fa75f69 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -14,14 +14,11 @@ from aprsd import client, messaging, packets, plugin, stats, utils LOG = logging.getLogger("APRSD") RX_THREAD = "RX" -TX_THREAD = "TX" EMAIL_THREAD = "Email" rx_msg_queue = queue.Queue(maxsize=20) -tx_msg_queue = queue.Queue(maxsize=20) msg_queues = { "rx": rx_msg_queue, - "tx": tx_msg_queue, } @@ -55,6 +52,10 @@ class APRSDThreadList: LOG.debug(f"Stopping Thread {th.name}") th.stop() + def __len__(self): + with self.lock: + return len(self.threads_list) + class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): def __init__(self, name): @@ -93,6 +94,7 @@ class KeepAliveThread(APRSDThread): tracker = messaging.MsgTrack() stats_obj = stats.APRSDStats() pl = packets.PacketList() + thread_list = APRSDThreadList() now = datetime.datetime.now() last_email = stats_obj.email_thread_time if last_email: @@ -107,7 +109,7 @@ class KeepAliveThread(APRSDThread): stats_obj.set_memory_peak(peak) keepalive = ( "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " - "Last:{} Email: {} - RAM Current:{} Peak:{}" + "Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}" ).format( self.config["aprs"]["login"], utils.strfdelta(stats_obj.uptime), @@ -120,6 +122,7 @@ class KeepAliveThread(APRSDThread): email_thread_time, utils.human_size(current), utils.human_size(peak), + len(thread_list), ) LOG.info(keepalive) # Check version every hour @@ -170,6 +173,19 @@ class APRSDRXThread(APRSDThread): # Continue to loop return True + def process_packet(self, packet): + thread = APRSDProcessPacketThread(packet=packet, config=self.config) + thread.start() + + +class APRSDProcessPacketThread(APRSDThread): + + def __init__(self, packet, config): + self.packet = packet + self.config = config + name = self.packet["raw"][:10] + super().__init__(f"RX_PACKET-{name}") + def process_ack_packet(self, packet): ack_num = packet.get("msgNo") LOG.info(f"Got ack for message {ack_num}") @@ -185,8 +201,9 @@ class APRSDRXThread(APRSDThread): stats.APRSDStats().ack_rx_inc() return - def process_packet(self, packet): + def loop(self): """Process a packet recieved from aprs-is server.""" + packet = self.packet packets.PacketList().add(packet) fromcall = packet["from"] @@ -221,7 +238,7 @@ class APRSDRXThread(APRSDThread): fromcall, msg_id=msg_id, ) - self.msg_queues["tx"].put(ack) + ack.send() pm = plugin.PluginManager() try: @@ -239,7 +256,7 @@ class APRSDRXThread(APRSDThread): fromcall, subreply, ) - self.msg_queues["tx"].put(msg) + msg.send() else: replied = True @@ -255,7 +272,7 @@ class APRSDRXThread(APRSDThread): fromcall, reply, ) - self.msg_queues["tx"].put(msg) + msg.send() # If the message was for us and we didn't have a # response, then we send a usage statement. @@ -267,7 +284,7 @@ class APRSDRXThread(APRSDThread): fromcall, reply, ) - self.msg_queues["tx"].put(msg) + msg.send() except Exception as ex: LOG.exception("Plugin failed!!!", ex) # Do we need to send a reply? @@ -278,7 +295,7 @@ class APRSDRXThread(APRSDThread): fromcall, reply, ) - self.msg_queues["tx"].put(msg) + msg.send() LOG.debug("Packet processing complete") diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 23380e0..7a5d575 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -265,7 +265,7 @@ class TestPingPlugin(TestPlugin): class TestVersionPlugin(TestPlugin): @mock.patch("aprsd.plugin.PluginManager.get_plugins") def test_version(self, mock_get_plugins): - expected = f"APRSD ver:{aprsd.__version__} uptime:0:0:0" + expected = f"APRSD ver:{aprsd.__version__} uptime:00:00:00" version = version_plugin.VersionPlugin(self.config) packet = fake.fake_packet(