From 4f87d5da12a5588d943b0e3644d57a93e26ccafe Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 28 Sep 2023 12:19:18 -0400 Subject: [PATCH] rewrote packet_list and drop dupe packets This patch rewrites the packet_list internally to be a dictionary instead of a list for very fast lookups. This was needed to test for duplicate packets already in the list. This patch drops packets that have the same data and are < 60 seconds in age from the last time we got the packet. On RF based clients we can get dupes!! --- aprsd/client.py | 36 ++++++++++++++++++++++- aprsd/clients/fake.py | 49 +++++++++++++++++++++++++++++++ aprsd/conf/client.py | 17 +++++++++++ aprsd/packets/core.py | 29 ++++++++++-------- aprsd/packets/packet_list.py | 57 ++++++++++++++++++++++++++---------- aprsd/threads/rx.py | 18 +++++++++--- 6 files changed, 173 insertions(+), 33 deletions(-) create mode 100644 aprsd/clients/fake.py diff --git a/aprsd/client.py b/aprsd/client.py index 7bc4f02..bca92c5 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -7,7 +7,7 @@ from aprslib.exceptions import LoginError from oslo_config import cfg from aprsd import exception -from aprsd.clients import aprsis, kiss +from aprsd.clients import aprsis, fake, kiss from aprsd.packets import core, packet_list from aprsd.utils import trace @@ -17,6 +17,7 @@ LOG = logging.getLogger("APRSD") TRANSPORT_APRSIS = "aprsis" TRANSPORT_TCPKISS = "tcpkiss" TRANSPORT_SERIALKISS = "serialkiss" +TRANSPORT_FAKE = "fake" # Main must create this from the ClientFactory # object such that it's populated with the @@ -248,6 +249,35 @@ class KISSClient(Client, metaclass=trace.TraceWrapperMetaclass): return self._client +class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass): + + @staticmethod + def is_enabled(): + if CONF.fake_client.enabled: + return True + return False + + @staticmethod + def is_configured(): + return APRSDFakeClient.is_enabled() + + def is_alive(self): + return True + + def setup_connection(self): + return fake.APRSDFakeClient() + + @staticmethod + def transport(): + return TRANSPORT_FAKE + + def decode_packet(self, *args, **kwargs): + LOG.debug(f"kwargs {kwargs}") + pkt = kwargs["packet"] + LOG.debug(f"Got an APRS Fake Packet '{pkt}'") + return pkt + + class ClientFactory: _instance = None @@ -270,8 +300,11 @@ class ClientFactory: key = TRANSPORT_APRSIS elif KISSClient.is_enabled(): key = KISSClient.transport() + elif APRSDFakeClient.is_enabled(): + key = TRANSPORT_FAKE builder = self._builders.get(key) + LOG.debug(f"Creating client {key}") if not builder: raise ValueError(key) return builder() @@ -312,3 +345,4 @@ class ClientFactory: factory.register(TRANSPORT_APRSIS, APRSISClient) factory.register(TRANSPORT_TCPKISS, KISSClient) factory.register(TRANSPORT_SERIALKISS, KISSClient) + factory.register(TRANSPORT_FAKE, APRSDFakeClient) diff --git a/aprsd/clients/fake.py b/aprsd/clients/fake.py new file mode 100644 index 0000000..87e8d0b --- /dev/null +++ b/aprsd/clients/fake.py @@ -0,0 +1,49 @@ +import logging +import threading +import time + +from oslo_config import cfg +import wrapt + +from aprsd import conf # noqa +from aprsd.packets import core +from aprsd.utils import trace + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +class APRSDFakeClient(metaclass=trace.TraceWrapperMetaclass): + '''Fake client for testing.''' + + # flag to tell us to stop + thread_stop = False + + lock = threading.Lock() + + def stop(self): + self.thread_stop = True + LOG.info("Shutdown APRSDFakeClient client.") + + def is_alive(self): + """If the connection is alive or not.""" + return not self.thread_stop + + @wrapt.synchronized(lock) + def send(self, packet: core.Packet): + """Send an APRS Message object.""" + LOG.info(f"Sending packet: {packet}") + + def consumer(self, callback, blocking=False, immortal=False, raw=False): + LOG.debug("Start non blocking FAKE consumer") + # Generate packets here? + pkt = core.MessagePacket( + from_call="N0CALL", + to_call=CONF.callsign, + message_text="Hello World", + msgNo=13, + ) + callback(packet=pkt) + LOG.debug(f"END blocking FAKE consumer {self}") + time.sleep(8) diff --git a/aprsd/conf/client.py b/aprsd/conf/client.py index c752f16..cb77e09 100644 --- a/aprsd/conf/client.py +++ b/aprsd/conf/client.py @@ -19,6 +19,11 @@ kiss_tcp_group = cfg.OptGroup( name="kiss_tcp", title="KISS TCP/IP Device connection", ) + +fake_client_group = cfg.OptGroup( + name="fake_client", + title="Fake Client settings", +) aprs_opts = [ cfg.BoolOpt( "enabled", @@ -84,6 +89,14 @@ kiss_tcp_opts = [ ), ] +fake_client_opts = [ + cfg.BoolOpt( + "enabled", + default=False, + help="Enable fake client connection.", + ), +] + def register_opts(config): config.register_group(aprs_group) @@ -93,10 +106,14 @@ def register_opts(config): config.register_opts(kiss_serial_opts, group=kiss_serial_group) config.register_opts(kiss_tcp_opts, group=kiss_tcp_group) + config.register_group(fake_client_group) + config.register_opts(fake_client_opts, group=fake_client_group) + def list_opts(): return { aprs_group.name: aprs_opts, kiss_serial_group.name: kiss_serial_opts, kiss_tcp_group.name: kiss_tcp_opts, + fake_client_group.name: fake_client_opts, } diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index beda33d..a95f658 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -29,11 +29,10 @@ PACKET_TYPE_THIRDPARTY = "thirdparty" PACKET_TYPE_UNCOMPRESSED = "uncompressed" -def _int_timestamp(): +def _init_timestamp(): """Build a unix style timestamp integer""" return int(round(time.time())) - def _init_msgNo(): # noqa: N802 """For some reason __post__init doesn't get called. @@ -45,7 +44,7 @@ def _init_msgNo(): # noqa: N802 return c.value -@dataclass +@dataclass(unsafe_hash=True) class Packet(metaclass=abc.ABCMeta): from_call: str to_call: str @@ -53,11 +52,11 @@ class Packet(metaclass=abc.ABCMeta): format: str = None msgNo: str = field(default_factory=_init_msgNo) # noqa: N815 packet_type: str = None - timestamp: float = field(default_factory=_int_timestamp) + timestamp: float = field(default_factory=_init_timestamp) # Holds the raw text string to be sent over the wire # or holds the raw string from input packet raw: str = None - raw_dict: dict = field(repr=False, default_factory=lambda: {}) + raw_dict: dict = field(repr=False, default_factory=lambda: {}, compare=False) # Built by calling prepare(). raw needs this built first. payload: str = None @@ -89,8 +88,12 @@ class Packet(metaclass=abc.ABCMeta): else: return default + def key(self): + """Build a key for finding this packet in a dict.""" + return f"{self.from_call}:{self.addresse}:{self.msgNo}" + def update_timestamp(self): - self.timestamp = _int_timestamp() + self.timestamp = _init_timestamp() def prepare(self): """Do stuff here that is needed prior to sending over the air.""" @@ -258,16 +261,16 @@ class Packet(metaclass=abc.ABCMeta): return repr -@dataclass +@dataclass(unsafe_hash=True) class PathPacket(Packet): - path: List[str] = field(default_factory=list) + path: List[str] = field(default_factory=list, compare=False) via: str = None def _build_payload(self): raise NotImplementedError -@dataclass +@dataclass(unsafe_hash=True) class AckPacket(PathPacket): response: str = None @@ -279,7 +282,7 @@ class AckPacket(PathPacket): self.payload = f":{self.to_call.ljust(9)}:ack{self.msgNo}" -@dataclass +@dataclass(unsafe_hash=True) class RejectPacket(PathPacket): response: str = None @@ -291,7 +294,7 @@ class RejectPacket(PathPacket): self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}" -@dataclass +@dataclass(unsafe_hash=True) class MessagePacket(PathPacket): message_text: str = None @@ -313,7 +316,7 @@ class MessagePacket(PathPacket): ) -@dataclass +@dataclass(unsafe_hash=True) class StatusPacket(PathPacket): status: str = None messagecapable: bool = False @@ -323,7 +326,7 @@ class StatusPacket(PathPacket): raise NotImplementedError -@dataclass +@dataclass(unsafe_hash=True) class GPSPacket(PathPacket): latitude: float = 0.00 longitude: float = 0.00 diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index d42e9de..e44e02e 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -1,10 +1,11 @@ +from collections import MutableMapping, OrderedDict import logging import threading from oslo_config import cfg import wrapt -from aprsd import stats, utils +from aprsd import stats from aprsd.packets import seen_list @@ -12,31 +13,24 @@ CONF = cfg.CONF LOG = logging.getLogger("APRSD") -class PacketList: - """Class to track all of the packets rx'd and tx'd by aprsd.""" - +class PacketList(MutableMapping): _instance = None lock = threading.Lock() - - packet_list: utils.RingBuffer = utils.RingBuffer(1000) - _total_rx: int = 0 _total_tx: int = 0 def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) + cls._maxlen = 1000 + cls.d = OrderedDict() return cls._instance - @wrapt.synchronized(lock) - def __iter__(self): - return iter(self.packet_list) - @wrapt.synchronized(lock) def rx(self, packet): """Add a packet that was received.""" self._total_rx += 1 - self.packet_list.append(packet) + self._add(packet) seen_list.SeenList().update_seen(packet) stats.APRSDStats().rx(packet) @@ -44,13 +38,46 @@ class PacketList: def tx(self, packet): """Add a packet that was received.""" self._total_tx += 1 - self.packet_list.append(packet) + self._add(packet) seen_list.SeenList().update_seen(packet) stats.APRSDStats().tx(packet) @wrapt.synchronized(lock) - def get(self): - return self.packet_list.get() + def add(self, packet): + self._add(packet) + + def _add(self, packet): + key = packet.key() + self[key] = packet + + @property + def maxlen(self): + return self._maxlen + + @wrapt.synchronized(lock) + def find(self, packet): + key = packet.key() + return self.get(key) + + def __getitem__(self, key): + #self.d.move_to_end(key) + return self.d[key] + + def __setitem__(self, key, value): + if key in self.d: + self.d.move_to_end(key) + elif len(self.d) == self.maxlen: + self.d.popitem(last=False) + self.d[key] = value + + def __delitem__(self, key): + del self.d[key] + + def __iter__(self): + return self.d.__iter__() + + def __len__(self): + return len(self.d) @wrapt.synchronized(lock) def total_rx(self): diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 750d944..983bb07 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -70,16 +70,26 @@ class APRSDPluginRXThread(APRSDRXThread): packet = self._client.decode_packet(*args, **kwargs) # LOG.debug(raw) packet.log(header="RX") - tracked = packets.PacketTrack().get(packet.msgNo) - if not tracked: + found = False + pkt_list = packets.PacketList() + try: + found = pkt_list.find(packet) + except KeyError: + found = False + + if not found: # If we are in the process of already ack'ing # a packet, we should drop the packet # because it's a dupe within the time that # we send the 3 acks for the packet. - packets.PacketList().rx(packet) + pkt_list.rx(packet) self.packet_queue.put(packet) + elif packet.timestamp - found.timestamp < 60: + LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.") else: - LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.") + LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked but older than 60 seconds. processing.") + pkt_list.rx(packet) + self.packet_queue.put(packet) class APRSDProcessPacketThread(APRSDThread):