diff --git a/aprsd/client.py b/aprsd/client.py index 87faf5b..40c9d40 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -84,7 +84,7 @@ class Aprsdis(aprslib.IS): thread_stop = False # timeout in seconds - select_timeout = 10 + select_timeout = 1 def stop(self): self.thread_stop = True diff --git a/aprsd/dev.py b/aprsd/dev.py index a8052dd..f5d9528 100644 --- a/aprsd/dev.py +++ b/aprsd/dev.py @@ -14,7 +14,7 @@ import click_completion # local imports here import aprsd -from aprsd import client, email, plugin, utils +from aprsd import client, plugin, utils # setup the global logger @@ -179,7 +179,6 @@ def test_plugin( """APRSD Plugin test app.""" config = utils.parse_config(config_file) - email.CONFIG = config setup_logging(config, loglevel, False) LOG.info(f"Test APRSD PLugin version: {aprsd.__version__}") @@ -189,11 +188,13 @@ def test_plugin( client.Client(config) pm = plugin.PluginManager(config) - obj = pm._create_class(plugin_path, plugin.APRSDMessagePluginBase, config=config) + obj = pm._create_class(plugin_path, plugin.APRSDPluginBase, config=config) packet = {"from": fromcall, "message_text": message, "msgNo": 1} - reply = obj.run(packet) + reply = obj.filter(packet) + # Plugin might have threads, so lets stop them so we can exit. + obj.stop_threads() LOG.info(f"Result = '{reply}'") diff --git a/aprsd/email.py b/aprsd/email.py deleted file mode 100644 index 039e204..0000000 --- a/aprsd/email.py +++ /dev/null @@ -1,548 +0,0 @@ -import datetime -import email -from email.mime.text import MIMEText -import imaplib -import logging -import re -import smtplib -import time - -import imapclient -from validate_email import validate_email - -from aprsd import messaging, stats, threads, trace - - -LOG = logging.getLogger("APRSD") - -# This gets forced set from main.py prior to being used internally -CONFIG = None - - -@trace.trace -def _imap_connect(): - imap_port = CONFIG["aprsd"]["email"]["imap"].get("port", 143) - use_ssl = CONFIG["aprsd"]["email"]["imap"].get("use_ssl", False) - host = CONFIG["aprsd"]["email"]["imap"]["host"] - msg = "{}{}:{}".format("TLS " if use_ssl else "", host, imap_port) - # LOG.debug("Connect to IMAP host {} with user '{}'". - # format(msg, CONFIG['imap']['login'])) - - try: - server = imapclient.IMAPClient( - CONFIG["aprsd"]["email"]["imap"]["host"], - port=imap_port, - use_uid=True, - ssl=use_ssl, - timeout=30, - ) - except Exception as e: - LOG.error("Failed to connect IMAP server", e) - return - - try: - server.login( - CONFIG["aprsd"]["email"]["imap"]["login"], - CONFIG["aprsd"]["email"]["imap"]["password"], - ) - except (imaplib.IMAP4.error, Exception) as e: - msg = getattr(e, "message", repr(e)) - LOG.error(f"Failed to login {msg}") - return - - server.select_folder("INBOX") - - server.fetch = trace.trace(server.fetch) - server.search = trace.trace(server.search) - server.remove_flags = trace.trace(server.remove_flags) - server.add_flags = trace.trace(server.add_flags) - return server - - -@trace.trace -def _smtp_connect(): - host = CONFIG["aprsd"]["email"]["smtp"]["host"] - smtp_port = CONFIG["aprsd"]["email"]["smtp"]["port"] - use_ssl = CONFIG["aprsd"]["email"]["smtp"].get("use_ssl", False) - msg = "{}{}:{}".format("SSL " if use_ssl else "", host, smtp_port) - LOG.debug( - "Connect to SMTP host {} with user '{}'".format( - msg, - CONFIG["aprsd"]["email"]["imap"]["login"], - ), - ) - - try: - if use_ssl: - server = smtplib.SMTP_SSL( - host=host, - port=smtp_port, - timeout=30, - ) - else: - server = smtplib.SMTP( - host=host, - port=smtp_port, - timeout=30, - ) - except Exception: - LOG.error("Couldn't connect to SMTP Server") - return - - LOG.debug(f"Connected to smtp host {msg}") - - debug = CONFIG["aprsd"]["email"]["smtp"].get("debug", False) - if debug: - server.set_debuglevel(5) - server.sendmail = trace.trace(server.sendmail) - - try: - server.login( - CONFIG["aprsd"]["email"]["smtp"]["login"], - CONFIG["aprsd"]["email"]["smtp"]["password"], - ) - except Exception: - LOG.error("Couldn't connect to SMTP Server") - return - - LOG.debug(f"Logged into SMTP server {msg}") - return server - - -def validate_shortcuts(config): - shortcuts = config["aprsd"]["email"].get("shortcuts", None) - if not shortcuts: - return - - LOG.info( - "Validating {} Email shortcuts. This can take up to 10 seconds" - " per shortcut".format(len(shortcuts)), - ) - delete_keys = [] - for key in shortcuts: - LOG.info(f"Validating {key}:{shortcuts[key]}") - is_valid = validate_email( - email_address=shortcuts[key], - check_regex=True, - check_mx=False, - from_address=config["aprsd"]["email"]["smtp"]["login"], - helo_host=config["aprsd"]["email"]["smtp"]["host"], - smtp_timeout=10, - dns_timeout=10, - use_blacklist=True, - debug=False, - ) - if not is_valid: - LOG.error( - "'{}' is an invalid email address. Removing shortcut".format( - shortcuts[key], - ), - ) - delete_keys.append(key) - - for key in delete_keys: - del config["aprsd"]["email"]["shortcuts"][key] - - LOG.info("Available shortcuts: {}".format(config["aprsd"]["email"]["shortcuts"])) - - -def get_email_from_shortcut(addr): - if CONFIG["aprsd"]["email"].get("shortcuts", False): - return CONFIG["aprsd"]["email"]["shortcuts"].get(addr, addr) - else: - return addr - - -def validate_email_config(config, disable_validation=False): - """function to simply ensure we can connect to email services. - - This helps with failing early during startup. - """ - LOG.info("Checking IMAP configuration") - imap_server = _imap_connect() - LOG.info("Checking SMTP configuration") - smtp_server = _smtp_connect() - - # Now validate and flag any shortcuts as invalid - if not disable_validation: - validate_shortcuts(config) - else: - LOG.info("Shortcuts email validation is Disabled!!, you were warned.") - - if imap_server and smtp_server: - return True - else: - return False - - -@trace.trace -def parse_email(msgid, data, server): - envelope = data[b"ENVELOPE"] - # email address match - # use raw string to avoid invalid escape secquence errors r"string here" - f = re.search(r"([\.\w_-]+@[\.\w_-]+)", str(envelope.from_[0])) - if f is not None: - from_addr = f.group(1) - else: - from_addr = "noaddr" - LOG.debug(f"Got a message from '{from_addr}'") - try: - m = server.fetch([msgid], ["RFC822"]) - except Exception as e: - LOG.exception("Couldn't fetch email from server in parse_email", e) - return - - msg = email.message_from_string(m[msgid][b"RFC822"].decode(errors="ignore")) - if msg.is_multipart(): - text = "" - html = None - # default in case body somehow isn't set below - happened once - body = b"* unreadable msg received" - # this uses the last text or html part in the email, phone companies often put content in an attachment - for part in msg.get_payload(): - if part.get_content_charset() is None: - # or BREAK when we hit a text or html? - # We cannot know the character set, - # so return decoded "something" - LOG.debug("Email got unknown content type") - text = part.get_payload(decode=True) - continue - - charset = part.get_content_charset() - - if part.get_content_type() == "text/plain": - LOG.debug("Email got text/plain") - text = str( - part.get_payload(decode=True), - str(charset), - "ignore", - ).encode("utf8", "replace") - - if part.get_content_type() == "text/html": - LOG.debug("Email got text/html") - html = str( - part.get_payload(decode=True), - str(charset), - "ignore", - ).encode("utf8", "replace") - - if text is not None: - # strip removes white space fore and aft of string - body = text.strip() - else: - body = html.strip() - else: # message is not multipart - # email.uscc.net sends no charset, blows up unicode function below - LOG.debug("Email is not multipart") - if msg.get_content_charset() is None: - text = str(msg.get_payload(decode=True), "US-ASCII", "ignore").encode( - "utf8", - "replace", - ) - else: - text = str( - msg.get_payload(decode=True), - msg.get_content_charset(), - "ignore", - ).encode("utf8", "replace") - body = text.strip() - - # FIXED: UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 6: ordinal not in range(128) - # decode with errors='ignore'. be sure to encode it before we return it below, also with errors='ignore' - try: - body = body.decode(errors="ignore") - except Exception as e: - LOG.error("Unicode decode failure: " + str(e)) - LOG.error("Unidoce decode failed: " + str(body)) - body = "Unreadable unicode msg" - # strip all html tags - body = re.sub("<[^<]+?>", "", body) - # strip CR/LF, make it one line, .rstrip fails at this - body = body.replace("\n", " ").replace("\r", " ") - # ascii might be out of range, so encode it, removing any error characters - body = body.encode(errors="ignore") - return (body, from_addr) - - -# end parse_email - - -@trace.trace -def send_email(to_addr, content): - global check_email_delay - - shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] - email_address = get_email_from_shortcut(to_addr) - LOG.info("Sending Email_________________") - - if to_addr in shortcuts: - LOG.info("To : " + to_addr) - to_addr = email_address - LOG.info(" (" + to_addr + ")") - subject = CONFIG["ham"]["callsign"] - # content = content + "\n\n(NOTE: reply with one line)" - LOG.info("Subject : " + subject) - LOG.info("Body : " + content) - - # check email more often since there's activity right now - check_email_delay = 60 - - msg = MIMEText(content) - msg["Subject"] = subject - msg["From"] = CONFIG["aprsd"]["email"]["smtp"]["login"] - msg["To"] = to_addr - server = _smtp_connect() - if server: - try: - server.sendmail( - CONFIG["aprsd"]["email"]["smtp"]["login"], - [to_addr], - msg.as_string(), - ) - stats.APRSDStats().email_tx_inc() - except Exception as e: - msg = getattr(e, "message", repr(e)) - LOG.error("Sendmail Error!!!! '{}'", msg) - server.quit() - return -1 - server.quit() - return 0 - - -# end send_email - - -@trace.trace -def resend_email(count, fromcall): - global check_email_delay - date = datetime.datetime.now() - month = date.strftime("%B")[:3] # Nov, Mar, Apr - day = date.day - year = date.year - today = f"{day}-{month}-{year}" - - shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] - # swap key/value - shortcuts_inverted = {v: k for k, v in shortcuts.items()} - - try: - server = _imap_connect() - except Exception as e: - LOG.exception("Failed to Connect to IMAP. Cannot resend email ", e) - return - - try: - messages = server.search(["SINCE", today]) - except Exception as e: - LOG.exception("Couldn't search for emails in resend_email ", e) - return - - # LOG.debug("%d messages received today" % len(messages)) - - msgexists = False - - messages.sort(reverse=True) - del messages[int(count) :] # only the latest "count" messages - for message in messages: - try: - parts = server.fetch(message, ["ENVELOPE"]).items() - except Exception as e: - LOG.exception("Couldn't fetch email parts in resend_email", e) - continue - - for msgid, data in list(parts): - # one at a time, otherwise order is random - (body, from_addr) = parse_email(msgid, data, server) - # unset seen flag, will stay bold in email client - try: - server.remove_flags(msgid, [imapclient.SEEN]) - except Exception as e: - LOG.exception("Failed to remove SEEN flag in resend_email", e) - - if from_addr in shortcuts_inverted: - # reverse lookup of a shortcut - from_addr = shortcuts_inverted[from_addr] - # asterisk indicates a resend - reply = "-" + from_addr + " * " + body.decode(errors="ignore") - # messaging.send_message(fromcall, reply) - msg = messaging.TextMessage( - CONFIG["aprs"]["login"], - fromcall, - reply, - ) - msg.send() - msgexists = True - - if msgexists is not True: - stm = time.localtime() - h = stm.tm_hour - m = stm.tm_min - s = stm.tm_sec - # append time as a kind of serial number to prevent FT1XDR from - # thinking this is a duplicate message. - # The FT1XDR pretty much ignores the aprs message number in this - # regard. The FTM400 gets it right. - reply = "No new msg {}:{}:{}".format( - str(h).zfill(2), - str(m).zfill(2), - str(s).zfill(2), - ) - # messaging.send_message(fromcall, reply) - msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply) - msg.send() - - # check email more often since we're resending one now - check_email_delay = 60 - - server.logout() - # end resend_email() - - -class APRSDEmailThread(threads.APRSDThread): - def __init__(self, msg_queues, config): - super().__init__("EmailThread") - self.msg_queues = msg_queues - self.config = config - - @trace.trace - def run(self): - global check_email_delay - - LOG.debug("Starting") - - check_email_delay = 60 - past = datetime.datetime.now() - while not self.thread_stop: - time.sleep(5) - stats.APRSDStats().email_thread_update() - # always sleep for 5 seconds and see if we need to check email - # This allows CTRL-C to stop the execution of this loop sooner - # than check_email_delay time - now = datetime.datetime.now() - if now - past > datetime.timedelta(seconds=check_email_delay): - # It's time to check email - - # slowly increase delay every iteration, max out at 300 seconds - # any send/receive/resend activity will reset this to 60 seconds - if check_email_delay < 300: - check_email_delay += 1 - LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds") - - shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] - # swap key/value - shortcuts_inverted = {v: k for k, v in shortcuts.items()} - - date = datetime.datetime.now() - month = date.strftime("%B")[:3] # Nov, Mar, Apr - day = date.day - year = date.year - today = f"{day}-{month}-{year}" - - server = None - try: - server = _imap_connect() - except Exception as e: - LOG.exception("IMAP failed to connect.", e) - - if not server: - continue - - try: - messages = server.search(["SINCE", today]) - except Exception as e: - LOG.exception("IMAP failed to search for messages since today.", e) - continue - LOG.debug(f"{len(messages)} messages received today") - - try: - _msgs = server.fetch(messages, ["ENVELOPE"]) - except Exception as e: - LOG.exception("IMAP failed to fetch/flag messages: ", e) - continue - - for msgid, data in _msgs.items(): - envelope = data[b"ENVELOPE"] - LOG.debug( - 'ID:%d "%s" (%s)' - % (msgid, envelope.subject.decode(), envelope.date), - ) - f = re.search( - r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", - str(envelope.from_[0]), - ) - if f is not None: - from_addr = f.group(1) - else: - from_addr = "noaddr" - - # LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid])) - # if "APRS" not in server.get_flags(msgid)[msgid]: - # in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both - try: - taglist = [ - x.decode(errors="ignore") - for x in server.get_flags(msgid)[msgid] - ] - except Exception as e: - LOG.exception("Failed to get flags.", e) - break - - if "APRS" not in taglist: - # if msg not flagged as sent via aprs - try: - server.fetch([msgid], ["RFC822"]) - except Exception as e: - LOG.exception("Failed single server fetch for RFC822", e) - break - - (body, from_addr) = parse_email(msgid, data, server) - # unset seen flag, will stay bold in email client - try: - server.remove_flags(msgid, [imapclient.SEEN]) - except Exception as e: - LOG.exception("Failed to remove flags SEEN", e) - # Not much we can do here, so lets try and - # send the aprs message anyway - - if from_addr in shortcuts_inverted: - # reverse lookup of a shortcut - from_addr = shortcuts_inverted[from_addr] - - reply = "-" + from_addr + " " + body.decode(errors="ignore") - msg = messaging.TextMessage( - self.config["aprs"]["login"], - self.config["ham"]["callsign"], - reply, - ) - self.msg_queues["tx"].put(msg) - # flag message as sent via aprs - try: - server.add_flags(msgid, ["APRS"]) - # unset seen flag, will stay bold in email client - except Exception as e: - LOG.exception("Couldn't add APRS flag to email", e) - - try: - server.remove_flags(msgid, [imapclient.SEEN]) - except Exception as e: - LOG.exception("Couldn't remove seen flag from email", e) - - # check email more often since we just received an email - check_email_delay = 60 - - # reset clock - LOG.debug("Done looping over Server.fetch, logging out.") - past = datetime.datetime.now() - try: - server.logout() - except Exception as e: - LOG.exception("IMAP failed to logout: ", e) - continue - else: - # We haven't hit the email delay yet. - # LOG.debug("Delta({}) < {}".format(now - past, check_email_delay)) - pass - - # Remove ourselves from the global threads list - threads.APRSDThreadList().remove(self) - LOG.info("Exiting") diff --git a/aprsd/flask.py b/aprsd/flask.py index 7577425..63014c9 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -61,6 +61,10 @@ class APRSDFlask(flask_classful.FlaskView): watch_count = 0 watch_age = 0 + pm = plugin.PluginManager() + plugins = pm.get_plugins() + plugin_count = len(plugins) + return flask.render_template( "index.html", initial_stats=stats, @@ -69,6 +73,7 @@ class APRSDFlask(flask_classful.FlaskView): config_json=json.dumps(self.config), watch_count=watch_count, watch_age=watch_age, + plugin_count=plugin_count, ) @auth.login_required diff --git a/aprsd/listen.py b/aprsd/listen.py index 8d8a9c0..4342447 100644 --- a/aprsd/listen.py +++ b/aprsd/listen.py @@ -294,7 +294,7 @@ def listen( # TODO(walt) - manually edit this list # prior to running aprsd-listen listen - watch_list = [] + watch_list = ["k*"] # build last seen list last_seen = {} diff --git a/aprsd/main.py b/aprsd/main.py index 3017297..d7ee309 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -25,7 +25,6 @@ import logging from logging import NullHandler from logging.handlers import RotatingFileHandler import os -import queue import signal import sys import time @@ -38,8 +37,7 @@ import click_completion # local imports here import aprsd from aprsd import ( - client, email, flask, messaging, packets, plugin, stats, threads, trace, - utils, + client, flask, messaging, packets, plugin, stats, threads, trace, utils, ) @@ -159,7 +157,7 @@ def signal_handler(sig, frame): datetime.datetime.now(), ), ) - time.sleep(5) + time.sleep(1.5) tracker = messaging.MsgTrack() tracker.save() LOG.info(stats.APRSDStats()) @@ -227,12 +225,6 @@ def setup_logging(config, loglevel, quiet): def check_version(loglevel, config_file): config = utils.parse_config(config_file) - # Force setting the config to the modules that need it - # TODO(Walt): convert these modules to classes that can - # Accept the config as a constructor param, instead of this - # hacky global setting - email.CONFIG = config - setup_logging(config, loglevel, False) level, msg = utils._check_version() if level: @@ -414,12 +406,6 @@ def send_message( help="The log level to use for aprsd.log", ) @click.option("--quiet", is_flag=True, default=False, help="Don't log to stdout") -@click.option( - "--disable-validation", - is_flag=True, - default=False, - help="Disable email shortcut validation. Bad email addresses can result in broken email responses!!", -) @click.option( "-c", "--config", @@ -440,7 +426,6 @@ def send_message( def server( loglevel, quiet, - disable_validation, config_file, flush, ): @@ -453,12 +438,6 @@ def server( config = utils.parse_config(config_file) - # Force setting the config to the modules that need it - # TODO(Walt): convert these modules to classes that can - # Accept the config as a constructor param, instead of this - # hacky global setting - email.CONFIG = config - setup_logging(config, loglevel, quiet) level, msg = utils._check_version() if level: @@ -479,27 +458,16 @@ def server( trace.setup_tracing(["method", "api"]) stats.APRSDStats(config) - email_enabled = config["aprsd"]["email"].get("enabled", False) - - if email_enabled: - # TODO(walt): Make email processing/checking optional? - # Maybe someone only wants this to process messages with plugins only. - valid = email.validate_email_config(config, disable_validation) - if not valid: - LOG.error("Failed to validate email config options") - sys.exit(-1) - else: - LOG.info("Email services not enabled.") - - # Create the initial PM singleton and Register plugins - plugin_manager = plugin.PluginManager(config) - plugin_manager.setup_plugins() try: cl = client.Client(config) cl.client except LoginError: sys.exit(-1) + # Create the initial PM singleton and Register plugins + plugin_manager = plugin.PluginManager(config) + plugin_manager.setup_plugins() + # Now load the msgTrack from disk if any if flush: LOG.debug("Deleting saved MsgTrack.") @@ -509,38 +477,24 @@ def server( LOG.debug("Loading saved MsgTrack object.") messaging.MsgTrack().load() - rx_notify_queue = queue.Queue(maxsize=20) - rx_msg_queue = queue.Queue(maxsize=20) - tx_msg_queue = queue.Queue(maxsize=20) - msg_queues = { - "rx": rx_msg_queue, - "tx": tx_msg_queue, - "notify": rx_notify_queue, - } + packets.PacketList(config=config) + + rx_thread = threads.APRSDRXThread( + msg_queues=threads.msg_queues, + config=config, + ) + + rx_thread.start() - rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config) - tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config) if "watch_list" in config["aprsd"] and config["aprsd"]["watch_list"].get( "enabled", True, ): - packets.PacketList(config) - notify_thread = threads.APRSDNotifyThread( - msg_queues=msg_queues, - config=config, - ) - notify_thread.start() - - if email_enabled: - email_thread = email.APRSDEmailThread(msg_queues=msg_queues, config=config) - email_thread.start() - - rx_thread.start() - tx_thread.start() + packets.WatchList(config=config) messaging.MsgTrack().restart() - keepalive = threads.KeepAliveThread() + keepalive = threads.KeepAliveThread(config=config) keepalive.start() try: diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 09750e5..1d2c8b3 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -9,7 +9,7 @@ import re import threading import time -from aprsd import client, stats, threads, trace, utils +from aprsd import client, packets, stats, threads, trace, utils LOG = logging.getLogger("APRSD") @@ -424,6 +424,7 @@ class SendMessageThread(threads.APRSDThread): ) cl.sendall(str(msg)) stats.APRSDStats().msgs_tx_inc() + packets.PacketList().add(msg.dict()) msg.last_send_time = datetime.datetime.now() msg.last_send_attempt += 1 @@ -467,8 +468,6 @@ class AckMessage(Message): thread = SendAckThread(self) thread.start() - # end send_ack() - def send_direct(self): """Send an ack message without a separate thread.""" cl = client.get_client() @@ -527,6 +526,7 @@ class SendAckThread(threads.APRSDThread): ) cl.sendall(str(self.ack)) stats.APRSDStats().ack_tx_inc() + packets.PacketList().add(self.ack.dict()) self.ack.last_send_attempt += 1 self.ack.last_send_time = datetime.datetime.now() time.sleep(5) @@ -543,15 +543,8 @@ def log_packet(packet): ack = packet.get("ack", None) log_message( - "Packet", - packet["raw"], - msg, - fromcall=fromcall, - tocall=tocall, - ack=ack, - packet_type=response_type, - msg_num=msg_num, - ) + "Packet", packet["raw"], msg, fromcall=fromcall, tocall=tocall, + ack=ack, packet_type=response_type, msg_num=msg_num, ) def log_message( diff --git a/aprsd/packets.py b/aprsd/packets.py index ae6e7d0..ef5e3f5 100644 --- a/aprsd/packets.py +++ b/aprsd/packets.py @@ -27,7 +27,7 @@ class PacketList: def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) - cls._instance.packet_list = utils.RingBuffer(100) + cls._instance.packet_list = utils.RingBuffer(1000) cls._instance.lock = threading.Lock() return cls._instance @@ -42,7 +42,10 @@ class PacketList: def add(self, packet): with self.lock: packet["ts"] = time.time() - if "from" in packet and packet["from"] == self.config["aprs"]["login"]: + if ( + "fromcall" in packet + and packet["fromcall"] == self.config["aprs"]["login"] + ): self.total_tx += 1 else: self.total_recv += 1 @@ -53,10 +56,12 @@ class PacketList: return self.packet_list.get() def total_received(self): - return self.total_recv + with self.lock: + return self.total_recv def total_sent(self): - return self.total_tx + with self.lock: + return self.total_tx class WatchList: @@ -156,3 +161,15 @@ def get_packet_type(packet): elif msg_format == "mic-e": packet_type = PACKET_TYPE_MICE return packet_type + + +def is_message_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_MESSAGE + + +def is_ack_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_ACK + + +def is_mice_packet(packet): + return get_packet_type(packet) == PACKET_TYPE_MICE diff --git a/aprsd/plugin.py b/aprsd/plugin.py index f361827..ebf160c 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -11,6 +11,8 @@ import threading import pluggy from thesmuggler import smuggle +from aprsd import client, messaging, packets, threads + # setup the global logger LOG = logging.getLogger("APRSD") @@ -39,12 +41,93 @@ class APRSDCommandSpec: """A hook specification namespace.""" @hookspec - def run(self, packet): + def filter(self, packet): """My special little hook that you can customize.""" -class APRSDNotificationPluginBase(metaclass=abc.ABCMeta): - """Base plugin class for all notification ased plugins. +class APRSDPluginBase(metaclass=abc.ABCMeta): + """The base class for all APRSD Plugins.""" + + config = None + rx_count = 0 + tx_count = 0 + version = "1.0" + + # Holds the list of APRSDThreads that the plugin creates + threads = [] + + def __init__(self, config): + self.config = config + self.message_counter = 0 + self.setup() + self.threads = self.create_threads() + if self.threads: + self.start_threads() + + def start_threads(self): + if self.threads: + if not isinstance(self.threads, list): + self.threads = [self.threads] + + try: + for thread in self.threads: + if isinstance(thread, threads.APRSDThread): + thread.start() + else: + LOG.error( + "Can't start thread {}:{}, Must be a child " + "of aprsd.threads.APRSDThread".format( + self, + thread, + ), + ) + except Exception: + LOG.error( + "Failed to start threads for plugin {}".format( + self, + ), + ) + + @property + def message_count(self): + return self.message_counter + + @property + def version(self): + """Version""" + raise NotImplementedError + + def setup(self): + """Do any plugin setup here.""" + + def create_threads(self): + """Gives the plugin writer the ability start a background thread.""" + return [] + + def rx_inc(self): + self.rx_count += 1 + + def tx_inc(self): + self.tx_count += 1 + + def stop_threads(self): + """Stop any threads this plugin might have created.""" + for thread in self.threads: + if isinstance(thread, threads.APRSDThread): + thread.stop() + + @hookimpl + @abc.abstractmethod + def filter(self, packet): + pass + + @abc.abstractmethod + def process(self, packet): + """This is called when the filter passes.""" + + +class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): + """Base plugin class for all notification APRSD plugins. All these plugins will get every packet seen by APRSD's registered list of HAM callsigns in the config file's @@ -54,25 +137,43 @@ class APRSDNotificationPluginBase(metaclass=abc.ABCMeta): by a particular HAM callsign, write a plugin based off of this class. """ + enabled = False - def __init__(self, config): - """The aprsd config object is stored.""" - self.config = config - self.message_counter = 0 + def setup(self): + # if we have a watch list enabled, we need to add filtering + # to enable seeing packets from the watch list. + if "watch_list" in self.config["aprsd"] and self.config["aprsd"][ + "watch_list" + ].get("enabled", False): + # watch list is enabled + self.enabled = True + watch_list = self.config["aprsd"]["watch_list"].get( + "callsigns", + [], + ) + # make sure the timeout is set or this doesn't work + if watch_list: + aprs_client = client.get_client() + filter_str = "b/{}".format("/".join(watch_list)) + aprs_client.set_filter(filter_str) + else: + LOG.warning("Watch list enabled, but no callsigns set.") - @hookimpl - def run(self, packet): - return self.notify(packet) + def filter(self, packet): + wl = packets.WatchList() + result = messaging.NULL_MESSAGE + if wl.callsign_in_watchlist(packet["from"]): + # packet is from a callsign in the watch list + self.rx_inc() + result = self.process() + if result: + self.tx_inc() + wl.update_seen(packet) - @abc.abstractmethod - def notify(self, packet): - """This is the main method called when a packet is rx. - - This will get called when a packet is seen by a callsign - registered in the watch list in the config file.""" + return result -class APRSDMessagePluginBase(metaclass=abc.ABCMeta): +class APRSDRegexCommandPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): """Base Message plugin class. When you want to search for a particular command in an @@ -80,11 +181,6 @@ class APRSDMessagePluginBase(metaclass=abc.ABCMeta): based off of this class. """ - def __init__(self, config): - """The aprsd config object is stored.""" - self.config = config - self.message_counter = 0 - @property def command_name(self): """The usage string help.""" @@ -95,29 +191,28 @@ class APRSDMessagePluginBase(metaclass=abc.ABCMeta): """The regex to match from the caller""" raise NotImplementedError - @property - def version(self): - """Version""" - raise NotImplementedError - - @property - def message_count(self): - return self.message_counter - @hookimpl - def run(self, packet): + def filter(self, packet): + result = None + message = packet.get("message_text", None) - if re.search(self.command_regex, message): - self.message_counter += 1 - return self.command(packet) + msg_format = packet.get("format", None) + tocall = packet.get("addresse", None) - @abc.abstractmethod - def command(self, packet): - """This is the command that runs when the regex matches. + # Only process messages destined for us + # and is an APRS message format and has a message. + if ( + tocall == self.config["aprs"]["login"] + and msg_format == "message" + and message + ): + if re.search(self.command_regex, message): + self.rx_inc() + result = self.process(packet) + if result: + self.tx_inc() - To reply with a message over the air, return a string - to send. - """ + return result class PluginManager: @@ -125,10 +220,7 @@ class PluginManager: _instance = None # the pluggy PluginManager for all Message plugins - _pluggy_msg_pm = None - - # the pluggy PluginManager for all Notification plugins - _pluggy_notify_pm = None + _pluggy_pm = None # aprsd config dict config = None @@ -173,18 +265,20 @@ class PluginManager: def is_plugin(self, obj): for c in inspect.getmro(obj): - if issubclass(c, APRSDMessagePluginBase) or issubclass( - c, - APRSDNotificationPluginBase, - ): + if issubclass(c, APRSDPluginBase): return True return False - def _create_class(self, module_class_string, super_cls: type = None, **kwargs): + def _create_class( + self, + module_class_string, + super_cls: type = None, + **kwargs, + ): """ Method to create a class from a fqn python string. - :param module_class_string: full name of the class to create an object of + :param module_class_string: full name of the class to create an object :param super_cls: expected super class for validity, None if bypass :param kwargs: parameters to pass :return: @@ -213,7 +307,7 @@ class PluginManager: obj = cls(**kwargs) return obj - def _load_msg_plugin(self, plugin_name): + def _load_plugin(self, plugin_name): """ Given a python fully qualified class path.name, Try importing the path, then creating the object, @@ -223,61 +317,35 @@ class PluginManager: try: plugin_obj = self._create_class( plugin_name, - APRSDMessagePluginBase, + APRSDPluginBase, config=self.config, ) if plugin_obj: LOG.info( - "Registering Message plugin '{}'({}) '{}'".format( - plugin_name, - plugin_obj.version, - plugin_obj.command_regex, - ), - ) - self._pluggy_msg_pm.register(plugin_obj) - except Exception as ex: - LOG.exception(f"Couldn't load plugin '{plugin_name}'", ex) - - def _load_notify_plugin(self, plugin_name): - """ - Given a python fully qualified class path.name, - Try importing the path, then creating the object, - then registering it as a aprsd Command Plugin - """ - plugin_obj = None - try: - plugin_obj = self._create_class( - plugin_name, - APRSDNotificationPluginBase, - config=self.config, - ) - if plugin_obj: - LOG.info( - "Registering Notification plugin '{}'({})".format( + "Registering plugin '{}'({})".format( plugin_name, plugin_obj.version, ), ) - self._pluggy_notify_pm.register(plugin_obj) + self._pluggy_pm.register(plugin_obj) except Exception as ex: LOG.exception(f"Couldn't load plugin '{plugin_name}'", ex) def reload_plugins(self): with self.lock: - del self._pluggy_msg_pm - del self._pluggy_notify_pm + del self._pluggy_pm self.setup_plugins() def setup_plugins(self): """Create the plugin manager and register plugins.""" - LOG.info("Loading APRSD Message Plugins") - enabled_msg_plugins = self.config["aprsd"].get("enabled_plugins", None) - self._pluggy_msg_pm = pluggy.PluginManager("aprsd") - self._pluggy_msg_pm.add_hookspecs(APRSDCommandSpec) - if enabled_msg_plugins: - for p_name in enabled_msg_plugins: - self._load_msg_plugin(p_name) + LOG.info("Loading APRSD Plugins") + enabled_plugins = self.config["aprsd"].get("enabled_plugins", None) + self._pluggy_pm = pluggy.PluginManager("aprsd") + self._pluggy_pm.add_hookspecs(APRSDCommandSpec) + if enabled_plugins: + for p_name in enabled_plugins: + self._load_plugin(p_name) else: # Enabled plugins isn't set, so we default to loading all of # the core plugins. @@ -285,34 +353,24 @@ class PluginManager: self._load_plugin(p_name) if self.config["aprsd"]["watch_list"].get("enabled", False): - LOG.info("Loading APRSD Notification Plugins") + LOG.info("Loading APRSD WatchList Plugins") enabled_notify_plugins = self.config["aprsd"]["watch_list"].get( "enabled_plugins", None, ) - self._pluggy_notify_pm = pluggy.PluginManager("aprsd") - self._pluggy_notify_pm.add_hookspecs(APRSDCommandSpec) if enabled_notify_plugins: for p_name in enabled_notify_plugins: - self._load_notify_plugin(p_name) - - else: - LOG.info("Skipping Custom Plugins directory.") + self._load_plugin(p_name) LOG.info("Completed Plugin Loading.") def run(self, packet): """Execute all the pluguns run method.""" with self.lock: - return self._pluggy_msg_pm.hook.run(packet=packet) - - def notify(self, packet): - """Execute all the notify pluguns run method.""" - with self.lock: - return self._pluggy_notify_pm.hook.run(packet=packet) + return self._pluggy_pm.hook.filter(packet=packet) def register_msg(self, obj): """Register the plugin.""" - self._pluggy_msg_pm.register(obj) + self._pluggy_pm.register(obj) - def get_msg_plugins(self): - return self._pluggy_msg_pm.get_plugins() + def get_plugins(self): + return self._pluggy_pm.get_plugins() diff --git a/aprsd/plugins/email.py b/aprsd/plugins/email.py index 0d0d9c9..c5bf6c4 100644 --- a/aprsd/plugins/email.py +++ b/aprsd/plugins/email.py @@ -1,14 +1,26 @@ +import datetime +import email +from email.mime.text import MIMEText +import imaplib import logging import re +import smtplib import time -from aprsd import email, messaging, plugin, trace +import imapclient +from validate_email import validate_email + +from aprsd import messaging, plugin, stats, threads, trace LOG = logging.getLogger("APRSD") +# This gets forced set from main.py prior to being used internally +CONFIG = {} +check_email_delay = 60 -class EmailPlugin(plugin.APRSDMessagePluginBase): + +class EmailPlugin(plugin.APRSDRegexCommandPluginBase): """Email Plugin.""" version = "1.0" @@ -18,10 +30,40 @@ class EmailPlugin(plugin.APRSDMessagePluginBase): # message_number:time combos so we don't resend the same email in # five mins {int:int} email_sent_dict = {} + enabled = False + + def setup(self): + """Ensure that email is enabled and start the thread.""" + global CONFIG + CONFIG = self.config + + email_enabled = self.config["aprsd"]["email"].get("enabled", False) + validation = self.config["aprsd"]["email"].get("validate", False) + + if email_enabled: + valid = validate_email_config(self.config, validation) + if not valid: + LOG.error("Failed to validate email config options.") + LOG.error("EmailPlugin DISABLED!!!!") + else: + self.enabled = True + else: + LOG.info("Email services not enabled.") + + def create_threads(self): + if self.enabled: + return APRSDEmailThread( + msg_queues=threads.msg_queues, + config=self.config, + ) @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("Email COMMAND") + if not self.enabled: + # Email has not been enabled + # so the plugin will just NOOP + return messaging.NULL_MESSAGE fromcall = packet.get("from") message = packet.get("message_text", None) @@ -39,7 +81,7 @@ class EmailPlugin(plugin.APRSDMessagePluginBase): r = re.search("^-([0-9])[0-9]*$", message) if r is not None: LOG.debug("RESEND EMAIL") - email.resend_email(r.group(1), fromcall) + resend_email(r.group(1), fromcall) reply = messaging.NULL_MESSAGE # -user@address.com body of email elif re.search(r"^-([A-Za-z0-9_\-\.@]+) (.*)", message): @@ -49,14 +91,16 @@ class EmailPlugin(plugin.APRSDMessagePluginBase): to_addr = a.group(1) content = a.group(2) - email_address = email.get_email_from_shortcut(to_addr) + email_address = get_email_from_shortcut(to_addr) if not email_address: reply = "Bad email address" return reply # send recipient link to aprs.fi map if content == "mapme": - content = "Click for my location: http://aprs.fi/{}".format( + content = ( + "Click for my location: http://aprs.fi/{}" "" + ).format( self.config["ham"]["callsign"], ) too_soon = 0 @@ -74,9 +118,9 @@ class EmailPlugin(plugin.APRSDMessagePluginBase): reply = messaging.NULL_MESSAGE if send_result != 0: reply = f"-{to_addr} failed" - # messaging.send_message(fromcall, "-" + to_addr + " failed") else: - # clear email sent dictionary if somehow goes over 100 + # clear email sent dictionary if somehow goes + # over 100 if len(self.email_sent_dict) > 98: LOG.debug( "DEBUG: email_sent_dict is big (" @@ -97,3 +141,540 @@ class EmailPlugin(plugin.APRSDMessagePluginBase): # messaging.send_message(fromcall, "Bad email address") return reply + + +@trace.trace +def _imap_connect(): + global CONFIG + imap_port = CONFIG["aprsd"]["email"]["imap"].get("port", 143) + use_ssl = CONFIG["aprsd"]["email"]["imap"].get("use_ssl", False) + # host = CONFIG["aprsd"]["email"]["imap"]["host"] + # msg = "{}{}:{}".format("TLS " if use_ssl else "", host, imap_port) + # LOG.debug("Connect to IMAP host {} with user '{}'". + # format(msg, CONFIG['imap']['login'])) + + try: + server = imapclient.IMAPClient( + CONFIG["aprsd"]["email"]["imap"]["host"], + port=imap_port, + use_uid=True, + ssl=use_ssl, + timeout=30, + ) + except Exception as e: + LOG.error("Failed to connect IMAP server", e) + return + + try: + server.login( + CONFIG["aprsd"]["email"]["imap"]["login"], + CONFIG["aprsd"]["email"]["imap"]["password"], + ) + except (imaplib.IMAP4.error, Exception) as e: + msg = getattr(e, "message", repr(e)) + LOG.error(f"Failed to login {msg}") + return + + server.select_folder("INBOX") + + server.fetch = trace.trace(server.fetch) + server.search = trace.trace(server.search) + server.remove_flags = trace.trace(server.remove_flags) + server.add_flags = trace.trace(server.add_flags) + return server + + +@trace.trace +def _smtp_connect(): + host = CONFIG["aprsd"]["email"]["smtp"]["host"] + smtp_port = CONFIG["aprsd"]["email"]["smtp"]["port"] + use_ssl = CONFIG["aprsd"]["email"]["smtp"].get("use_ssl", False) + msg = "{}{}:{}".format("SSL " if use_ssl else "", host, smtp_port) + LOG.debug( + "Connect to SMTP host {} with user '{}'".format( + msg, + CONFIG["aprsd"]["email"]["imap"]["login"], + ), + ) + + try: + if use_ssl: + server = smtplib.SMTP_SSL( + host=host, + port=smtp_port, + timeout=30, + ) + else: + server = smtplib.SMTP( + host=host, + port=smtp_port, + timeout=30, + ) + except Exception: + LOG.error("Couldn't connect to SMTP Server") + return + + LOG.debug(f"Connected to smtp host {msg}") + + debug = CONFIG["aprsd"]["email"]["smtp"].get("debug", False) + if debug: + server.set_debuglevel(5) + server.sendmail = trace.trace(server.sendmail) + + try: + server.login( + CONFIG["aprsd"]["email"]["smtp"]["login"], + CONFIG["aprsd"]["email"]["smtp"]["password"], + ) + except Exception: + LOG.error("Couldn't connect to SMTP Server") + return + + LOG.debug(f"Logged into SMTP server {msg}") + return server + + +def validate_shortcuts(config): + shortcuts = config["aprsd"]["email"].get("shortcuts", None) + if not shortcuts: + return + + LOG.info( + "Validating {} Email shortcuts. This can take up to 10 seconds" + " per shortcut".format(len(shortcuts)), + ) + delete_keys = [] + for key in shortcuts: + LOG.info(f"Validating {key}:{shortcuts[key]}") + is_valid = validate_email( + email_address=shortcuts[key], + check_regex=True, + check_mx=False, + from_address=config["aprsd"]["email"]["smtp"]["login"], + helo_host=config["aprsd"]["email"]["smtp"]["host"], + smtp_timeout=10, + dns_timeout=10, + use_blacklist=True, + debug=False, + ) + if not is_valid: + LOG.error( + "'{}' is an invalid email address. Removing shortcut".format( + shortcuts[key], + ), + ) + delete_keys.append(key) + + for key in delete_keys: + del config["aprsd"]["email"]["shortcuts"][key] + + LOG.info( + "Available shortcuts: {}".format( + config["aprsd"]["email"]["shortcuts"], + ), + ) + + +def get_email_from_shortcut(addr): + if CONFIG["aprsd"]["email"].get("shortcuts", False): + return CONFIG["aprsd"]["email"]["shortcuts"].get(addr, addr) + else: + return addr + + +def validate_email_config(config, disable_validation=False): + """function to simply ensure we can connect to email services. + + This helps with failing early during startup. + """ + LOG.info("Checking IMAP configuration") + imap_server = _imap_connect() + LOG.info("Checking SMTP configuration") + smtp_server = _smtp_connect() + + # Now validate and flag any shortcuts as invalid + if not disable_validation: + validate_shortcuts(config) + else: + LOG.info("Shortcuts email validation is Disabled!!, you were warned.") + + if imap_server and smtp_server: + return True + else: + return False + + +@trace.trace +def parse_email(msgid, data, server): + envelope = data[b"ENVELOPE"] + # email address match + # use raw string to avoid invalid escape secquence errors r"string here" + f = re.search(r"([\.\w_-]+@[\.\w_-]+)", str(envelope.from_[0])) + if f is not None: + from_addr = f.group(1) + else: + from_addr = "noaddr" + LOG.debug(f"Got a message from '{from_addr}'") + try: + m = server.fetch([msgid], ["RFC822"]) + except Exception as e: + LOG.exception("Couldn't fetch email from server in parse_email", e) + return + + msg = email.message_from_string(m[msgid][b"RFC822"].decode(errors="ignore")) + if msg.is_multipart(): + text = "" + html = None + # default in case body somehow isn't set below - happened once + body = b"* unreadable msg received" + # this uses the last text or html part in the email, + # phone companies often put content in an attachment + for part in msg.get_payload(): + if part.get_content_charset() is None: + # or BREAK when we hit a text or html? + # We cannot know the character set, + # so return decoded "something" + LOG.debug("Email got unknown content type") + text = part.get_payload(decode=True) + continue + + charset = part.get_content_charset() + + if part.get_content_type() == "text/plain": + LOG.debug("Email got text/plain") + text = str( + part.get_payload(decode=True), + str(charset), + "ignore", + ).encode("utf8", "replace") + + if part.get_content_type() == "text/html": + LOG.debug("Email got text/html") + html = str( + part.get_payload(decode=True), + str(charset), + "ignore", + ).encode("utf8", "replace") + + if text is not None: + # strip removes white space fore and aft of string + body = text.strip() + else: + body = html.strip() + else: # message is not multipart + # email.uscc.net sends no charset, blows up unicode function below + LOG.debug("Email is not multipart") + if msg.get_content_charset() is None: + text = str(msg.get_payload(decode=True), "US-ASCII", "ignore").encode( + "utf8", + "replace", + ) + else: + text = str( + msg.get_payload(decode=True), + msg.get_content_charset(), + "ignore", + ).encode("utf8", "replace") + body = text.strip() + + # FIXED: UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 + # in position 6: ordinal not in range(128) + # decode with errors='ignore'. be sure to encode it before we return + # it below, also with errors='ignore' + try: + body = body.decode(errors="ignore") + except Exception as e: + LOG.error("Unicode decode failure: " + str(e)) + LOG.error("Unidoce decode failed: " + str(body)) + body = "Unreadable unicode msg" + # strip all html tags + body = re.sub("<[^<]+?>", "", body) + # strip CR/LF, make it one line, .rstrip fails at this + body = body.replace("\n", " ").replace("\r", " ") + # ascii might be out of range, so encode it, removing any error characters + body = body.encode(errors="ignore") + return body, from_addr + + +# end parse_email + + +@trace.trace +def send_email(to_addr, content): + global check_email_delay + + shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] + email_address = get_email_from_shortcut(to_addr) + LOG.info("Sending Email_________________") + + if to_addr in shortcuts: + LOG.info("To : " + to_addr) + to_addr = email_address + LOG.info(" (" + to_addr + ")") + subject = CONFIG["ham"]["callsign"] + # content = content + "\n\n(NOTE: reply with one line)" + LOG.info("Subject : " + subject) + LOG.info("Body : " + content) + + # check email more often since there's activity right now + check_email_delay = 60 + + msg = MIMEText(content) + msg["Subject"] = subject + msg["From"] = CONFIG["aprsd"]["email"]["smtp"]["login"] + msg["To"] = to_addr + server = _smtp_connect() + if server: + try: + server.sendmail( + CONFIG["aprsd"]["email"]["smtp"]["login"], + [to_addr], + msg.as_string(), + ) + stats.APRSDStats().email_tx_inc() + except Exception as e: + msg = getattr(e, "message", repr(e)) + LOG.error("Sendmail Error!!!! '{}'", msg) + server.quit() + return -1 + server.quit() + return 0 + + +@trace.trace +def resend_email(count, fromcall): + global check_email_delay + date = datetime.datetime.now() + month = date.strftime("%B")[:3] # Nov, Mar, Apr + day = date.day + year = date.year + today = f"{day}-{month}-{year}" + + shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] + # swap key/value + shortcuts_inverted = {v: k for k, v in shortcuts.items()} + + try: + server = _imap_connect() + except Exception as e: + LOG.exception("Failed to Connect to IMAP. Cannot resend email ", e) + return + + try: + messages = server.search(["SINCE", today]) + except Exception as e: + LOG.exception("Couldn't search for emails in resend_email ", e) + return + + # LOG.debug("%d messages received today" % len(messages)) + + msgexists = False + + messages.sort(reverse=True) + del messages[int(count) :] # only the latest "count" messages + for message in messages: + try: + parts = server.fetch(message, ["ENVELOPE"]).items() + except Exception as e: + LOG.exception("Couldn't fetch email parts in resend_email", e) + continue + + for msgid, data in list(parts): + # one at a time, otherwise order is random + (body, from_addr) = parse_email(msgid, data, server) + # unset seen flag, will stay bold in email client + try: + server.remove_flags(msgid, [imapclient.SEEN]) + except Exception as e: + LOG.exception("Failed to remove SEEN flag in resend_email", e) + + if from_addr in shortcuts_inverted: + # reverse lookup of a shortcut + from_addr = shortcuts_inverted[from_addr] + # asterisk indicates a resend + reply = "-" + from_addr + " * " + body.decode(errors="ignore") + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage( + CONFIG["aprs"]["login"], + fromcall, + reply, + ) + msg.send() + msgexists = True + + if msgexists is not True: + stm = time.localtime() + h = stm.tm_hour + m = stm.tm_min + s = stm.tm_sec + # append time as a kind of serial number to prevent FT1XDR from + # thinking this is a duplicate message. + # The FT1XDR pretty much ignores the aprs message number in this + # regard. The FTM400 gets it right. + reply = "No new msg {}:{}:{}".format( + str(h).zfill(2), + str(m).zfill(2), + str(s).zfill(2), + ) + # messaging.send_message(fromcall, reply) + msg = messaging.TextMessage(CONFIG["aprs"]["login"], fromcall, reply) + msg.send() + + # check email more often since we're resending one now + check_email_delay = 60 + + server.logout() + # end resend_email() + + +class APRSDEmailThread(threads.APRSDThread): + def __init__(self, msg_queues, config): + super().__init__("EmailThread") + self.msg_queues = msg_queues + self.config = config + self.past = datetime.datetime.now() + + def loop(self): + global check_email_delay + + LOG.debug("Starting Loop") + + check_email_delay = 60 + time.sleep(5) + stats.APRSDStats().email_thread_update() + # always sleep for 5 seconds and see if we need to check email + # This allows CTRL-C to stop the execution of this loop sooner + # than check_email_delay time + now = datetime.datetime.now() + if now - self.past > datetime.timedelta(seconds=check_email_delay): + # It's time to check email + + # slowly increase delay every iteration, max out at 300 seconds + # any send/receive/resend activity will reset this to 60 seconds + if check_email_delay < 300: + check_email_delay += 1 + LOG.debug( + "check_email_delay is " + str(check_email_delay) + " seconds", + ) + + shortcuts = CONFIG["aprsd"]["email"]["shortcuts"] + # swap key/value + shortcuts_inverted = {v: k for k, v in shortcuts.items()} + + date = datetime.datetime.now() + month = date.strftime("%B")[:3] # Nov, Mar, Apr + day = date.day + year = date.year + today = f"{day}-{month}-{year}" + + try: + server = _imap_connect() + except Exception as e: + LOG.exception("IMAP failed to connect.", e) + return True + + try: + messages = server.search(["SINCE", today]) + except Exception as e: + LOG.exception( + "IMAP failed to search for messages since today.", + e, + ) + return True + LOG.debug(f"{len(messages)} messages received today") + + try: + _msgs = server.fetch(messages, ["ENVELOPE"]) + except Exception as e: + LOG.exception("IMAP failed to fetch/flag messages: ", e) + return True + + for msgid, data in _msgs.items(): + envelope = data[b"ENVELOPE"] + LOG.debug( + 'ID:%d "%s" (%s)' + % (msgid, envelope.subject.decode(), envelope.date), + ) + f = re.search( + r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", + str(envelope.from_[0]), + ) + if f is not None: + from_addr = f.group(1) + else: + from_addr = "noaddr" + + # LOG.debug("Message flags/tags: " + + # str(server.get_flags(msgid)[msgid])) + # if "APRS" not in server.get_flags(msgid)[msgid]: + # in python3, imap tags are unicode. in py2 they're strings. + # so .decode them to handle both + try: + taglist = [ + x.decode(errors="ignore") + for x in server.get_flags(msgid)[msgid] + ] + except Exception as e: + LOG.exception("Failed to get flags.", e) + break + + if "APRS" not in taglist: + # if msg not flagged as sent via aprs + try: + server.fetch([msgid], ["RFC822"]) + except Exception as e: + LOG.exception( + "Failed single server fetch for RFC822", + e, + ) + break + + (body, from_addr) = parse_email(msgid, data, server) + # unset seen flag, will stay bold in email client + try: + server.remove_flags(msgid, [imapclient.SEEN]) + except Exception as e: + LOG.exception("Failed to remove flags SEEN", e) + # Not much we can do here, so lets try and + # send the aprs message anyway + + if from_addr in shortcuts_inverted: + # reverse lookup of a shortcut + from_addr = shortcuts_inverted[from_addr] + + reply = "-" + from_addr + " " + body.decode(errors="ignore") + msg = messaging.TextMessage( + self.config["aprs"]["login"], + self.config["ham"]["callsign"], + reply, + ) + msg.send() + # flag message as sent via aprs + try: + server.add_flags(msgid, ["APRS"]) + # unset seen flag, will stay bold in email client + except Exception as e: + LOG.exception("Couldn't add APRS flag to email", e) + + try: + server.remove_flags(msgid, [imapclient.SEEN]) + except Exception as e: + LOG.exception("Couldn't remove seen flag from email", e) + + # check email more often since we just received an email + check_email_delay = 60 + + # reset clock + LOG.debug("Done looping over Server.fetch, logging out.") + self.past = datetime.datetime.now() + try: + server.logout() + except Exception as e: + LOG.exception("IMAP failed to logout: ", e) + return True + else: + # We haven't hit the email delay yet. + # LOG.debug("Delta({}) < {}".format(now - past, check_email_delay)) + return True + + return True diff --git a/aprsd/plugins/fortune.py b/aprsd/plugins/fortune.py index 27fe632..e6ac01a 100644 --- a/aprsd/plugins/fortune.py +++ b/aprsd/plugins/fortune.py @@ -8,7 +8,7 @@ from aprsd import plugin, trace LOG = logging.getLogger("APRSD") -class FortunePlugin(plugin.APRSDMessagePluginBase): +class FortunePlugin(plugin.APRSDRegexCommandPluginBase): """Fortune.""" version = "1.0" @@ -16,7 +16,7 @@ class FortunePlugin(plugin.APRSDMessagePluginBase): command_name = "fortune" @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("FortunePlugin") # fromcall = packet.get("from") diff --git a/aprsd/plugins/location.py b/aprsd/plugins/location.py index a201a17..59b3847 100644 --- a/aprsd/plugins/location.py +++ b/aprsd/plugins/location.py @@ -8,7 +8,7 @@ from aprsd import plugin, plugin_utils, trace, utils LOG = logging.getLogger("APRSD") -class LocationPlugin(plugin.APRSDMessagePluginBase): +class LocationPlugin(plugin.APRSDRegexCommandPluginBase): """Location!""" version = "1.0" @@ -16,7 +16,7 @@ class LocationPlugin(plugin.APRSDMessagePluginBase): command_name = "location" @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("Location Plugin") fromcall = packet.get("from") message = packet.get("message_text", None) diff --git a/aprsd/plugins/notify.py b/aprsd/plugins/notify.py index 0687020..c3806fa 100644 --- a/aprsd/plugins/notify.py +++ b/aprsd/plugins/notify.py @@ -6,7 +6,7 @@ from aprsd import messaging, packets, plugin LOG = logging.getLogger("APRSD") -class NotifySeenPlugin(plugin.APRSDNotificationPluginBase): +class NotifySeenPlugin(plugin.APRSDWatchListPluginBase): """Notification plugin to send seen message for callsign. @@ -21,7 +21,7 @@ class NotifySeenPlugin(plugin.APRSDNotificationPluginBase): """The aprsd config object is stored.""" super().__init__(config) - def notify(self, packet): + def process(self, packet): LOG.info("BaseNotifyPlugin") notify_callsign = self.config["aprsd"]["watch_list"]["alert_callsign"] diff --git a/aprsd/plugins/ping.py b/aprsd/plugins/ping.py index 471bc07..77d78ec 100644 --- a/aprsd/plugins/ping.py +++ b/aprsd/plugins/ping.py @@ -7,7 +7,7 @@ from aprsd import plugin, trace LOG = logging.getLogger("APRSD") -class PingPlugin(plugin.APRSDMessagePluginBase): +class PingPlugin(plugin.APRSDRegexCommandPluginBase): """Ping.""" version = "1.0" @@ -15,8 +15,8 @@ class PingPlugin(plugin.APRSDMessagePluginBase): command_name = "ping" @trace.trace - def command(self, packet): - LOG.info("PINGPlugin") + def process(self, packet): + LOG.info("PingPlugin") # fromcall = packet.get("from") # message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") diff --git a/aprsd/plugins/query.py b/aprsd/plugins/query.py index b6e3eb0..a658e74 100644 --- a/aprsd/plugins/query.py +++ b/aprsd/plugins/query.py @@ -8,7 +8,7 @@ from aprsd import messaging, plugin, trace LOG = logging.getLogger("APRSD") -class QueryPlugin(plugin.APRSDMessagePluginBase): +class QueryPlugin(plugin.APRSDRegexCommandPluginBase): """Query command.""" version = "1.0" @@ -16,7 +16,7 @@ class QueryPlugin(plugin.APRSDMessagePluginBase): command_name = "query" @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("Query COMMAND") fromcall = packet.get("from") diff --git a/aprsd/plugins/stock.py b/aprsd/plugins/stock.py index 83d487e..debf3d5 100644 --- a/aprsd/plugins/stock.py +++ b/aprsd/plugins/stock.py @@ -9,7 +9,7 @@ from aprsd import plugin, trace LOG = logging.getLogger("APRSD") -class StockPlugin(plugin.APRSDMessagePluginBase): +class StockPlugin(plugin.APRSDRegexCommandPluginBase): """Stock market plugin for fetching stock quotes""" version = "1.0" @@ -17,7 +17,7 @@ class StockPlugin(plugin.APRSDMessagePluginBase): command_name = "stock" @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("StockPlugin") # fromcall = packet.get("from") diff --git a/aprsd/plugins/time.py b/aprsd/plugins/time.py index 819e657..81df83a 100644 --- a/aprsd/plugins/time.py +++ b/aprsd/plugins/time.py @@ -11,7 +11,7 @@ from aprsd import fuzzyclock, plugin, plugin_utils, trace, utils LOG = logging.getLogger("APRSD") -class TimePlugin(plugin.APRSDMessagePluginBase): +class TimePlugin(plugin.APRSDRegexCommandPluginBase): """Time command.""" version = "1.0" @@ -42,7 +42,7 @@ class TimePlugin(plugin.APRSDMessagePluginBase): return reply @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("TIME COMMAND") # So we can mock this in unit tests localzone = self._get_local_tz() @@ -57,7 +57,7 @@ class TimeOpenCageDataPlugin(TimePlugin): command_name = "Time" @trace.trace - def command(self, packet): + def process(self, packet): fromcall = packet.get("from") message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") @@ -123,7 +123,7 @@ class TimeOWMPlugin(TimePlugin): command_name = "Time" @trace.trace - def command(self, packet): + def process(self, packet): fromcall = packet.get("from") message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") diff --git a/aprsd/plugins/version.py b/aprsd/plugins/version.py index cad661d..2a5e41a 100644 --- a/aprsd/plugins/version.py +++ b/aprsd/plugins/version.py @@ -7,7 +7,7 @@ from aprsd import plugin, stats, trace LOG = logging.getLogger("APRSD") -class VersionPlugin(plugin.APRSDMessagePluginBase): +class VersionPlugin(plugin.APRSDRegexCommandPluginBase): """Version of APRSD Plugin.""" version = "1.0" @@ -19,7 +19,7 @@ class VersionPlugin(plugin.APRSDMessagePluginBase): email_sent_dict = {} @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("Version COMMAND") # fromcall = packet.get("from") # message = packet.get("message_text", None) diff --git a/aprsd/plugins/weather.py b/aprsd/plugins/weather.py index 6f94c45..77e68c6 100644 --- a/aprsd/plugins/weather.py +++ b/aprsd/plugins/weather.py @@ -10,7 +10,7 @@ from aprsd import plugin, plugin_utils, trace, utils LOG = logging.getLogger("APRSD") -class USWeatherPlugin(plugin.APRSDMessagePluginBase): +class USWeatherPlugin(plugin.APRSDRegexCommandPluginBase): """USWeather Command Returns a weather report for the calling weather station @@ -28,7 +28,7 @@ class USWeatherPlugin(plugin.APRSDMessagePluginBase): command_name = "weather" @trace.trace - def command(self, packet): + def process(self, packet): LOG.info("Weather Plugin") fromcall = packet.get("from") # message = packet.get("message_text", None) @@ -71,7 +71,7 @@ class USWeatherPlugin(plugin.APRSDMessagePluginBase): return reply -class USMetarPlugin(plugin.APRSDMessagePluginBase): +class USMetarPlugin(plugin.APRSDRegexCommandPluginBase): """METAR Command This provides a METAR weather report from a station near the caller @@ -91,7 +91,7 @@ class USMetarPlugin(plugin.APRSDMessagePluginBase): command_name = "Metar" @trace.trace - def command(self, packet): + def process(self, packet): fromcall = packet.get("from") message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") @@ -162,7 +162,7 @@ class USMetarPlugin(plugin.APRSDMessagePluginBase): return reply -class OWMWeatherPlugin(plugin.APRSDMessagePluginBase): +class OWMWeatherPlugin(plugin.APRSDRegexCommandPluginBase): """OpenWeatherMap Weather Command This provides weather near the caller or callsign. @@ -186,7 +186,7 @@ class OWMWeatherPlugin(plugin.APRSDMessagePluginBase): command_name = "Weather" @trace.trace - def command(self, packet): + def process(self, packet): fromcall = packet.get("from") message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") @@ -282,7 +282,7 @@ class OWMWeatherPlugin(plugin.APRSDMessagePluginBase): return reply -class AVWXWeatherPlugin(plugin.APRSDMessagePluginBase): +class AVWXWeatherPlugin(plugin.APRSDRegexCommandPluginBase): """AVWXWeatherMap Weather Command Fetches a METAR weather report for the nearest @@ -310,7 +310,7 @@ class AVWXWeatherPlugin(plugin.APRSDMessagePluginBase): command_name = "Weather" @trace.trace - def command(self, packet): + def process(self, packet): fromcall = packet.get("from") message = packet.get("message_text", None) # ack = packet.get("msgNo", "0") diff --git a/aprsd/stats.py b/aprsd/stats.py index d70ab8d..144c6bf 100644 --- a/aprsd/stats.py +++ b/aprsd/stats.py @@ -183,7 +183,7 @@ class APRSDStats: last_aprsis_keepalive = "never" pm = plugin.PluginManager() - plugins = pm.get_msg_plugins() + plugins = pm.get_plugins() plugin_stats = {} def full_name_with_qualname(obj): @@ -193,7 +193,10 @@ class APRSDStats: ) for p in plugins: - plugin_stats[full_name_with_qualname(p)] = p.message_count + plugin_stats[full_name_with_qualname(p)] = { + "rx": p.rx_count, + "tx": p.tx_count, + } wl = packets.WatchList() diff --git a/aprsd/threads.py b/aprsd/threads.py index 60fec15..fa75f69 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -14,9 +14,13 @@ from aprsd import client, messaging, packets, plugin, stats, utils LOG = logging.getLogger("APRSD") RX_THREAD = "RX" -TX_THREAD = "TX" EMAIL_THREAD = "Email" +rx_msg_queue = queue.Queue(maxsize=20) +msg_queues = { + "rx": rx_msg_queue, +} + class APRSDThreadList: """Singleton class that keeps track of application wide threads.""" @@ -45,8 +49,13 @@ class APRSDThreadList: """Iterate over all threads and call stop on them.""" with self.lock: for th in self.threads_list: + LOG.debug(f"Stopping Thread {th.name}") th.stop() + def __len__(self): + with self.lock: + return len(self.threads_list) + class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): def __init__(self, name): @@ -57,6 +66,10 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta): def stop(self): self.thread_stop = True + @abc.abstractmethod + def loop(self): + pass + def run(self): LOG.debug("Starting") while not self.thread_stop: @@ -71,15 +84,17 @@ class KeepAliveThread(APRSDThread): cntr = 0 checker_time = datetime.datetime.now() - def __init__(self): + def __init__(self, config): tracemalloc.start() super().__init__("KeepAlive") + self.config = config def loop(self): if self.cntr % 6 == 0: tracker = messaging.MsgTrack() stats_obj = stats.APRSDStats() - packets_list = packets.PacketList().packet_list + pl = packets.PacketList() + thread_list = APRSDThreadList() now = datetime.datetime.now() last_email = stats_obj.email_thread_time if last_email: @@ -92,18 +107,24 @@ class KeepAliveThread(APRSDThread): current, peak = tracemalloc.get_traced_memory() stats_obj.set_memory(current) stats_obj.set_memory_peak(peak) - keepalive = "Uptime {} Tracker {} " "Msgs TX:{} RX:{} Last:{} Email:{} Packets:{} RAM Current:{} Peak:{}".format( + keepalive = ( + "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " + "Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{}" + ).format( + self.config["aprs"]["login"], utils.strfdelta(stats_obj.uptime), + pl.total_recv, + pl.total_tx, len(tracker), stats_obj.msgs_tx, stats_obj.msgs_rx, last_msg_time, email_thread_time, - len(packets_list), utils.human_size(current), utils.human_size(peak), + len(thread_list), ) - LOG.debug(keepalive) + LOG.info(keepalive) # Check version every hour delta = now - self.checker_time if delta > datetime.timedelta(hours=1): @@ -112,52 +133,7 @@ class KeepAliveThread(APRSDThread): if level: LOG.warning(msg) self.cntr += 1 - time.sleep(10) - return True - - -class APRSDNotifyThread(APRSDThread): - last_seen = {} - - def __init__(self, msg_queues, config): - super().__init__("NOTIFY_MSG") - self.msg_queues = msg_queues - self.config = config - packets.WatchList(config=config) - - def loop(self): - try: - packet = self.msg_queues["notify"].get(timeout=5) - wl = packets.WatchList() - if wl.callsign_in_watchlist(packet["from"]): - # NOW WE RUN through the notify plugins. - # If they return a msg, then we queue it for sending. - pm = plugin.PluginManager() - results = pm.notify(packet) - for reply in results: - if reply is not messaging.NULL_MESSAGE: - watch_list_conf = self.config["aprsd"]["watch_list"] - - msg = messaging.TextMessage( - self.config["aprs"]["login"], - watch_list_conf["alert_callsign"], - reply, - ) - self.msg_queues["tx"].put(msg) - - wl.update_seen(packet) - else: - LOG.debug( - "Ignoring packet from '{}'. Not in watch list.".format( - packet["from"], - ), - ) - - # Allows stats object to have latest info from the last_seen dict - LOG.debug("Packet processing complete") - except queue.Empty: - pass - # Continue to loop + time.sleep(1) return True @@ -174,23 +150,6 @@ class APRSDRXThread(APRSDThread): def loop(self): aprs_client = client.get_client() - # if we have a watch list enabled, we need to add filtering - # to enable seeing packets from the watch list. - if "watch_list" in self.config["aprsd"] and self.config["aprsd"][ - "watch_list" - ].get("enabled", False): - # watch list is enabled - watch_list = self.config["aprsd"]["watch_list"].get( - "callsigns", - [], - ) - # make sure the timeout is set or this doesn't work - if watch_list: - filter_str = "p/{}".format("/".join(watch_list)) - aprs_client.set_filter(filter_str) - else: - LOG.warning("Watch list enabled, but no callsigns set.") - # setup the consumer of messages and block until a messages try: # This will register a packet consumer with aprslib @@ -214,6 +173,19 @@ class APRSDRXThread(APRSDThread): # Continue to loop return True + def process_packet(self, packet): + thread = APRSDProcessPacketThread(packet=packet, config=self.config) + thread.start() + + +class APRSDProcessPacketThread(APRSDThread): + + def __init__(self, packet, config): + self.packet = packet + self.config = config + name = self.packet["raw"][:10] + super().__init__(f"RX_PACKET-{name}") + def process_ack_packet(self, packet): ack_num = packet.get("msgNo") LOG.info(f"Got ack for message {ack_num}") @@ -229,133 +201,102 @@ class APRSDRXThread(APRSDThread): stats.APRSDStats().ack_rx_inc() return - def process_mic_e_packet(self, packet): - LOG.info("Mic-E Packet detected. Currenlty unsupported.") - messaging.log_packet(packet) - stats.APRSDStats().msgs_mice_inc() - return - - def process_message_packet(self, packet): - fromcall = packet["from"] - message = packet.get("message_text", None) - - msg_id = packet.get("msgNo", "0") - - messaging.log_message( - "Received Message", - packet["raw"], - message, - fromcall=fromcall, - msg_num=msg_id, - ) - - found_command = False - # Get singleton of the PM - pm = plugin.PluginManager() - try: - results = pm.run(packet) - for reply in results: - if isinstance(reply, list): - # one of the plugins wants to send multiple messages - found_command = True - for subreply in reply: - LOG.debug(f"Sending '{subreply}'") - - msg = messaging.TextMessage( - self.config["aprs"]["login"], - fromcall, - subreply, - ) - self.msg_queues["tx"].put(msg) - - else: - found_command = True - # A plugin can return a null message flag which signals - # us that they processed the message correctly, but have - # nothing to reply with, so we avoid replying with a usage string - if reply is not messaging.NULL_MESSAGE: - LOG.debug(f"Sending '{reply}'") - - msg = messaging.TextMessage( - self.config["aprs"]["login"], - fromcall, - reply, - ) - self.msg_queues["tx"].put(msg) - else: - LOG.debug("Got NULL MESSAGE from plugin") - - if not found_command: - plugins = pm.get_msg_plugins() - names = [x.command_name for x in plugins] - names.sort() - - # reply = "Usage: {}".format(", ".join(names)) - reply = "Usage: weather, locate [call], time, fortune, ping" - - msg = messaging.TextMessage( - self.config["aprs"]["login"], - fromcall, - reply, - ) - self.msg_queues["tx"].put(msg) - except Exception as ex: - LOG.exception("Plugin failed!!!", ex) - reply = "A Plugin failed! try again?" - msg = messaging.TextMessage(self.config["aprs"]["login"], fromcall, reply) - self.msg_queues["tx"].put(msg) - - # let any threads do their thing, then ack - # send an ack last - ack = messaging.AckMessage( - self.config["aprs"]["login"], - fromcall, - msg_id=msg_id, - ) - self.msg_queues["tx"].put(ack) - - def process_packet(self, packet): + def loop(self): """Process a packet recieved from aprs-is server.""" + packet = self.packet + packets.PacketList().add(packet) - try: - LOG.debug("Adding packet to notify queue {}".format(packet["raw"])) - self.msg_queues["notify"].put(packet) - packets.PacketList().add(packet) + fromcall = packet["from"] + tocall = packet.get("addresse", None) + msg = packet.get("message_text", None) + msg_id = packet.get("msgNo", "0") + msg_response = packet.get("response", None) + # LOG.debug(f"Got packet from '{fromcall}' - {packet}") - # since we can see packets from anyone now with the - # watch list, we need to filter messages directly only to us. - tocall = packet.get("addresse", None) + # We don't put ack packets destined for us through the + # plugins. + if tocall == self.config["aprs"]["login"] and msg_response == "ack": + self.process_ack_packet(packet) + else: + # It's not an ACK for us, so lets run it through + # the plugins. + messaging.log_message( + "Received Message", + packet["raw"], + msg, + fromcall=fromcall, + msg_num=msg_id, + ) + + # Only ack messages that were sent directly to us if tocall == self.config["aprs"]["login"]: stats.APRSDStats().msgs_rx_inc() - packets.PacketList().add(packet) - - msg = packet.get("message_text", None) - msg_format = packet.get("format", None) - msg_response = packet.get("response", None) - if msg_format == "message" and msg: - # we want to send the message through the - # plugins - self.process_message_packet(packet) - return - elif msg_response == "ack": - self.process_ack_packet(packet) - return - - if msg_format == "mic-e": - # process a mic-e packet - self.process_mic_e_packet(packet) - return - else: - LOG.debug( - "Ignoring '{}' packet from '{}' to '{}'".format( - packets.get_packet_type(packet), - packet["from"], - tocall, - ), + # let any threads do their thing, then ack + # send an ack last + ack = messaging.AckMessage( + self.config["aprs"]["login"], + fromcall, + msg_id=msg_id, ) + ack.send() + + pm = plugin.PluginManager() + try: + results = pm.run(packet) + replied = False + for reply in results: + if isinstance(reply, list): + # one of the plugins wants to send multiple messages + replied = True + for subreply in reply: + LOG.debug(f"Sending '{subreply}'") + + msg = messaging.TextMessage( + self.config["aprs"]["login"], + fromcall, + subreply, + ) + msg.send() + + else: + replied = True + # A plugin can return a null message flag which signals + # us that they processed the message correctly, but have + # nothing to reply with, so we avoid replying with a + # usage string + if reply is not messaging.NULL_MESSAGE: + LOG.debug(f"Sending '{reply}'") + + msg = messaging.TextMessage( + self.config["aprs"]["login"], + fromcall, + reply, + ) + msg.send() + + # If the message was for us and we didn't have a + # response, then we send a usage statement. + if tocall == self.config["aprs"]["login"] and not replied: + reply = "Usage: weather, locate [call], time, fortune, ping" + + msg = messaging.TextMessage( + self.config["aprs"]["login"], + fromcall, + reply, + ) + msg.send() + except Exception as ex: + LOG.exception("Plugin failed!!!", ex) + # Do we need to send a reply? + if tocall == self.config["aprs"]["login"]: + reply = "A Plugin failed! try again?" + msg = messaging.TextMessage( + self.config["aprs"]["login"], + fromcall, + reply, + ) + msg.send() - except (aprslib.ParseError, aprslib.UnknownFormat) as exp: - LOG.exception("Failed to parse packet from aprs-is", exp) LOG.debug("Packet processing complete") @@ -367,8 +308,7 @@ class APRSDTXThread(APRSDThread): def loop(self): try: - msg = self.msg_queues["tx"].get(timeout=5) - packets.PacketList().add(msg.dict()) + msg = self.msg_queues["tx"].get(timeout=1) msg.send() except queue.Empty: pass diff --git a/aprsd/utils.py b/aprsd/utils.py index 3793b90..14b47ff 100644 --- a/aprsd/utils.py +++ b/aprsd/utils.py @@ -394,8 +394,14 @@ def human_size(bytes, units=None): return str(bytes) + units[0] if bytes < 1024 else human_size(bytes >> 10, units[1:]) -def strfdelta(tdelta, fmt="{hours}:{minutes}:{seconds}"): - d = {"days": tdelta.days} +def strfdelta(tdelta, fmt="{hours:{width}}:{minutes:{width}}:{seconds:{width}}"): + d = { + "days": tdelta.days, + "width": "02", + } + if tdelta.days > 0: + fmt = "{days} days " + fmt + d["hours"], rem = divmod(tdelta.seconds, 3600) d["minutes"], d["seconds"] = divmod(rem, 60) return fmt.format(**d) @@ -460,6 +466,9 @@ class RingBuffer: """return list of elements in correct order""" return self.data[self.cur :] + self.data[: self.cur] + def __len__(self): + return len(self.data) + def append(self, x): """append an element at the end of the buffer""" diff --git a/aprsd/web/static/js/main.js b/aprsd/web/static/js/main.js index 9d3f930..f3849b3 100644 --- a/aprsd/web/static/js/main.js +++ b/aprsd/web/static/js/main.js @@ -58,6 +58,23 @@ function update_watchlist_from_packet(callsign, val) { //console.log(watchlist) } +function update_plugins( data ) { + var plugindiv = $("#pluginDiv"); + var html_str = '
Plugin Name | Processed Packets | Sent Packets | ' + key + ' | ' + val["rx"] + ' | ' + val["tx"] + ' | '; + } + html_str += "
---|
{{ config_json|safe }}diff --git a/tests/fake.py b/tests/fake.py new file mode 100644 index 0000000..a45ead3 --- /dev/null +++ b/tests/fake.py @@ -0,0 +1,67 @@ +from aprsd import packets, plugin, threads + + +FAKE_MESSAGE_TEXT = "fake MeSSage" +FAKE_FROM_CALLSIGN = "KFART" +FAKE_TO_CALLSIGN = "KMINE" + + +def fake_packet( + fromcall=FAKE_FROM_CALLSIGN, + tocall=FAKE_TO_CALLSIGN, + message=None, + msg_number=None, + message_format=packets.PACKET_TYPE_MESSAGE, +): + packet = { + "from": fromcall, + "addresse": tocall, + "format": message_format, + } + if message: + packet["message_text"] = message + + if msg_number: + packet["msgNo"] = msg_number + + return packet + + +class FakeBaseNoThreadsPlugin(plugin.APRSDPluginBase): + version = "1.0" + + def filter(self, packet): + return None + + def process(self, packet): + return "process" + + +class FakeThread(threads.APRSDThread): + def __init__(self): + super().__init__("FakeThread") + + def loop(self): + return True + + +class FakeBaseThreadsPlugin(plugin.APRSDPluginBase): + version = "1.0" + + def filter(self, packet): + return None + + def process(self, packet): + return "process" + + def create_threads(self): + return FakeThread() + + +class FakeRegexCommandPlugin(plugin.APRSDRegexCommandPluginBase): + version = "1.0" + command_regex = "^[fF]" + command_name = "fake" + + def process(self, packet): + return FAKE_MESSAGE_TEXT diff --git a/tests/test_email.py b/tests/test_email.py index 08a35e2..6a4532f 100644 --- a/tests/test_email.py +++ b/tests/test_email.py @@ -1,6 +1,6 @@ import unittest -from aprsd import email +from aprsd.plugins import email class TestEmail(unittest.TestCase): diff --git a/tests/test_main.py b/tests/test_main.py index a140131..bd62b55 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,7 +1,7 @@ import sys import unittest -from aprsd import email +from aprsd.plugins import email if sys.version_info >= (3, 2): @@ -11,8 +11,8 @@ else: class TestMain(unittest.TestCase): - @mock.patch("aprsd.email._imap_connect") - @mock.patch("aprsd.email._smtp_connect") + @mock.patch("aprsd.plugins.email._imap_connect") + @mock.patch("aprsd.plugins.email._smtp_connect") def test_validate_email(self, imap_mock, smtp_mock): """Test to make sure we fail.""" imap_mock.return_value = None diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 8ee2cd7..7a5d575 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -4,7 +4,7 @@ from unittest import mock import pytz import aprsd -from aprsd import messaging, stats, utils +from aprsd import messaging, packets, stats, utils from aprsd.fuzzyclock import fuzzy from aprsd.plugins import fortune as fortune_plugin from aprsd.plugins import ping as ping_plugin @@ -12,33 +12,123 @@ from aprsd.plugins import query as query_plugin from aprsd.plugins import time as time_plugin from aprsd.plugins import version as version_plugin +from . import fake + class TestPlugin(unittest.TestCase): def setUp(self): - self.fromcall = "KFART" + self.fromcall = fake.FAKE_FROM_CALLSIGN self.ack = 1 self.config = utils.DEFAULT_CONFIG_DICT self.config["ham"]["callsign"] = self.fromcall + self.config["aprs"]["login"] = fake.FAKE_TO_CALLSIGN # Inintialize the stats object with the config stats.APRSDStats(self.config) - def fake_packet(self, fromcall="KFART", message=None, msg_number=None): - packet = {"from": fromcall} - if message: - packet["message_text"] = message + @mock.patch.object(fake.FakeBaseNoThreadsPlugin, "process") + def test_base_plugin_no_threads(self, mock_process): + p = fake.FakeBaseNoThreadsPlugin(self.config) - if msg_number: - packet["msgNo"] = msg_number + expected = [] + actual = p.create_threads() + self.assertEqual(expected, actual) - return packet + expected = "1.0" + actual = p.version + self.assertEqual(expected, actual) + expected = 0 + actual = p.message_counter + self.assertEqual(expected, actual) + + expected = None + actual = p.filter(fake.fake_packet()) + self.assertEqual(expected, actual) + mock_process.assert_not_called() + + @mock.patch.object(fake.FakeBaseThreadsPlugin, "create_threads") + def test_base_plugin_threads_created(self, mock_create): + fake.FakeBaseThreadsPlugin(self.config) + mock_create.assert_called_once() + + def test_base_plugin_threads(self): + p = fake.FakeBaseThreadsPlugin(self.config) + actual = p.create_threads() + self.assertTrue(isinstance(actual, fake.FakeThread)) + p.stop_threads() + + @mock.patch.object(fake.FakeRegexCommandPlugin, "process") + def test_regex_base_not_called(self, mock_process): + p = fake.FakeRegexCommandPlugin(self.config) + packet = fake.fake_packet(message="a") + expected = None + actual = p.filter(packet) + self.assertEqual(expected, actual) + mock_process.assert_not_called() + + packet = fake.fake_packet(tocall="notMe", message="f") + expected = None + actual = p.filter(packet) + self.assertEqual(expected, actual) + mock_process.assert_not_called() + + packet = fake.fake_packet( + message="F", + message_format=packets.PACKET_TYPE_MICE, + ) + expected = None + actual = p.filter(packet) + self.assertEqual(expected, actual) + mock_process.assert_not_called() + + packet = fake.fake_packet( + message="f", + message_format=packets.PACKET_TYPE_ACK, + ) + expected = None + actual = p.filter(packet) + self.assertEqual(expected, actual) + mock_process.assert_not_called() + + @mock.patch.object(fake.FakeRegexCommandPlugin, "process") + def test_regex_base_assert_called(self, mock_process): + p = fake.FakeRegexCommandPlugin(self.config) + packet = fake.fake_packet(message="f") + p.filter(packet) + mock_process.assert_called_once() + + def test_regex_base_process_called(self): + p = fake.FakeRegexCommandPlugin(self.config) + + packet = fake.fake_packet(message="f") + expected = fake.FAKE_MESSAGE_TEXT + actual = p.filter(packet) + self.assertEqual(expected, actual) + + packet = fake.fake_packet(message="F") + expected = fake.FAKE_MESSAGE_TEXT + actual = p.filter(packet) + self.assertEqual(expected, actual) + + packet = fake.fake_packet(message="fake") + expected = fake.FAKE_MESSAGE_TEXT + actual = p.filter(packet) + self.assertEqual(expected, actual) + + packet = fake.fake_packet(message="FAKE") + expected = fake.FAKE_MESSAGE_TEXT + actual = p.filter(packet) + self.assertEqual(expected, actual) + + +class TestFortunePlugin(TestPlugin): @mock.patch("shutil.which") def test_fortune_fail(self, mock_which): fortune = fortune_plugin.FortunePlugin(self.config) mock_which.return_value = None expected = "Fortune command not installed" - packet = self.fake_packet(message="fortune") - actual = fortune.run(packet) + packet = fake.fake_packet(message="fortune") + actual = fortune.filter(packet) self.assertEqual(expected, actual) @mock.patch("subprocess.check_output") @@ -50,17 +140,19 @@ class TestPlugin(unittest.TestCase): mock_output.return_value = "Funny fortune" expected = "Funny fortune" - packet = self.fake_packet(message="fortune") - actual = fortune.run(packet) + packet = fake.fake_packet(message="fortune") + actual = fortune.filter(packet) self.assertEqual(expected, actual) + +class TestQueryPlugin(TestPlugin): @mock.patch("aprsd.messaging.MsgTrack.flush") def test_query_flush(self, mock_flush): - packet = self.fake_packet(message="!delete") + packet = fake.fake_packet(message="!delete") query = query_plugin.QueryPlugin(self.config) expected = "Deleted ALL pending msgs." - actual = query.run(packet) + actual = query.filter(packet) mock_flush.assert_called_once() self.assertEqual(expected, actual) @@ -68,11 +160,11 @@ class TestPlugin(unittest.TestCase): def test_query_restart_delayed(self, mock_restart): track = messaging.MsgTrack() track.track = {} - packet = self.fake_packet(message="!4") + packet = fake.fake_packet(message="!4") query = query_plugin.QueryPlugin(self.config) expected = "No pending msgs to resend" - actual = query.run(packet) + actual = query.filter(packet) mock_restart.assert_not_called() self.assertEqual(expected, actual) mock_restart.reset_mock() @@ -80,9 +172,11 @@ class TestPlugin(unittest.TestCase): # add a message msg = messaging.TextMessage(self.fromcall, "testing", self.ack) track.add(msg) - actual = query.run(packet) + actual = query.filter(packet) mock_restart.assert_called_once() + +class TestTimePlugins(TestPlugin): @mock.patch("aprsd.plugins.time.TimePlugin._get_local_tz") @mock.patch("aprsd.plugins.time.TimePlugin._get_utcnow") def test_time(self, mock_utcnow, mock_localtz): @@ -100,19 +194,17 @@ class TestPlugin(unittest.TestCase): fake_time.tm_sec = 13 time = time_plugin.TimePlugin(self.config) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="location", msg_number=1, ) - actual = time.run(packet) + actual = time.filter(packet) self.assertEqual(None, actual) cur_time = fuzzy(h, m, 1) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="time", msg_number=1, ) @@ -121,9 +213,11 @@ class TestPlugin(unittest.TestCase): cur_time, local_short_str, ) - actual = time.run(packet) + actual = time.filter(packet) self.assertEqual(expected, actual) + +class TestPingPlugin(TestPlugin): @mock.patch("time.localtime") def test_ping(self, mock_time): fake_time = mock.MagicMock() @@ -134,13 +228,12 @@ class TestPlugin(unittest.TestCase): ping = ping_plugin.PingPlugin(self.config) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="location", msg_number=1, ) - result = ping.run(packet) + result = ping.filter(packet) self.assertEqual(None, result) def ping_str(h, m, s): @@ -153,49 +246,46 @@ class TestPlugin(unittest.TestCase): + str(s).zfill(2) ) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="Ping", msg_number=1, ) - actual = ping.run(packet) + actual = ping.filter(packet) expected = ping_str(h, m, s) self.assertEqual(expected, actual) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="ping", msg_number=1, ) - actual = ping.run(packet) + actual = ping.filter(packet) self.assertEqual(expected, actual) - @mock.patch("aprsd.plugin.PluginManager.get_msg_plugins") + +class TestVersionPlugin(TestPlugin): + @mock.patch("aprsd.plugin.PluginManager.get_plugins") def test_version(self, mock_get_plugins): - expected = f"APRSD ver:{aprsd.__version__} uptime:0:0:0" + expected = f"APRSD ver:{aprsd.__version__} uptime:00:00:00" version = version_plugin.VersionPlugin(self.config) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="No", msg_number=1, ) - actual = version.run(packet) + actual = version.filter(packet) self.assertEqual(None, actual) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="version", msg_number=1, ) - actual = version.run(packet) + actual = version.filter(packet) self.assertEqual(expected, actual) - packet = self.fake_packet( - fromcall="KFART", + packet = fake.fake_packet( message="Version", msg_number=1, ) - actual = version.run(packet) + actual = version.filter(packet) self.assertEqual(expected, actual) diff --git a/tox.ini b/tox.ini index 0a415d1..95e1349 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,7 @@ minversion = 2.9.0 skipdist = True skip_missing_interpreters = true -envlist = pre-commit,pep8,fmt-check,py{36,37,38} +envlist = pre-commit,pep8,py{36,37,38} # Activate isolated build environment. tox will use a virtual environment # to build a source distribution from the source tree. For build tools and