mirror of
https://github.com/craigerl/aprsd.git
synced 2026-03-30 19:55:44 -04:00
Merge pull request #219 from craigerl/feature-daemon-threads-event-refactor
Refactor threads to use daemon threads and Event-based timing
This commit is contained in:
commit
f2526efe1d
@ -24,7 +24,6 @@ import datetime
|
||||
import importlib.metadata as imp
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
from importlib.metadata import version as metadata_version
|
||||
|
||||
import click
|
||||
@ -76,14 +75,17 @@ def main():
|
||||
def signal_handler(sig, frame):
|
||||
click.echo('signal_handler: called')
|
||||
collector.Collector().stop_all()
|
||||
threads.APRSDThreadList().stop_all()
|
||||
thread_list = threads.APRSDThreadList()
|
||||
thread_list.stop_all()
|
||||
|
||||
if 'subprocess' not in str(frame):
|
||||
LOG.info(
|
||||
'Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}'.format(
|
||||
'Ctrl+C, Sending all threads exit! {}'.format(
|
||||
datetime.datetime.now(),
|
||||
),
|
||||
)
|
||||
time.sleep(1.5)
|
||||
# Wait for non-daemon threads to finish gracefully
|
||||
thread_list.join_non_daemon(timeout=5.0)
|
||||
try:
|
||||
packets.PacketTrack().save()
|
||||
packets.WatchList().save()
|
||||
@ -93,8 +95,6 @@ def signal_handler(sig, frame):
|
||||
except Exception as e:
|
||||
LOG.error(f'Failed to save data: {e}')
|
||||
sys.exit(0)
|
||||
# signal.signal(signal.SIGTERM, sys.exit(0))
|
||||
# sys.exit(0)
|
||||
|
||||
|
||||
@cli.command()
|
||||
|
||||
@ -2,8 +2,6 @@ import abc
|
||||
import datetime
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
import wrapt
|
||||
|
||||
@ -13,21 +11,26 @@ LOG = logging.getLogger('APRSD')
|
||||
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
|
||||
"""Base class for all threads in APRSD."""
|
||||
|
||||
# Class attributes - subclasses override as needed
|
||||
daemon = True # Most threads are daemon threads
|
||||
period = 1 # Default wait period in seconds
|
||||
loop_count = 1
|
||||
_pause = False
|
||||
thread_stop = False
|
||||
|
||||
def __init__(self, name):
|
||||
super().__init__(name=name)
|
||||
self.thread_stop = False
|
||||
# Set daemon from class attribute
|
||||
self.daemon = self.__class__.daemon
|
||||
# Set period from class attribute (can be overridden in __init__)
|
||||
self.period = self.__class__.period
|
||||
self._shutdown_event = threading.Event()
|
||||
self.loop_count = 0
|
||||
APRSDThreadList().add(self)
|
||||
self._last_loop = datetime.datetime.now()
|
||||
|
||||
def _should_quit(self):
|
||||
"""see if we have a quit message from the global queue."""
|
||||
if self.thread_stop:
|
||||
return True
|
||||
return False
|
||||
"""Check if thread should exit."""
|
||||
return self._shutdown_event.is_set()
|
||||
|
||||
def pause(self):
|
||||
"""Logically pause the processing of the main loop."""
|
||||
@ -40,11 +43,24 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
|
||||
self._pause = False
|
||||
|
||||
def stop(self):
|
||||
"""Signal thread to stop. Returns immediately."""
|
||||
LOG.debug(f"Stopping thread '{self.name}'")
|
||||
self.thread_stop = True
|
||||
self._shutdown_event.set()
|
||||
|
||||
def wait(self, timeout: float | None = None) -> bool:
|
||||
"""Wait for shutdown signal or timeout.
|
||||
|
||||
Args:
|
||||
timeout: Seconds to wait. Defaults to self.period.
|
||||
|
||||
Returns:
|
||||
True if shutdown was signaled, False if timeout expired.
|
||||
"""
|
||||
wait_time = timeout if timeout is not None else self.period
|
||||
return self._shutdown_event.wait(timeout=wait_time)
|
||||
|
||||
@abc.abstractmethod
|
||||
def loop(self):
|
||||
def loop(self) -> bool:
|
||||
pass
|
||||
|
||||
def _cleanup(self):
|
||||
@ -64,7 +80,7 @@ class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
|
||||
LOG.debug('Starting')
|
||||
while not self._should_quit():
|
||||
if self._pause:
|
||||
time.sleep(1)
|
||||
self.wait(timeout=1)
|
||||
else:
|
||||
self.loop_count += 1
|
||||
can_loop = self.loop()
|
||||
@ -81,7 +97,7 @@ class APRSDThreadList:
|
||||
|
||||
_instance = None
|
||||
|
||||
threads_list: List[APRSDThread] = []
|
||||
threads_list: list[APRSDThread] = []
|
||||
lock = threading.Lock()
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
@ -167,3 +183,15 @@ class APRSDThreadList:
|
||||
@wrapt.synchronized(lock)
|
||||
def __len__(self):
|
||||
return len(self.threads_list)
|
||||
|
||||
@wrapt.synchronized(lock)
|
||||
def join_non_daemon(self, timeout: float = 5.0):
|
||||
"""Wait for non-daemon threads to complete gracefully.
|
||||
|
||||
Args:
|
||||
timeout: Maximum seconds to wait per thread.
|
||||
"""
|
||||
for th in self.threads_list:
|
||||
if not th.daemon and th.is_alive():
|
||||
LOG.info(f'Waiting for non-daemon thread {th.name} to finish')
|
||||
th.join(timeout=timeout)
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
import datetime
|
||||
import logging
|
||||
import time
|
||||
import tracemalloc
|
||||
|
||||
from loguru import logger
|
||||
@ -20,6 +19,7 @@ LOGU = logger
|
||||
class KeepAliveThread(APRSDThread):
|
||||
cntr = 0
|
||||
checker_time = datetime.datetime.now()
|
||||
period = 60
|
||||
|
||||
def __init__(self):
|
||||
tracemalloc.start()
|
||||
@ -28,81 +28,80 @@ class KeepAliveThread(APRSDThread):
|
||||
self.max_delta = datetime.timedelta(**max_timeout)
|
||||
|
||||
def loop(self):
|
||||
if self.loop_count % 60 == 0:
|
||||
stats_json = collector.Collector().collect()
|
||||
pl = packets.PacketList()
|
||||
thread_list = APRSDThreadList()
|
||||
now = datetime.datetime.now()
|
||||
stats_json = collector.Collector().collect()
|
||||
pl = packets.PacketList()
|
||||
thread_list = APRSDThreadList()
|
||||
now = datetime.datetime.now()
|
||||
|
||||
if (
|
||||
'APRSClientStats' in stats_json
|
||||
and stats_json['APRSClientStats'].get('transport') == 'aprsis'
|
||||
):
|
||||
if stats_json['APRSClientStats'].get('server_keepalive'):
|
||||
last_msg_time = utils.strfdelta(
|
||||
now - stats_json['APRSClientStats']['server_keepalive']
|
||||
)
|
||||
else:
|
||||
last_msg_time = 'N/A'
|
||||
if (
|
||||
'APRSClientStats' in stats_json
|
||||
and stats_json['APRSClientStats'].get('transport') == 'aprsis'
|
||||
):
|
||||
if stats_json['APRSClientStats'].get('server_keepalive'):
|
||||
last_msg_time = utils.strfdelta(
|
||||
now - stats_json['APRSClientStats']['server_keepalive']
|
||||
)
|
||||
else:
|
||||
last_msg_time = 'N/A'
|
||||
else:
|
||||
last_msg_time = 'N/A'
|
||||
|
||||
tracked_packets = stats_json['PacketTrack']['total_tracked']
|
||||
tx_msg = 0
|
||||
rx_msg = 0
|
||||
if 'PacketList' in stats_json:
|
||||
msg_packets = stats_json['PacketList'].get('MessagePacket')
|
||||
if msg_packets:
|
||||
tx_msg = msg_packets.get('tx', 0)
|
||||
rx_msg = msg_packets.get('rx', 0)
|
||||
tracked_packets = stats_json['PacketTrack']['total_tracked']
|
||||
tx_msg = 0
|
||||
rx_msg = 0
|
||||
if 'PacketList' in stats_json:
|
||||
msg_packets = stats_json['PacketList'].get('MessagePacket')
|
||||
if msg_packets:
|
||||
tx_msg = msg_packets.get('tx', 0)
|
||||
rx_msg = msg_packets.get('rx', 0)
|
||||
|
||||
keepalive = (
|
||||
'{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} '
|
||||
'Last:{} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}'
|
||||
).format(
|
||||
stats_json['APRSDStats']['callsign'],
|
||||
stats_json['APRSDStats']['uptime'],
|
||||
pl.total_rx(),
|
||||
pl.total_tx(),
|
||||
tracked_packets,
|
||||
tx_msg,
|
||||
rx_msg,
|
||||
last_msg_time,
|
||||
stats_json['APRSDStats']['memory_current_str'],
|
||||
stats_json['APRSDStats']['memory_peak_str'],
|
||||
len(thread_list),
|
||||
aprsd_log.logging_queue.qsize(),
|
||||
)
|
||||
LOG.info(keepalive)
|
||||
if 'APRSDThreadList' in stats_json:
|
||||
thread_list = stats_json['APRSDThreadList']
|
||||
for thread_name in thread_list:
|
||||
thread = thread_list[thread_name]
|
||||
alive = thread['alive']
|
||||
age = thread['age']
|
||||
key = thread['name']
|
||||
if not alive:
|
||||
LOG.error(f'Thread {thread}')
|
||||
keepalive = (
|
||||
'{} - Uptime {} RX:{} TX:{} Tracker:{} Msgs TX:{} RX:{} '
|
||||
'Last:{} - RAM Current:{} Peak:{} Threads:{} LoggingQueue:{}'
|
||||
).format(
|
||||
stats_json['APRSDStats']['callsign'],
|
||||
stats_json['APRSDStats']['uptime'],
|
||||
pl.total_rx(),
|
||||
pl.total_tx(),
|
||||
tracked_packets,
|
||||
tx_msg,
|
||||
rx_msg,
|
||||
last_msg_time,
|
||||
stats_json['APRSDStats']['memory_current_str'],
|
||||
stats_json['APRSDStats']['memory_peak_str'],
|
||||
len(thread_list),
|
||||
aprsd_log.logging_queue.qsize(),
|
||||
)
|
||||
LOG.info(keepalive)
|
||||
if 'APRSDThreadList' in stats_json:
|
||||
thread_list = stats_json['APRSDThreadList']
|
||||
for thread_name in thread_list:
|
||||
thread = thread_list[thread_name]
|
||||
alive = thread['alive']
|
||||
age = thread['age']
|
||||
key = thread['name']
|
||||
if not alive:
|
||||
LOG.error(f'Thread {thread}')
|
||||
|
||||
thread_hex = f'fg {utils.hex_from_name(key)}'
|
||||
t_name = f'<{thread_hex}>{key:<15}</{thread_hex}>'
|
||||
thread_msg = f'{t_name} Alive? {str(alive): <5} {str(age): <20}'
|
||||
LOGU.opt(colors=True).info(thread_msg)
|
||||
# LOG.info(f"{key: <15} Alive? {str(alive): <5} {str(age): <20}")
|
||||
thread_hex = f'fg {utils.hex_from_name(key)}'
|
||||
t_name = f'<{thread_hex}>{key:<15}</{thread_hex}>'
|
||||
thread_msg = f'{t_name} Alive? {str(alive): <5} {str(age): <20}'
|
||||
LOGU.opt(colors=True).info(thread_msg)
|
||||
# LOG.info(f"{key: <15} Alive? {str(alive): <5} {str(age): <20}")
|
||||
|
||||
# Go through the registered keepalive collectors
|
||||
# and check them as well as call log.
|
||||
collect = keepalive_collector.KeepAliveCollector()
|
||||
collect.check()
|
||||
collect.log()
|
||||
# Go through the registered keepalive collectors
|
||||
# and check them as well as call log.
|
||||
collect = keepalive_collector.KeepAliveCollector()
|
||||
collect.check()
|
||||
collect.log()
|
||||
|
||||
# Check version every day
|
||||
delta = now - self.checker_time
|
||||
if delta > datetime.timedelta(hours=24):
|
||||
self.checker_time = now
|
||||
level, msg = utils._check_version()
|
||||
if level:
|
||||
LOG.warning(msg)
|
||||
self.cntr += 1
|
||||
time.sleep(1)
|
||||
# Check version every day
|
||||
delta = now - self.checker_time
|
||||
if delta > datetime.timedelta(hours=24):
|
||||
self.checker_time = now
|
||||
level, msg = utils._check_version()
|
||||
if level:
|
||||
LOG.warning(msg)
|
||||
self.cntr += 1
|
||||
self.wait()
|
||||
return True
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
import logging
|
||||
import time
|
||||
|
||||
import requests
|
||||
from oslo_config import cfg
|
||||
@ -14,11 +13,9 @@ LOG = logging.getLogger('APRSD')
|
||||
class APRSRegistryThread(aprsd_threads.APRSDThread):
|
||||
"""This sends service information to the configured APRS Registry."""
|
||||
|
||||
_loop_cnt: int = 1
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('APRSRegistryThread')
|
||||
self._loop_cnt = 1
|
||||
self.period = CONF.aprs_registry.frequency_seconds
|
||||
if not CONF.aprs_registry.enabled:
|
||||
LOG.error(
|
||||
'APRS Registry is not enabled. ',
|
||||
@ -34,24 +31,21 @@ class APRSRegistryThread(aprsd_threads.APRSDThread):
|
||||
)
|
||||
|
||||
def loop(self):
|
||||
# Only call the registry every N seconds
|
||||
if self._loop_cnt % CONF.aprs_registry.frequency_seconds == 0:
|
||||
info = {
|
||||
'callsign': CONF.callsign,
|
||||
'owner_callsign': CONF.owner_callsign,
|
||||
'description': CONF.aprs_registry.description,
|
||||
'service_website': CONF.aprs_registry.service_website,
|
||||
'software': f'APRSD version {aprsd.__version__} '
|
||||
'https://github.com/craigerl/aprsd',
|
||||
}
|
||||
try:
|
||||
requests.post(
|
||||
f'{CONF.aprs_registry.registry_url}',
|
||||
json=info,
|
||||
)
|
||||
except Exception as e:
|
||||
LOG.error(f'Failed to send registry info: {e}')
|
||||
info = {
|
||||
'callsign': CONF.callsign,
|
||||
'owner_callsign': CONF.owner_callsign,
|
||||
'description': CONF.aprs_registry.description,
|
||||
'service_website': CONF.aprs_registry.service_website,
|
||||
'software': f'APRSD version {aprsd.__version__} '
|
||||
'https://github.com/craigerl/aprsd',
|
||||
}
|
||||
try:
|
||||
requests.post(
|
||||
f'{CONF.aprs_registry.registry_url}',
|
||||
json=info,
|
||||
)
|
||||
except Exception as e:
|
||||
LOG.error(f'Failed to send registry info: {e}')
|
||||
|
||||
time.sleep(1)
|
||||
self._loop_cnt += 1
|
||||
self.wait()
|
||||
return True
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
import abc
|
||||
import logging
|
||||
import queue
|
||||
import time
|
||||
|
||||
import aprslib
|
||||
from oslo_config import cfg
|
||||
@ -43,19 +42,19 @@ class APRSDRXThread(APRSDThread):
|
||||
self.packet_queue = packet_queue
|
||||
|
||||
def stop(self):
|
||||
self.thread_stop = True
|
||||
self._shutdown_event.set()
|
||||
if self._client:
|
||||
self._client.close()
|
||||
|
||||
def loop(self):
|
||||
if not self._client:
|
||||
self._client = APRSDClient()
|
||||
time.sleep(1)
|
||||
self.wait(timeout=1)
|
||||
return True
|
||||
|
||||
if not self._client.is_alive:
|
||||
self._client = APRSDClient()
|
||||
time.sleep(1)
|
||||
self.wait(timeout=1)
|
||||
return True
|
||||
|
||||
# setup the consumer of messages and block until a messages
|
||||
@ -82,12 +81,14 @@ class APRSDRXThread(APRSDThread):
|
||||
# This will cause a reconnect, next time client.get_client()
|
||||
# is called
|
||||
self._client.reset()
|
||||
time.sleep(5)
|
||||
if self.wait(timeout=5):
|
||||
return False
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
LOG.error('Resetting connection and trying again.')
|
||||
self._client.reset()
|
||||
time.sleep(5)
|
||||
if self.wait(timeout=5):
|
||||
return False
|
||||
return True
|
||||
|
||||
def process_packet(self, *args, **kwargs):
|
||||
@ -153,7 +154,7 @@ class APRSDFilterThread(APRSDThread):
|
||||
|
||||
def loop(self):
|
||||
try:
|
||||
pkt = self.packet_queue.get(timeout=1)
|
||||
pkt = self.packet_queue.get(timeout=self.period)
|
||||
self.packet_count += 1
|
||||
# We use the client here, because the specific
|
||||
# driver may need to decode the packet differently.
|
||||
|
||||
@ -31,20 +31,19 @@ class StatsStore(objectstore.ObjectStoreMixin):
|
||||
class APRSDStatsStoreThread(APRSDThread):
|
||||
"""Save APRSD Stats to disk periodically."""
|
||||
|
||||
# how often in seconds to write the file
|
||||
save_interval = 10
|
||||
daemon = False
|
||||
period = 10
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('StatsStore')
|
||||
|
||||
def loop(self):
|
||||
if self.loop_count % self.save_interval == 0:
|
||||
stats = collector.Collector().collect()
|
||||
ss = StatsStore()
|
||||
ss.add(stats)
|
||||
ss.save()
|
||||
stats = collector.Collector().collect()
|
||||
ss = StatsStore()
|
||||
ss.add(stats)
|
||||
ss.save()
|
||||
|
||||
time.sleep(1)
|
||||
self.wait()
|
||||
return True
|
||||
|
||||
|
||||
@ -64,143 +63,140 @@ class APRSDPushStatsThread(APRSDThread):
|
||||
self.send_packetlist = send_packetlist
|
||||
|
||||
def loop(self):
|
||||
if self.loop_count % self.period == 0:
|
||||
stats_json = collector.Collector().collect(serializable=True)
|
||||
url = f'{self.push_url}/stats'
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
# Remove the PacketList section to reduce payload size
|
||||
if not self.send_packetlist:
|
||||
if 'PacketList' in stats_json:
|
||||
del stats_json['PacketList']['packets']
|
||||
stats_json = collector.Collector().collect(serializable=True)
|
||||
url = f'{self.push_url}/stats'
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
# Remove the PacketList section to reduce payload size
|
||||
if not self.send_packetlist:
|
||||
if 'PacketList' in stats_json:
|
||||
del stats_json['PacketList']['packets']
|
||||
|
||||
now = datetime.datetime.now()
|
||||
time_format = '%m-%d-%Y %H:%M:%S'
|
||||
stats = {
|
||||
'time': now.strftime(time_format),
|
||||
'stats': stats_json,
|
||||
}
|
||||
now = datetime.datetime.now()
|
||||
time_format = '%m-%d-%Y %H:%M:%S'
|
||||
stats = {
|
||||
'time': now.strftime(time_format),
|
||||
'stats': stats_json,
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(url, json=stats, headers=headers, timeout=5)
|
||||
response.raise_for_status()
|
||||
try:
|
||||
response = requests.post(url, json=stats, headers=headers, timeout=5)
|
||||
response.raise_for_status()
|
||||
|
||||
if response.status_code == 200:
|
||||
LOGU.info(f'Successfully pushed stats to {self.push_url}')
|
||||
else:
|
||||
LOGU.warning(
|
||||
f'Failed to push stats to {self.push_url}: HTTP {response.status_code}'
|
||||
)
|
||||
if response.status_code == 200:
|
||||
LOGU.info(f'Successfully pushed stats to {self.push_url}')
|
||||
else:
|
||||
LOGU.warning(
|
||||
f'Failed to push stats to {self.push_url}: HTTP {response.status_code}'
|
||||
)
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
LOGU.error(f'Error pushing stats to {self.push_url}: {e}')
|
||||
except Exception as e:
|
||||
LOGU.error(f'Unexpected error in stats push: {e}')
|
||||
except requests.exceptions.RequestException as e:
|
||||
LOGU.error(f'Error pushing stats to {self.push_url}: {e}')
|
||||
except Exception as e:
|
||||
LOGU.error(f'Unexpected error in stats push: {e}')
|
||||
|
||||
time.sleep(1)
|
||||
self.wait()
|
||||
return True
|
||||
|
||||
|
||||
class StatsLogThread(APRSDThread):
|
||||
"""Log the stats from the PacketList."""
|
||||
|
||||
period = 10
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('PacketStatsLog')
|
||||
self._last_total_rx = 0
|
||||
self.period = 10
|
||||
self.start_time = time.time()
|
||||
|
||||
def loop(self):
|
||||
if self.loop_count % self.period == 0:
|
||||
# log the stats every 10 seconds
|
||||
stats_json = collector.Collector().collect(serializable=True)
|
||||
stats = stats_json['PacketList']
|
||||
total_rx = stats['rx']
|
||||
rx_delta = total_rx - self._last_total_rx
|
||||
rate = rx_delta / self.period
|
||||
# log the stats every 10 seconds
|
||||
stats_json = collector.Collector().collect(serializable=True)
|
||||
stats = stats_json['PacketList']
|
||||
total_rx = stats['rx']
|
||||
rx_delta = total_rx - self._last_total_rx
|
||||
rate = rx_delta / self.period
|
||||
|
||||
# Get unique callsigns count from SeenList stats
|
||||
seen_list_instance = seen_list.SeenList()
|
||||
# stats() returns data while holding lock internally, so copy it immediately
|
||||
seen_list_stats = seen_list_instance.stats()
|
||||
seen_list_instance.save()
|
||||
# Copy the stats to avoid holding references to locked data
|
||||
seen_list_stats = seen_list_stats.copy()
|
||||
unique_callsigns_count = len(seen_list_stats)
|
||||
# Get unique callsigns count from SeenList stats
|
||||
seen_list_instance = seen_list.SeenList()
|
||||
# stats() returns data while holding lock internally, so copy it immediately
|
||||
seen_list_stats = seen_list_instance.stats()
|
||||
seen_list_instance.save()
|
||||
# Copy the stats to avoid holding references to locked data
|
||||
seen_list_stats = seen_list_stats.copy()
|
||||
unique_callsigns_count = len(seen_list_stats)
|
||||
|
||||
# Calculate uptime
|
||||
elapsed = time.time() - self.start_time
|
||||
elapsed_minutes = elapsed / 60
|
||||
elapsed_hours = elapsed / 3600
|
||||
# Calculate uptime
|
||||
elapsed = time.time() - self.start_time
|
||||
elapsed_minutes = elapsed / 60
|
||||
elapsed_hours = elapsed / 3600
|
||||
|
||||
# Log summary stats
|
||||
LOGU.opt(colors=True).info(
|
||||
f'<green>RX Rate: {rate:.2f} pps</green> '
|
||||
f'<yellow>Total RX: {total_rx}</yellow> '
|
||||
f'<red>RX Last {self.period} secs: {rx_delta}</red> '
|
||||
# Log summary stats
|
||||
LOGU.opt(colors=True).info(
|
||||
f'<green>RX Rate: {rate:.2f} pps</green> '
|
||||
f'<yellow>Total RX: {total_rx}</yellow> '
|
||||
f'<red>RX Last {self.period} secs: {rx_delta}</red> '
|
||||
)
|
||||
LOGU.opt(colors=True).info(
|
||||
f'<cyan>Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h)</cyan> '
|
||||
f'<magenta>Unique Callsigns: {unique_callsigns_count}</magenta>',
|
||||
)
|
||||
self._last_total_rx = total_rx
|
||||
|
||||
# Log individual type stats, sorted by RX count (descending)
|
||||
sorted_types = sorted(
|
||||
stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True
|
||||
)
|
||||
for k, v in sorted_types:
|
||||
# Calculate percentage of this packet type compared to total RX
|
||||
percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0
|
||||
# Format values first, then apply colors
|
||||
packet_type_str = f'{k:<15}'
|
||||
rx_count_str = f'{v["rx"]:6d}'
|
||||
tx_count_str = f'{v["tx"]:6d}'
|
||||
percentage_str = f'{percentage:5.1f}%'
|
||||
# Use different colors for RX count based on threshold (matching mqtt_injest.py)
|
||||
rx_color_tag = (
|
||||
'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red'
|
||||
)
|
||||
LOGU.opt(colors=True).info(
|
||||
f'<cyan>Uptime: {elapsed:.0f}s ({elapsed_minutes:.1f}m / {elapsed_hours:.2f}h)</cyan> '
|
||||
f'<magenta>Unique Callsigns: {unique_callsigns_count}</magenta>',
|
||||
f' <cyan>{packet_type_str}</cyan>: '
|
||||
f'<{rx_color_tag}>RX: {rx_count_str}</{rx_color_tag}> '
|
||||
f'<red>TX: {tx_count_str}</red> '
|
||||
f'<magenta>({percentage_str})</magenta>',
|
||||
)
|
||||
self._last_total_rx = total_rx
|
||||
|
||||
# Log individual type stats, sorted by RX count (descending)
|
||||
sorted_types = sorted(
|
||||
stats['types'].items(), key=lambda x: x[1]['rx'], reverse=True
|
||||
)
|
||||
for k, v in sorted_types:
|
||||
# Calculate percentage of this packet type compared to total RX
|
||||
percentage = (v['rx'] / total_rx * 100) if total_rx > 0 else 0.0
|
||||
# Format values first, then apply colors
|
||||
packet_type_str = f'{k:<15}'
|
||||
rx_count_str = f'{v["rx"]:6d}'
|
||||
tx_count_str = f'{v["tx"]:6d}'
|
||||
percentage_str = f'{percentage:5.1f}%'
|
||||
# Use different colors for RX count based on threshold (matching mqtt_injest.py)
|
||||
rx_color_tag = (
|
||||
'green' if v['rx'] > 100 else 'yellow' if v['rx'] > 10 else 'red'
|
||||
)
|
||||
# Extract callsign counts from seen_list stats
|
||||
callsign_counts = {}
|
||||
for callsign, data in seen_list_stats.items():
|
||||
if isinstance(data, dict) and 'count' in data:
|
||||
callsign_counts[callsign] = data['count']
|
||||
|
||||
# Sort callsigns by packet count (descending) and get top 10
|
||||
sorted_callsigns = sorted(
|
||||
callsign_counts.items(), key=lambda x: x[1], reverse=True
|
||||
)[:10]
|
||||
|
||||
# Log top 10 callsigns
|
||||
if sorted_callsigns:
|
||||
LOGU.opt(colors=True).info('<cyan>Top 10 Callsigns by Packet Count:</cyan>')
|
||||
total_ranks = len(sorted_callsigns)
|
||||
for rank, (callsign, count) in enumerate(sorted_callsigns, 1):
|
||||
# Calculate percentage of this callsign compared to total RX
|
||||
percentage = (count / total_rx * 100) if total_rx > 0 else 0.0
|
||||
# Use different colors based on rank: most packets (rank 1) = red,
|
||||
# least packets (last rank) = green, middle = yellow
|
||||
if rank == 1:
|
||||
count_color_tag = 'red'
|
||||
elif rank == total_ranks:
|
||||
count_color_tag = 'green'
|
||||
else:
|
||||
count_color_tag = 'yellow'
|
||||
LOGU.opt(colors=True).info(
|
||||
f' <cyan>{packet_type_str}</cyan>: '
|
||||
f'<{rx_color_tag}>RX: {rx_count_str}</{rx_color_tag}> '
|
||||
f'<red>TX: {tx_count_str}</red> '
|
||||
f'<magenta>({percentage_str})</magenta>',
|
||||
f' <cyan>{rank:2d}.</cyan> '
|
||||
f'<white>{callsign:<12}</white>: '
|
||||
f'<{count_color_tag}>{count:6d} packets</{count_color_tag}> '
|
||||
f'<magenta>({percentage:5.1f}%)</magenta>',
|
||||
)
|
||||
|
||||
# Extract callsign counts from seen_list stats
|
||||
callsign_counts = {}
|
||||
for callsign, data in seen_list_stats.items():
|
||||
if isinstance(data, dict) and 'count' in data:
|
||||
callsign_counts[callsign] = data['count']
|
||||
|
||||
# Sort callsigns by packet count (descending) and get top 10
|
||||
sorted_callsigns = sorted(
|
||||
callsign_counts.items(), key=lambda x: x[1], reverse=True
|
||||
)[:10]
|
||||
|
||||
# Log top 10 callsigns
|
||||
if sorted_callsigns:
|
||||
LOGU.opt(colors=True).info(
|
||||
'<cyan>Top 10 Callsigns by Packet Count:</cyan>'
|
||||
)
|
||||
total_ranks = len(sorted_callsigns)
|
||||
for rank, (callsign, count) in enumerate(sorted_callsigns, 1):
|
||||
# Calculate percentage of this callsign compared to total RX
|
||||
percentage = (count / total_rx * 100) if total_rx > 0 else 0.0
|
||||
# Use different colors based on rank: most packets (rank 1) = red,
|
||||
# least packets (last rank) = green, middle = yellow
|
||||
if rank == 1:
|
||||
count_color_tag = 'red'
|
||||
elif rank == total_ranks:
|
||||
count_color_tag = 'green'
|
||||
else:
|
||||
count_color_tag = 'yellow'
|
||||
LOGU.opt(colors=True).info(
|
||||
f' <cyan>{rank:2d}.</cyan> '
|
||||
f'<white>{callsign:<12}</white>: '
|
||||
f'<{count_color_tag}>{count:6d} packets</{count_color_tag}> '
|
||||
f'<magenta>({percentage:5.1f}%)</magenta>',
|
||||
)
|
||||
|
||||
time.sleep(1)
|
||||
self.wait()
|
||||
return True
|
||||
|
||||
@ -241,6 +241,8 @@ class PacketSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
separate thread for each packet.
|
||||
"""
|
||||
|
||||
daemon = False # Non-daemon for graceful packet handling
|
||||
|
||||
def __init__(self, max_workers=5):
|
||||
super().__init__('PacketSendSchedulerThread')
|
||||
self.executor = ThreadPoolExecutor(
|
||||
@ -272,7 +274,7 @@ class PacketSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
# The worker will check timing and send if needed
|
||||
self.executor.submit(_send_packet_worker, msg_no)
|
||||
|
||||
time.sleep(1) # Check every second
|
||||
self.wait() # Check every period (default 1 second)
|
||||
return True
|
||||
|
||||
def _cleanup(self):
|
||||
@ -289,6 +291,8 @@ class AckSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
separate thread for each ack.
|
||||
"""
|
||||
|
||||
daemon = False # Non-daemon for graceful ACK handling
|
||||
|
||||
def __init__(self, max_workers=3):
|
||||
super().__init__('AckSendSchedulerThread')
|
||||
self.executor = ThreadPoolExecutor(
|
||||
@ -320,7 +324,7 @@ class AckSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
# Submit send task to threadpool
|
||||
self.executor.submit(_send_ack_worker, msg_no, self.max_retries)
|
||||
|
||||
time.sleep(1) # Check every second
|
||||
self.wait() # Check every period (default 1 second)
|
||||
return True
|
||||
|
||||
def _cleanup(self):
|
||||
@ -330,8 +334,6 @@ class AckSendSchedulerThread(aprsd_threads.APRSDThread):
|
||||
|
||||
|
||||
class SendPacketThread(aprsd_threads.APRSDThread):
|
||||
loop_count: int = 1
|
||||
|
||||
def __init__(self, packet):
|
||||
self.packet = packet
|
||||
super().__init__(f'TX-{packet.to_call}-{self.packet.msgNo}')
|
||||
@ -401,14 +403,12 @@ class SendPacketThread(aprsd_threads.APRSDThread):
|
||||
if sent:
|
||||
packet.send_count += 1
|
||||
|
||||
time.sleep(1)
|
||||
self.wait()
|
||||
# Make sure we get called again.
|
||||
self.loop_count += 1
|
||||
return True
|
||||
|
||||
|
||||
class SendAckThread(aprsd_threads.APRSDThread):
|
||||
loop_count: int = 1
|
||||
max_retries = 3
|
||||
|
||||
def __init__(self, packet):
|
||||
@ -462,8 +462,7 @@ class SendAckThread(aprsd_threads.APRSDThread):
|
||||
|
||||
self.packet.last_send_time = int(round(time.time()))
|
||||
|
||||
time.sleep(1)
|
||||
self.loop_count += 1
|
||||
self.wait()
|
||||
return True
|
||||
|
||||
|
||||
@ -473,11 +472,9 @@ class BeaconSendThread(aprsd_threads.APRSDThread):
|
||||
Settings are in the [DEFAULT] section of the config file.
|
||||
"""
|
||||
|
||||
_loop_cnt: int = 1
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('BeaconSendThread')
|
||||
self._loop_cnt = 1
|
||||
self.period = CONF.beacon_interval
|
||||
# Make sure Latitude and Longitude are set.
|
||||
if not CONF.latitude or not CONF.longitude:
|
||||
LOG.error(
|
||||
@ -491,25 +488,23 @@ class BeaconSendThread(aprsd_threads.APRSDThread):
|
||||
)
|
||||
|
||||
def loop(self):
|
||||
# Only dump out the stats every N seconds
|
||||
if self._loop_cnt % CONF.beacon_interval == 0:
|
||||
pkt = core.BeaconPacket(
|
||||
from_call=CONF.callsign,
|
||||
to_call='APRS',
|
||||
latitude=float(CONF.latitude),
|
||||
longitude=float(CONF.longitude),
|
||||
comment='APRSD GPS Beacon',
|
||||
symbol=CONF.beacon_symbol,
|
||||
)
|
||||
try:
|
||||
# Only send it once
|
||||
pkt.retry_count = 1
|
||||
send(pkt, direct=True)
|
||||
except Exception as e:
|
||||
LOG.error(f'Failed to send beacon: {e}')
|
||||
APRSDClient().reset()
|
||||
time.sleep(5)
|
||||
pkt = core.BeaconPacket(
|
||||
from_call=CONF.callsign,
|
||||
to_call='APRS',
|
||||
latitude=float(CONF.latitude),
|
||||
longitude=float(CONF.longitude),
|
||||
comment='APRSD GPS Beacon',
|
||||
symbol=CONF.beacon_symbol,
|
||||
)
|
||||
try:
|
||||
# Only send it once
|
||||
pkt.retry_count = 1
|
||||
send(pkt, direct=True)
|
||||
except Exception as e:
|
||||
LOG.error(f'Failed to send beacon: {e}')
|
||||
APRSDClient().reset()
|
||||
if self.wait(timeout=5):
|
||||
return False
|
||||
|
||||
self._loop_cnt += 1
|
||||
time.sleep(1)
|
||||
self.wait()
|
||||
return True
|
||||
|
||||
1355
docs/superpowers/plans/2026-03-24-daemon-threads-event-refactor.md
Normal file
1355
docs/superpowers/plans/2026-03-24-daemon-threads-event-refactor.md
Normal file
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,259 @@
|
||||
# Daemon Threads and Event-Based Timing Refactor
|
||||
|
||||
**Date:** 2026-03-24
|
||||
**Status:** Draft
|
||||
**Scope:** All APRSD thread classes
|
||||
|
||||
## Overview
|
||||
|
||||
Refactor all APRSD thread classes to use daemon threads and replace counter-based sleep patterns with `threading.Event()` for interruptible waits and cleaner periodic timing.
|
||||
|
||||
## Goals
|
||||
|
||||
1. **Faster shutdown** — threads respond to shutdown signals immediately instead of waiting for `time.sleep()` to complete
|
||||
2. **Cleaner periodic timing** — replace counter-based timing (`loop_count % 60`) with explicit period-based waits
|
||||
3. **Proper daemon semantics** — most threads become daemon threads; critical I/O threads remain non-daemon for graceful shutdown
|
||||
|
||||
## Current State
|
||||
|
||||
- **14 thread classes** extend `APRSDThread` base class
|
||||
- All use `time.sleep(1)` with counter-based conditionals for periodic work
|
||||
- Shutdown via boolean `thread_stop` flag, polled in while loop
|
||||
- No threads set as daemon — all block program exit
|
||||
- No `threading.Event()` usage anywhere
|
||||
|
||||
### Current Problems
|
||||
|
||||
1. Shutdown delay: up to 1-5 seconds waiting for sleep to finish
|
||||
2. Counter math is fragile and obscures intent
|
||||
3. Non-daemon threads prevent clean program exit
|
||||
|
||||
## Design
|
||||
|
||||
### Base Class Changes
|
||||
|
||||
File: `aprsd/threads/aprsd.py`
|
||||
|
||||
```python
|
||||
class APRSDThread(threading.Thread, metaclass=abc.ABCMeta):
|
||||
# Class attributes (subclasses override as needed)
|
||||
daemon = True # Most threads are daemon
|
||||
period = 1 # Default wait period in seconds
|
||||
|
||||
def __init__(self, name):
|
||||
super().__init__(name=name)
|
||||
self.daemon = self.__class__.daemon
|
||||
self._shutdown_event = threading.Event()
|
||||
self.loop_count = 0 # Retained for debugging
|
||||
self._last_loop = datetime.datetime.now()
|
||||
APRSDThreadList().add(self)
|
||||
|
||||
def _should_quit(self) -> bool:
|
||||
"""Check if thread should exit."""
|
||||
return self._shutdown_event.is_set()
|
||||
|
||||
def stop(self):
|
||||
"""Signal thread to stop. Returns immediately."""
|
||||
self._shutdown_event.set()
|
||||
|
||||
def wait(self, timeout: float | None = None) -> bool:
|
||||
"""Wait for shutdown signal or timeout.
|
||||
|
||||
Args:
|
||||
timeout: Seconds to wait. Defaults to self.period.
|
||||
|
||||
Returns:
|
||||
True if shutdown was signaled, False if timeout expired.
|
||||
"""
|
||||
wait_time = timeout if timeout is not None else self.period
|
||||
return self._shutdown_event.wait(timeout=wait_time)
|
||||
|
||||
def run(self):
|
||||
while not self._should_quit():
|
||||
self.loop_count += 1
|
||||
self._last_loop = datetime.datetime.now()
|
||||
if not self.loop():
|
||||
break
|
||||
APRSDThreadList().remove(self)
|
||||
```
|
||||
|
||||
### Daemon vs Non-Daemon Threads
|
||||
|
||||
**Non-daemon threads (3)** — require graceful shutdown for I/O operations:
|
||||
|
||||
| Thread | File | Reason |
|
||||
|--------|------|--------|
|
||||
| `PacketSendSchedulerThread` | tx.py | Manages packet send queue |
|
||||
| `AckSendSchedulerThread` | tx.py | Manages ACK send queue |
|
||||
| `APRSDStatsStoreThread` | stats.py | Writes stats to disk |
|
||||
|
||||
**Daemon threads (12)** — can be terminated immediately:
|
||||
|
||||
- `APRSDRXThread`, `APRSDFilterThread`, `APRSDProcessPacketThread`, `APRSDPluginProcessPacketThread`
|
||||
- `APRSDPushStatsThread`, `StatsLogThread`
|
||||
- `KeepAliveThread`, `APRSRegistryThread`
|
||||
- `SendPacketThread`, `SendAckThread`, `BeaconSendThread`
|
||||
- `APRSDListenProcessThread` (listen command)
|
||||
|
||||
### Subclass Migration Pattern
|
||||
|
||||
**Before:**
|
||||
```python
|
||||
class KeepAliveThread(APRSDThread):
|
||||
def loop(self):
|
||||
if self.loop_count % 60 == 0:
|
||||
self._do_keepalive_work()
|
||||
time.sleep(1)
|
||||
return True
|
||||
```
|
||||
|
||||
**After:**
|
||||
```python
|
||||
class KeepAliveThread(APRSDThread):
|
||||
period = 60
|
||||
|
||||
def loop(self):
|
||||
self._do_keepalive_work()
|
||||
self.wait()
|
||||
return True
|
||||
```
|
||||
|
||||
### Thread Periods
|
||||
|
||||
| Thread | New `period` | Source |
|
||||
|--------|--------------|--------|
|
||||
| `KeepAliveThread` | 60 | Fixed |
|
||||
| `APRSDStatsStoreThread` | 10 | Fixed |
|
||||
| `APRSDPushStatsThread` | config | `CONF.push_stats.period` |
|
||||
| `StatsLogThread` | 10 | Fixed |
|
||||
| `APRSRegistryThread` | config | `CONF.aprs_registry.frequency_seconds` |
|
||||
| `BeaconSendThread` | config | `CONF.beacon_interval` |
|
||||
| `APRSDRXThread` | 1 | Default |
|
||||
| `APRSDFilterThread` | 1 | Default |
|
||||
| `APRSDProcessPacketThread` | 1 | Default |
|
||||
| `APRSDPluginProcessPacketThread` | 1 | Default |
|
||||
| `SendPacketThread` | 1 | Default |
|
||||
| `SendAckThread` | 1 | Default |
|
||||
| `PacketSendSchedulerThread` | 1 | Default |
|
||||
| `AckSendSchedulerThread` | 1 | Default |
|
||||
| `APRSDListenProcessThread` | 1 | Default |
|
||||
|
||||
Config-based periods are set in `__init__` or `setup()`. Note: `setup()` is called by subclasses in their `__init__` before the thread starts; the base class does not call it automatically.
|
||||
```python
|
||||
class APRSRegistryThread(APRSDThread):
|
||||
period = 1 # Default
|
||||
|
||||
def setup(self):
|
||||
self.period = CONF.aprs_registry.frequency_seconds
|
||||
```
|
||||
|
||||
### ThreadList Changes
|
||||
|
||||
File: `aprsd/threads/aprsd.py`
|
||||
|
||||
```python
|
||||
class APRSDThreadList:
|
||||
def stop_all(self):
|
||||
"""Signal all threads to stop."""
|
||||
with self.lock:
|
||||
for th in self.threads_list:
|
||||
th.stop()
|
||||
|
||||
def join_non_daemon(self, timeout: float = 5.0):
|
||||
"""Wait for non-daemon threads to complete gracefully."""
|
||||
with self.lock:
|
||||
for th in self.threads_list:
|
||||
if not th.daemon and th.is_alive():
|
||||
th.join(timeout=timeout)
|
||||
```
|
||||
|
||||
### Shutdown Handler Changes
|
||||
|
||||
File: `aprsd/main.py`
|
||||
|
||||
```python
|
||||
def signal_handler(sig, frame):
|
||||
LOG.info("Shutdown signal received")
|
||||
thread_list = threads.APRSDThreadList()
|
||||
thread_list.stop_all()
|
||||
thread_list.join_non_daemon(timeout=5.0)
|
||||
# Daemon threads killed automatically on exit
|
||||
```
|
||||
|
||||
### Queue-Based Threads
|
||||
|
||||
Threads that block on queues use queue timeout as interruptible wait:
|
||||
|
||||
```python
|
||||
class APRSDFilterThread(APRSDThread):
|
||||
period = 1
|
||||
|
||||
def loop(self):
|
||||
try:
|
||||
packet = self.queue.get(timeout=self.period)
|
||||
self._process(packet)
|
||||
except queue.Empty:
|
||||
pass # Timeout, loop checks _should_quit
|
||||
return True
|
||||
```
|
||||
|
||||
### Error Recovery Waits
|
||||
|
||||
Threads needing longer waits for error recovery use explicit timeout:
|
||||
|
||||
```python
|
||||
class APRSDRXThread(APRSDThread):
|
||||
period = 1
|
||||
|
||||
def loop(self):
|
||||
try:
|
||||
self._process_packets()
|
||||
except ConnectionError:
|
||||
LOG.error("Connection lost, retrying in 5s")
|
||||
if self.wait(timeout=5):
|
||||
return False # Shutdown signaled
|
||||
self.wait()
|
||||
return True
|
||||
```
|
||||
|
||||
## Files to Modify
|
||||
|
||||
1. `aprsd/threads/aprsd.py` — Base class and ThreadList
|
||||
2. `aprsd/threads/rx.py` — RX thread classes (4)
|
||||
3. `aprsd/threads/tx.py` — TX thread classes (5)
|
||||
4. `aprsd/threads/stats.py` — Stats thread classes (3)
|
||||
5. `aprsd/threads/keepalive.py` — KeepAliveThread
|
||||
6. `aprsd/threads/registry.py` — APRSRegistryThread
|
||||
7. `aprsd/main.py` — Signal handler
|
||||
8. `aprsd/cmds/listen.py` — APRSDListenProcessThread
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
1. **Unit tests for base class:**
|
||||
- `wait()` returns `True` immediately when event is already set
|
||||
- `wait(timeout=5)` returns `False` after 5 seconds if event not set
|
||||
- `stop()` causes `_should_quit()` to return `True`
|
||||
- `daemon` attribute is set correctly from class attribute
|
||||
|
||||
2. **Integration tests:**
|
||||
- Shutdown completes in <1s for daemon-only scenarios
|
||||
- Non-daemon threads get up to 5s grace period
|
||||
- `join_non_daemon()` respects timeout parameter
|
||||
|
||||
3. **Manual testing:**
|
||||
- Send SIGINT during operation, verify clean exit
|
||||
- Verify no "thread still running" warnings on shutdown
|
||||
|
||||
4. **Existing test updates:**
|
||||
- Update any tests that mock `thread_stop` to mock `_shutdown_event` instead
|
||||
- Update tests that check `time.sleep` calls to check `wait()` calls
|
||||
|
||||
## Backwards Compatibility
|
||||
|
||||
- `loop_count` retained for debugging/logging
|
||||
- `_should_quit()` method signature unchanged
|
||||
- Default `period=1` matches current 1-second sleep behavior
|
||||
|
||||
## Rollout
|
||||
|
||||
Single PR with all changes — the refactor is atomic and affects thread behavior globally.
|
||||
@ -1,3 +1,4 @@
|
||||
import datetime
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
@ -42,11 +43,9 @@ class TestAPRSDThread(unittest.TestCase):
|
||||
"""Test thread initialization."""
|
||||
thread = TestThread('TestThread1')
|
||||
self.assertEqual(thread.name, 'TestThread1')
|
||||
self.assertFalse(thread.thread_stop)
|
||||
self.assertFalse(thread._shutdown_event.is_set())
|
||||
self.assertFalse(thread._pause)
|
||||
self.assertEqual(thread.loop_count, 1)
|
||||
|
||||
# Should be registered in thread list
|
||||
self.assertEqual(thread.loop_count, 0) # Was 1, now starts at 0
|
||||
thread_list = APRSDThreadList()
|
||||
self.assertIn(thread, thread_list.threads_list)
|
||||
|
||||
@ -54,8 +53,7 @@ class TestAPRSDThread(unittest.TestCase):
|
||||
"""Test _should_quit() method."""
|
||||
thread = TestThread('TestThread2')
|
||||
self.assertFalse(thread._should_quit())
|
||||
|
||||
thread.thread_stop = True
|
||||
thread._shutdown_event.set()
|
||||
self.assertTrue(thread._should_quit())
|
||||
|
||||
def test_pause_unpause(self):
|
||||
@ -72,20 +70,93 @@ class TestAPRSDThread(unittest.TestCase):
|
||||
def test_stop(self):
|
||||
"""Test stop() method."""
|
||||
thread = TestThread('TestThread4')
|
||||
self.assertFalse(thread.thread_stop)
|
||||
|
||||
self.assertFalse(thread._shutdown_event.is_set())
|
||||
thread.stop()
|
||||
self.assertTrue(thread.thread_stop)
|
||||
self.assertTrue(thread._shutdown_event.is_set())
|
||||
|
||||
def test_loop_age(self):
|
||||
"""Test loop_age() method."""
|
||||
import datetime
|
||||
|
||||
thread = TestThread('TestThread5')
|
||||
age = thread.loop_age()
|
||||
self.assertIsInstance(age, datetime.timedelta)
|
||||
self.assertGreaterEqual(age.total_seconds(), 0)
|
||||
|
||||
def test_daemon_attribute_default(self):
|
||||
"""Test that daemon attribute defaults to True."""
|
||||
thread = TestThread('DaemonTest')
|
||||
self.assertTrue(thread.daemon)
|
||||
|
||||
def test_daemon_attribute_override(self):
|
||||
"""Test that daemon attribute can be overridden via class attribute."""
|
||||
|
||||
class NonDaemonThread(APRSDThread):
|
||||
daemon = False
|
||||
|
||||
def loop(self):
|
||||
return False
|
||||
|
||||
thread = NonDaemonThread('NonDaemonTest')
|
||||
self.assertFalse(thread.daemon)
|
||||
|
||||
def test_period_attribute_default(self):
|
||||
"""Test that period attribute defaults to 1."""
|
||||
thread = TestThread('PeriodTest')
|
||||
self.assertEqual(thread.period, 1)
|
||||
|
||||
def test_period_attribute_override(self):
|
||||
"""Test that period attribute can be overridden via class attribute."""
|
||||
|
||||
class LongPeriodThread(APRSDThread):
|
||||
period = 60
|
||||
|
||||
def loop(self):
|
||||
return False
|
||||
|
||||
thread = LongPeriodThread('LongPeriodTest')
|
||||
self.assertEqual(thread.period, 60)
|
||||
|
||||
def test_shutdown_event_exists(self):
|
||||
"""Test that _shutdown_event is created."""
|
||||
thread = TestThread('EventTest')
|
||||
self.assertIsInstance(thread._shutdown_event, threading.Event)
|
||||
self.assertFalse(thread._shutdown_event.is_set())
|
||||
|
||||
def test_wait_returns_false_on_timeout(self):
|
||||
"""Test that wait() returns False when timeout expires."""
|
||||
thread = TestThread('WaitTimeoutTest')
|
||||
start = time.time()
|
||||
result = thread.wait(timeout=0.1)
|
||||
elapsed = time.time() - start
|
||||
self.assertFalse(result)
|
||||
self.assertGreaterEqual(elapsed, 0.1)
|
||||
|
||||
def test_wait_returns_true_when_stopped(self):
|
||||
"""Test that wait() returns True immediately when stop() was called."""
|
||||
thread = TestThread('WaitStopTest')
|
||||
thread.stop()
|
||||
start = time.time()
|
||||
result = thread.wait(timeout=10)
|
||||
elapsed = time.time() - start
|
||||
self.assertTrue(result)
|
||||
self.assertLess(elapsed, 1)
|
||||
|
||||
def test_wait_uses_period_by_default(self):
|
||||
"""Test that wait() uses self.period when no timeout specified."""
|
||||
|
||||
class ShortPeriodThread(APRSDThread):
|
||||
period = 0.1
|
||||
|
||||
def loop(self):
|
||||
return False
|
||||
|
||||
thread = ShortPeriodThread('ShortPeriodTest')
|
||||
start = time.time()
|
||||
result = thread.wait()
|
||||
elapsed = time.time() - start
|
||||
self.assertFalse(result)
|
||||
self.assertGreaterEqual(elapsed, 0.1)
|
||||
self.assertLess(elapsed, 0.5)
|
||||
|
||||
def test_str(self):
|
||||
"""Test __str__() method."""
|
||||
thread = TestThread('TestThread6')
|
||||
@ -253,10 +324,9 @@ class TestAPRSDThreadList(unittest.TestCase):
|
||||
thread2 = TestThread('TestThread9')
|
||||
thread_list.add(thread1)
|
||||
thread_list.add(thread2)
|
||||
|
||||
thread_list.stop_all()
|
||||
self.assertTrue(thread1.thread_stop)
|
||||
self.assertTrue(thread2.thread_stop)
|
||||
self.assertTrue(thread1._shutdown_event.is_set())
|
||||
self.assertTrue(thread2._shutdown_event.is_set())
|
||||
|
||||
def test_pause_all(self):
|
||||
"""Test pause_all() method."""
|
||||
@ -334,3 +404,51 @@ class TestAPRSDThreadList(unittest.TestCase):
|
||||
|
||||
# Should handle concurrent access without errors
|
||||
self.assertGreaterEqual(len(thread_list), 0)
|
||||
|
||||
def test_join_non_daemon(self):
|
||||
"""Test join_non_daemon() waits for non-daemon threads."""
|
||||
|
||||
class NonDaemonTestThread(APRSDThread):
|
||||
daemon = False
|
||||
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.finished = False
|
||||
|
||||
def loop(self):
|
||||
time.sleep(0.2)
|
||||
self.finished = True
|
||||
return False
|
||||
|
||||
thread_list = APRSDThreadList()
|
||||
thread = NonDaemonTestThread('NonDaemonJoinTest')
|
||||
thread_list.add(thread)
|
||||
thread.start()
|
||||
|
||||
# Stop triggers the event, thread should finish its loop then exit
|
||||
thread.stop()
|
||||
thread_list.join_non_daemon(timeout=5.0)
|
||||
|
||||
self.assertTrue(thread.finished or not thread.is_alive())
|
||||
|
||||
def test_join_non_daemon_skips_daemon_threads(self):
|
||||
"""Test join_non_daemon() does not wait for daemon threads."""
|
||||
thread_list = APRSDThreadList()
|
||||
# Clear existing threads
|
||||
thread_list.threads_list = []
|
||||
|
||||
# Create a daemon thread that loops forever
|
||||
thread = TestThread('DaemonSkipTest', should_loop=True)
|
||||
thread_list.add(thread)
|
||||
thread.start()
|
||||
|
||||
# This should return quickly since it's a daemon thread
|
||||
start = time.time()
|
||||
thread_list.join_non_daemon(timeout=0.1)
|
||||
elapsed = time.time() - start
|
||||
|
||||
self.assertLess(elapsed, 0.5) # Should not wait for daemon
|
||||
|
||||
# Cleanup
|
||||
thread.stop()
|
||||
thread.join(timeout=1)
|
||||
|
||||
@ -15,17 +15,18 @@ class TestAPRSDRXThread(unittest.TestCase):
|
||||
self.packet_queue = queue.Queue()
|
||||
self.rx_thread = rx.APRSDRXThread(self.packet_queue)
|
||||
self.rx_thread.pkt_count = 0 # Reset packet count
|
||||
# Mock time.sleep to speed up tests
|
||||
self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep')
|
||||
self.mock_sleep = self.sleep_patcher.start()
|
||||
# Mock self.wait to speed up tests
|
||||
self.wait_patcher = mock.patch.object(
|
||||
self.rx_thread, 'wait', return_value=False
|
||||
)
|
||||
self.mock_wait = self.wait_patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
self.wait_patcher.stop()
|
||||
self.rx_thread.stop()
|
||||
if self.rx_thread.is_alive():
|
||||
self.rx_thread.join(timeout=1)
|
||||
# Stop the sleep patcher
|
||||
self.sleep_patcher.stop()
|
||||
|
||||
def test_init(self):
|
||||
"""Test initialization."""
|
||||
@ -39,13 +40,13 @@ class TestAPRSDRXThread(unittest.TestCase):
|
||||
self.rx_thread._client = mock.MagicMock()
|
||||
self.rx_thread.stop()
|
||||
|
||||
self.assertTrue(self.rx_thread.thread_stop)
|
||||
self.assertTrue(self.rx_thread._shutdown_event.is_set())
|
||||
self.rx_thread._client.close.assert_called()
|
||||
|
||||
def test_stop_no_client(self):
|
||||
"""Test stop() when client is None."""
|
||||
self.rx_thread.stop()
|
||||
self.assertTrue(self.rx_thread.thread_stop)
|
||||
self.assertTrue(self.rx_thread._shutdown_event.is_set())
|
||||
|
||||
def test_loop_no_client(self):
|
||||
"""Test loop() when client is None."""
|
||||
@ -237,18 +238,18 @@ class TestAPRSDFilterThread(unittest.TestCase):
|
||||
"""Process packet - required by base class."""
|
||||
pass
|
||||
|
||||
# Mock APRSDClient to avoid config requirements
|
||||
self.client_patcher = mock.patch('aprsd.threads.rx.APRSDClient')
|
||||
self.mock_client = self.client_patcher.start()
|
||||
|
||||
self.filter_thread = TestFilterThread('TestFilterThread', self.packet_queue)
|
||||
# Mock time.sleep to speed up tests
|
||||
self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep')
|
||||
self.mock_sleep = self.sleep_patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
self.client_patcher.stop()
|
||||
self.filter_thread.stop()
|
||||
if self.filter_thread.is_alive():
|
||||
self.filter_thread.join(timeout=1)
|
||||
# Stop the sleep patcher
|
||||
self.sleep_patcher.stop()
|
||||
|
||||
def test_init(self):
|
||||
"""Test initialization."""
|
||||
@ -330,18 +331,18 @@ class TestAPRSDProcessPacketThread(unittest.TestCase):
|
||||
def process_our_message_packet(self, packet):
|
||||
pass
|
||||
|
||||
# Mock APRSDClient to avoid config requirements
|
||||
self.client_patcher = mock.patch('aprsd.threads.rx.APRSDClient')
|
||||
self.mock_client = self.client_patcher.start()
|
||||
|
||||
self.process_thread = ConcreteProcessThread(self.packet_queue)
|
||||
# Mock time.sleep to speed up tests
|
||||
self.sleep_patcher = mock.patch('aprsd.threads.rx.time.sleep')
|
||||
self.mock_sleep = self.sleep_patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
self.client_patcher.stop()
|
||||
self.process_thread.stop()
|
||||
if self.process_thread.is_alive():
|
||||
self.process_thread.join(timeout=1)
|
||||
# Stop the sleep patcher
|
||||
self.sleep_patcher.stop()
|
||||
|
||||
def test_init(self):
|
||||
"""Test initialization."""
|
||||
|
||||
@ -74,26 +74,25 @@ class TestAPRSDStatsStoreThread(unittest.TestCase):
|
||||
"""Test APRSDStatsStoreThread initialization."""
|
||||
thread = APRSDStatsStoreThread()
|
||||
self.assertEqual(thread.name, 'StatsStore')
|
||||
self.assertEqual(thread.save_interval, 10)
|
||||
self.assertEqual(thread.period, 10)
|
||||
self.assertFalse(thread.daemon)
|
||||
self.assertTrue(hasattr(thread, 'loop_count'))
|
||||
|
||||
def test_loop_with_save(self):
|
||||
"""Test loop method when save interval is reached."""
|
||||
"""Test loop method saves stats every call."""
|
||||
thread = APRSDStatsStoreThread()
|
||||
|
||||
# Mock the collector and save methods
|
||||
with (
|
||||
mock.patch('aprsd.stats.collector.Collector') as mock_collector_class,
|
||||
mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save') as mock_save,
|
||||
mock.patch.object(thread, 'wait'),
|
||||
):
|
||||
# Setup mock collector to return some stats
|
||||
mock_collector_instance = mock.Mock()
|
||||
mock_collector_instance.collect.return_value = {'test': 'data'}
|
||||
mock_collector_class.return_value = mock_collector_instance
|
||||
|
||||
# Set loop_count to match save interval
|
||||
thread.loop_count = 10
|
||||
|
||||
# Call loop
|
||||
result = thread.loop()
|
||||
|
||||
@ -104,45 +103,43 @@ class TestAPRSDStatsStoreThread(unittest.TestCase):
|
||||
mock_collector_instance.collect.assert_called_once()
|
||||
mock_save.assert_called_once()
|
||||
|
||||
def test_loop_without_save(self):
|
||||
"""Test loop method when save interval is not reached."""
|
||||
def test_loop_calls_wait(self):
|
||||
"""Test loop method calls wait() at the end."""
|
||||
thread = APRSDStatsStoreThread()
|
||||
|
||||
# Mock the collector and save methods
|
||||
with (
|
||||
mock.patch('aprsd.stats.collector.Collector') as mock_collector_class,
|
||||
mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save') as mock_save,
|
||||
mock.patch('aprsd.utils.objectstore.ObjectStoreMixin.save'),
|
||||
mock.patch.object(thread, 'wait') as mock_wait,
|
||||
):
|
||||
# Setup mock collector to return some stats
|
||||
mock_collector_instance = mock.Mock()
|
||||
mock_collector_instance.collect.return_value = {'test': 'data'}
|
||||
mock_collector_class.return_value = mock_collector_instance
|
||||
|
||||
# Set loop_count to not match save interval
|
||||
thread.loop_count = 1
|
||||
|
||||
# Call loop
|
||||
result = thread.loop()
|
||||
|
||||
# Should return True (continue looping)
|
||||
self.assertTrue(result)
|
||||
|
||||
# Should not have called save
|
||||
mock_save.assert_not_called()
|
||||
# Should have called wait
|
||||
mock_wait.assert_called_once()
|
||||
|
||||
def test_loop_with_exception(self):
|
||||
"""Test loop method when an exception occurs."""
|
||||
thread = APRSDStatsStoreThread()
|
||||
|
||||
# Mock the collector to raise an exception
|
||||
with mock.patch('aprsd.stats.collector.Collector') as mock_collector_class:
|
||||
with (
|
||||
mock.patch('aprsd.stats.collector.Collector') as mock_collector_class,
|
||||
mock.patch.object(thread, 'wait'),
|
||||
):
|
||||
mock_collector_instance = mock.Mock()
|
||||
mock_collector_instance.collect.side_effect = RuntimeError('Test exception')
|
||||
mock_collector_class.return_value = mock_collector_instance
|
||||
|
||||
# Set loop_count to match save interval
|
||||
thread.loop_count = 10
|
||||
|
||||
# Should raise the exception
|
||||
with self.assertRaises(RuntimeError):
|
||||
thread.loop()
|
||||
@ -177,24 +174,31 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
self.assertEqual(thread.period, 15)
|
||||
self.assertFalse(thread.send_packetlist)
|
||||
|
||||
def test_loop_skips_push_when_period_not_reached(self):
|
||||
"""Test loop does not POST when loop_count not divisible by period."""
|
||||
def test_loop_pushes_stats_every_call(self):
|
||||
"""Test loop POSTs stats on every call (timing controlled by wait)."""
|
||||
thread = APRSDPushStatsThread(
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
)
|
||||
thread.loop_count = 3 # 3 % 10 != 0
|
||||
|
||||
with (
|
||||
mock.patch('aprsd.threads.stats.collector.Collector') as mock_collector,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch.object(thread, 'wait'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
):
|
||||
mock_collector.return_value.collect.return_value = {}
|
||||
mock_dt.datetime.now.return_value.strftime.return_value = (
|
||||
'01-01-2025 12:00:00'
|
||||
)
|
||||
mock_post.return_value.status_code = 200
|
||||
mock_post.return_value.raise_for_status = mock.Mock()
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_collector.return_value.collect.assert_not_called()
|
||||
mock_post.assert_not_called()
|
||||
mock_collector.return_value.collect.assert_called_once_with(serializable=True)
|
||||
mock_post.assert_called_once()
|
||||
|
||||
def test_loop_pushes_stats_and_removes_packetlist_by_default(self):
|
||||
"""Test loop collects stats, POSTs to url/stats, and strips PacketList.packets."""
|
||||
@ -203,7 +207,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
frequency_seconds=10,
|
||||
send_packetlist=False,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
collected = {
|
||||
'PacketList': {'packets': [1, 2, 3], 'rx': 5, 'tx': 1},
|
||||
@ -215,7 +218,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch.object(thread, 'wait'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
):
|
||||
mock_collector_class.return_value.collect.return_value = collected
|
||||
@ -247,7 +250,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
frequency_seconds=10,
|
||||
send_packetlist=True,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
collected = {'PacketList': {'packets': [1, 2, 3], 'rx': 5}}
|
||||
|
||||
@ -256,7 +258,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch.object(thread, 'wait'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
):
|
||||
mock_collector_class.return_value.collect.return_value = collected
|
||||
@ -276,14 +278,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch.object(thread, 'wait'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
|
||||
):
|
||||
@ -306,14 +307,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch.object(thread, 'wait'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
|
||||
):
|
||||
@ -337,14 +337,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch.object(thread, 'wait'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
|
||||
):
|
||||
@ -368,14 +367,13 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
push_url='https://example.com',
|
||||
frequency_seconds=10,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch.object(thread, 'wait'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
mock.patch('aprsd.threads.stats.LOGU') as mock_logu,
|
||||
):
|
||||
@ -398,7 +396,6 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
frequency_seconds=10,
|
||||
send_packetlist=False,
|
||||
)
|
||||
thread.loop_count = 10
|
||||
|
||||
collected = {'Only': 'data', 'No': 'PacketList'}
|
||||
|
||||
@ -407,7 +404,7 @@ class TestAPRSDPushStatsThread(unittest.TestCase):
|
||||
'aprsd.threads.stats.collector.Collector'
|
||||
) as mock_collector_class,
|
||||
mock.patch('aprsd.threads.stats.requests.post') as mock_post,
|
||||
mock.patch('aprsd.threads.stats.time.sleep'),
|
||||
mock.patch.object(thread, 'wait'),
|
||||
mock.patch('aprsd.threads.stats.datetime') as mock_dt,
|
||||
):
|
||||
mock_collector_class.return_value.collect.return_value = collected
|
||||
|
||||
@ -596,9 +596,13 @@ class TestSendPacketThread(unittest.TestCase):
|
||||
tracker.PacketTrack._instance = None
|
||||
self.packet = fake.fake_packet(msg_number='123')
|
||||
self.thread = tx.SendPacketThread(self.packet)
|
||||
# Mock wait to speed up tests
|
||||
self.wait_patcher = mock.patch.object(self.thread, 'wait', return_value=False)
|
||||
self.mock_wait = self.wait_patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
self.wait_patcher.stop()
|
||||
self.thread.stop()
|
||||
if self.thread.is_alive():
|
||||
self.thread.join(timeout=1)
|
||||
@ -608,7 +612,8 @@ class TestSendPacketThread(unittest.TestCase):
|
||||
"""Test initialization."""
|
||||
self.assertEqual(self.thread.packet, self.packet)
|
||||
self.assertIn('TX-', self.thread.name)
|
||||
self.assertEqual(self.thread.loop_count, 1)
|
||||
# loop_count starts at 0 from base class, incremented in run()
|
||||
self.assertEqual(self.thread.loop_count, 0)
|
||||
|
||||
@mock.patch('aprsd.threads.tx.tracker.PacketTrack')
|
||||
def test_loop_packet_acked(self, mock_tracker_class):
|
||||
@ -761,7 +766,8 @@ class TestBeaconSendThread(unittest.TestCase):
|
||||
"""Test initialization."""
|
||||
thread = tx.BeaconSendThread()
|
||||
self.assertEqual(thread.name, 'BeaconSendThread')
|
||||
self.assertEqual(thread._loop_cnt, 1)
|
||||
self.assertEqual(thread.period, 10) # Uses CONF.beacon_interval
|
||||
thread.stop()
|
||||
|
||||
def test_init_no_coordinates(self):
|
||||
"""Test initialization without coordinates."""
|
||||
@ -772,39 +778,27 @@ class TestBeaconSendThread(unittest.TestCase):
|
||||
CONF.longitude = None
|
||||
|
||||
thread = tx.BeaconSendThread()
|
||||
self.assertTrue(thread.thread_stop)
|
||||
self.assertTrue(thread._shutdown_event.is_set())
|
||||
thread.stop()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.send')
|
||||
def test_loop_send_beacon(self, mock_send):
|
||||
"""Test loop() sends beacon at interval."""
|
||||
"""Test loop() sends beacon."""
|
||||
from oslo_config import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.beacon_interval = 1
|
||||
CONF.latitude = 40.7128
|
||||
CONF.longitude = -74.0060
|
||||
|
||||
thread = tx.BeaconSendThread()
|
||||
thread._loop_cnt = 1
|
||||
# Mock wait to return False (no shutdown)
|
||||
with mock.patch.object(thread, 'wait', return_value=False):
|
||||
result = thread.loop()
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_send.assert_called()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.send')
|
||||
def test_loop_not_time(self, mock_send):
|
||||
"""Test loop() doesn't send before interval."""
|
||||
from oslo_config import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.beacon_interval = 10
|
||||
|
||||
thread = tx.BeaconSendThread()
|
||||
thread._loop_cnt = 5
|
||||
|
||||
result = thread.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_send.assert_not_called()
|
||||
self.assertTrue(result)
|
||||
mock_send.assert_called()
|
||||
thread.stop()
|
||||
|
||||
@mock.patch('aprsd.threads.tx.send')
|
||||
@mock.patch('aprsd.threads.tx.APRSDClient')
|
||||
@ -814,13 +808,17 @@ class TestBeaconSendThread(unittest.TestCase):
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.beacon_interval = 1
|
||||
CONF.latitude = 40.7128
|
||||
CONF.longitude = -74.0060
|
||||
|
||||
thread = tx.BeaconSendThread()
|
||||
thread._loop_cnt = 1
|
||||
mock_send.side_effect = Exception('Send error')
|
||||
|
||||
with mock.patch('aprsd.threads.tx.LOG') as mock_log:
|
||||
result = thread.loop()
|
||||
self.assertTrue(result)
|
||||
mock_log.error.assert_called()
|
||||
mock_client_class.return_value.reset.assert_called()
|
||||
# Mock wait to return False (no shutdown signaled during error wait)
|
||||
with mock.patch.object(thread, 'wait', return_value=False):
|
||||
result = thread.loop()
|
||||
self.assertTrue(result)
|
||||
mock_log.error.assert_called()
|
||||
mock_client_class.return_value.reset.assert_called()
|
||||
thread.stop()
|
||||
|
||||
14
uv.lock
generated
14
uv.lock
generated
@ -38,7 +38,6 @@ dependencies = [
|
||||
{ name = "rfc3986" },
|
||||
{ name = "rich" },
|
||||
{ name = "rush" },
|
||||
{ name = "setuptools" },
|
||||
{ name = "stevedore" },
|
||||
{ name = "thesmuggler" },
|
||||
{ name = "timeago" },
|
||||
@ -82,7 +81,7 @@ type = [
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "aprslib", git = "https://github.com/hemna/aprs-python.git?rev=09cd7a2829a2e9d28ee1566881c843cc4769e590" },
|
||||
{ name = "aprslib", git = "https://github.com/hemna/aprs-python.git?rev=telemetry" },
|
||||
{ name = "attrs", specifier = "==25.4.0" },
|
||||
{ name = "ax253", specifier = "==0.1.5.post1" },
|
||||
{ name = "bitarray", specifier = "==3.8.0" },
|
||||
@ -97,7 +96,7 @@ requires-dist = [
|
||||
{ name = "kiss3", specifier = "==8.0.0" },
|
||||
{ name = "loguru", specifier = "==0.7.3" },
|
||||
{ name = "markdown-it-py", specifier = "==4.0.0" },
|
||||
{ name = "marshmallow", specifier = "==3.26.1" },
|
||||
{ name = "marshmallow", specifier = "==3.26.2" },
|
||||
{ name = "mdurl", specifier = "==0.1.2" },
|
||||
{ name = "mypy", marker = "extra == 'dev'" },
|
||||
{ name = "mypy", marker = "extra == 'type'" },
|
||||
@ -126,7 +125,6 @@ requires-dist = [
|
||||
{ name = "rich", specifier = "==14.2.0" },
|
||||
{ name = "ruff", marker = "extra == 'dev'" },
|
||||
{ name = "rush", specifier = "==2021.4.0" },
|
||||
{ name = "setuptools", specifier = "==80.9.0" },
|
||||
{ name = "stevedore", specifier = "==5.6.0" },
|
||||
{ name = "thesmuggler", specifier = "==1.0.1" },
|
||||
{ name = "timeago", specifier = "==1.0.16" },
|
||||
@ -152,7 +150,7 @@ provides-extras = ["dev", "tests", "type"]
|
||||
[[package]]
|
||||
name = "aprslib"
|
||||
version = "0.7.2"
|
||||
source = { git = "https://github.com/hemna/aprs-python.git?rev=09cd7a2829a2e9d28ee1566881c843cc4769e590#09cd7a2829a2e9d28ee1566881c843cc4769e590" }
|
||||
source = { git = "https://github.com/hemna/aprs-python.git?rev=telemetry#09cd7a2829a2e9d28ee1566881c843cc4769e590" }
|
||||
|
||||
[[package]]
|
||||
name = "attrs"
|
||||
@ -671,14 +669,14 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "marshmallow"
|
||||
version = "3.26.1"
|
||||
version = "3.26.2"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "packaging" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/ab/5e/5e53d26b42ab75491cda89b871dab9e97c840bf12c63ec58a1919710cd06/marshmallow-3.26.1.tar.gz", hash = "sha256:e6d8affb6cb61d39d26402096dc0aee12d5a26d490a121f118d2e81dc0719dc6", size = 221825, upload-time = "2025-02-03T15:32:25.093Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/55/79/de6c16cc902f4fc372236926b0ce2ab7845268dcc30fb2fbb7f71b418631/marshmallow-3.26.2.tar.gz", hash = "sha256:bbe2adb5a03e6e3571b573f42527c6fe926e17467833660bebd11593ab8dfd57", size = 222095, upload-time = "2025-12-22T06:53:53.309Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/34/75/51952c7b2d3873b44a0028b1bd26a25078c18f92f256608e8d1dc61b39fd/marshmallow-3.26.1-py3-none-any.whl", hash = "sha256:3350409f20a70a7e4e11a27661187b77cdcaeb20abca41c1454fe33636bea09c", size = 50878, upload-time = "2025-02-03T15:32:22.295Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/be/2f/5108cb3ee4ba6501748c4908b908e55f42a5b66245b4cfe0c99326e1ef6e/marshmallow-3.26.2-py3-none-any.whl", hash = "sha256:013fa8a3c4c276c24d26d84ce934dc964e2aa794345a0f8c7e5a7191482c8a73", size = 50964, upload-time = "2025-12-22T06:53:51.801Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user