From c201c93b5d01f73eb2a308d309103e114e5f6ff1 Mon Sep 17 00:00:00 2001 From: Hemna Date: Sat, 17 Dec 2022 18:00:26 -0500 Subject: [PATCH] Cleaned up packet transmit class attributes This patch cleans up the Packet class attributes used to keep track of how many times packets have been sent and the last time they were sent. This is used by the PacketTracker and the tx threads for transmitting packets --- aprsd/packets/core.py | 68 +++++++++++++++++++++++++---------------- aprsd/plugins/notify.py | 3 +- aprsd/threads/rx.py | 3 +- aprsd/threads/tx.py | 48 ++++++++++++++++------------- 4 files changed, 70 insertions(+), 52 deletions(-) diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index 64f3da6..c476472 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -31,31 +31,41 @@ def _int_timestamp(): return int(round(time.time())) -@dataclass() +def _init_msgNo(): # noqa: N802 + """For some reason __post__init doesn't get called. + + So in order to initialize the msgNo field in the packet + we use this workaround. + """ + c = counter.PacketCounter() + c.increment() + return c.value + + +@dataclass class Packet(metaclass=abc.ABCMeta): from_call: str to_call: str addresse: str = None format: str = None - msgNo: str = None # noqa: N815 + msgNo: str = field(default_factory=_init_msgNo) # noqa: N815 packet_type: str = None timestamp: float = field(default_factory=_int_timestamp) + # Holds the raw text string to be sent over the wire + # or holds the raw string from input packet raw: str = None - _raw_dict: dict = field(repr=False, default_factory=lambda: {}) - _retry_count = 3 - _last_send_time = 0 - _last_send_attempt = 0 + raw_dict: dict = field(repr=False, default_factory=lambda: {}) + + # Fields related to sending packets out + send_count: int = field(repr=False, default=1) + retry_count: int = field(repr=False, default=3) + last_send_time: datetime.timedelta = field(repr=False, default=None) + last_send_attempt: int = field(repr=False, default=0) # Do we allow this packet to be saved to send later? - _allow_delay = True + allow_delay: bool = field(repr=False, default=True) - _transport = None - _raw_message = None - - def __post__init(self): - if not self.msgNo: - c = counter.PacketCounter() - c.increment() - self.msgNo = c.value + def __post__init__(self): + LOG.warning(f"POST INIT {self}") def get(self, key, default=None): """Emulate a getter on a dict.""" @@ -76,7 +86,7 @@ class Packet(metaclass=abc.ABCMeta): @staticmethod def factory(raw_packet): raw = raw_packet - raw["_raw_dict"] = raw.copy() + raw["raw_dict"] = raw.copy() translate_fields = { "from": "from_call", "to": "to_call", @@ -110,15 +120,16 @@ class Packet(metaclass=abc.ABCMeta): """LOG a packet to the logfile.""" asdict(self) log_list = ["\n"] + name = self.__class__.__name__ if header: - if isinstance(self, AckPacket): + if isinstance(self, AckPacket) and "tx" in header.lower(): log_list.append( - f"{header} ___________" - f"(TX:{self._send_count} of {self._retry_count})", + f"{header}____________({name}__" + f"TX:{self.send_count} of {self.retry_count})", ) else: - log_list.append(f"{header} _______________") - log_list.append(f" Packet : {self.__class__.__name__}") + log_list.append(f"{header}____________({name})") + # log_list.append(f" Packet : {self.__class__.__name__}") log_list.append(f" Raw : {self.raw}") if self.to_call: log_list.append(f" To : {self.to_call}") @@ -137,7 +148,7 @@ class Packet(metaclass=abc.ABCMeta): if self.msgNo: log_list.append(f" Msg # : {self.msgNo}") - log_list.append(f"{header} _______________ Complete") + log_list.append(f"{header}____________({name})") LOG.info("\n".join(log_list)) LOG.debug(self) @@ -165,12 +176,12 @@ class Packet(metaclass=abc.ABCMeta): cl = aprsis_client else: cl = client.factory.create().client - self.log(header="Sending Message Direct") + self.log(header="TX Message Direct") cl.send(self.raw) stats.APRSDStats().msgs_tx_inc() -@dataclass() +@dataclass class PathPacket(Packet): path: List[str] = field(default_factory=list) via: str = None @@ -179,10 +190,13 @@ class PathPacket(Packet): raise NotImplementedError -@dataclass() +@dataclass class AckPacket(PathPacket): response: str = None - _send_count = 1 + + def __post__init__(self): + if self.response: + LOG.warning("Response set!") def _build_raw(self): """Build the self.raw which is what is sent over the air.""" @@ -200,7 +214,7 @@ class AckPacket(PathPacket): thread.start() -@dataclass() +@dataclass class MessagePacket(PathPacket): message_text: str = None diff --git a/aprsd/plugins/notify.py b/aprsd/plugins/notify.py index 18dc1ef..15a9b22 100644 --- a/aprsd/plugins/notify.py +++ b/aprsd/plugins/notify.py @@ -43,8 +43,9 @@ class NotifySeenPlugin(plugin.APRSDWatchListPluginBase): message_text=( f"{fromcall} was just seen by type:'{packet_type}'" ), + allow_delay=False, ) - pkt._allow_delay = False + # pkt.allow_delay = False return pkt else: LOG.debug("fromcall and notify_callsign are the same, not notifying") diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 134c12c..796aac3 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -66,7 +66,7 @@ class APRSDPluginRXThread(APRSDRXThread): def process_packet(self, *args, **kwargs): packet = self._client.decode_packet(*args, **kwargs) # LOG.debug(raw) - packet.log(header="RX Packet") + packet.log(header="RX") thread = APRSDPluginProcessPacketThread( config=self.config, packet=packet, @@ -92,7 +92,6 @@ class APRSDProcessPacketThread(APRSDThread): def process_ack_packet(self, packet): ack_num = packet.msgNo LOG.info(f"Got ack for message {ack_num}") - packet.log("RXACK") pkt_tracker = packets.PacketTrack() pkt_tracker.remove(ack_num) stats.APRSDStats().ack_rx_inc() diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py index 8df838c..9f88b55 100644 --- a/aprsd/threads/tx.py +++ b/aprsd/threads/tx.py @@ -11,6 +11,8 @@ LOG = logging.getLogger("APRSD") class SendPacketThread(aprsd_threads.APRSDThread): + loop_count: int = 1 + def __init__(self, packet): self.packet = packet name = self.packet.raw[:5] @@ -19,7 +21,6 @@ class SendPacketThread(aprsd_threads.APRSDThread): pkt_tracker.add(packet) def loop(self): - LOG.debug("TX Loop") """Loop until a message is acked or it gets delayed. We only sleep for 5 seconds between each loop run, so @@ -39,20 +40,20 @@ class SendPacketThread(aprsd_threads.APRSDThread): return False else: send_now = False - if packet._last_send_attempt == packet._retry_count: + if packet.last_send_attempt == packet.retry_count: # we reached the send limit, don't send again # TODO(hemna) - Need to put this in a delayed queue? LOG.info("Message Send Complete. Max attempts reached.") - if not packet._allow_delay: + if not packet.allow_delay: pkt_tracker.remove(packet.msgNo) return False # Message is still outstanding and needs to be acked. - if packet._last_send_time: + if packet.last_send_time: # Message has a last send time tracking now = datetime.datetime.now() - sleeptime = (packet._last_send_attempt + 1) * 31 - delta = now - packet._last_send_time + sleeptime = (packet.last_send_attempt + 1) * 31 + delta = now - packet.last_send_time if delta > datetime.timedelta(seconds=sleeptime): # It's time to try to send it again send_now = True @@ -62,59 +63,62 @@ class SendPacketThread(aprsd_threads.APRSDThread): if send_now: # no attempt time, so lets send it, and start # tracking the time. - packet.log("Sending Message") + packet.log("TX") cl = client.factory.create().client cl.send(packet.raw) stats.APRSDStats().msgs_tx_inc() packet_list.PacketList().add(packet) - packet._last_send_time = datetime.datetime.now() - packet._last_send_attempt += 1 + packet.last_send_time = datetime.datetime.now() + packet.last_send_attempt += 1 - time.sleep(5) + time.sleep(1) # Make sure we get called again. + self.loop_count += 1 return True class SendAckThread(aprsd_threads.APRSDThread): + loop_count: int = 1 + def __init__(self, packet): self.packet = packet super().__init__(f"SendAck-{self.packet.msgNo}") - self._loop_cnt = 1 def loop(self): """Separate thread to send acks with retries.""" send_now = False - if self.packet._last_send_attempt == self.packet._retry_count: + if self.packet.last_send_attempt == self.packet.retry_count: # we reached the send limit, don't send again # TODO(hemna) - Need to put this in a delayed queue? LOG.info("Ack Send Complete. Max attempts reached.") return False - if self.packet._last_send_time: + if self.packet.last_send_time: # Message has a last send time tracking now = datetime.datetime.now() # aprs duplicate detection is 30 secs? # (21 only sends first, 28 skips middle) - sleeptime = 31 - delta = now - self.packet._last_send_time - if delta > datetime.timedelta(seconds=sleeptime): + sleep_time = 31 + delta = now - self.packet.last_send_time + if delta > datetime.timedelta(seconds=sleep_time): # It's time to try to send it again send_now = True - elif self._loop_cnt % 5 == 0: + elif self.loop_count % 10 == 0: LOG.debug(f"Still wating. {delta}") else: send_now = True if send_now: cl = client.factory.create().client - self.packet.log("Sending ACK") + self.packet.log("TX") cl.send(self.packet.raw) - self.packet._send_count += 1 + self.packet.send_count += 1 stats.APRSDStats().ack_tx_inc() packet_list.PacketList().add(self.packet) - self.packet._last_send_attempt += 1 - self.packet._last_send_time = datetime.datetime.now() + self.packet.last_send_attempt += 1 + self.packet.last_send_time = datetime.datetime.now() + time.sleep(1) - self._loop_cnt += 1 + self.loop_count += 1 return True