diff --git a/aprsd/main.py b/aprsd/main.py
index 347893c..4766611 100644
--- a/aprsd/main.py
+++ b/aprsd/main.py
@@ -24,7 +24,6 @@ import datetime
import importlib.metadata as imp
import logging
import sys
-import time
from importlib.metadata import version as metadata_version
import click
@@ -76,14 +75,17 @@ def main():
def signal_handler(sig, frame):
click.echo('signal_handler: called')
collector.Collector().stop_all()
- threads.APRSDThreadList().stop_all()
+ thread_list = threads.APRSDThreadList()
+ thread_list.stop_all()
+
if 'subprocess' not in str(frame):
LOG.info(
- 'Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}'.format(
+ 'Ctrl+C, Sending all threads exit! {}'.format(
datetime.datetime.now(),
),
)
- time.sleep(1.5)
+ # Wait for non-daemon threads to finish gracefully
+ thread_list.join_non_daemon(timeout=5.0)
try:
packets.PacketTrack().save()
packets.WatchList().save()
@@ -93,8 +95,6 @@ def signal_handler(sig, frame):
except Exception as e:
LOG.error(f'Failed to save data: {e}')
sys.exit(0)
- # signal.signal(signal.SIGTERM, sys.exit(0))
- # sys.exit(0)
@cli.command()
diff --git a/aprsd/threads/aprsd.py b/aprsd/threads/aprsd.py
index 77e60cf..2377770 100644
--- a/aprsd/threads/aprsd.py
+++ b/aprsd/threads/aprsd.py
@@ -2,8 +2,6 @@ import abc
import datetime
import logging
import threading
-import time
-from typing import List
import wrapt
@@ -13,21 +11,26 @@ LOG = logging.getLogger('APRSD')
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
"""Base class for all threads in APRSD."""
+ # Class attributes - subclasses override as needed
+ daemon = True # Most threads are daemon threads
+ period = 1 # Default wait period in seconds
loop_count = 1
_pause = False
- thread_stop = False
def __init__(self, name):
super().__init__(name=name)
- self.thread_stop = False
+ # Set daemon from class attribute
+ self.daemon = self.__class__.daemon
+ # Set period from class attribute (can be overridden in __init__)
+ self.period = self.__class__.period
+ self._shutdown_event = threading.Event()
+ self.loop_count = 0
APRSDThreadList().add(self)
self._last_loop = datetime.datetime.now()
def _should_quit(self):
- """see if we have a quit message from the global queue."""
- if self.thread_stop:
- return True
- return False
+ """Check if thread should exit."""
+ return self._shutdown_event.is_set()
def pause(self):
"""Logically pause the processing of the main loop."""
@@ -40,11 +43,24 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
self._pause = False
def stop(self):
+ """Signal thread to stop. Returns immediately."""
LOG.debug(f"Stopping thread '{self.name}'")
- self.thread_stop = True
+ self._shutdown_event.set()
+
+ def wait(self, timeout: float | None = None) -> bool:
+ """Wait for shutdown signal or timeout.
+
+ Args:
+ timeout: Seconds to wait. Defaults to self.period.
+
+ Returns:
+ True if shutdown was signaled, False if timeout expired.
+ """
+ wait_time = timeout if timeout is not None else self.period
+ return self._shutdown_event.wait(timeout=wait_time)
@abc.abstractmethod
- def loop(self):
+ def loop(self) -> bool:
pass
def _cleanup(self):
@@ -64,7 +80,7 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
LOG.debug('Starting')
while not self._should_quit():
if self._pause:
- time.sleep(1)
+ self.wait(timeout=1)
else:
self.loop_count += 1
can_loop = self.loop()
@@ -81,7 +97,7 @@ class APRSDThreadList:
_instance = None
- threads_list: List[APRSDThread] = []
+ threads_list: list[APRSDThread] = []
lock = threading.Lock()
def __new__(cls, *args, **kwargs):
@@ -167,3 +183,15 @@ class APRSDThreadList:
@wrapt.synchronized(lock)
def __len__(self):
return len(self.threads_list)
+
+ @wrapt.synchronized(lock)
+ def join_non_daemon(self, timeout: float = 5.0):
+ """Wait for non-daemon threads to complete gracefully.
+
+ Args:
+ timeout: Maximum seconds to wait per thread.
+ """
+ for th in self.threads_list:
+ if not th.daemon and th.is_alive():
+ LOG.info(f'Waiting for non-daemon thread {th.name} to finish')
+ th.join(timeout=timeout)
diff --git a/aprsd/threads/keepalive.py b/aprsd/threads/keepalive.py
index af552bd..7b4848a 100644
--- a/aprsd/threads/keepalive.py
+++ b/aprsd/threads/keepalive.py
@@ -1,6 +1,5 @@
import datetime
import logging
-import time
import tracemalloc
from loguru import logger
@@ -20,6 +19,7 @@ LOGU = logger
class KeepAliveThread(APRSDThread):
cntr = 0
checker_time = datetime.datetime.now()
+ period = 60
def __init__(self):
tracemalloc.start()
@@ -28,81 +28,80 @@ class KeepAliveThread(APRSDThread):
self.max_delta = datetime.timedelta(**max_timeout)
def loop(self):
- if self.loop_count % 60 == 0:
- stats_json = collector.Collector().collect()
- pl = packets.PacketList()
- thread_list = APRSDThreadList()
- now = datetime.datetime.now()
+ stats_json = collector.Collector().collect()
+ pl = packets.PacketList()
+ thread_list = APRSDThreadList()
+ now = datetime.datetime.now()
- if (
- 'APRSClientStats' in stats_json
- and stats_json['APRSClientStats'].get('transport') == 'aprsis'
- ):
- if stats_json['APRSClientStats'].get('server_keepalive'):
- last_msg_time = utils.strfdelta(
- now - stats_json['APRSClientStats']['server_keepalive']
- )
- else:
- last_msg_time = 'N/A'
+ if (
+ 'APRSClientStats' in stats_json
+ and stats_json['APRSClientStats'].get('transport') == 'aprsis'
+ ):
+ if stats_json['APRSClientStats'].get('server_keepalive'):
+ last_msg_time = utils.strfdelta(
+ now - stats_json['APRSClientStats']['server_keepalive']
+ )
else:
last_msg_time = 'N/A'
+ else:
+ last_msg_time = 'N/A'
- tracked_packets = stats_json['PacketTrack']['total_tracked']
- tx_msg = 0
- rx_msg = 0
- if 'PacketList' in stats_json:
- msg_packets = stats_json['PacketList'].get('MessagePacket')
- if msg_packets:
- tx_msg = msg_packets.get('tx', 0)
- rx_msg = msg_packets.get('rx', 0)
+ tracked_packets = stats_json['PacketTrack']['total_tracked']
+ tx_msg = 0
+ rx_msg = 0
+ if 'PacketList' in stats_json:
+ msg_packets = stats_json['PacketList'].get('MessagePacket')
+ if msg_packets:
+ tx_msg = msg_packets.get('tx', 0)
+ rx_msg = msg_packets.get('rx', 0)
- keepalive = (
- '{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} '
- 'Last:{} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}'
- ).format(
- stats_json['APRSDStats']['callsign'],
- stats_json['APRSDStats']['uptime'],
- pl.total_rx(),
- pl.total_tx(),
- tracked_packets,
- tx_msg,
- rx_msg,
- last_msg_time,
- stats_json['APRSDStats']['memory_current_str'],
- stats_json['APRSDStats']['memory_peak_str'],
- len(thread_list),
- aprsd_log.logging_queue.qsize(),
- )
- LOG.info(keepalive)
- if 'APRSDThreadList' in stats_json:
- thread_list = stats_json['APRSDThreadList']
- for thread_name in thread_list:
- thread = thread_list[thread_name]
- alive = thread['alive']
- age = thread['age']
- key = thread['name']
- if not alive:
- LOG.error(f'Thread {thread}')
+ keepalive = (
+ '{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} '
+ 'Last:{} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}'
+ ).format(
+ stats_json['APRSDStats']['callsign'],
+ stats_json['APRSDStats']['uptime'],
+ pl.total_rx(),
+ pl.total_tx(),
+ tracked_packets,
+ tx_msg,
+ rx_msg,
+ last_msg_time,
+ stats_json['APRSDStats']['memory_current_str'],
+ stats_json['APRSDStats']['memory_peak_str'],
+ len(thread_list),
+ aprsd_log.logging_queue.qsize(),
+ )
+ LOG.info(keepalive)
+ if 'APRSDThreadList' in stats_json:
+ thread_list = stats_json['APRSDThreadList']
+ for thread_name in thread_list:
+ thread = thread_list[thread_name]
+ alive = thread['alive']
+ age = thread['age']
+ key = thread['name']
+ if not alive:
+ LOG.error(f'Thread {thread}')
- thread_hex = f'fg {utils.hex_from_name(key)}'
- t_name = f'<{thread_hex}>{key:<15}{thread_hex}>'
- thread_msg = f'{t_name} Alive? {str(alive): <5} {str(age): <20}'
- LOGU.opt(colors=True).info(thread_msg)
- # LOG.info(f"{key: <15} Alive? {str(alive): <5} {str(age): <20}")
+ thread_hex = f'fg {utils.hex_from_name(key)}'
+ t_name = f'<{thread_hex}>{key:<15}{thread_hex}>'
+ thread_msg = f'{t_name} Alive? {str(alive): <5} {str(age): <20}'
+ LOGU.opt(colors=True).info(thread_msg)
+ # LOG.info(f"{key: <15} Alive? {str(alive): <5} {str(age): <20}")
- # Go through the registered keepalive collectors
- # and check them as well as call log.
- collect = keepalive_collector.KeepAliveCollector()
- collect.check()
- collect.log()
+ # Go through the registered keepalive collectors
+ # and check them as well as call log.
+ collect = keepalive_collector.KeepAliveCollector()
+ collect.check()
+ collect.log()
- # Check version every day
- delta = now - self.checker_time
- if delta > datetime.timedelta(hours=24):
- self.checker_time = now
- level, msg = utils._check_version()
- if level:
- LOG.warning(msg)
- self.cntr += 1
- time.sleep(1)
+ # Check version every day
+ delta = now - self.checker_time
+ if delta > datetime.timedelta(hours=24):
+ self.checker_time = now
+ level, msg = utils._check_version()
+ if level:
+ LOG.warning(msg)
+ self.cntr += 1
+ self.wait()
return True
diff --git a/aprsd/threads/registry.py b/aprsd/threads/registry.py
index 7139821..24765ee 100644
--- a/aprsd/threads/registry.py
+++ b/aprsd/threads/registry.py
@@ -1,5 +1,4 @@
import logging
-import time
import requests
from oslo_config import cfg
@@ -14,11 +13,9 @@ LOG = logging.getLogger('APRSD')
class APRSRegistryThread(aprsd_threads.APRSDThread):
"""This sends service information to the configured APRS Registry."""
- _loop_cnt: int = 1
-
def __init__(self):
super().__init__('APRSRegistryThread')
- self._loop_cnt = 1
+ self.period = CONF.aprs_registry.frequency_seconds
if not CONF.aprs_registry.enabled:
LOG.error(
'APRS Registry is not enabled. ',
@@ -34,24 +31,21 @@ class APRSRegistryThread(aprsd_threads.APRSDThread):
)
def loop(self):
- # Only call the registry every N seconds
- if self._loop_cnt % CONF.aprs_registry.frequency_seconds == 0:
- info = {
- 'callsign': CONF.callsign,
- 'owner_callsign': CONF.owner_callsign,
- 'description': CONF.aprs_registry.description,
- 'service_website': CONF.aprs_registry.service_website,
- 'software': f'APRSD version {aprsd.__version__} '
- 'https://github.com/craigerl/aprsd',
- }
- try:
- requests.post(
- f'{CONF.aprs_registry.registry_url}',
- json=info,
- )
- except Exception as e:
- LOG.error(f'Failed to send registry info: {e}')
+ info = {
+ 'callsign': CONF.callsign,
+ 'owner_callsign': CONF.owner_callsign,
+ 'description': CONF.aprs_registry.description,
+ 'service_website': CONF.aprs_registry.service_website,
+ 'software': f'APRSD version {aprsd.__version__} '
+ 'https://github.com/craigerl/aprsd',
+ }
+ try:
+ requests.post(
+ f'{CONF.aprs_registry.registry_url}',
+ json=info,
+ )
+ except Exception as e:
+ LOG.error(f'Failed to send registry info: {e}')
- time.sleep(1)
- self._loop_cnt += 1
+ self.wait()
return True
diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py
index 21a7614..2485b16 100644
--- a/aprsd/threads/rx.py
+++ b/aprsd/threads/rx.py
@@ -1,7 +1,6 @@
import abc
import logging
import queue
-import time
import aprslib
from oslo_config import cfg
@@ -43,19 +42,19 @@ class APRSDRXThread(APRSDThread):
self.packet_queue = packet_queue
def stop(self):
- self.thread_stop = True
+ self._shutdown_event.set()
if self._client:
self._client.close()
def loop(self):
if not self._client:
self._client = APRSDClient()
- time.sleep(1)
+ self.wait(timeout=1)
return True
if not self._client.is_alive:
self._client = APRSDClient()
- time.sleep(1)
+ self.wait(timeout=1)
return True
# setup the consumer of messages and block until a messages
@@ -82,12 +81,14 @@ class APRSDRXThread(APRSDThread):
# This will cause a reconnect, next time client.get_client()
# is called
self._client.reset()
- time.sleep(5)
+ if self.wait(timeout=5):
+ return False
except Exception as ex:
LOG.exception(ex)
LOG.error('Resetting connection and trying again.')
self._client.reset()
- time.sleep(5)
+ if self.wait(timeout=5):
+ return False
return True
def process_packet(self, *args, **kwargs):
@@ -153,7 +154,7 @@ class APRSDFilterThread(APRSDThread):
def loop(self):
try:
- pkt = self.packet_queue.get(timeout=1)
+ pkt = self.packet_queue.get(timeout=self.period)
self.packet_count += 1
# We use the client here, because the specific
# driver may need to decode the packet differently.
diff --git a/aprsd/threads/stats.py b/aprsd/threads/stats.py
index e2d3f5e..91eb00c 100644
--- a/aprsd/threads/stats.py
+++ b/aprsd/threads/stats.py
@@ -31,20 +31,19 @@ class StatsStore(objectstore.ObjectStoreMixin):
class APRSDStatsStoreThread(APRSDThread):
"""Save APRSD Stats to disk periodically."""
- # how often in seconds to write the file
- save_interval = 10
+ daemon = False
+ period = 10
def __init__(self):
super().__init__('StatsStore')
def loop(self):
- if self.loop_count % self.save_interval == 0:
- stats = collector.Collector().collect()
- ss = StatsStore()
- ss.add(stats)
- ss.save()
+ stats = collector.Collector().collect()
+ ss = StatsStore()
+ ss.add(stats)
+ ss.save()
- time.sleep(1)
+ self.wait()
return True
@@ -64,143 +63,140 @@ class APRSDPushStatsThread(APRSDThread):
self.send_packetlist = send_packetlist
def loop(self):
- if self.loop_count % self.period == 0:
- stats_json = collector.Collector().collect(serializable=True)
- url = f'{self.push_url}/stats'
- headers = {'Content-Type': 'application/json'}
- # Remove the PacketList section to reduce payload size
- if not self.send_packetlist:
- if 'PacketList' in stats_json:
- del stats_json['PacketList']['packets']
+ stats_json = collector.Collector().collect(serializable=True)
+ url = f'{self.push_url}/stats'
+ headers = {'Content-Type': 'application/json'}
+ # Remove the PacketList section to reduce payload size
+ if not self.send_packetlist:
+ if 'PacketList' in stats_json:
+ del stats_json['PacketList']['packets']
- now = datetime.datetime.now()
- time_format = '%m-%d-%Y %H:%M:%S'
- stats = {
- 'time': now.strftime(time_format),
- 'stats': stats_json,
- }
+ now = datetime.datetime.now()
+ time_format = '%m-%d-%Y %H:%M:%S'
+ stats = {
+ 'time': now.strftime(time_format),
+ 'stats': stats_json,
+ }
- try:
- response = requests.post(url, json=stats, headers=headers, timeout=5)
- response.raise_for_status()
+ try:
+ response = requests.post(url, json=stats, headers=headers, timeout=5)
+ response.raise_for_status()
- if response.status_code == 200:
- LOGU.info(f'Successfully pushed stats to {self.push_url}')
- else:
- LOGU.warning(
- f'Failed to push stats to {self.push_url}: HTTP {response.status_code}'
- )
+ if response.status_code == 200:
+ LOGU.info(f'Successfully pushed stats to {self.push_url}')
+ else:
+ LOGU.warning(
+ f'Failed to push stats to {self.push_url}: HTTP {response.status_code}'
+ )
- except requests.exceptions.RequestException as e:
- LOGU.error(f'Error pushing stats to {self.push_url}: {e}')
- except Exception as e:
- LOGU.error(f'Unexpected error in stats push: {e}')
+ except requests.exceptions.RequestException as e:
+ LOGU.error(f'Error pushing stats to {self.push_url}: {e}')
+ except Exception as e:
+ LOGU.error(f'Unexpected error in stats push: {e}')
- time.sleep(1)
+ self.wait()
return True
class StatsLogThread(APRSDThread):
"""Log the stats from the PacketList."""
+ period = 10
+
def __init__(self):
super().__init__('PacketStatsLog')
self._last_total_rx = 0
- self.period = 10
self.start_time = time.time()
def loop(self):
- if self.loop_count % self.period == 0:
- # log the stats every 10 seconds
- stats_json = collector.Collector().collect(serializable=True)
- stats = stats_json['PacketList']
- total_rx = stats['rx']
- rx_delta = total_rx - self._last_total_rx
- rate = rx_delta / self.period
+ # log the stats every 10 seconds
+ stats_json = collector.Collector().collect(serializable=True)
+ stats = stats_json['PacketList']
+ total_rx = stats['rx']
+ rx_delta = total_rx - self._last_total_rx
+ rate = rx_delta / self.period
- # Get unique callsigns count from SeenList stats
- seen_list_instance = seen_list.SeenList()
- # stats() returns data while holding lock internally, so copy it immediately
- seen_list_stats = seen_list_instance.stats()
- seen_list_instance.save()
- # Copy the stats to avoid holding references to locked data
- seen_list_stats = seen_list_stats.copy()
- unique_callsigns_count = len(seen_list_stats)
+ # Get unique callsigns count from SeenList stats
+ seen_list_instance = seen_list.SeenList()
+ # stats() returns data while holding lock internally, so copy it immediately
+ seen_list_stats = seen_list_instance.stats()
+ seen_list_instance.save()
+ # Copy the stats to avoid holding references to locked data
+ seen_list_stats = seen_list_stats.copy()
+ unique_callsigns_count = len(seen_list_stats)
- # Calculate uptime
- elapsed = time.time() - self.start_time
- elapsed_minutes = elapsed / 60
- elapsed_hours = elapsed / 3600
+ # Calculate uptime
+ elapsed = time.time() - self.start_time
+ elapsed_minutes = elapsed / 60
+ elapsed_hours = elapsed / 3600
- # Log summary stats
- LOGU.opt(colors=True).info(
- f'RX Rate: {rate:.2f} pps '
- f'Total RX: {total_rx} '
- f'RX Last {self.period} secs: {rx_delta} '
+ # Log summary stats
+ LOGU.opt(colors=True).info(
+ f'RX Rate: {rate:.2f} pps '
+ f'Total RX: {total_rx} '
+ f'RX Last {self.period} secs: {rx_delta} '
+ )
+ LOGU.opt(colors=True).info(
+ f'Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h) '
+ f'Unique Callsigns: {unique_callsigns_count}',
+ )
+ self._last_total_rx = total_rx
+
+ # Log individual type stats, sorted by RX count (descending)
+ sorted_types = sorted(
+ stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True
+ )
+ for k, v in sorted_types:
+ # Calculate percentage of this packet type compared to total RX
+ percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0
+ # Format values first, then apply colors
+ packet_type_str = f'{k:<15}'
+ rx_count_str = f'{v["rx"]:6d}'
+ tx_count_str = f'{v["tx"]:6d}'
+ percentage_str = f'{percentage:5.1f}%'
+ # Use different colors for RX count based on threshold (matching mqtt_injest.py)
+ rx_color_tag = (
+ 'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red'
)
LOGU.opt(colors=True).info(
- f'Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h) '
- f'Unique Callsigns: {unique_callsigns_count}',
+ f' {packet_type_str}: '
+ f'<{rx_color_tag}>RX: {rx_count_str}{rx_color_tag}> '
+ f'TX: {tx_count_str} '
+ f'({percentage_str})',
)
- self._last_total_rx = total_rx
- # Log individual type stats, sorted by RX count (descending)
- sorted_types = sorted(
- stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True
- )
- for k, v in sorted_types:
- # Calculate percentage of this packet type compared to total RX
- percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0
- # Format values first, then apply colors
- packet_type_str = f'{k:<15}'
- rx_count_str = f'{v["rx"]:6d}'
- tx_count_str = f'{v["tx"]:6d}'
- percentage_str = f'{percentage:5.1f}%'
- # Use different colors for RX count based on threshold (matching mqtt_injest.py)
- rx_color_tag = (
- 'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red'
- )
+ # Extract callsign counts from seen_list stats
+ callsign_counts = {}
+ for callsign, data in seen_list_stats.items():
+ if isinstance(data, dict) and 'count' in data:
+ callsign_counts[callsign] = data['count']
+
+ # Sort callsigns by packet count (descending) and get top 10
+ sorted_callsigns = sorted(
+ callsign_counts.items(), key=lambda x: x[1], reverse=True
+ )[:10]
+
+ # Log top 10 callsigns
+ if sorted_callsigns:
+ LOGU.opt(colors=True).info('Top 10 Callsigns by Packet Count:')
+ total_ranks = len(sorted_callsigns)
+ for rank, (callsign, count) in enumerate(sorted_callsigns, 1):
+ # Calculate percentage of this callsign compared to total RX
+ percentage = (count / total_rx * 100) if total_rx > 0 else 0.0
+ # Use different colors based on rank: most packets (rank 1) = red,
+ # least packets (last rank) = green, middle = yellow
+ if rank == 1:
+ count_color_tag = 'red'
+ elif rank == total_ranks:
+ count_color_tag = 'green'
+ else:
+ count_color_tag = 'yellow'
LOGU.opt(colors=True).info(
- f' {packet_type_str}: '
- f'<{rx_color_tag}>RX: {rx_count_str}{rx_color_tag}> '
- f'TX: {tx_count_str} '
- f'({percentage_str})',
+ f' {rank:2d}. '
+ f'{callsign:<12}: '
+ f'<{count_color_tag}>{count:6d} packets{count_color_tag}> '
+ f'({percentage:5.1f}%)',
)
- # Extract callsign counts from seen_list stats
- callsign_counts = {}
- for callsign, data in seen_list_stats.items():
- if isinstance(data, dict) and 'count' in data:
- callsign_counts[callsign] = data['count']
-
- # Sort callsigns by packet count (descending) and get top 10
- sorted_callsigns = sorted(
- callsign_counts.items(), key=lambda x: x[1], reverse=True
- )[:10]
-
- # Log top 10 callsigns
- if sorted_callsigns:
- LOGU.opt(colors=True).info(
- 'Top 10 Callsigns by Packet Count:'
- )
- total_ranks = len(sorted_callsigns)
- for rank, (callsign, count) in enumerate(sorted_callsigns, 1):
- # Calculate percentage of this callsign compared to total RX
- percentage = (count / total_rx * 100) if total_rx > 0 else 0.0
- # Use different colors based on rank: most packets (rank 1) = red,
- # least packets (last rank) = green, middle = yellow
- if rank == 1:
- count_color_tag = 'red'
- elif rank == total_ranks:
- count_color_tag = 'green'
- else:
- count_color_tag = 'yellow'
- LOGU.opt(colors=True).info(
- f' {rank:2d}. '
- f'{callsign:<12}: '
- f'<{count_color_tag}>{count:6d} packets{count_color_tag}> '
- f'({percentage:5.1f}%)',
- )
-
- time.sleep(1)
+ self.wait()
return True
diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py
index e6d9ce6..a3a3619 100644
--- a/aprsd/threads/tx.py
+++ b/aprsd/threads/tx.py
@@ -241,6 +241,8 @@ class PacketSendSchedulerThread(aprsd_threads.APRSDThread):
separate thread for each packet.
"""
+ daemon = False # Non-daemon for graceful packet handling
+
def __init__(self, max_workers=5):
super().__init__('PacketSendSchedulerThread')
self.executor = ThreadPoolExecutor(
@@ -272,7 +274,7 @@ class PacketSendSchedulerThread(aprsd_threads.APRSDThread):
# The worker will check timing and send if needed
self.executor.submit(_send_packet_worker, msg_no)
- time.sleep(1) # Check every second
+ self.wait() # Check every period (default 1 second)
return True
def _cleanup(self):
@@ -289,6 +291,8 @@ class AckSendSchedulerThread(aprsd_threads.APRSDThread):
separate thread for each ack.
"""
+ daemon = False # Non-daemon for graceful ACK handling
+
def __init__(self, max_workers=3):
super().__init__('AckSendSchedulerThread')
self.executor = ThreadPoolExecutor(
@@ -320,7 +324,7 @@ class AckSendSchedulerThread(aprsd_threads.APRSDThread):
# Submit send task to threadpool
self.executor.submit(_send_ack_worker, msg_no, self.max_retries)
- time.sleep(1) # Check every second
+ self.wait() # Check every period (default 1 second)
return True
def _cleanup(self):
@@ -330,8 +334,6 @@ class AckSendSchedulerThread(aprsd_threads.APRSDThread):
class SendPacketThread(aprsd_threads.APRSDThread):
- loop_count: int = 1
-
def __init__(self, packet):
self.packet = packet
super().__init__(f'TX-{packet.to_call}-{self.packet.msgNo}')
@@ -401,14 +403,12 @@ class SendPacketThread(aprsd_threads.APRSDThread):
if sent:
packet.send_count += 1
- time.sleep(1)
+ self.wait()
# Make sure we get called again.
- self.loop_count += 1
return True
class SendAckThread(aprsd_threads.APRSDThread):
- loop_count: int = 1
max_retries = 3
def __init__(self, packet):
@@ -462,8 +462,7 @@ class SendAckThread(aprsd_threads.APRSDThread):
self.packet.last_send_time = int(round(time.time()))
- time.sleep(1)
- self.loop_count += 1
+ self.wait()
return True
@@ -473,11 +472,9 @@ class BeaconSendThread(aprsd_threads.APRSDThread):
Settings are in the [DEFAULT] section of the config file.
"""
- _loop_cnt: int = 1
-
def __init__(self):
super().__init__('BeaconSendThread')
- self._loop_cnt = 1
+ self.period = CONF.beacon_interval
# Make sure Latitude and Longitude are set.
if not CONF.latitude or not CONF.longitude:
LOG.error(
@@ -491,25 +488,23 @@ class BeaconSendThread(aprsd_threads.APRSDThread):
)
def loop(self):
- # Only dump out the stats every N seconds
- if self._loop_cnt % CONF.beacon_interval == 0:
- pkt = core.BeaconPacket(
- from_call=CONF.callsign,
- to_call='APRS',
- latitude=float(CONF.latitude),
- longitude=float(CONF.longitude),
- comment='APRSD GPS Beacon',
- symbol=CONF.beacon_symbol,
- )
- try:
- # Only send it once
- pkt.retry_count = 1
- send(pkt, direct=True)
- except Exception as e:
- LOG.error(f'Failed to send beacon: {e}')
- APRSDClient().reset()
- time.sleep(5)
+ pkt = core.BeaconPacket(
+ from_call=CONF.callsign,
+ to_call='APRS',
+ latitude=float(CONF.latitude),
+ longitude=float(CONF.longitude),
+ comment='APRSD GPS Beacon',
+ symbol=CONF.beacon_symbol,
+ )
+ try:
+ # Only send it once
+ pkt.retry_count = 1
+ send(pkt, direct=True)
+ except Exception as e:
+ LOG.error(f'Failed to send beacon: {e}')
+ APRSDClient().reset()
+ if self.wait(timeout=5):
+ return False
- self._loop_cnt += 1
- time.sleep(1)
+ self.wait()
return True
diff --git a/docs/superpowers/plans/2026-03-24-daemon-threads-event-refactor.md b/docs/superpowers/plans/2026-03-24-daemon-threads-event-refactor.md
new file mode 100644
index 0000000..cefb319
--- /dev/null
+++ b/docs/superpowers/plans/2026-03-24-daemon-threads-event-refactor.md
@@ -0,0 +1,1355 @@
+# Daemon Threads and Event-Based Timing Implementation Plan
+
+> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking.
+
+**Goal:** Refactor all APRSD thread classes to use daemon threads and `threading.Event()` for interruptible waits and cleaner periodic timing.
+
+**Architecture:** Modify `APRSDThread` base class to add `daemon` and `period` class attributes, replace boolean `thread_stop` with `threading.Event`, add `wait()` method. Update all 15 subclasses to use new pattern.
+
+**Tech Stack:** Python threading, threading.Event, existing APRSD thread infrastructure
+
+---
+
+## File Structure
+
+| File | Action | Responsibility |
+|------|--------|----------------|
+| `aprsd/threads/aprsd.py` | Modify | Base class + ThreadList changes |
+| `aprsd/threads/keepalive.py` | Modify | KeepAliveThread period migration |
+| `aprsd/threads/stats.py` | Modify | 3 stats threads, APRSDStatsStoreThread non-daemon |
+| `aprsd/threads/rx.py` | Modify | 4 RX threads with queue-based waits |
+| `aprsd/threads/tx.py` | Modify | 5 TX threads, 2 schedulers non-daemon |
+| `aprsd/threads/registry.py` | Modify | APRSRegistryThread period from config |
+| `aprsd/cmds/listen.py` | Modify | APRSDListenProcessThread (inherits from APRSDFilterThread) |
+| `aprsd/main.py` | Modify | Signal handler update |
+| `tests/threads/test_aprsd_thread.py` | Modify | Update existing tests for Event-based API |
+
+---
+
+## Chunk 1: Base Class Refactor
+
+### Task 1: Update APRSDThread base class
+
+**Files:**
+- Modify: `aprsd/threads/aprsd.py:13-76`
+- Test: `tests/threads/test_aprsd_thread.py`
+
+- [ ] **Step 1: Write failing test for daemon attribute**
+
+**Note:** The existing test file already has a `TestThread` fixture class with a `should_loop` parameter. Tests in this plan use that existing fixture.
+
+Add to `tests/threads/test_aprsd_thread.py`:
+
+```python
+def test_daemon_attribute_default(self):
+ """Test that daemon attribute defaults to True."""
+ thread = TestThread('DaemonTest')
+ self.assertTrue(thread.daemon)
+
+
+def test_daemon_attribute_override(self):
+ """Test that daemon attribute can be overridden via class attribute."""
+ class NonDaemonThread(APRSDThread):
+ daemon = False
+ def loop(self):
+ return False
+
+ thread = NonDaemonThread('NonDaemonTest')
+ self.assertFalse(thread.daemon)
+```
+
+- [ ] **Step 2: Run test to verify it fails**
+
+Run: `pytest tests/threads/test_aprsd_thread.py::TestAPRSDThread::test_daemon_attribute_default -v`
+Expected: FAIL with `AssertionError` (daemon is False by default currently)
+
+- [ ] **Step 3: Write failing test for period attribute**
+
+Add to `tests/threads/test_aprsd_thread.py`:
+
+```python
+def test_period_attribute_default(self):
+ """Test that period attribute defaults to 1."""
+ thread = TestThread('PeriodTest')
+ self.assertEqual(thread.period, 1)
+
+
+def test_period_attribute_override(self):
+ """Test that period attribute can be overridden via class attribute."""
+ class LongPeriodThread(APRSDThread):
+ period = 60
+ def loop(self):
+ return False
+
+ thread = LongPeriodThread('LongPeriodTest')
+ self.assertEqual(thread.period, 60)
+```
+
+- [ ] **Step 4: Run test to verify it fails**
+
+Run: `pytest tests/threads/test_aprsd_thread.py::TestAPRSDThread::test_period_attribute_default -v`
+Expected: FAIL with `AttributeError: 'TestThread' object has no attribute 'period'`
+
+- [ ] **Step 5: Write failing test for _shutdown_event and wait()**
+
+Add to `tests/threads/test_aprsd_thread.py`:
+
+```python
+def test_shutdown_event_exists(self):
+ """Test that _shutdown_event is created."""
+ thread = TestThread('EventTest')
+ self.assertIsInstance(thread._shutdown_event, threading.Event)
+ self.assertFalse(thread._shutdown_event.is_set())
+
+
+def test_wait_returns_false_on_timeout(self):
+ """Test that wait() returns False when timeout expires."""
+ thread = TestThread('WaitTimeoutTest')
+ start = time.time()
+ result = thread.wait(timeout=0.1)
+ elapsed = time.time() - start
+ self.assertFalse(result)
+ self.assertGreaterEqual(elapsed, 0.1)
+
+
+def test_wait_returns_true_when_stopped(self):
+ """Test that wait() returns True immediately when stop() was called."""
+ thread = TestThread('WaitStopTest')
+ thread.stop()
+ start = time.time()
+ result = thread.wait(timeout=10) # Would timeout in 10s if not stopped
+ elapsed = time.time() - start
+ self.assertTrue(result)
+ self.assertLess(elapsed, 1) # Should return immediately
+
+
+def test_wait_uses_period_by_default(self):
+ """Test that wait() uses self.period when no timeout specified."""
+ class ShortPeriodThread(APRSDThread):
+ period = 0.1
+ def loop(self):
+ return False
+
+ thread = ShortPeriodThread('ShortPeriodTest')
+ start = time.time()
+ result = thread.wait()
+ elapsed = time.time() - start
+ self.assertFalse(result)
+ self.assertGreaterEqual(elapsed, 0.1)
+ self.assertLess(elapsed, 0.5)
+```
+
+- [ ] **Step 6: Run tests to verify they fail**
+
+Run: `pytest tests/threads/test_aprsd_thread.py -k "shutdown_event or wait_" -v`
+Expected: FAIL with `AttributeError`
+
+- [ ] **Step 7: Implement base class changes**
+
+Update `aprsd/threads/aprsd.py`:
+
+```python
+import abc
+import datetime
+import logging
+import threading
+import time
+from typing import List
+
+import wrapt
+
+LOG = logging.getLogger('APRSD')
+
+
+class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
+ """Base class for all threads in APRSD."""
+
+ # Class attributes - subclasses override as needed
+ daemon = True # Most threads are daemon threads
+ period = 1 # Default wait period in seconds
+ loop_count = 1
+ _pause = False
+
+ def __init__(self, name):
+ super().__init__(name=name)
+ # Set daemon from class attribute
+ self.daemon = self.__class__.daemon
+ # Set period from class attribute (can be overridden in __init__)
+ self.period = self.__class__.period
+ self._shutdown_event = threading.Event()
+ self.loop_count = 0
+ APRSDThreadList().add(self)
+ self._last_loop = datetime.datetime.now()
+
+ def _should_quit(self):
+ """Check if thread should exit."""
+ return self._shutdown_event.is_set()
+
+ def pause(self):
+ """Logically pause the processing of the main loop."""
+ LOG.debug(f"Pausing thread '{self.name}' loop_count {self.loop_count}")
+ self._pause = True
+
+ def unpause(self):
+ """Logically resume processing of the main loop."""
+ LOG.debug(f"Resuming thread '{self.name}' loop_count {self.loop_count}")
+ self._pause = False
+
+ def stop(self):
+ """Signal thread to stop. Returns immediately."""
+ LOG.debug(f"Stopping thread '{self.name}'")
+ self._shutdown_event.set()
+
+ def wait(self, timeout: float | None = None) -> bool:
+ """Wait for shutdown signal or timeout.
+
+ Args:
+ timeout: Seconds to wait. Defaults to self.period.
+
+ Returns:
+ True if shutdown was signaled, False if timeout expired.
+ """
+ wait_time = timeout if timeout is not None else self.period
+ return self._shutdown_event.wait(timeout=wait_time)
+
+ @abc.abstractmethod
+ def loop(self):
+ pass
+
+ def _cleanup(self):
+ """Add code to subclass to do any cleanup"""
+
+ def __str__(self):
+ out = (
+ f'Thread <{self.__class__.__name__}({self.name}) Alive? {self.is_alive()}>'
+ )
+ return out
+
+ def loop_age(self):
+ """How old is the last loop call?"""
+ return datetime.datetime.now() - self._last_loop
+
+ def run(self):
+ LOG.debug('Starting')
+ while not self._should_quit():
+ if self._pause:
+ self.wait(timeout=1)
+ else:
+ self.loop_count += 1
+ can_loop = self.loop()
+ self._last_loop = datetime.datetime.now()
+ if not can_loop:
+ self.stop()
+ self._cleanup()
+ APRSDThreadList().remove(self)
+ LOG.debug('Exiting')
+```
+
+- [ ] **Step 8: Run base class tests to verify they pass**
+
+Run: `pytest tests/threads/test_aprsd_thread.py::TestAPRSDThread -v`
+Expected: PASS
+
+- [ ] **Step 9: Update existing tests that reference thread_stop**
+
+Update `tests/threads/test_aprsd_thread.py` - change references from `thread_stop` to `_shutdown_event.is_set()`:
+
+```python
+def test_init(self):
+ """Test thread initialization."""
+ thread = TestThread('TestThread1')
+ self.assertEqual(thread.name, 'TestThread1')
+ self.assertFalse(thread._shutdown_event.is_set())
+ self.assertFalse(thread._pause)
+ # Note: loop_count starts at 0 now (was 1)
+ self.assertEqual(thread.loop_count, 0)
+
+ # Should be registered in thread list
+ thread_list = APRSDThreadList()
+ self.assertIn(thread, thread_list.threads_list)
+
+
+def test_should_quit(self):
+ """Test _should_quit() method."""
+ thread = TestThread('TestThread2')
+ self.assertFalse(thread._should_quit())
+
+ thread._shutdown_event.set()
+ self.assertTrue(thread._should_quit())
+
+
+def test_stop(self):
+ """Test stop() method."""
+ thread = TestThread('TestThread4')
+ self.assertFalse(thread._shutdown_event.is_set())
+
+ thread.stop()
+ self.assertTrue(thread._shutdown_event.is_set())
+
+
+def test_stop_all(self):
+ """Test stop_all() method."""
+ thread_list = APRSDThreadList()
+ thread1 = TestThread('TestThread8')
+ thread2 = TestThread('TestThread9')
+ thread_list.add(thread1)
+ thread_list.add(thread2)
+
+ thread_list.stop_all()
+ self.assertTrue(thread1._shutdown_event.is_set())
+ self.assertTrue(thread2._shutdown_event.is_set())
+```
+
+- [ ] **Step 10: Run all base class tests**
+
+Run: `pytest tests/threads/test_aprsd_thread.py -v`
+Expected: PASS
+
+- [ ] **Step 11: Commit base class changes**
+
+```bash
+git add aprsd/threads/aprsd.py tests/threads/test_aprsd_thread.py
+git commit -m "refactor(threads): add daemon, period, Event-based shutdown to APRSDThread
+
+- Add daemon=True class attribute (subclasses override to False)
+- Add period=1 class attribute for wait interval
+- Replace thread_stop boolean with _shutdown_event (threading.Event)
+- Add wait() method for interruptible sleeps
+- Update tests for new Event-based API
+
+BREAKING: thread_stop boolean replaced with _shutdown_event.
+Code checking thread.thread_stop directly must use thread._shutdown_event.is_set()"
+```
+
+---
+
+### Task 2: Add join_non_daemon to APRSDThreadList
+
+**Files:**
+- Modify: `aprsd/threads/aprsd.py:79-169`
+- Test: `tests/threads/test_aprsd_thread.py`
+
+- [ ] **Step 1: Write failing test for join_non_daemon**
+
+Add to `tests/threads/test_aprsd_thread.py`:
+
+```python
+def test_join_non_daemon(self):
+ """Test join_non_daemon() waits for non-daemon threads."""
+ class NonDaemonTestThread(APRSDThread):
+ daemon = False
+ def __init__(self, name):
+ super().__init__(name)
+ self.finished = False
+
+ def loop(self):
+ time.sleep(0.2)
+ self.finished = True
+ return False
+
+ thread_list = APRSDThreadList()
+ thread = NonDaemonTestThread('NonDaemonJoinTest')
+ thread_list.add(thread)
+ thread.start()
+
+ # Stop triggers the event, thread should finish its loop then exit
+ thread.stop()
+ thread_list.join_non_daemon(timeout=5.0)
+
+ self.assertTrue(thread.finished or not thread.is_alive())
+
+
+def test_join_non_daemon_skips_daemon_threads(self):
+ """Test join_non_daemon() does not wait for daemon threads."""
+ thread_list = APRSDThreadList()
+ # Clear existing threads
+ thread_list.threads_list = []
+
+ # Create a daemon thread that loops forever
+ thread = TestThread('DaemonSkipTest', should_loop=True)
+ thread_list.add(thread)
+ thread.start()
+
+ # This should return quickly since it's a daemon thread
+ start = time.time()
+ thread_list.join_non_daemon(timeout=0.1)
+ elapsed = time.time() - start
+
+ self.assertLess(elapsed, 0.5) # Should not wait for daemon
+
+ # Cleanup
+ thread.stop()
+ thread.join(timeout=1)
+```
+
+- [ ] **Step 2: Run test to verify it fails**
+
+Run: `pytest tests/threads/test_aprsd_thread.py::TestAPRSDThreadList::test_join_non_daemon -v`
+Expected: FAIL with `AttributeError: 'APRSDThreadList' object has no attribute 'join_non_daemon'`
+
+- [ ] **Step 3: Implement join_non_daemon method**
+
+Add to `APRSDThreadList` class in `aprsd/threads/aprsd.py`:
+
+```python
+@wrapt.synchronized(lock)
+def join_non_daemon(self, timeout: float = 5.0):
+ """Wait for non-daemon threads to complete gracefully.
+
+ Args:
+ timeout: Maximum seconds to wait per thread.
+ """
+ for th in self.threads_list:
+ if not th.daemon and th.is_alive():
+ LOG.info(f'Waiting for non-daemon thread {th.name} to finish')
+ th.join(timeout=timeout)
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest tests/threads/test_aprsd_thread.py::TestAPRSDThreadList::test_join_non_daemon -v`
+Expected: PASS
+
+- [ ] **Step 5: Commit ThreadList changes**
+
+```bash
+git add aprsd/threads/aprsd.py tests/threads/test_aprsd_thread.py
+git commit -m "feat(threads): add join_non_daemon() to APRSDThreadList
+
+Allows graceful shutdown by waiting for non-daemon threads to complete
+while allowing daemon threads to be terminated immediately."
+```
+
+---
+
+## Chunk 2: Stats and Keepalive Thread Migration
+
+### Task 3: Migrate KeepAliveThread
+
+**Files:**
+- Modify: `aprsd/threads/keepalive.py`
+
+- [ ] **Step 1: Update KeepAliveThread to use period**
+
+Replace the loop method in `aprsd/threads/keepalive.py`:
+
+```python
+class KeepAliveThread(APRSDThread):
+ cntr = 0
+ checker_time = datetime.datetime.now()
+ period = 60 # Run keepalive every 60 seconds
+
+ def __init__(self):
+ tracemalloc.start()
+ super().__init__('KeepAlive')
+ max_timeout = {'hours': 0.0, 'minutes': 2, 'seconds': 0}
+ self.max_delta = datetime.timedelta(**max_timeout)
+
+ def loop(self):
+ stats_json = collector.Collector().collect()
+ pl = packets.PacketList()
+ thread_list = APRSDThreadList()
+ now = datetime.datetime.now()
+
+ if (
+ 'APRSClientStats' in stats_json
+ and stats_json['APRSClientStats'].get('transport') == 'aprsis'
+ ):
+ if stats_json['APRSClientStats'].get('server_keepalive'):
+ last_msg_time = utils.strfdelta(
+ now - stats_json['APRSClientStats']['server_keepalive']
+ )
+ else:
+ last_msg_time = 'N/A'
+ else:
+ last_msg_time = 'N/A'
+
+ tracked_packets = stats_json['PacketTrack']['total_tracked']
+ tx_msg = 0
+ rx_msg = 0
+ if 'PacketList' in stats_json:
+ msg_packets = stats_json['PacketList'].get('MessagePacket')
+ if msg_packets:
+ tx_msg = msg_packets.get('tx', 0)
+ rx_msg = msg_packets.get('rx', 0)
+
+ keepalive = (
+ '{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} '
+ 'Last:{} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}'
+ ).format(
+ stats_json['APRSDStats']['callsign'],
+ stats_json['APRSDStats']['uptime'],
+ pl.total_rx(),
+ pl.total_tx(),
+ tracked_packets,
+ tx_msg,
+ rx_msg,
+ last_msg_time,
+ stats_json['APRSDStats']['memory_current_str'],
+ stats_json['APRSDStats']['memory_peak_str'],
+ len(thread_list),
+ aprsd_log.logging_queue.qsize(),
+ )
+ LOG.info(keepalive)
+ if 'APRSDThreadList' in stats_json:
+ thread_list = stats_json['APRSDThreadList']
+ for thread_name in thread_list:
+ thread = thread_list[thread_name]
+ alive = thread['alive']
+ age = thread['age']
+ key = thread['name']
+ if not alive:
+ LOG.error(f'Thread {thread}')
+
+ thread_hex = f'fg {utils.hex_from_name(key)}'
+ t_name = f'<{thread_hex}>{key:<15}{thread_hex}>'
+ thread_msg = f'{t_name} Alive? {str(alive): <5} {str(age): <20}'
+ LOGU.opt(colors=True).info(thread_msg)
+
+ # Go through the registered keepalive collectors
+ # and check them as well as call log.
+ collect = keepalive_collector.KeepAliveCollector()
+ collect.check()
+ collect.log()
+
+ # Check version every day
+ delta = now - self.checker_time
+ if delta > datetime.timedelta(hours=24):
+ self.checker_time = now
+ level, msg = utils._check_version()
+ if level:
+ LOG.warning(msg)
+ self.cntr += 1
+
+ self.wait() # Wait for period (60s) or shutdown signal
+ return True
+```
+
+- [ ] **Step 2: Run existing tests**
+
+Run: `pytest tests/ -k keepalive -v`
+Expected: PASS (or skip if no specific keepalive tests)
+
+- [ ] **Step 3: Commit KeepAliveThread changes**
+
+```bash
+git add aprsd/threads/keepalive.py
+git commit -m "refactor(threads): migrate KeepAliveThread to Event-based timing
+
+- Set period=60 class attribute
+- Remove counter-based conditional (loop_count % 60)
+- Replace time.sleep(1) with self.wait()"
+```
+
+---
+
+### Task 4: Migrate Stats Threads
+
+**Files:**
+- Modify: `aprsd/threads/stats.py`
+
+- [ ] **Step 1: Update APRSDStatsStoreThread (non-daemon)**
+
+```python
+class APRSDStatsStoreThread(APRSDThread):
+ """Save APRSD Stats to disk periodically."""
+
+ daemon = False # Non-daemon for graceful disk writes
+ period = 10 # Save every 10 seconds
+
+ def __init__(self):
+ super().__init__('StatsStore')
+
+ def loop(self):
+ stats = collector.Collector().collect()
+ ss = StatsStore()
+ ss.add(stats)
+ ss.save()
+
+ self.wait() # Wait for period (10s) or shutdown signal
+ return True
+```
+
+- [ ] **Step 2: Update APRSDPushStatsThread**
+
+```python
+class APRSDPushStatsThread(APRSDThread):
+ """Push the local stats to a remote API."""
+
+ def __init__(
+ self, push_url=None, frequency_seconds=None, send_packetlist: bool = False
+ ):
+ super().__init__('PushStats')
+ self.push_url = push_url if push_url else CONF.push_stats.push_url
+ # Set period from config
+ self.period = (
+ frequency_seconds
+ if frequency_seconds
+ else CONF.push_stats.frequency_seconds
+ )
+ self.send_packetlist = send_packetlist
+
+ def loop(self):
+ stats_json = collector.Collector().collect(serializable=True)
+ url = f'{self.push_url}/stats'
+ headers = {'Content-Type': 'application/json'}
+ # Remove the PacketList section to reduce payload size
+ if not self.send_packetlist:
+ if 'PacketList' in stats_json:
+ del stats_json['PacketList']['packets']
+
+ now = datetime.datetime.now()
+ time_format = '%m-%d-%Y %H:%M:%S'
+ stats = {
+ 'time': now.strftime(time_format),
+ 'stats': stats_json,
+ }
+
+ try:
+ response = requests.post(url, json=stats, headers=headers, timeout=5)
+ response.raise_for_status()
+
+ if response.status_code == 200:
+ LOGU.info(f'Successfully pushed stats to {self.push_url}')
+ else:
+ LOGU.warning(
+ f'Failed to push stats to {self.push_url}: HTTP {response.status_code}'
+ )
+
+ except requests.exceptions.RequestException as e:
+ LOGU.error(f'Error pushing stats to {self.push_url}: {e}')
+ except Exception as e:
+ LOGU.error(f'Unexpected error in stats push: {e}')
+
+ self.wait() # Wait for period or shutdown signal
+ return True
+```
+
+- [ ] **Step 3: Update StatsLogThread**
+
+```python
+class StatsLogThread(APRSDThread):
+ """Log the stats from the PacketList."""
+
+ period = 10 # Log every 10 seconds
+
+ def __init__(self):
+ super().__init__('PacketStatsLog')
+ self._last_total_rx = 0
+ self.start_time = time.time()
+
+ def loop(self):
+ # log the stats every period seconds
+ stats_json = collector.Collector().collect(serializable=True)
+ stats = stats_json['PacketList']
+ total_rx = stats['rx']
+ rx_delta = total_rx - self._last_total_rx
+ rate = rx_delta / self.period
+
+ # Get unique callsigns count from SeenList stats
+ seen_list_instance = seen_list.SeenList()
+ seen_list_stats = seen_list_instance.stats()
+ seen_list_instance.save()
+ seen_list_stats = seen_list_stats.copy()
+ unique_callsigns_count = len(seen_list_stats)
+
+ # Calculate uptime
+ elapsed = time.time() - self.start_time
+ elapsed_minutes = elapsed / 60
+ elapsed_hours = elapsed / 3600
+
+ # Log summary stats
+ LOGU.opt(colors=True).info(
+ f'RX Rate: {rate:.2f} pps '
+ f'Total RX: {total_rx} '
+ f'RX Last {self.period} secs: {rx_delta} '
+ )
+ LOGU.opt(colors=True).info(
+ f'Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h) '
+ f'Unique Callsigns: {unique_callsigns_count}',
+ )
+ self._last_total_rx = total_rx
+
+ # Log individual type stats, sorted by RX count (descending)
+ sorted_types = sorted(
+ stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True
+ )
+ for k, v in sorted_types:
+ percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0
+ packet_type_str = f'{k:<15}'
+ rx_count_str = f'{v["rx"]:6d}'
+ tx_count_str = f'{v["tx"]:6d}'
+ percentage_str = f'{percentage:5.1f}%'
+ rx_color_tag = (
+ 'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red'
+ )
+ LOGU.opt(colors=True).info(
+ f' {packet_type_str}: '
+ f'<{rx_color_tag}>RX: {rx_count_str}{rx_color_tag}> '
+ f'TX: {tx_count_str} '
+ f'({percentage_str})',
+ )
+
+ # Extract callsign counts from seen_list stats
+ callsign_counts = {}
+ for callsign, data in seen_list_stats.items():
+ if isinstance(data, dict) and 'count' in data:
+ callsign_counts[callsign] = data['count']
+
+ # Sort callsigns by packet count (descending) and get top 10
+ sorted_callsigns = sorted(
+ callsign_counts.items(), key=lambda x: x[1], reverse=True
+ )[:10]
+
+ # Log top 10 callsigns
+ if sorted_callsigns:
+ LOGU.opt(colors=True).info(
+ 'Top 10 Callsigns by Packet Count:'
+ )
+ total_ranks = len(sorted_callsigns)
+ for rank, (callsign, count) in enumerate(sorted_callsigns, 1):
+ percentage = (count / total_rx * 100) if total_rx > 0 else 0.0
+ if rank == 1:
+ count_color_tag = 'red'
+ elif rank == total_ranks:
+ count_color_tag = 'green'
+ else:
+ count_color_tag = 'yellow'
+ LOGU.opt(colors=True).info(
+ f' {rank:2d}. '
+ f'{callsign:<12}: '
+ f'<{count_color_tag}>{count:6d} packets{count_color_tag}> '
+ f'({percentage:5.1f}%)',
+ )
+
+ self.wait() # Wait for period (10s) or shutdown signal
+ return True
+```
+
+- [ ] **Step 4: Run tests**
+
+Run: `pytest tests/ -v`
+Expected: PASS
+
+- [ ] **Step 5: Commit stats thread changes**
+
+```bash
+git add aprsd/threads/stats.py
+git commit -m "refactor(threads): migrate stats threads to Event-based timing
+
+- APRSDStatsStoreThread: daemon=False, period=10
+- APRSDPushStatsThread: period from config
+- StatsLogThread: period=10
+- Remove counter-based conditionals
+- Replace time.sleep(1) with self.wait()"
+```
+
+---
+
+## Chunk 3: RX Thread Migration
+
+### Task 5: Migrate RX Threads
+
+**Files:**
+- Modify: `aprsd/threads/rx.py`
+
+- [ ] **Step 1: Update APRSDRXThread**
+
+```python
+class APRSDRXThread(APRSDThread):
+ """Thread to receive packets from the APRS Client."""
+
+ _client = None
+ packet_queue = None
+ pkt_count = 0
+
+ def __init__(self, packet_queue: queue.Queue):
+ super().__init__('RX_PKT')
+ self.packet_queue = packet_queue
+
+ def stop(self):
+ self._shutdown_event.set()
+ if self._client:
+ self._client.close()
+
+ def loop(self):
+ if not self._client:
+ self._client = APRSDClient()
+ self.wait(timeout=1)
+ return True
+
+ if not self._client.is_alive:
+ self._client = APRSDClient()
+ self.wait(timeout=1)
+ return True
+
+ try:
+ self._client.consumer(
+ self.process_packet,
+ raw=True,
+ )
+ except (
+ aprslib.exceptions.ConnectionDrop,
+ aprslib.exceptions.ConnectionError,
+ ):
+ LOG.error('Connection dropped, reconnecting')
+ self._client.reset()
+ if self.wait(timeout=5): # Interruptible 5s wait
+ return False # Shutdown signaled
+ except Exception as ex:
+ LOG.exception(ex)
+ LOG.error('Resetting connection and trying again.')
+ self._client.reset()
+ if self.wait(timeout=5): # Interruptible 5s wait
+ return False # Shutdown signaled
+ return True
+
+ def process_packet(self, *args, **kwargs):
+ """Put the raw packet on the queue."""
+ if args:
+ data = args[0]
+ elif 'raw' in kwargs:
+ data = kwargs['raw']
+ elif 'frame' in kwargs:
+ data = kwargs['frame']
+ elif 'packet' in kwargs:
+ data = kwargs['packet']
+ else:
+ LOG.warning('No frame received to process?!?!')
+ return
+
+ self.pkt_count += 1
+ self.packet_queue.put(data)
+```
+
+- [ ] **Step 2: Update APRSDFilterThread (queue-based wait)**
+
+```python
+class APRSDFilterThread(APRSDThread):
+ """Thread to filter packets on the packet queue."""
+
+ def __init__(self, thread_name: str, packet_queue: queue.Queue):
+ super().__init__(thread_name)
+ self.packet_queue = packet_queue
+ self.packet_count = 0
+ self._client = APRSDClient()
+
+ def filter_packet(self, packet: type[core.Packet]) -> type[core.Packet] | None:
+ if not filter.PacketFilter().filter(packet):
+ return None
+ return packet
+
+ def print_packet(self, packet):
+ packet_log.log(packet, packet_count=self.packet_count)
+
+ def loop(self):
+ try:
+ # Queue timeout serves as interruptible wait
+ pkt = self.packet_queue.get(timeout=self.period)
+ self.packet_count += 1
+ packet = self._client.decode_packet(pkt)
+ if not packet:
+ LOG.debug(f'Packet failed to parse. "{pkt}"')
+ return True
+ self.print_packet(packet)
+ if packet:
+ if self.filter_packet(packet):
+ collector.PacketCollector().rx(packet)
+ self.process_packet(packet)
+ except queue.Empty:
+ pass # Normal timeout, loop will check _should_quit
+ return True
+```
+
+Note: `APRSDProcessPacketThread` and `APRSDPluginProcessPacketThread` extend `APRSDFilterThread` and inherit the queue-based wait pattern. They don't need changes to their `loop()` method as they override `process_packet()` and `process_our_message_packet()`.
+
+- [ ] **Step 3: Verify APRSDListenProcessThread (listen.py) inherits changes**
+
+`APRSDListenProcessThread` in `aprsd/cmds/listen.py` extends `APRSDFilterThread`. It inherits the queue-based wait pattern automatically. No code changes needed in `listen.py` - verify the class still works by inspection:
+
+```python
+# aprsd/cmds/listen.py - NO CHANGES NEEDED
+# APRSDListenProcessThread extends APRSDFilterThread which now uses:
+# queue.get(timeout=self.period) for interruptible wait
+# The daemon=True default is inherited from base APRSDThread
+```
+
+- [ ] **Step 4: Run tests**
+
+Run: `pytest tests/ -v`
+Expected: PASS
+
+- [ ] **Step 5: Commit RX thread changes**
+
+```bash
+git add aprsd/threads/rx.py
+git commit -m "refactor(threads): migrate RX threads to Event-based timing
+
+- APRSDRXThread: use wait(timeout=5) for error recovery
+- APRSDFilterThread: use queue.get(timeout=period) as interruptible wait
+- APRSDListenProcessThread inherits changes from APRSDFilterThread
+- Remove time.sleep() calls"
+```
+
+---
+
+## Chunk 4: TX and Registry Thread Migration
+
+### Task 6: Migrate TX Threads
+
+**Files:**
+- Modify: `aprsd/threads/tx.py`
+
+- [ ] **Step 1: Update PacketSendSchedulerThread (non-daemon)**
+
+```python
+class PacketSendSchedulerThread(aprsd_threads.APRSDThread):
+ """Scheduler thread that uses a threadpool to send packets."""
+
+ daemon = False # Non-daemon for graceful packet handling
+
+ def __init__(self, max_workers=5):
+ super().__init__('PacketSendSchedulerThread')
+ self.executor = ThreadPoolExecutor(
+ max_workers=max_workers, thread_name_prefix='PacketSendWorker'
+ )
+ self.max_workers = max_workers
+
+ def loop(self):
+ """Check all tracked packets and submit send tasks to threadpool."""
+ pkt_tracker = tracker.PacketTrack()
+
+ for msg_no in list(pkt_tracker.keys()):
+ packet = pkt_tracker.get(msg_no)
+ if not packet:
+ continue
+
+ if isinstance(packet, core.AckPacket):
+ continue
+
+ if packet.send_count >= packet.retry_count:
+ continue
+
+ self.executor.submit(_send_packet_worker, msg_no)
+
+ self.wait() # Wait for period (1s) or shutdown signal
+ return True
+
+ def _cleanup(self):
+ """Cleanup threadpool executor on thread shutdown."""
+ LOG.debug('Shutting down PacketSendSchedulerThread executor')
+ self.executor.shutdown(wait=True)
+```
+
+- [ ] **Step 2: Update AckSendSchedulerThread (non-daemon)**
+
+```python
+class AckSendSchedulerThread(aprsd_threads.APRSDThread):
+ """Scheduler thread that uses a threadpool to send ack packets."""
+
+ daemon = False # Non-daemon for graceful ACK handling
+
+ def __init__(self, max_workers=3):
+ super().__init__('AckSendSchedulerThread')
+ self.executor = ThreadPoolExecutor(
+ max_workers=max_workers, thread_name_prefix='AckSendWorker'
+ )
+ self.max_workers = max_workers
+ self.max_retries = CONF.default_ack_send_count
+
+ def loop(self):
+ """Check all tracked ack packets and submit send tasks to threadpool."""
+ pkt_tracker = tracker.PacketTrack()
+
+ for msg_no in list(pkt_tracker.keys()):
+ packet = pkt_tracker.get(msg_no)
+ if not packet:
+ continue
+
+ if not isinstance(packet, core.AckPacket):
+ continue
+
+ if packet.send_count >= self.max_retries:
+ continue
+
+ self.executor.submit(_send_ack_worker, msg_no, self.max_retries)
+
+ self.wait() # Wait for period (1s) or shutdown signal
+ return True
+
+ def _cleanup(self):
+ """Cleanup threadpool executor on thread shutdown."""
+ LOG.debug('Shutting down AckSendSchedulerThread executor')
+ self.executor.shutdown(wait=True)
+```
+
+- [ ] **Step 3: Update SendPacketThread**
+
+```python
+class SendPacketThread(aprsd_threads.APRSDThread):
+
+ def __init__(self, packet):
+ self.packet = packet
+ super().__init__(f'TX-{packet.to_call}-{self.packet.msgNo}')
+
+ def loop(self):
+ """Loop until a message is acked or max retries reached."""
+ pkt_tracker = tracker.PacketTrack()
+ packet = pkt_tracker.get(self.packet.msgNo)
+ if not packet:
+ LOG.info(
+ f'{self.packet.__class__.__name__}'
+ f'({self.packet.msgNo}) '
+ 'Message Send Complete via Ack.',
+ )
+ return False
+
+ send_now = False
+ if packet.send_count >= packet.retry_count:
+ LOG.info(
+ f'{packet.__class__.__name__} '
+ f'({packet.msgNo}) '
+ 'Message Send Complete. Max attempts reached'
+ f' {packet.retry_count}',
+ )
+ pkt_tracker.remove(packet.msgNo)
+ return False
+
+ if packet.last_send_time:
+ now = int(round(time.time()))
+ sleeptime = (packet.send_count + 1) * 31
+ delta = now - packet.last_send_time
+ if delta > sleeptime:
+ send_now = True
+ else:
+ send_now = True
+
+ if send_now:
+ packet.last_send_time = int(round(time.time()))
+ sent = False
+ try:
+ sent = _send_direct(packet)
+ except Exception as ex:
+ LOG.error(f'Failed to send packet: {packet}')
+ LOG.error(ex)
+ else:
+ if sent:
+ packet.send_count += 1
+
+ self.wait() # Wait for period (1s) or shutdown signal
+ return True
+```
+
+- [ ] **Step 4: Update SendAckThread**
+
+```python
+class SendAckThread(aprsd_threads.APRSDThread):
+ max_retries = 3
+
+ def __init__(self, packet):
+ self.packet = packet
+ super().__init__(f'TXAck-{packet.to_call}-{self.packet.msgNo}')
+ self.max_retries = CONF.default_ack_send_count
+
+ def loop(self):
+ """Separate thread to send acks with retries."""
+ send_now = False
+ if self.packet.send_count == self.max_retries:
+ LOG.debug(
+ f'{self.packet.__class__.__name__}'
+ f'({self.packet.msgNo}) '
+ 'Send Complete. Max attempts reached'
+ f' {self.max_retries}',
+ )
+ return False
+
+ if self.packet.last_send_time:
+ now = int(round(time.time()))
+ sleep_time = 31
+ delta = now - self.packet.last_send_time
+ if delta > sleep_time:
+ send_now = True
+ else:
+ send_now = True
+
+ if send_now:
+ sent = False
+ try:
+ sent = _send_direct(self.packet)
+ except Exception:
+ LOG.error(f'Failed to send packet: {self.packet}')
+ else:
+ if sent:
+ self.packet.send_count += 1
+
+ self.packet.last_send_time = int(round(time.time()))
+
+ self.wait() # Wait for period (1s) or shutdown signal
+ return True
+```
+
+- [ ] **Step 5: Update BeaconSendThread**
+
+```python
+class BeaconSendThread(aprsd_threads.APRSDThread):
+ """Thread that sends a GPS beacon packet periodically."""
+
+ def __init__(self):
+ super().__init__('BeaconSendThread')
+ # Set period from config
+ self.period = CONF.beacon_interval
+ if not CONF.latitude or not CONF.longitude:
+ LOG.error(
+ 'Latitude and Longitude are not set in the config file.'
+ 'Beacon will not be sent and thread is STOPPED.',
+ )
+ self.stop()
+ LOG.info(
+ 'Beacon thread is running and will send '
+ f'beacons every {CONF.beacon_interval} seconds.',
+ )
+
+ def loop(self):
+ pkt = core.BeaconPacket(
+ from_call=CONF.callsign,
+ to_call='APRS',
+ latitude=float(CONF.latitude),
+ longitude=float(CONF.longitude),
+ comment='APRSD GPS Beacon',
+ symbol=CONF.beacon_symbol,
+ )
+ try:
+ pkt.retry_count = 1
+ send(pkt, direct=True)
+ except Exception as e:
+ LOG.error(f'Failed to send beacon: {e}')
+ APRSDClient().reset()
+ if self.wait(timeout=5): # Interruptible error recovery wait
+ return False
+
+ self.wait() # Wait for beacon_interval or shutdown signal
+ return True
+```
+
+- [ ] **Step 6: Run tests**
+
+Run: `pytest tests/ -v`
+Expected: PASS
+
+- [ ] **Step 7: Commit TX thread changes**
+
+```bash
+git add aprsd/threads/tx.py
+git commit -m "refactor(threads): migrate TX threads to Event-based timing
+
+- PacketSendSchedulerThread: daemon=False
+- AckSendSchedulerThread: daemon=False
+- SendPacketThread, SendAckThread, BeaconSendThread: daemon=True (default)
+- BeaconSendThread: period from CONF.beacon_interval
+- Replace time.sleep(1) with self.wait()
+- Remove counter-based timing (_loop_cnt)"
+```
+
+---
+
+### Task 7: Migrate APRSRegistryThread
+
+**Files:**
+- Modify: `aprsd/threads/registry.py`
+
+- [ ] **Step 1: Update APRSRegistryThread**
+
+```python
+class APRSRegistryThread(aprsd_threads.APRSDThread):
+ """This sends service information to the configured APRS Registry."""
+
+ def __init__(self):
+ super().__init__('APRSRegistryThread')
+ # Set period from config
+ self.period = CONF.aprs_registry.frequency_seconds
+ if not CONF.aprs_registry.enabled:
+ LOG.error('APRS Registry is not enabled.')
+ LOG.error('APRS Registry thread is STOPPING.')
+ self.stop()
+ LOG.info(
+ 'APRS Registry thread is running and will send '
+ f'info every {CONF.aprs_registry.frequency_seconds} seconds '
+ f'to {CONF.aprs_registry.registry_url}.',
+ )
+
+ def loop(self):
+ info = {
+ 'callsign': CONF.callsign,
+ 'owner_callsign': CONF.owner_callsign,
+ 'description': CONF.aprs_registry.description,
+ 'service_website': CONF.aprs_registry.service_website,
+ 'software': f'APRSD version {aprsd.__version__} '
+ 'https://github.com/craigerl/aprsd',
+ }
+ try:
+ requests.post(
+ f'{CONF.aprs_registry.registry_url}',
+ json=info,
+ )
+ except Exception as e:
+ LOG.error(f'Failed to send registry info: {e}')
+
+ self.wait() # Wait for frequency_seconds or shutdown signal
+ return True
+```
+
+- [ ] **Step 2: Run tests**
+
+Run: `pytest tests/ -v`
+Expected: PASS
+
+- [ ] **Step 3: Commit registry thread changes**
+
+```bash
+git add aprsd/threads/registry.py
+git commit -m "refactor(threads): migrate APRSRegistryThread to Event-based timing
+
+- Set period from CONF.aprs_registry.frequency_seconds
+- Remove counter-based timing (_loop_cnt)
+- Replace time.sleep(1) with self.wait()"
+```
+
+---
+
+## Chunk 5: Signal Handler and Final Cleanup
+
+### Task 8: Update Signal Handler
+
+**Files:**
+- Modify: `aprsd/main.py:76-97`
+
+- [ ] **Step 1: Update signal_handler function**
+
+```python
+def signal_handler(sig, frame):
+ click.echo('signal_handler: called')
+ collector.Collector().stop_all()
+ thread_list = threads.APRSDThreadList()
+ thread_list.stop_all()
+
+ if 'subprocess' not in str(frame):
+ LOG.info(
+ 'Ctrl+C, Sending all threads exit! {}'.format(
+ datetime.datetime.now(),
+ ),
+ )
+ # Wait for non-daemon threads to finish gracefully
+ thread_list.join_non_daemon(timeout=5.0)
+ try:
+ packets.PacketTrack().save()
+ packets.WatchList().save()
+ packets.SeenList().save()
+ packets.PacketList().save()
+ collector.Collector().collect()
+ except Exception as e:
+ LOG.error(f'Failed to save data: {e}')
+ sys.exit(0)
+```
+
+- [ ] **Step 2: Run full test suite**
+
+Run: `pytest tests/ -v`
+Expected: PASS
+
+- [ ] **Step 3: Commit signal handler changes**
+
+```bash
+git add aprsd/main.py
+git commit -m "refactor(main): update signal handler for Event-based thread shutdown
+
+- Replace time.sleep(1.5) with join_non_daemon(timeout=5.0)
+- Non-daemon threads get graceful shutdown
+- Daemon threads terminate immediately on exit"
+```
+
+---
+
+### Task 9: Remove unused imports
+
+**Files:**
+- Modify: `aprsd/threads/keepalive.py`
+- Modify: `aprsd/threads/stats.py`
+- Modify: `aprsd/threads/rx.py`
+- Modify: `aprsd/threads/tx.py`
+- Modify: `aprsd/threads/registry.py`
+
+- [ ] **Step 1: Remove `import time` where no longer needed**
+
+Check each file and remove `import time` if `time.sleep` is no longer used. Keep it if `time.time()` is still used.
+
+Files to check:
+- `keepalive.py`: Remove `import time` (no longer used)
+- `stats.py`: Keep `import time` (uses `time.time()`)
+- `rx.py`: Remove `import time` (no longer used)
+- `tx.py`: Keep `import time` (uses `time.time()`)
+- `registry.py`: Remove `import time` (no longer used)
+
+- [ ] **Step 2: Run linter**
+
+Run: `tox -e lint`
+Expected: PASS
+
+- [ ] **Step 3: Run full test suite**
+
+Run: `pytest tests/ -v`
+Expected: PASS
+
+- [ ] **Step 4: Commit cleanup**
+
+```bash
+git add aprsd/threads/keepalive.py aprsd/threads/rx.py aprsd/threads/registry.py
+git commit -m "chore(threads): remove unused time imports"
+```
+
+---
+
+### Task 10: Final Verification
+
+- [ ] **Step 1: Run full test suite with coverage**
+
+Run: `pytest tests/ --cov=aprsd -v`
+Expected: PASS with good coverage
+
+- [ ] **Step 2: Run type checking**
+
+Run: `tox -e type-check`
+Expected: PASS (or existing errors only)
+
+- [ ] **Step 3: Manual smoke test**
+
+Start the server briefly to verify threads start correctly:
+```bash
+aprsd server --help
+```
+
+- [ ] **Step 4: Create summary commit**
+
+If any fixes were needed during verification, commit them.
+
+---
+
+## Summary
+
+**Total Tasks:** 10
+**Total Steps:** ~45
+
+**Files Modified:**
+- `aprsd/threads/aprsd.py` (base class + ThreadList)
+- `aprsd/threads/keepalive.py`
+- `aprsd/threads/stats.py`
+- `aprsd/threads/rx.py`
+- `aprsd/threads/tx.py`
+- `aprsd/threads/registry.py`
+- `aprsd/main.py`
+- `tests/threads/test_aprsd_thread.py`
+
+**Key Changes:**
+1. Base class: `daemon`, `period` attributes, `_shutdown_event`, `wait()` method
+2. Non-daemon threads: `PacketSendSchedulerThread`, `AckSendSchedulerThread`, `APRSDStatsStoreThread`
+3. All counter-based timing replaced with period-based `wait()`
+4. Signal handler uses `join_non_daemon()` instead of `sleep(1.5)`
diff --git a/docs/superpowers/specs/2026-03-24-daemon-threads-event-refactor-design.md b/docs/superpowers/specs/2026-03-24-daemon-threads-event-refactor-design.md
new file mode 100644
index 0000000..4c3c2a1
--- /dev/null
+++ b/docs/superpowers/specs/2026-03-24-daemon-threads-event-refactor-design.md
@@ -0,0 +1,259 @@
+# Daemon Threads and Event-Based Timing Refactor
+
+**Date:** 2026-03-24
+**Status:** Draft
+**Scope:** All APRSD thread classes
+
+## Overview
+
+Refactor all APRSD thread classes to use daemon threads and replace counter-based sleep patterns with `threading.Event()` for interruptible waits and cleaner periodic timing.
+
+## Goals
+
+1. **Faster shutdown** — threads respond to shutdown signals immediately instead of waiting for `time.sleep()` to complete
+2. **Cleaner periodic timing** — replace counter-based timing (`loop_count % 60`) with explicit period-based waits
+3. **Proper daemon semantics** — most threads become daemon threads; critical I/O threads remain non-daemon for graceful shutdown
+
+## Current State
+
+- **14 thread classes** extend `APRSDThread` base class
+- All use `time.sleep(1)` with counter-based conditionals for periodic work
+- Shutdown via boolean `thread_stop` flag, polled in while loop
+- No threads set as daemon — all block program exit
+- No `threading.Event()` usage anywhere
+
+### Current Problems
+
+1. Shutdown delay: up to 1-5 seconds waiting for sleep to finish
+2. Counter math is fragile and obscures intent
+3. Non-daemon threads prevent clean program exit
+
+## Design
+
+### Base Class Changes
+
+File: `aprsd/threads/aprsd.py`
+
+```python
+class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
+ # Class attributes (subclasses override as needed)
+ daemon = True # Most threads are daemon
+ period = 1 # Default wait period in seconds
+
+ def __init__(self, name):
+ super().__init__(name=name)
+ self.daemon = self.__class__.daemon
+ self._shutdown_event = threading.Event()
+ self.loop_count = 0 # Retained for debugging
+ self._last_loop = datetime.datetime.now()
+ APRSDThreadList().add(self)
+
+ def _should_quit(self) -> bool:
+ """Check if thread should exit."""
+ return self._shutdown_event.is_set()
+
+ def stop(self):
+ """Signal thread to stop. Returns immediately."""
+ self._shutdown_event.set()
+
+ def wait(self, timeout: float | None = None) -> bool:
+ """Wait for shutdown signal or timeout.
+
+ Args:
+ timeout: Seconds to wait. Defaults to self.period.
+
+ Returns:
+ True if shutdown was signaled, False if timeout expired.
+ """
+ wait_time = timeout if timeout is not None else self.period
+ return self._shutdown_event.wait(timeout=wait_time)
+
+ def run(self):
+ while not self._should_quit():
+ self.loop_count += 1
+ self._last_loop = datetime.datetime.now()
+ if not self.loop():
+ break
+ APRSDThreadList().remove(self)
+```
+
+### Daemon vs Non-Daemon Threads
+
+**Non-daemon threads (3)** — require graceful shutdown for I/O operations:
+
+| Thread | File | Reason |
+|--------|------|--------|
+| `PacketSendSchedulerThread` | tx.py | Manages packet send queue |
+| `AckSendSchedulerThread` | tx.py | Manages ACK send queue |
+| `APRSDStatsStoreThread` | stats.py | Writes stats to disk |
+
+**Daemon threads (12)** — can be terminated immediately:
+
+- `APRSDRXThread`, `APRSDFilterThread`, `APRSDProcessPacketThread`, `APRSDPluginProcessPacketThread`
+- `APRSDPushStatsThread`, `StatsLogThread`
+- `KeepAliveThread`, `APRSRegistryThread`
+- `SendPacketThread`, `SendAckThread`, `BeaconSendThread`
+- `APRSDListenProcessThread` (listen command)
+
+### Subclass Migration Pattern
+
+**Before:**
+```python
+class KeepAliveThread(APRSDThread):
+ def loop(self):
+ if self.loop_count % 60 == 0:
+ self._do_keepalive_work()
+ time.sleep(1)
+ return True
+```
+
+**After:**
+```python
+class KeepAliveThread(APRSDThread):
+ period = 60
+
+ def loop(self):
+ self._do_keepalive_work()
+ self.wait()
+ return True
+```
+
+### Thread Periods
+
+| Thread | New `period` | Source |
+|--------|--------------|--------|
+| `KeepAliveThread` | 60 | Fixed |
+| `APRSDStatsStoreThread` | 10 | Fixed |
+| `APRSDPushStatsThread` | config | `CONF.push_stats.period` |
+| `StatsLogThread` | 10 | Fixed |
+| `APRSRegistryThread` | config | `CONF.aprs_registry.frequency_seconds` |
+| `BeaconSendThread` | config | `CONF.beacon_interval` |
+| `APRSDRXThread` | 1 | Default |
+| `APRSDFilterThread` | 1 | Default |
+| `APRSDProcessPacketThread` | 1 | Default |
+| `APRSDPluginProcessPacketThread` | 1 | Default |
+| `SendPacketThread` | 1 | Default |
+| `SendAckThread` | 1 | Default |
+| `PacketSendSchedulerThread` | 1 | Default |
+| `AckSendSchedulerThread` | 1 | Default |
+| `APRSDListenProcessThread` | 1 | Default |
+
+Config-based periods are set in `__init__` or `setup()`. Note: `setup()` is called by subclasses in their `__init__` before the thread starts; the base class does not call it automatically.
+```python
+class APRSRegistryThread(APRSDThread):
+ period = 1 # Default
+
+ def setup(self):
+ self.period = CONF.aprs_registry.frequency_seconds
+```
+
+### ThreadList Changes
+
+File: `aprsd/threads/aprsd.py`
+
+```python
+class APRSDThreadList:
+ def stop_all(self):
+ """Signal all threads to stop."""
+ with self.lock:
+ for th in self.threads_list:
+ th.stop()
+
+ def join_non_daemon(self, timeout: float = 5.0):
+ """Wait for non-daemon threads to complete gracefully."""
+ with self.lock:
+ for th in self.threads_list:
+ if not th.daemon and th.is_alive():
+ th.join(timeout=timeout)
+```
+
+### Shutdown Handler Changes
+
+File: `aprsd/main.py`
+
+```python
+def signal_handler(sig, frame):
+ LOG.info("Shutdown signal received")
+ thread_list = threads.APRSDThreadList()
+ thread_list.stop_all()
+ thread_list.join_non_daemon(timeout=5.0)
+ # Daemon threads killed automatically on exit
+```
+
+### Queue-Based Threads
+
+Threads that block on queues use queue timeout as interruptible wait:
+
+```python
+class APRSDFilterThread(APRSDThread):
+ period = 1
+
+ def loop(self):
+ try:
+ packet = self.queue.get(timeout=self.period)
+ self._process(packet)
+ except queue.Empty:
+ pass # Timeout, loop checks _should_quit
+ return True
+```
+
+### Error Recovery Waits
+
+Threads needing longer waits for error recovery use explicit timeout:
+
+```python
+class APRSDRXThread(APRSDThread):
+ period = 1
+
+ def loop(self):
+ try:
+ self._process_packets()
+ except ConnectionError:
+ LOG.error("Connection lost, retrying in 5s")
+ if self.wait(timeout=5):
+ return False # Shutdown signaled
+ self.wait()
+ return True
+```
+
+## Files to Modify
+
+1. `aprsd/threads/aprsd.py` — Base class and ThreadList
+2. `aprsd/threads/rx.py` — RX thread classes (4)
+3. `aprsd/threads/tx.py` — TX thread classes (5)
+4. `aprsd/threads/stats.py` — Stats thread classes (3)
+5. `aprsd/threads/keepalive.py` — KeepAliveThread
+6. `aprsd/threads/registry.py` — APRSRegistryThread
+7. `aprsd/main.py` — Signal handler
+8. `aprsd/cmds/listen.py` — APRSDListenProcessThread
+
+## Testing Strategy
+
+1. **Unit tests for base class:**
+ - `wait()` returns `True` immediately when event is already set
+ - `wait(timeout=5)` returns `False` after 5 seconds if event not set
+ - `stop()` causes `_should_quit()` to return `True`
+ - `daemon` attribute is set correctly from class attribute
+
+2. **Integration tests:**
+ - Shutdown completes in <1s for daemon-only scenarios
+ - Non-daemon threads get up to 5s grace period
+ - `join_non_daemon()` respects timeout parameter
+
+3. **Manual testing:**
+ - Send SIGINT during operation, verify clean exit
+ - Verify no "thread still running" warnings on shutdown
+
+4. **Existing test updates:**
+ - Update any tests that mock `thread_stop` to mock `_shutdown_event` instead
+ - Update tests that check `time.sleep` calls to check `wait()` calls
+
+## Backwards Compatibility
+
+- `loop_count` retained for debugging/logging
+- `_should_quit()` method signature unchanged
+- Default `period=1` matches current 1-second sleep behavior
+
+## Rollout
+
+Single PR with all changes — the refactor is atomic and affects thread behavior globally.
diff --git a/tests/threads/test_aprsd_thread.py b/tests/threads/test_aprsd_thread.py
index b18cfb5..f9c0048 100644
--- a/tests/threads/test_aprsd_thread.py
+++ b/tests/threads/test_aprsd_thread.py
@@ -1,3 +1,4 @@
+import datetime
import threading
import time
import unittest
@@ -42,11 +43,9 @@ class TestAPRSDThread(unittest.TestCase):
"""Test thread initialization."""
thread = TestThread('TestThread1')
self.assertEqual(thread.name, 'TestThread1')
- self.assertFalse(thread.thread_stop)
+ self.assertFalse(thread._shutdown_event.is_set())
self.assertFalse(thread._pause)
- self.assertEqual(thread.loop_count, 1)
-
- # Should be registered in thread list
+ self.assertEqual(thread.loop_count, 0) # Was 1, now starts at 0
thread_list = APRSDThreadList()
self.assertIn(thread, thread_list.threads_list)
@@ -54,8 +53,7 @@ class TestAPRSDThread(unittest.TestCase):
"""Test _should_quit() method."""
thread = TestThread('TestThread2')
self.assertFalse(thread._should_quit())
-
- thread.thread_stop = True
+ thread._shutdown_event.set()
self.assertTrue(thread._should_quit())
def test_pause_unpause(self):
@@ -72,20 +70,93 @@ class TestAPRSDThread(unittest.TestCase):
def test_stop(self):
"""Test stop() method."""
thread = TestThread('TestThread4')
- self.assertFalse(thread.thread_stop)
-
+ self.assertFalse(thread._shutdown_event.is_set())
thread.stop()
- self.assertTrue(thread.thread_stop)
+ self.assertTrue(thread._shutdown_event.is_set())
def test_loop_age(self):
"""Test loop_age() method."""
- import datetime
-
thread = TestThread('TestThread5')
age = thread.loop_age()
self.assertIsInstance(age, datetime.timedelta)
self.assertGreaterEqual(age.total_seconds(), 0)
+ def test_daemon_attribute_default(self):
+ """Test that daemon attribute defaults to True."""
+ thread = TestThread('DaemonTest')
+ self.assertTrue(thread.daemon)
+
+ def test_daemon_attribute_override(self):
+ """Test that daemon attribute can be overridden via class attribute."""
+
+ class NonDaemonThread(APRSDThread):
+ daemon = False
+
+ def loop(self):
+ return False
+
+ thread = NonDaemonThread('NonDaemonTest')
+ self.assertFalse(thread.daemon)
+
+ def test_period_attribute_default(self):
+ """Test that period attribute defaults to 1."""
+ thread = TestThread('PeriodTest')
+ self.assertEqual(thread.period, 1)
+
+ def test_period_attribute_override(self):
+ """Test that period attribute can be overridden via class attribute."""
+
+ class LongPeriodThread(APRSDThread):
+ period = 60
+
+ def loop(self):
+ return False
+
+ thread = LongPeriodThread('LongPeriodTest')
+ self.assertEqual(thread.period, 60)
+
+ def test_shutdown_event_exists(self):
+ """Test that _shutdown_event is created."""
+ thread = TestThread('EventTest')
+ self.assertIsInstance(thread._shutdown_event, threading.Event)
+ self.assertFalse(thread._shutdown_event.is_set())
+
+ def test_wait_returns_false_on_timeout(self):
+ """Test that wait() returns False when timeout expires."""
+ thread = TestThread('WaitTimeoutTest')
+ start = time.time()
+ result = thread.wait(timeout=0.1)
+ elapsed = time.time() - start
+ self.assertFalse(result)
+ self.assertGreaterEqual(elapsed, 0.1)
+
+ def test_wait_returns_true_when_stopped(self):
+ """Test that wait() returns True immediately when stop() was called."""
+ thread = TestThread('WaitStopTest')
+ thread.stop()
+ start = time.time()
+ result = thread.wait(timeout=10)
+ elapsed = time.time() - start
+ self.assertTrue(result)
+ self.assertLess(elapsed, 1)
+
+ def test_wait_uses_period_by_default(self):
+ """Test that wait() uses self.period when no timeout specified."""
+
+ class ShortPeriodThread(APRSDThread):
+ period = 0.1
+
+ def loop(self):
+ return False
+
+ thread = ShortPeriodThread('ShortPeriodTest')
+ start = time.time()
+ result = thread.wait()
+ elapsed = time.time() - start
+ self.assertFalse(result)
+ self.assertGreaterEqual(elapsed, 0.1)
+ self.assertLess(elapsed, 0.5)
+
def test_str(self):
"""Test __str__() method."""
thread = TestThread('TestThread6')
@@ -253,10 +324,9 @@ class TestAPRSDThreadList(unittest.TestCase):
thread2 = TestThread('TestThread9')
thread_list.add(thread1)
thread_list.add(thread2)
-
thread_list.stop_all()
- self.assertTrue(thread1.thread_stop)
- self.assertTrue(thread2.thread_stop)
+ self.assertTrue(thread1._shutdown_event.is_set())
+ self.assertTrue(thread2._shutdown_event.is_set())
def test_pause_all(self):
"""Test pause_all() method."""
@@ -334,3 +404,51 @@ class TestAPRSDThreadList(unittest.TestCase):
# Should handle concurrent access without errors
self.assertGreaterEqual(len(thread_list), 0)
+
+ def test_join_non_daemon(self):
+ """Test join_non_daemon() waits for non-daemon threads."""
+
+ class NonDaemonTestThread(APRSDThread):
+ daemon = False
+
+ def __init__(self, name):
+ super().__init__(name)
+ self.finished = False
+
+ def loop(self):
+ time.sleep(0.2)
+ self.finished = True
+ return False
+
+ thread_list = APRSDThreadList()
+ thread = NonDaemonTestThread('NonDaemonJoinTest')
+ thread_list.add(thread)
+ thread.start()
+
+ # Stop triggers the event, thread should finish its loop then exit
+ thread.stop()
+ thread_list.join_non_daemon(timeout=5.0)
+
+ self.assertTrue(thread.finished or not thread.is_alive())
+
+ def test_join_non_daemon_skips_daemon_threads(self):
+ """Test join_non_daemon() does not wait for daemon threads."""
+ thread_list = APRSDThreadList()
+ # Clear existing threads
+ thread_list.threads_list = []
+
+ # Create a daemon thread that loops forever
+ thread = TestThread('DaemonSkipTest', should_loop=True)
+ thread_list.add(thread)
+ thread.start()
+
+ # This should return quickly since it's a daemon thread
+ start = time.time()
+ thread_list.join_non_daemon(timeout=0.1)
+ elapsed = time.time() - start
+
+ self.assertLess(elapsed, 0.5) # Should not wait for daemon
+
+ # Cleanup
+ thread.stop()
+ thread.join(timeout=1)
diff --git a/tests/threads/test_rx.py b/tests/threads/test_rx.py
index f2b49d5..4c18059 100644
--- a/tests/threads/test_rx.py
+++ b/tests/threads/test_rx.py
@@ -15,17 +15,18 @@ class TestAPRSDRXThread(unittest.TestCase):
self.packet_queue = queue.Queue()
self.rx_thread = rx.APRSDRXThread(self.packet_queue)
self.rx_thread.pkt_count = 0 # Reset packet count
- # Mock time.sleep to speed up tests
- self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep')
- self.mock_sleep = self.sleep_patcher.start()
+ # Mock self.wait to speed up tests
+ self.wait_patcher = mock.patch.object(
+ self.rx_thread, 'wait', return_value=False
+ )
+ self.mock_wait = self.wait_patcher.start()
def tearDown(self):
"""Clean up after tests."""
+ self.wait_patcher.stop()
self.rx_thread.stop()
if self.rx_thread.is_alive():
self.rx_thread.join(timeout=1)
- # Stop the sleep patcher
- self.sleep_patcher.stop()
def test_init(self):
"""Test initialization."""
@@ -39,13 +40,13 @@ class TestAPRSDRXThread(unittest.TestCase):
self.rx_thread._client = mock.MagicMock()
self.rx_thread.stop()
- self.assertTrue(self.rx_thread.thread_stop)
+ self.assertTrue(self.rx_thread._shutdown_event.is_set())
self.rx_thread._client.close.assert_called()
def test_stop_no_client(self):
"""Test stop() when client is None."""
self.rx_thread.stop()
- self.assertTrue(self.rx_thread.thread_stop)
+ self.assertTrue(self.rx_thread._shutdown_event.is_set())
def test_loop_no_client(self):
"""Test loop() when client is None."""
@@ -237,18 +238,18 @@ class TestAPRSDFilterThread(unittest.TestCase):
"""Process packet - required by base class."""
pass
+ # Mock APRSDClient to avoid config requirements
+ self.client_patcher = mock.patch('aprsd.threads.rx.APRSDClient')
+ self.mock_client = self.client_patcher.start()
+
self.filter_thread = TestFilterThread('TestFilterThread', self.packet_queue)
- # Mock time.sleep to speed up tests
- self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep')
- self.mock_sleep = self.sleep_patcher.start()
def tearDown(self):
"""Clean up after tests."""
+ self.client_patcher.stop()
self.filter_thread.stop()
if self.filter_thread.is_alive():
self.filter_thread.join(timeout=1)
- # Stop the sleep patcher
- self.sleep_patcher.stop()
def test_init(self):
"""Test initialization."""
@@ -330,18 +331,18 @@ class TestAPRSDProcessPacketThread(unittest.TestCase):
def process_our_message_packet(self, packet):
pass
+ # Mock APRSDClient to avoid config requirements
+ self.client_patcher = mock.patch('aprsd.threads.rx.APRSDClient')
+ self.mock_client = self.client_patcher.start()
+
self.process_thread = ConcreteProcessThread(self.packet_queue)
- # Mock time.sleep to speed up tests
- self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep')
- self.mock_sleep = self.sleep_patcher.start()
def tearDown(self):
"""Clean up after tests."""
+ self.client_patcher.stop()
self.process_thread.stop()
if self.process_thread.is_alive():
self.process_thread.join(timeout=1)
- # Stop the sleep patcher
- self.sleep_patcher.stop()
def test_init(self):
"""Test initialization."""
diff --git a/tests/threads/test_stats.py b/tests/threads/test_stats.py
index 0bffa8a..d23fa3b 100644
--- a/tests/threads/test_stats.py
+++ b/tests/threads/test_stats.py
@@ -74,26 +74,25 @@ class TestAPRSDStatsStoreThread(unittest.TestCase):
"""Test APRSDStatsStoreThread initialization."""
thread = APRSDStatsStoreThread()
self.assertEqual(thread.name, 'StatsStore')
- self.assertEqual(thread.save_interval, 10)
+ self.assertEqual(thread.period, 10)
+ self.assertFalse(thread.daemon)
self.assertTrue(hasattr(thread, 'loop_count'))
def test_loop_with_save(self):
- """Test loop method when save interval is reached."""
+ """Test loop method saves stats every call."""
thread = APRSDStatsStoreThread()
# Mock the collector and save methods
with (
mock.patch('aprsd.stats.collector.Collector') as mock_collector_class,
mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save') as mock_save,
+ mock.patch.object(thread, 'wait'),
):
# Setup mock collector to return some stats
mock_collector_instance = mock.Mock()
mock_collector_instance.collect.return_value = {'test': 'data'}
mock_collector_class.return_value = mock_collector_instance
- # Set loop_count to match save interval
- thread.loop_count = 10
-
# Call loop
result = thread.loop()
@@ -104,45 +103,43 @@ class TestAPRSDStatsStoreThread(unittest.TestCase):
mock_collector_instance.collect.assert_called_once()
mock_save.assert_called_once()
- def test_loop_without_save(self):
- """Test loop method when save interval is not reached."""
+ def test_loop_calls_wait(self):
+ """Test loop method calls wait() at the end."""
thread = APRSDStatsStoreThread()
# Mock the collector and save methods
with (
mock.patch('aprsd.stats.collector.Collector') as mock_collector_class,
- mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save') as mock_save,
+ mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save'),
+ mock.patch.object(thread, 'wait') as mock_wait,
):
# Setup mock collector to return some stats
mock_collector_instance = mock.Mock()
mock_collector_instance.collect.return_value = {'test': 'data'}
mock_collector_class.return_value = mock_collector_instance
- # Set loop_count to not match save interval
- thread.loop_count = 1
-
# Call loop
result = thread.loop()
# Should return True (continue looping)
self.assertTrue(result)
- # Should not have called save
- mock_save.assert_not_called()
+ # Should have called wait
+ mock_wait.assert_called_once()
def test_loop_with_exception(self):
"""Test loop method when an exception occurs."""
thread = APRSDStatsStoreThread()
# Mock the collector to raise an exception
- with mock.patch('aprsd.stats.collector.Collector') as mock_collector_class:
+ with (
+ mock.patch('aprsd.stats.collector.Collector') as mock_collector_class,
+ mock.patch.object(thread, 'wait'),
+ ):
mock_collector_instance = mock.Mock()
mock_collector_instance.collect.side_effect = RuntimeError('Test exception')
mock_collector_class.return_value = mock_collector_instance
- # Set loop_count to match save interval
- thread.loop_count = 10
-
# Should raise the exception
with self.assertRaises(RuntimeError):
thread.loop()
@@ -177,24 +174,31 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
self.assertEqual(thread.period, 15)
self.assertFalse(thread.send_packetlist)
- def test_loop_skips_push_when_period_not_reached(self):
- """Test loop does not POST when loop_count not divisible by period."""
+ def test_loop_pushes_stats_every_call(self):
+ """Test loop POSTs stats on every call (timing controlled by wait)."""
thread = APRSDPushStatsThread(
push_url='https://example.com',
frequency_seconds=10,
)
- thread.loop_count = 3 # 3 % 10 != 0
with (
mock.patch('aprsd.threads.stats.collector.Collector') as mock_collector,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
- mock.patch('aprsd.threads.stats.time.sleep'),
+ mock.patch.object(thread, 'wait'),
+ mock.patch('aprsd.threads.stats.datetime') as mock_dt,
):
+ mock_collector.return_value.collect.return_value = {}
+ mock_dt.datetime.now.return_value.strftime.return_value = (
+ '01-01-2025 12:00:00'
+ )
+ mock_post.return_value.status_code = 200
+ mock_post.return_value.raise_for_status = mock.Mock()
+
result = thread.loop()
self.assertTrue(result)
- mock_collector.return_value.collect.assert_not_called()
- mock_post.assert_not_called()
+ mock_collector.return_value.collect.assert_called_once_with(serializable=True)
+ mock_post.assert_called_once()
def test_loop_pushes_stats_and_removes_packetlist_by_default(self):
"""Test loop collects stats, POSTs to url/stats, and strips PacketList.packets."""
@@ -203,7 +207,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
frequency_seconds=10,
send_packetlist=False,
)
- thread.loop_count = 10
collected = {
'PacketList': {'packets': [1, 2, 3], 'rx': 5, 'tx': 1},
@@ -215,7 +218,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
- mock.patch('aprsd.threads.stats.time.sleep'),
+ mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
):
mock_collector_class.return_value.collect.return_value = collected
@@ -247,7 +250,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
frequency_seconds=10,
send_packetlist=True,
)
- thread.loop_count = 10
collected = {'PacketList': {'packets': [1, 2, 3], 'rx': 5}}
@@ -256,7 +258,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
- mock.patch('aprsd.threads.stats.time.sleep'),
+ mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
):
mock_collector_class.return_value.collect.return_value = collected
@@ -276,14 +278,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
push_url='https://example.com',
frequency_seconds=10,
)
- thread.loop_count = 10
with (
mock.patch(
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
- mock.patch('aprsd.threads.stats.time.sleep'),
+ mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
):
@@ -306,14 +307,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
push_url='https://example.com',
frequency_seconds=10,
)
- thread.loop_count = 10
with (
mock.patch(
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
- mock.patch('aprsd.threads.stats.time.sleep'),
+ mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
):
@@ -337,14 +337,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
push_url='https://example.com',
frequency_seconds=10,
)
- thread.loop_count = 10
with (
mock.patch(
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
- mock.patch('aprsd.threads.stats.time.sleep'),
+ mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
):
@@ -368,14 +367,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
push_url='https://example.com',
frequency_seconds=10,
)
- thread.loop_count = 10
with (
mock.patch(
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
- mock.patch('aprsd.threads.stats.time.sleep'),
+ mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
):
@@ -398,7 +396,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
frequency_seconds=10,
send_packetlist=False,
)
- thread.loop_count = 10
collected = {'Only': 'data', 'No': 'PacketList'}
@@ -407,7 +404,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
'aprsd.threads.stats.collector.Collector'
) as mock_collector_class,
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
- mock.patch('aprsd.threads.stats.time.sleep'),
+ mock.patch.object(thread, 'wait'),
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
):
mock_collector_class.return_value.collect.return_value = collected
diff --git a/tests/threads/test_tx.py b/tests/threads/test_tx.py
index 190cad6..1c9730d 100644
--- a/tests/threads/test_tx.py
+++ b/tests/threads/test_tx.py
@@ -596,9 +596,13 @@ class TestSendPacketThread(unittest.TestCase):
tracker.PacketTrack._instance = None
self.packet = fake.fake_packet(msg_number='123')
self.thread = tx.SendPacketThread(self.packet)
+ # Mock wait to speed up tests
+ self.wait_patcher = mock.patch.object(self.thread, 'wait', return_value=False)
+ self.mock_wait = self.wait_patcher.start()
def tearDown(self):
"""Clean up after tests."""
+ self.wait_patcher.stop()
self.thread.stop()
if self.thread.is_alive():
self.thread.join(timeout=1)
@@ -608,7 +612,8 @@ class TestSendPacketThread(unittest.TestCase):
"""Test initialization."""
self.assertEqual(self.thread.packet, self.packet)
self.assertIn('TX-', self.thread.name)
- self.assertEqual(self.thread.loop_count, 1)
+ # loop_count starts at 0 from base class, incremented in run()
+ self.assertEqual(self.thread.loop_count, 0)
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
def test_loop_packet_acked(self, mock_tracker_class):
@@ -761,7 +766,8 @@ class TestBeaconSendThread(unittest.TestCase):
"""Test initialization."""
thread = tx.BeaconSendThread()
self.assertEqual(thread.name, 'BeaconSendThread')
- self.assertEqual(thread._loop_cnt, 1)
+ self.assertEqual(thread.period, 10) # Uses CONF.beacon_interval
+ thread.stop()
def test_init_no_coordinates(self):
"""Test initialization without coordinates."""
@@ -772,39 +778,27 @@ class TestBeaconSendThread(unittest.TestCase):
CONF.longitude = None
thread = tx.BeaconSendThread()
- self.assertTrue(thread.thread_stop)
+ self.assertTrue(thread._shutdown_event.is_set())
+ thread.stop()
@mock.patch('aprsd.threads.tx.send')
def test_loop_send_beacon(self, mock_send):
- """Test loop() sends beacon at interval."""
+ """Test loop() sends beacon."""
from oslo_config import cfg
CONF = cfg.CONF
CONF.beacon_interval = 1
+ CONF.latitude = 40.7128
+ CONF.longitude = -74.0060
thread = tx.BeaconSendThread()
- thread._loop_cnt = 1
+ # Mock wait to return False (no shutdown)
+ with mock.patch.object(thread, 'wait', return_value=False):
+ result = thread.loop()
- result = thread.loop()
-
- self.assertTrue(result)
- mock_send.assert_called()
-
- @mock.patch('aprsd.threads.tx.send')
- def test_loop_not_time(self, mock_send):
- """Test loop() doesn't send before interval."""
- from oslo_config import cfg
-
- CONF = cfg.CONF
- CONF.beacon_interval = 10
-
- thread = tx.BeaconSendThread()
- thread._loop_cnt = 5
-
- result = thread.loop()
-
- self.assertTrue(result)
- mock_send.assert_not_called()
+ self.assertTrue(result)
+ mock_send.assert_called()
+ thread.stop()
@mock.patch('aprsd.threads.tx.send')
@mock.patch('aprsd.threads.tx.APRSDClient')
@@ -814,13 +808,17 @@ class TestBeaconSendThread(unittest.TestCase):
CONF = cfg.CONF
CONF.beacon_interval = 1
+ CONF.latitude = 40.7128
+ CONF.longitude = -74.0060
thread = tx.BeaconSendThread()
- thread._loop_cnt = 1
mock_send.side_effect = Exception('Send error')
with mock.patch('aprsd.threads.tx.LOG') as mock_log:
- result = thread.loop()
- self.assertTrue(result)
- mock_log.error.assert_called()
- mock_client_class.return_value.reset.assert_called()
+ # Mock wait to return False (no shutdown signaled during error wait)
+ with mock.patch.object(thread, 'wait', return_value=False):
+ result = thread.loop()
+ self.assertTrue(result)
+ mock_log.error.assert_called()
+ mock_client_class.return_value.reset.assert_called()
+ thread.stop()
diff --git a/uv.lock b/uv.lock
index c339a9b..fa199b2 100644
--- a/uv.lock
+++ b/uv.lock
@@ -38,7 +38,6 @@ dependencies = [
{ name = "rfc3986" },
{ name = "rich" },
{ name = "rush" },
- { name = "setuptools" },
{ name = "stevedore" },
{ name = "thesmuggler" },
{ name = "timeago" },
@@ -82,7 +81,7 @@ type = [
[package.metadata]
requires-dist = [
- { name = "aprslib", git = "https://github.com/hemna/aprs-python.git?rev=09cd7a2829a2e9d28ee1566881c843cc4769e590" },
+ { name = "aprslib", git = "https://github.com/hemna/aprs-python.git?rev=telemetry" },
{ name = "attrs", specifier = "==25.4.0" },
{ name = "ax253", specifier = "==0.1.5.post1" },
{ name = "bitarray", specifier = "==3.8.0" },
@@ -97,7 +96,7 @@ requires-dist = [
{ name = "kiss3", specifier = "==8.0.0" },
{ name = "loguru", specifier = "==0.7.3" },
{ name = "markdown-it-py", specifier = "==4.0.0" },
- { name = "marshmallow", specifier = "==3.26.1" },
+ { name = "marshmallow", specifier = "==3.26.2" },
{ name = "mdurl", specifier = "==0.1.2" },
{ name = "mypy", marker = "extra == 'dev'" },
{ name = "mypy", marker = "extra == 'type'" },
@@ -126,7 +125,6 @@ requires-dist = [
{ name = "rich", specifier = "==14.2.0" },
{ name = "ruff", marker = "extra == 'dev'" },
{ name = "rush", specifier = "==2021.4.0" },
- { name = "setuptools", specifier = "==80.9.0" },
{ name = "stevedore", specifier = "==5.6.0" },
{ name = "thesmuggler", specifier = "==1.0.1" },
{ name = "timeago", specifier = "==1.0.16" },
@@ -152,7 +150,7 @@ provides-extras = ["dev", "tests", "type"]
[[package]]
name = "aprslib"
version = "0.7.2"
-source = { git = "https://github.com/hemna/aprs-python.git?rev=09cd7a2829a2e9d28ee1566881c843cc4769e590#09cd7a2829a2e9d28ee1566881c843cc4769e590" }
+source = { git = "https://github.com/hemna/aprs-python.git?rev=telemetry#09cd7a2829a2e9d28ee1566881c843cc4769e590" }
[[package]]
name = "attrs"
@@ -671,14 +669,14 @@ wheels = [
[[package]]
name = "marshmallow"
-version = "3.26.1"
+version = "3.26.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "packaging" },
]
-sdist = { url = "https://files.pythonhosted.org/packages/ab/5e/5e53d26b42ab75491cda89b871dab9e97c840bf12c63ec58a1919710cd06/marshmallow-3.26.1.tar.gz", hash = "sha256:e6d8affb6cb61d39d26402096dc0aee12d5a26d490a121f118d2e81dc0719dc6", size = 221825, upload-time = "2025-02-03T15:32:25.093Z" }
+sdist = { url = "https://files.pythonhosted.org/packages/55/79/de6c16cc902f4fc372236926b0ce2ab7845268dcc30fb2fbb7f71b418631/marshmallow-3.26.2.tar.gz", hash = "sha256:bbe2adb5a03e6e3571b573f42527c6fe926e17467833660bebd11593ab8dfd57", size = 222095, upload-time = "2025-12-22T06:53:53.309Z" }
wheels = [
- { url = "https://files.pythonhosted.org/packages/34/75/51952c7b2d3873b44a0028b1bd26a25078c18f92f256608e8d1dc61b39fd/marshmallow-3.26.1-py3-none-any.whl", hash = "sha256:3350409f20a70a7e4e11a27661187b77cdcaeb20abca41c1454fe33636bea09c", size = 50878, upload-time = "2025-02-03T15:32:22.295Z" },
+ { url = "https://files.pythonhosted.org/packages/be/2f/5108cb3ee4ba6501748c4908b908e55f42a5b66245b4cfe0c99326e1ef6e/marshmallow-3.26.2-py3-none-any.whl", hash = "sha256:013fa8a3c4c276c24d26d84ce934dc964e2aa794345a0f8c7e5a7191482c8a73", size = 50964, upload-time = "2025-12-22T06:53:51.801Z" },
]
[[package]]