diff --git a/aprsd/client/aprsis.py b/aprsd/client/aprsis.py index d6ad9ec..9773f7f 100644 --- a/aprsd/client/aprsis.py +++ b/aprsd/client/aprsis.py @@ -3,7 +3,9 @@ import logging import time from aprslib.exceptions import LoginError +from loguru import logger from oslo_config import cfg +import timeago from aprsd import client, exception from aprsd.client import base @@ -13,11 +15,13 @@ from aprsd.packets import core CONF = cfg.CONF LOG = logging.getLogger("APRSD") +LOGU = logger class APRSISClient(base.APRSClient): _client = None + _checks = False def __init__(self): max_timeout = {"hours": 0.0, "minutes": 2, "seconds": 0} @@ -45,6 +49,20 @@ class APRSISClient(base.APRSClient): return stats + def keepalive_check(self): + # Don't check the first time through. + if not self.is_alive() and self._checks: + LOG.warning("Resetting client. It's not alive.") + self.reset() + self._checks = True + + def keepalive_log(self): + if ka := self._client.aprsd_keepalive: + keepalive = timeago.format(ka) + else: + keepalive = "N/A" + LOGU.opt(colors=True).info(f"Client keepalive {keepalive}") + @staticmethod def is_enabled(): # Defaults to True if the enabled flag is non existent @@ -81,14 +99,13 @@ class APRSISClient(base.APRSClient): if delta > self.max_delta: LOG.error(f"Connection is stale, last heard {delta} ago.") return True + return False def is_alive(self): - if self._client: - return self._client.is_alive() and not self._is_stale_connection() - else: + if not self._client: LOG.warning(f"APRS_CLIENT {self._client} alive? NO!!!") return False - + return self._client.is_alive() and not self._is_stale_connection() def close(self): if self._client: self._client.stop() diff --git a/aprsd/client/base.py b/aprsd/client/base.py index 8470546..c0bfbe8 100644 --- a/aprsd/client/base.py +++ b/aprsd/client/base.py @@ -6,6 +6,7 @@ from oslo_config import cfg import wrapt from aprsd.packets import core +from aprsd.threads import keepalive_collector CONF = cfg.CONF @@ -30,6 +31,7 @@ class APRSClient: """This magic turns this into a singleton.""" if cls._instance is None: cls._instance = super().__new__(cls) + keepalive_collector.KeepAliveCollector().register(cls) # Put any initialization here. cls._instance._create_client() return cls._instance @@ -42,6 +44,16 @@ class APRSClient: dict: Statistics about the connection and packet handling """ + @abc.abstractmethod + def keepalive_check(self) -> None: + """Called during keepalive run to check status.""" + ... + + @abc.abstractmethod + def keepalive_log(self) -> None: + """Log any keepalive information.""" + ... + @property def is_connected(self): return self.connected diff --git a/aprsd/client/kiss.py b/aprsd/client/kiss.py index d413aa1..eaa546e 100644 --- a/aprsd/client/kiss.py +++ b/aprsd/client/kiss.py @@ -2,7 +2,9 @@ import datetime import logging import aprslib +from loguru import logger from oslo_config import cfg +import timeago from aprsd import client, exception from aprsd.client import base @@ -12,6 +14,7 @@ from aprsd.packets import core CONF = cfg.CONF LOG = logging.getLogger("APRSD") +LOGU = logger class KISSClient(base.APRSClient): @@ -79,6 +82,20 @@ class KISSClient(base.APRSClient): if self._client: self._client.stop() + def keepalive_check(self): + # Don't check the first time through. + if not self.is_alive() and self._checks: + LOG.warning("Resetting client. It's not alive.") + self.reset() + self._checks = True + + def keepalive_log(self): + if ka := self._client.aprsd_keepalive: + keepalive = timeago.format(ka) + else: + keepalive = "N/A" + LOGU.opt(colors=True).info(f"Client keepalive {keepalive}") + @staticmethod def transport(): if CONF.kiss_serial.enabled: diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index fc18e64..17f49e4 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -23,7 +23,7 @@ from aprsd.packets import collector as packet_collector from aprsd.packets import log as packet_log from aprsd.packets import seen_list from aprsd.stats import collector -from aprsd.threads import keep_alive, rx +from aprsd.threads import keepalive, rx from aprsd.threads import stats as stats_thread from aprsd.threads.aprsd import APRSDThread @@ -126,7 +126,7 @@ class ListenStatsThread(APRSDThread): thread_hex = f"fg {utils.hex_from_name(k)}" LOGU.opt(colors=True).info( f"<{thread_hex}>{k:<15} " - f"RX: {v['rx']} TX: {v['tx']}", + f"RX: {v["rx"]} TX: {v["tx"]}", ) time.sleep(1) @@ -265,7 +265,7 @@ def listen( LOG.debug(f"Filter by '{filter}'") aprs_client.set_filter(filter) - keepalive = keep_alive.KeepAliveThread() + keepalive = keepalive.KeepAliveThread() if not CONF.enable_seen_list: # just deregister the class from the packet collector diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py index 0732a8d..fc0bce9 100644 --- a/aprsd/cmds/server.py +++ b/aprsd/cmds/server.py @@ -14,7 +14,7 @@ from aprsd.main import cli from aprsd.packets import collector as packet_collector from aprsd.packets import seen_list from aprsd.threads import aprsd as aprsd_threads -from aprsd.threads import keep_alive, registry, rx +from aprsd.threads import keepalive, registry, rx from aprsd.threads import stats as stats_thread from aprsd.threads import tx from aprsd.utils import singleton @@ -146,7 +146,7 @@ def server(ctx, flush): # Now start all the main processing threads. - server_threads.register(keep_alive.KeepAliveThread()) + server_threads.register(keepalive.KeepAliveThread()) server_threads.register(stats_thread.APRSDStatsStoreThread()) server_threads.register( rx.APRSDPluginRXThread( diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keepalive.py similarity index 61% rename from aprsd/threads/keep_alive.py rename to aprsd/threads/keepalive.py index a47e4a5..d4f53ca 100644 --- a/aprsd/threads/keep_alive.py +++ b/aprsd/threads/keepalive.py @@ -5,13 +5,11 @@ import tracemalloc from loguru import logger from oslo_config import cfg -import timeago from aprsd import packets, utils -from aprsd.client import client_factory from aprsd.log import log as aprsd_log from aprsd.stats import collector -from aprsd.threads import APRSDThread, APRSDThreadList +from aprsd.threads import APRSDThread, APRSDThreadList, keepalive_collector CONF = cfg.CONF @@ -36,15 +34,6 @@ class KeepAliveThread(APRSDThread): thread_list = APRSDThreadList() now = datetime.datetime.now() - if "EmailStats" in stats_json: - email_stats = stats_json["EmailStats"] - if email_stats.get("last_check_time"): - email_thread_time = utils.strfdelta(now - email_stats["last_check_time"]) - else: - email_thread_time = "N/A" - else: - email_thread_time = "N/A" - if "APRSClientStats" in stats_json and stats_json["APRSClientStats"].get("transport") == "aprsis": if stats_json["APRSClientStats"].get("server_keepalive"): last_msg_time = utils.strfdelta(now - stats_json["APRSClientStats"]["server_keepalive"]) @@ -64,7 +53,7 @@ class KeepAliveThread(APRSDThread): keepalive = ( "{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} " - "Last:{} Email: {} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}" + "Last:{} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}" ).format( stats_json["APRSDStats"]["callsign"], stats_json["APRSDStats"]["uptime"], @@ -74,7 +63,6 @@ class KeepAliveThread(APRSDThread): tx_msg, rx_msg, last_msg_time, - email_thread_time, stats_json["APRSDStats"]["memory_current_str"], stats_json["APRSDStats"]["memory_peak_str"], len(thread_list), @@ -97,35 +85,11 @@ class KeepAliveThread(APRSDThread): LOGU.opt(colors=True).info(thread_msg) # LOG.info(f"{key: <15} Alive? {str(alive): <5} {str(age): <20}") - # check the APRS connection - cl = client_factory.create() - cl_stats = cl.stats() - ka = cl_stats.get("connection_keepalive", None) - if ka: - keepalive = timeago.format(ka) - else: - keepalive = "N/A" - LOGU.opt(colors=True).info(f"Client keepalive {keepalive}") - # Reset the connection if it's dead and this isn't our - # First time through the loop. - # The first time through the loop can happen at startup where - # The keepalive thread starts before the client has a chance - # to make it's connection the first time. - if not cl.is_alive() and self.cntr > 0: - LOG.error(f"{cl.__class__.__name__} is not alive!!! Resetting") - client_factory.create().reset() - # else: - # # See if we should reset the aprs-is client - # # Due to losing a keepalive from them - # delta_dict = utils.parse_delta_str(last_msg_time) - # delta = datetime.timedelta(**delta_dict) - # - # if delta > self.max_delta: - # # We haven't gotten a keepalive from aprs-is in a while - # # reset the connection.a - # if not client.KISSClient.is_enabled(): - # LOG.warning(f"Resetting connection to APRS-IS {delta}") - # client.factory.create().reset() + # Go through the registered keepalive collectors + # and check them as well as call log. + collect = keepalive_collector.KeepAliveCollector() + collect.check() + collect.log() # Check version every day delta = now - self.checker_time diff --git a/aprsd/threads/keepalive_collector.py b/aprsd/threads/keepalive_collector.py new file mode 100644 index 0000000..52dd0b0 --- /dev/null +++ b/aprsd/threads/keepalive_collector.py @@ -0,0 +1,54 @@ +import logging +from typing import Callable, Protocol, runtime_checkable + +from aprsd.utils import singleton + + +LOG = logging.getLogger("APRSD") + + +@runtime_checkable +class KeepAliveProducer(Protocol): + """The KeepAliveProducer protocol is used to define the interface for running Keepalive checks.""" + def keepalive_check(self) -> dict: + """Check for keepalive.""" + ... + + def keepalive_log(self): + """Log any keepalive information.""" + ... + + +@singleton +class KeepAliveCollector: + """The Collector class is used to collect stats from multiple StatsProducer instances.""" + def __init__(self): + self.producers: list[Callable] = [] + + def check(self) -> None: + """Do any keepalive checks.""" + for name in self.producers: + cls = name() + try: + cls.keepalive_check() + except Exception as e: + LOG.error(f"Error in producer {name} (check): {e}") + + def log(self) -> None: + """Log any relevant information during a KeepAlive check""" + for name in self.producers: + cls = name() + try: + cls.keepalive_log() + except Exception as e: + LOG.error(f"Error in producer {name} (check): {e}") + + def register(self, producer_name: Callable): + if not isinstance(producer_name, KeepAliveProducer): + raise TypeError(f"Producer {producer_name} is not a KeepAliveProducer") + self.producers.append(producer_name) + + def unregister(self, producer_name: Callable): + if not isinstance(producer_name, KeepAliveProducer): + raise TypeError(f"Producer {producer_name} is not a KeepAliveProducer") + self.producers.remove(producer_name)