From 1f6c55d2bf125b08c1a56106241c24aa9116cea3 Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 27 Sep 2023 14:55:47 -0400 Subject: [PATCH 1/9] Fix for dupe packets. Sometimes over KISS clients (RF), we can get duplicate packets due to having many digipeters in range of the TNC that aprsd is connected to. We will now filter out any dupe packets that aprsd is still in the process of doing it's 3 acks. --- ChangeLog | 1 + aprsd/packets/tracker.py | 11 +++++------ aprsd/threads/rx.py | 10 ++++++++-- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/ChangeLog b/ChangeLog index 8ad4952..827ad9c 100644 --- a/ChangeLog +++ b/ChangeLog @@ -4,6 +4,7 @@ CHANGES v3.2.0 ------ +* Update Changelog for 3.2.0 * minor cleanup prior to release * Webchat: fix input maxlength * WebChat: cleanup some console.logs diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index e62706a..ac5d4b4 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -72,18 +72,17 @@ class PacketTrack(objectstore.ObjectStoreMixin): @wrapt.synchronized(lock) def add(self, packet): - key = int(packet.msgNo) + key = packet.msgNo self.data[key] = packet self.total_tracked += 1 @wrapt.synchronized(lock) - def get(self, id): - if id in self.data: - return self.data[id] + def get(self, key): + if key in self.data: + return self.data[key] @wrapt.synchronized(lock) - def remove(self, id): - key = int(id) + def remove(self, key): if key in self.data.keys(): del self.data[key] diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 8387d16..2ee125f 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -70,8 +70,14 @@ class APRSDPluginRXThread(APRSDRXThread): packet = self._client.decode_packet(*args, **kwargs) # LOG.debug(raw) packet.log(header="RX") - packets.PacketList().rx(packet) - self.packet_queue.put(packet) + tracked = packets.PacketTrack().get(packet.msgNo) + if not tracked: + # If we are in the process of already ack'ing + # a packet, we should drop the packet + # because it's a dupe within the time that + # we send the 3 acks for the packet. + packets.PacketList().rx(packet) + self.packet_queue.put(packet) class APRSDProcessPacketThread(APRSDThread): From 0d7e50d2ba0ba55ae82a0c8df98bb0e7c283cdd8 Mon Sep 17 00:00:00 2001 From: Hemna Date: Wed, 27 Sep 2023 15:45:39 -0400 Subject: [PATCH 2/9] Log a warning on dupe This patch logs a warning if we detect a dupe packet inbound. --- aprsd/threads/rx.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 2ee125f..750d944 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -78,6 +78,8 @@ class APRSDPluginRXThread(APRSDRXThread): # we send the 3 acks for the packet. packets.PacketList().rx(packet) self.packet_queue.put(packet) + else: + LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.") class APRSDProcessPacketThread(APRSDThread): From 4f87d5da12a5588d943b0e3644d57a93e26ccafe Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 28 Sep 2023 12:19:18 -0400 Subject: [PATCH 3/9] rewrote packet_list and drop dupe packets This patch rewrites the packet_list internally to be a dictionary instead of a list for very fast lookups. This was needed to test for duplicate packets already in the list. This patch drops packets that have the same data and are < 60 seconds in age from the last time we got the packet. On RF based clients we can get dupes!! --- aprsd/client.py | 36 ++++++++++++++++++++++- aprsd/clients/fake.py | 49 +++++++++++++++++++++++++++++++ aprsd/conf/client.py | 17 +++++++++++ aprsd/packets/core.py | 29 ++++++++++-------- aprsd/packets/packet_list.py | 57 ++++++++++++++++++++++++++---------- aprsd/threads/rx.py | 18 +++++++++--- 6 files changed, 173 insertions(+), 33 deletions(-) create mode 100644 aprsd/clients/fake.py diff --git a/aprsd/client.py b/aprsd/client.py index 7bc4f02..bca92c5 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -7,7 +7,7 @@ from aprslib.exceptions import LoginError from oslo_config import cfg from aprsd import exception -from aprsd.clients import aprsis, kiss +from aprsd.clients import aprsis, fake, kiss from aprsd.packets import core, packet_list from aprsd.utils import trace @@ -17,6 +17,7 @@ LOG = logging.getLogger("APRSD") TRANSPORT_APRSIS = "aprsis" TRANSPORT_TCPKISS = "tcpkiss" TRANSPORT_SERIALKISS = "serialkiss" +TRANSPORT_FAKE = "fake" # Main must create this from the ClientFactory # object such that it's populated with the @@ -248,6 +249,35 @@ class KISSClient(Client, metaclass=trace.TraceWrapperMetaclass): return self._client +class APRSDFakeClient(Client, metaclass=trace.TraceWrapperMetaclass): + + @staticmethod + def is_enabled(): + if CONF.fake_client.enabled: + return True + return False + + @staticmethod + def is_configured(): + return APRSDFakeClient.is_enabled() + + def is_alive(self): + return True + + def setup_connection(self): + return fake.APRSDFakeClient() + + @staticmethod + def transport(): + return TRANSPORT_FAKE + + def decode_packet(self, *args, **kwargs): + LOG.debug(f"kwargs {kwargs}") + pkt = kwargs["packet"] + LOG.debug(f"Got an APRS Fake Packet '{pkt}'") + return pkt + + class ClientFactory: _instance = None @@ -270,8 +300,11 @@ class ClientFactory: key = TRANSPORT_APRSIS elif KISSClient.is_enabled(): key = KISSClient.transport() + elif APRSDFakeClient.is_enabled(): + key = TRANSPORT_FAKE builder = self._builders.get(key) + LOG.debug(f"Creating client {key}") if not builder: raise ValueError(key) return builder() @@ -312,3 +345,4 @@ class ClientFactory: factory.register(TRANSPORT_APRSIS, APRSISClient) factory.register(TRANSPORT_TCPKISS, KISSClient) factory.register(TRANSPORT_SERIALKISS, KISSClient) + factory.register(TRANSPORT_FAKE, APRSDFakeClient) diff --git a/aprsd/clients/fake.py b/aprsd/clients/fake.py new file mode 100644 index 0000000..87e8d0b --- /dev/null +++ b/aprsd/clients/fake.py @@ -0,0 +1,49 @@ +import logging +import threading +import time + +from oslo_config import cfg +import wrapt + +from aprsd import conf # noqa +from aprsd.packets import core +from aprsd.utils import trace + + +CONF = cfg.CONF +LOG = logging.getLogger("APRSD") + + +class APRSDFakeClient(metaclass=trace.TraceWrapperMetaclass): + '''Fake client for testing.''' + + # flag to tell us to stop + thread_stop = False + + lock = threading.Lock() + + def stop(self): + self.thread_stop = True + LOG.info("Shutdown APRSDFakeClient client.") + + def is_alive(self): + """If the connection is alive or not.""" + return not self.thread_stop + + @wrapt.synchronized(lock) + def send(self, packet: core.Packet): + """Send an APRS Message object.""" + LOG.info(f"Sending packet: {packet}") + + def consumer(self, callback, blocking=False, immortal=False, raw=False): + LOG.debug("Start non blocking FAKE consumer") + # Generate packets here? + pkt = core.MessagePacket( + from_call="N0CALL", + to_call=CONF.callsign, + message_text="Hello World", + msgNo=13, + ) + callback(packet=pkt) + LOG.debug(f"END blocking FAKE consumer {self}") + time.sleep(8) diff --git a/aprsd/conf/client.py b/aprsd/conf/client.py index c752f16..cb77e09 100644 --- a/aprsd/conf/client.py +++ b/aprsd/conf/client.py @@ -19,6 +19,11 @@ kiss_tcp_group = cfg.OptGroup( name="kiss_tcp", title="KISS TCP/IP Device connection", ) + +fake_client_group = cfg.OptGroup( + name="fake_client", + title="Fake Client settings", +) aprs_opts = [ cfg.BoolOpt( "enabled", @@ -84,6 +89,14 @@ kiss_tcp_opts = [ ), ] +fake_client_opts = [ + cfg.BoolOpt( + "enabled", + default=False, + help="Enable fake client connection.", + ), +] + def register_opts(config): config.register_group(aprs_group) @@ -93,10 +106,14 @@ def register_opts(config): config.register_opts(kiss_serial_opts, group=kiss_serial_group) config.register_opts(kiss_tcp_opts, group=kiss_tcp_group) + config.register_group(fake_client_group) + config.register_opts(fake_client_opts, group=fake_client_group) + def list_opts(): return { aprs_group.name: aprs_opts, kiss_serial_group.name: kiss_serial_opts, kiss_tcp_group.name: kiss_tcp_opts, + fake_client_group.name: fake_client_opts, } diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index beda33d..a95f658 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -29,11 +29,10 @@ PACKET_TYPE_THIRDPARTY = "thirdparty" PACKET_TYPE_UNCOMPRESSED = "uncompressed" -def _int_timestamp(): +def _init_timestamp(): """Build a unix style timestamp integer""" return int(round(time.time())) - def _init_msgNo(): # noqa: N802 """For some reason __post__init doesn't get called. @@ -45,7 +44,7 @@ def _init_msgNo(): # noqa: N802 return c.value -@dataclass +@dataclass(unsafe_hash=True) class Packet(metaclass=abc.ABCMeta): from_call: str to_call: str @@ -53,11 +52,11 @@ class Packet(metaclass=abc.ABCMeta): format: str = None msgNo: str = field(default_factory=_init_msgNo) # noqa: N815 packet_type: str = None - timestamp: float = field(default_factory=_int_timestamp) + timestamp: float = field(default_factory=_init_timestamp) # Holds the raw text string to be sent over the wire # or holds the raw string from input packet raw: str = None - raw_dict: dict = field(repr=False, default_factory=lambda: {}) + raw_dict: dict = field(repr=False, default_factory=lambda: {}, compare=False) # Built by calling prepare(). raw needs this built first. payload: str = None @@ -89,8 +88,12 @@ class Packet(metaclass=abc.ABCMeta): else: return default + def key(self): + """Build a key for finding this packet in a dict.""" + return f"{self.from_call}:{self.addresse}:{self.msgNo}" + def update_timestamp(self): - self.timestamp = _int_timestamp() + self.timestamp = _init_timestamp() def prepare(self): """Do stuff here that is needed prior to sending over the air.""" @@ -258,16 +261,16 @@ class Packet(metaclass=abc.ABCMeta): return repr -@dataclass +@dataclass(unsafe_hash=True) class PathPacket(Packet): - path: List[str] = field(default_factory=list) + path: List[str] = field(default_factory=list, compare=False) via: str = None def _build_payload(self): raise NotImplementedError -@dataclass +@dataclass(unsafe_hash=True) class AckPacket(PathPacket): response: str = None @@ -279,7 +282,7 @@ class AckPacket(PathPacket): self.payload = f":{self.to_call.ljust(9)}:ack{self.msgNo}" -@dataclass +@dataclass(unsafe_hash=True) class RejectPacket(PathPacket): response: str = None @@ -291,7 +294,7 @@ class RejectPacket(PathPacket): self.payload = f":{self.to_call.ljust(9)} :rej{self.msgNo}" -@dataclass +@dataclass(unsafe_hash=True) class MessagePacket(PathPacket): message_text: str = None @@ -313,7 +316,7 @@ class MessagePacket(PathPacket): ) -@dataclass +@dataclass(unsafe_hash=True) class StatusPacket(PathPacket): status: str = None messagecapable: bool = False @@ -323,7 +326,7 @@ class StatusPacket(PathPacket): raise NotImplementedError -@dataclass +@dataclass(unsafe_hash=True) class GPSPacket(PathPacket): latitude: float = 0.00 longitude: float = 0.00 diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index d42e9de..e44e02e 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -1,10 +1,11 @@ +from collections import MutableMapping, OrderedDict import logging import threading from oslo_config import cfg import wrapt -from aprsd import stats, utils +from aprsd import stats from aprsd.packets import seen_list @@ -12,31 +13,24 @@ CONF = cfg.CONF LOG = logging.getLogger("APRSD") -class PacketList: - """Class to track all of the packets rx'd and tx'd by aprsd.""" - +class PacketList(MutableMapping): _instance = None lock = threading.Lock() - - packet_list: utils.RingBuffer = utils.RingBuffer(1000) - _total_rx: int = 0 _total_tx: int = 0 def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) + cls._maxlen = 1000 + cls.d = OrderedDict() return cls._instance - @wrapt.synchronized(lock) - def __iter__(self): - return iter(self.packet_list) - @wrapt.synchronized(lock) def rx(self, packet): """Add a packet that was received.""" self._total_rx += 1 - self.packet_list.append(packet) + self._add(packet) seen_list.SeenList().update_seen(packet) stats.APRSDStats().rx(packet) @@ -44,13 +38,46 @@ class PacketList: def tx(self, packet): """Add a packet that was received.""" self._total_tx += 1 - self.packet_list.append(packet) + self._add(packet) seen_list.SeenList().update_seen(packet) stats.APRSDStats().tx(packet) @wrapt.synchronized(lock) - def get(self): - return self.packet_list.get() + def add(self, packet): + self._add(packet) + + def _add(self, packet): + key = packet.key() + self[key] = packet + + @property + def maxlen(self): + return self._maxlen + + @wrapt.synchronized(lock) + def find(self, packet): + key = packet.key() + return self.get(key) + + def __getitem__(self, key): + #self.d.move_to_end(key) + return self.d[key] + + def __setitem__(self, key, value): + if key in self.d: + self.d.move_to_end(key) + elif len(self.d) == self.maxlen: + self.d.popitem(last=False) + self.d[key] = value + + def __delitem__(self, key): + del self.d[key] + + def __iter__(self): + return self.d.__iter__() + + def __len__(self): + return len(self.d) @wrapt.synchronized(lock) def total_rx(self): diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 750d944..983bb07 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -70,16 +70,26 @@ class APRSDPluginRXThread(APRSDRXThread): packet = self._client.decode_packet(*args, **kwargs) # LOG.debug(raw) packet.log(header="RX") - tracked = packets.PacketTrack().get(packet.msgNo) - if not tracked: + found = False + pkt_list = packets.PacketList() + try: + found = pkt_list.find(packet) + except KeyError: + found = False + + if not found: # If we are in the process of already ack'ing # a packet, we should drop the packet # because it's a dupe within the time that # we send the 3 acks for the packet. - packets.PacketList().rx(packet) + pkt_list.rx(packet) self.packet_queue.put(packet) + elif packet.timestamp - found.timestamp < 60: + LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.") else: - LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.") + LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked but older than 60 seconds. processing.") + pkt_list.rx(packet) + self.packet_queue.put(packet) class APRSDProcessPacketThread(APRSDThread): From 99a0f877f4b97dcc91cbf9552f72fe9e29ac9ba1 Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 28 Sep 2023 12:34:01 -0400 Subject: [PATCH 4/9] pep8 fixes --- aprsd/packets/core.py | 1 + aprsd/packets/packet_list.py | 2 +- aprsd/threads/rx.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index a95f658..a75820b 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -33,6 +33,7 @@ def _init_timestamp(): """Build a unix style timestamp integer""" return int(round(time.time())) + def _init_msgNo(): # noqa: N802 """For some reason __post__init doesn't get called. diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index e44e02e..c07c72c 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -60,7 +60,7 @@ class PacketList(MutableMapping): return self.get(key) def __getitem__(self, key): - #self.d.move_to_end(key) + # self.d.move_to_end(key) return self.d[key] def __setitem__(self, key, value): diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 983bb07..c718f3b 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -85,7 +85,7 @@ class APRSDPluginRXThread(APRSDRXThread): pkt_list.rx(packet) self.packet_queue.put(packet) elif packet.timestamp - found.timestamp < 60: - LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.") + LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.") else: LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked but older than 60 seconds. processing.") pkt_list.rx(packet) From f79b88ec1b44ea66572ee83ef040cb0f81d82549 Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 28 Sep 2023 15:30:54 -0400 Subject: [PATCH 5/9] Fixed import of Mutablemapping python 3.10 moved it to collections.abc --- aprsd/packets/packet_list.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index c07c72c..347cd51 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -1,4 +1,5 @@ -from collections import MutableMapping, OrderedDict +from collections import OrderedDict +from collections.abc import MutableMapping import logging import threading From 9bdfd166fd213983ddd55e57b726bb8fc7eba8b7 Mon Sep 17 00:00:00 2001 From: Hemna Date: Fri, 29 Sep 2023 10:04:15 -0400 Subject: [PATCH 6/9] Fixed issue with packet tracker and msgNO Counter The packet msgNo field is a string, but is typically is an integer counter to keep track of a specific packet id. The counter was returning an int, but the packet.msgNo is a string. So, when trying to delete a packet from the packet tracker, the key for accessing the packet is the msgNo, which has to be a string. Passing an int, will cause the packet tracker to not find the packet, and hence silently fail. This patch forces the msgNo counter to be a string. --- aprsd/packets/tracker.py | 15 ++++----------- aprsd/utils/counter.py | 2 +- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index ac5d4b4..b512320 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -62,14 +62,6 @@ class PacketTrack(objectstore.ObjectStoreMixin): def __len__(self): return len(self.data) - @wrapt.synchronized(lock) - def __str__(self): - result = "{" - for key in self.data.keys(): - result += f"{key}: {str(self.data[key])}, " - result += "}" - return result - @wrapt.synchronized(lock) def add(self, packet): key = packet.msgNo @@ -78,13 +70,14 @@ class PacketTrack(objectstore.ObjectStoreMixin): @wrapt.synchronized(lock) def get(self, key): - if key in self.data: - return self.data[key] + return self.data.get(key, None) @wrapt.synchronized(lock) def remove(self, key): - if key in self.data.keys(): + try: del self.data[key] + except KeyError: + pass def restart(self): """Walk the list of messages and restart them if any.""" diff --git a/aprsd/utils/counter.py b/aprsd/utils/counter.py index e423bfb..5f569f4 100644 --- a/aprsd/utils/counter.py +++ b/aprsd/utils/counter.py @@ -37,7 +37,7 @@ class PacketCounter: @property @wrapt.synchronized(lock) def value(self): - return self.val.value + return str(self.val.value) @wrapt.synchronized(lock) def __repr__(self): From 751bbc25149f8623f54e9e853c22191533fda45c Mon Sep 17 00:00:00 2001 From: Hemna Date: Fri, 29 Sep 2023 15:40:42 -0400 Subject: [PATCH 7/9] Fixed another msgNo int issue --- aprsd/cmds/webchat.py | 4 ++-- aprsd/web/chat/static/js/send-message.js | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index 57b1d69..ff56a16 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -131,9 +131,9 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread): def process_ack_packet(self, packet: packets.AckPacket): super().process_ack_packet(packet) ack_num = packet.get("msgNo") - SentMessages().ack(int(ack_num)) + SentMessages().ack(ack_num) self.socketio.emit( - "ack", SentMessages().get(int(ack_num)), + "ack", SentMessages().get(ack_num), namespace="/sendmsg", ) self.got_ack = True diff --git a/aprsd/web/chat/static/js/send-message.js b/aprsd/web/chat/static/js/send-message.js index 2d93de2..6b2d3f2 100644 --- a/aprsd/web/chat/static/js/send-message.js +++ b/aprsd/web/chat/static/js/send-message.js @@ -34,7 +34,6 @@ function init_chat() { console.log("SENT: "); console.log(msg); if (cleared === false) { - console.log("CLEARING #msgsTabsDiv"); var msgsdiv = $("#msgsTabsDiv"); msgsdiv.html(''); cleared = true; From 59cec1317da8fea28e2c6496f95ce8ea5bf1c73b Mon Sep 17 00:00:00 2001 From: Hemna Date: Mon, 2 Oct 2023 08:42:00 -0400 Subject: [PATCH 8/9] Don't process AckPackets as dupes If we RX an AckPacket, then send it on for processing. There is no need to check for a dupe. --- aprsd/threads/rx.py | 60 ++++++++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index c718f3b..21f72f2 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -67,29 +67,55 @@ class APRSDPluginRXThread(APRSDRXThread): """ def process_packet(self, *args, **kwargs): + """This handles the processing of an inbound packet. + + When a packet is received by the connected client object, + it sends the raw packet into this function. This function then + decodes the packet via the client, and then processes the packet. + Ack Packets are sent to the PluginProcessPacketThread for processing. + All other packets have to be checked as a dupe, and then only after + we haven't seen this packet before, do we send it to the + PluginProcessPacketThread for processing. + """ packet = self._client.decode_packet(*args, **kwargs) # LOG.debug(raw) packet.log(header="RX") - found = False - pkt_list = packets.PacketList() - try: - found = pkt_list.find(packet) - except KeyError: - found = False - if not found: - # If we are in the process of already ack'ing - # a packet, we should drop the packet - # because it's a dupe within the time that - # we send the 3 acks for the packet. - pkt_list.rx(packet) + if isinstance(packet, packets.AckPacket): + # We don't need to drop AckPackets, those should be + # processed. self.packet_queue.put(packet) - elif packet.timestamp - found.timestamp < 60: - LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.") else: - LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked but older than 60 seconds. processing.") - pkt_list.rx(packet) - self.packet_queue.put(packet) + # Make sure we aren't re-processing the same packet + # For RF based APRS Clients we can get duplicate packets + # So we need to track them and not process the dupes. + found = False + pkt_list = packets.PacketList() + try: + # Find the packet in the list of already seen packets + # Based on the packet.key() + found = pkt_list.find(packet) + except KeyError: + found = False + + if not found: + # If we are in the process of already ack'ing + # a packet, we should drop the packet + # because it's a dupe within the time that + # we send the 3 acks for the packet. + pkt_list.rx(packet) + self.packet_queue.put(packet) + elif packet.timestamp - found.timestamp < 60: + # If the packet came in within 60 seconds of the + # Last time seeing the packet, then we drop it as a dupe. + LOG.warning(f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.") + else: + LOG.warning( + f"Packet {packet.from_call}:{packet.msgNo} already tracked " + "but older than 60 seconds. processing.", + ) + pkt_list.rx(packet) + self.packet_queue.put(packet) class APRSDProcessPacketThread(APRSDThread): From 544600a96b6a23fc5999c1be117511998a2228b2 Mon Sep 17 00:00:00 2001 From: Hemna Date: Tue, 3 Oct 2023 16:01:43 -0400 Subject: [PATCH 9/9] Make Packet objects hashable This patch makes the packet key a property of the Packet object and makes packet objects comparable and hashable. --- aprsd/packets/__init__.py | 4 +- aprsd/packets/core.py | 72 +++++++++++++++++------------------- aprsd/packets/packet_list.py | 6 +-- aprsd/threads/rx.py | 2 +- 4 files changed, 38 insertions(+), 46 deletions(-) diff --git a/aprsd/packets/__init__.py b/aprsd/packets/__init__.py index 66236d8..7fab8bb 100644 --- a/aprsd/packets/__init__.py +++ b/aprsd/packets/__init__.py @@ -1,6 +1,6 @@ from aprsd.packets.core import ( # noqa: F401 - AckPacket, GPSPacket, MessagePacket, MicEPacket, Packet, PathPacket, - RejectPacket, StatusPacket, WeatherPacket, + AckPacket, GPSPacket, MessagePacket, MicEPacket, Packet, RejectPacket, + StatusPacket, WeatherPacket, ) from aprsd.packets.packet_list import PacketList # noqa: F401 from aprsd.packets.seen_list import SeenList # noqa: F401 diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index a75820b..1b3dd8c 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -47,26 +47,28 @@ def _init_msgNo(): # noqa: N802 @dataclass(unsafe_hash=True) class Packet(metaclass=abc.ABCMeta): - from_call: str - to_call: str - addresse: str = None - format: str = None + from_call: str = field(default=None) + to_call: str = field(default=None) + addresse: str = field(default=None) + format: str = field(default=None) msgNo: str = field(default_factory=_init_msgNo) # noqa: N815 - packet_type: str = None - timestamp: float = field(default_factory=_init_timestamp) + packet_type: str = field(default=None) + timestamp: float = field(default_factory=_init_timestamp, compare=False, hash=False) # Holds the raw text string to be sent over the wire # or holds the raw string from input packet - raw: str = None - raw_dict: dict = field(repr=False, default_factory=lambda: {}, compare=False) + raw: str = field(default=None, compare=False, hash=False) + raw_dict: dict = field(repr=False, default_factory=lambda: {}, compare=False, hash=False) # Built by calling prepare(). raw needs this built first. - payload: str = None + payload: str = field(default=None) # Fields related to sending packets out - send_count: int = field(repr=False, default=0) - retry_count: int = field(repr=False, default=3) - last_send_time: datetime.timedelta = field(repr=False, default=None) + send_count: int = field(repr=False, default=0, compare=False, hash=False) + retry_count: int = field(repr=False, default=3, compare=False, hash=False) + last_send_time: datetime.timedelta = field(repr=False, default=None, compare=False, hash=False) # Do we allow this packet to be saved to send later? - allow_delay: bool = field(repr=False, default=True) + allow_delay: bool = field(repr=False, default=True, compare=False, hash=False) + path: List[str] = field(default_factory=list, compare=False, hash=False) + via: str = field(default=None, compare=False, hash=False) def __post__init__(self): LOG.warning(f"POST INIT {self}") @@ -89,6 +91,7 @@ class Packet(metaclass=abc.ABCMeta): else: return default + @property def key(self): """Build a key for finding this packet in a dict.""" return f"{self.from_call}:{self.addresse}:{self.msgNo}" @@ -263,17 +266,8 @@ class Packet(metaclass=abc.ABCMeta): @dataclass(unsafe_hash=True) -class PathPacket(Packet): - path: List[str] = field(default_factory=list, compare=False) - via: str = None - - def _build_payload(self): - raise NotImplementedError - - -@dataclass(unsafe_hash=True) -class AckPacket(PathPacket): - response: str = None +class AckPacket(Packet): + response: str = field(default=None) def __post__init__(self): if self.response: @@ -284,8 +278,8 @@ class AckPacket(PathPacket): @dataclass(unsafe_hash=True) -class RejectPacket(PathPacket): - response: str = None +class RejectPacket(Packet): + response: str = field(default=None) def __post__init__(self): if self.response: @@ -296,8 +290,8 @@ class RejectPacket(PathPacket): @dataclass(unsafe_hash=True) -class MessagePacket(PathPacket): - message_text: str = None +class MessagePacket(Packet): + message_text: str = field(default=None) def _filter_for_send(self) -> str: """Filter and format message string for FCC.""" @@ -318,23 +312,23 @@ class MessagePacket(PathPacket): @dataclass(unsafe_hash=True) -class StatusPacket(PathPacket): - status: str = None - messagecapable: bool = False - comment: str = None +class StatusPacket(Packet): + status: str = field(default=None) + messagecapable: bool = field(default=False) + comment: str = field(default=None) def _build_payload(self): raise NotImplementedError @dataclass(unsafe_hash=True) -class GPSPacket(PathPacket): - latitude: float = 0.00 - longitude: float = 0.00 - altitude: float = 0.00 - rng: float = 0.00 - posambiguity: int = 0 - comment: str = None +class GPSPacket(Packet): + latitude: float = field(default=0.00) + longitude: float = field(default=0.00) + altitude: float = field(default=0.00) + rng: float = field(default=0.00) + posambiguity: int = field(default=0) + comment: str = field(default=None) symbol: str = field(default="l") symbol_table: str = field(default="/") diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index 347cd51..a6dc6f7 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -48,8 +48,7 @@ class PacketList(MutableMapping): self._add(packet) def _add(self, packet): - key = packet.key() - self[key] = packet + self[packet.key] = packet @property def maxlen(self): @@ -57,8 +56,7 @@ class PacketList(MutableMapping): @wrapt.synchronized(lock) def find(self, packet): - key = packet.key() - return self.get(key) + return self.get(packet.key) def __getitem__(self, key): # self.d.move_to_end(key) diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 21f72f2..01ea308 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -93,7 +93,7 @@ class APRSDPluginRXThread(APRSDRXThread): pkt_list = packets.PacketList() try: # Find the packet in the list of already seen packets - # Based on the packet.key() + # Based on the packet.key found = pkt_list.find(packet) except KeyError: found = False