1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-06-13 20:02:26 -04:00

Remove sleep in main RX thread

We had a bottleneck of pulling down packets as fast as possible,
which was caused by the time.sleep(1) call in the main RX
thread that used to be needed.
This commit is contained in:
Hemna 2025-02-03 13:27:57 -08:00
parent 101904ca77
commit 6cd7e99713

View File

@ -14,14 +14,14 @@ from aprsd.threads import APRSDThread, tx
from aprsd.utils import trace
CONF = cfg.CONF
LOG = logging.getLogger("APRSD")
LOG = logging.getLogger('APRSD')
class APRSDRXThread(APRSDThread):
_client = None
def __init__(self, packet_queue):
super().__init__("RX_PKT")
super().__init__('RX_PKT')
self.packet_queue = packet_queue
def stop(self):
@ -60,7 +60,7 @@ class APRSDRXThread(APRSDThread):
aprslib.exceptions.ConnectionDrop,
aprslib.exceptions.ConnectionError,
):
LOG.error("Connection dropped, reconnecting")
LOG.error('Connection dropped, reconnecting')
# Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client()
# is called
@ -68,11 +68,9 @@ class APRSDRXThread(APRSDThread):
time.sleep(5)
except Exception:
# LOG.exception(ex)
LOG.error("Resetting connection and trying again.")
LOG.error('Resetting connection and trying again.')
self._client.reset()
time.sleep(5)
# Continue to loop
time.sleep(1)
return True
def _process_packet(self, *args, **kwargs):
@ -140,12 +138,12 @@ class APRSDDupeRXThread(APRSDRXThread):
# If the packet came in within N seconds of the
# Last time seeing the packet, then we drop it as a dupe.
LOG.warning(
f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping."
f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.'
)
else:
LOG.warning(
f"Packet {packet.from_call}:{packet.msgNo} already tracked "
f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
f'Packet {packet.from_call}:{packet.msgNo} already tracked '
f'but older than {CONF.packet_dupe_timeout} seconds. processing.',
)
collector.PacketCollector().rx(packet)
self.packet_queue.put(packet)
@ -168,29 +166,28 @@ class APRSDProcessPacketThread(APRSDThread):
def __init__(self, packet_queue):
self.packet_queue = packet_queue
super().__init__("ProcessPKT")
super().__init__('ProcessPKT')
if not CONF.enable_sending_ack_packets:
LOG.warning(
"Sending ack packets is disabled, messages "
"will not be acknowledged.",
'Sending ack packets is disabled, messages will not be acknowledged.',
)
def process_ack_packet(self, packet):
"""We got an ack for a message, no need to resend it."""
ack_num = packet.msgNo
LOG.debug(f"Got ack for message {ack_num}")
LOG.debug(f'Got ack for message {ack_num}')
collector.PacketCollector().rx(packet)
def process_piggyback_ack(self, packet):
"""We got an ack embedded in a packet."""
ack_num = packet.ackMsgNo
LOG.debug(f"Got PiggyBackAck for message {ack_num}")
LOG.debug(f'Got PiggyBackAck for message {ack_num}')
collector.PacketCollector().rx(packet)
def process_reject_packet(self, packet):
"""We got a reject message for a packet. Stop sending the message."""
ack_num = packet.msgNo
LOG.debug(f"Got REJECT for message {ack_num}")
LOG.debug(f'Got REJECT for message {ack_num}')
collector.PacketCollector().rx(packet)
def loop(self):
@ -204,7 +201,7 @@ class APRSDProcessPacketThread(APRSDThread):
def process_packet(self, packet):
"""Process a packet received from aprs-is server."""
LOG.debug(f"ProcessPKT-LOOP {self.loop_count}")
LOG.debug(f'ProcessPKT-LOOP {self.loop_count}')
our_call = CONF.callsign.lower()
from_call = packet.from_call
@ -227,7 +224,7 @@ class APRSDProcessPacketThread(APRSDThread):
):
self.process_reject_packet(packet)
else:
if hasattr(packet, "ackMsgNo") and packet.ackMsgNo:
if hasattr(packet, 'ackMsgNo') and packet.ackMsgNo:
# we got an ack embedded in this packet
# we need to handle the ack
self.process_piggyback_ack(packet)
@ -267,7 +264,7 @@ class APRSDProcessPacketThread(APRSDThread):
if not for_us:
LOG.info("Got a packet meant for someone else '{packet.to_call}'")
else:
LOG.info("Got a non AckPacket/MessagePacket")
LOG.info('Got a non AckPacket/MessagePacket')
class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
@ -287,7 +284,7 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
tx.send(subreply)
else:
wl = CONF.watch_list
to_call = wl["alert_callsign"]
to_call = wl['alert_callsign']
tx.send(
packets.MessagePacket(
from_call=CONF.callsign,
@ -299,7 +296,7 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
# We have a message based object.
tx.send(reply)
except Exception as ex:
LOG.error("Plugin failed!!!")
LOG.error('Plugin failed!!!')
LOG.exception(ex)
def process_our_message_packet(self, packet):
@ -355,11 +352,11 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
if to_call == CONF.callsign and not replied:
# Tailor the messages accordingly
if CONF.load_help_plugin:
LOG.warning("Sending help!")
LOG.warning('Sending help!')
message_text = "Unknown command! Send 'help' message for help"
else:
LOG.warning("Unknown command!")
message_text = "Unknown command!"
LOG.warning('Unknown command!')
message_text = 'Unknown command!'
tx.send(
packets.MessagePacket(
@ -369,11 +366,11 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
),
)
except Exception as ex:
LOG.error("Plugin failed!!!")
LOG.error('Plugin failed!!!')
LOG.exception(ex)
# Do we need to send a reply?
if to_call == CONF.callsign:
reply = "A Plugin failed! try again?"
reply = 'A Plugin failed! try again?'
tx.send(
packets.MessagePacket(
from_call=CONF.callsign,
@ -382,4 +379,4 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
),
)
LOG.debug("Completed process_our_message_packet")
LOG.debug('Completed process_our_message_packet')