1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-06-14 04:12:26 -04:00

Added threads.service

This is just a handy wrapper to inject all the threads a service wants
to start and then start them all at once, and then join them all.
This commit is contained in:
Hemna 2025-02-28 17:16:53 -05:00
parent 0fa5b07d4b
commit c1c89fd2c2
2 changed files with 70 additions and 63 deletions

View File

@ -12,59 +12,24 @@ from aprsd.client import client_factory
from aprsd.main import cli from aprsd.main import cli
from aprsd.packets import collector as packet_collector from aprsd.packets import collector as packet_collector
from aprsd.packets import seen_list from aprsd.packets import seen_list
from aprsd.threads import aprsd as aprsd_threads from aprsd.threads import keepalive, registry, rx, service, tx
from aprsd.threads import keepalive, registry, rx, tx
from aprsd.threads import stats as stats_thread from aprsd.threads import stats as stats_thread
from aprsd.utils import singleton
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger("APRSD") LOG = logging.getLogger('APRSD')
@singleton
class ServerThreads:
"""Registry for threads that the server command runs.
This enables extensions to register a thread to run during
the server command.
"""
def __init__(self):
self.threads: list[aprsd_threads.APRSDThread] = []
def register(self, thread: aprsd_threads.APRSDThread):
if not isinstance(thread, aprsd_threads.APRSDThread):
raise TypeError(f"Thread {thread} is not an APRSDThread")
self.threads.append(thread)
def unregister(self, thread: aprsd_threads.APRSDThread):
if not isinstance(thread, aprsd_threads.APRSDThread):
raise TypeError(f"Thread {thread} is not an APRSDThread")
self.threads.remove(thread)
def start(self):
"""Start all threads in the list."""
for thread in self.threads:
thread.start()
def join(self):
"""Join all the threads in the list"""
for thread in self.threads:
thread.join()
# main() ### # main() ###
@cli.command() @cli.command()
@cli_helper.add_options(cli_helper.common_options) @cli_helper.add_options(cli_helper.common_options)
@click.option( @click.option(
"-f", '-f',
"--flush", '--flush',
"flush", 'flush',
is_flag=True, is_flag=True,
show_default=True, show_default=True,
default=False, default=False,
help="Flush out all old aged messages on disk.", help='Flush out all old aged messages on disk.',
) )
@click.pass_context @click.pass_context
@cli_helper.process_standard_options @cli_helper.process_standard_options
@ -73,37 +38,37 @@ def server(ctx, flush):
signal.signal(signal.SIGINT, aprsd_main.signal_handler) signal.signal(signal.SIGINT, aprsd_main.signal_handler)
signal.signal(signal.SIGTERM, aprsd_main.signal_handler) signal.signal(signal.SIGTERM, aprsd_main.signal_handler)
server_threads = ServerThreads() service_threads = service.ServiceThreads()
level, msg = utils._check_version() level, msg = utils._check_version()
if level: if level:
LOG.warning(msg) LOG.warning(msg)
else: else:
LOG.info(msg) LOG.info(msg)
LOG.info(f"APRSD Started version: {aprsd.__version__}") LOG.info(f'APRSD Started version: {aprsd.__version__}')
# Initialize the client factory and create # Initialize the client factory and create
# The correct client object ready for use # The correct client object ready for use
if not client_factory.is_client_enabled(): if not client_factory.is_client_enabled():
LOG.error("No Clients are enabled in config.") LOG.error('No Clients are enabled in config.')
sys.exit(-1) sys.exit(-1)
# Make sure we have 1 client transport enabled # Make sure we have 1 client transport enabled
if not client_factory.is_client_enabled(): if not client_factory.is_client_enabled():
LOG.error("No Clients are enabled in config.") LOG.error('No Clients are enabled in config.')
sys.exit(-1) sys.exit(-1)
if not client_factory.is_client_configured(): if not client_factory.is_client_configured():
LOG.error("APRS client is not properly configured in config file.") LOG.error('APRS client is not properly configured in config file.')
sys.exit(-1) sys.exit(-1)
# Creates the client object # Creates the client object
LOG.info("Creating client connection") LOG.info('Creating client connection')
aprs_client = client_factory.create() aprs_client = client_factory.create()
LOG.info(aprs_client) LOG.info(aprs_client)
if not aprs_client.login_success: if not aprs_client.login_success:
# We failed to login, will just quit! # We failed to login, will just quit!
msg = f"Login Failure: {aprs_client.login_failure}" msg = f'Login Failure: {aprs_client.login_failure}'
LOG.error(msg) LOG.error(msg)
print(msg) print(msg)
sys.exit(-1) sys.exit(-1)
@ -114,7 +79,7 @@ def server(ctx, flush):
# We register plugins first here so we can register each # We register plugins first here so we can register each
# plugins config options, so we can dump them all in the # plugins config options, so we can dump them all in the
# log file output. # log file output.
LOG.info("Loading Plugin Manager and registering plugins") LOG.info('Loading Plugin Manager and registering plugins')
plugin_manager = plugin.PluginManager() plugin_manager = plugin.PluginManager()
plugin_manager.setup_plugins(load_help_plugin=CONF.load_help_plugin) plugin_manager.setup_plugins(load_help_plugin=CONF.load_help_plugin)
@ -122,10 +87,10 @@ def server(ctx, flush):
CONF.log_opt_values(LOG, logging.DEBUG) CONF.log_opt_values(LOG, logging.DEBUG)
message_plugins = plugin_manager.get_message_plugins() message_plugins = plugin_manager.get_message_plugins()
watchlist_plugins = plugin_manager.get_watchlist_plugins() watchlist_plugins = plugin_manager.get_watchlist_plugins()
LOG.info("Message Plugins enabled and running:") LOG.info('Message Plugins enabled and running:')
for p in message_plugins: for p in message_plugins:
LOG.info(p) LOG.info(p)
LOG.info("Watchlist Plugins enabled and running:") LOG.info('Watchlist Plugins enabled and running:')
for p in watchlist_plugins: for p in watchlist_plugins:
LOG.info(p) LOG.info(p)
@ -135,37 +100,37 @@ def server(ctx, flush):
# Now load the msgTrack from disk if any # Now load the msgTrack from disk if any
if flush: if flush:
LOG.debug("Flushing All packet tracking objects.") LOG.debug('Flushing All packet tracking objects.')
packet_collector.PacketCollector().flush() packet_collector.PacketCollector().flush()
else: else:
# Try and load saved MsgTrack list # Try and load saved MsgTrack list
LOG.debug("Loading saved packet tracking data.") LOG.debug('Loading saved packet tracking data.')
packet_collector.PacketCollector().load() packet_collector.PacketCollector().load()
# Now start all the main processing threads. # Now start all the main processing threads.
server_threads.register(keepalive.KeepAliveThread()) service_threads.register(keepalive.KeepAliveThread())
server_threads.register(stats_thread.APRSDStatsStoreThread()) service_threads.register(stats_thread.APRSDStatsStoreThread())
server_threads.register( service_threads.register(
rx.APRSDRXThread( rx.APRSDRXThread(
packet_queue=threads.packet_queue, packet_queue=threads.packet_queue,
), ),
) )
server_threads.register( service_threads.register(
rx.APRSDPluginProcessPacketThread( rx.APRSDPluginProcessPacketThread(
packet_queue=threads.packet_queue, packet_queue=threads.packet_queue,
), ),
) )
if CONF.enable_beacon: if CONF.enable_beacon:
LOG.info("Beacon Enabled. Starting Beacon thread.") LOG.info('Beacon Enabled. Starting Beacon thread.')
server_threads.register(tx.BeaconSendThread()) service_threads.register(tx.BeaconSendThread())
if CONF.aprs_registry.enabled: if CONF.aprs_registry.enabled:
LOG.info("Registry Enabled. Starting Registry thread.") LOG.info('Registry Enabled. Starting Registry thread.')
server_threads.register(registry.APRSRegistryThread()) service_threads.register(registry.APRSRegistryThread())
server_threads.start() service_threads.start()
server_threads.join() service_threads.join()
return 0 return 0

42
aprsd/threads/service.py Normal file
View File

@ -0,0 +1,42 @@
# aprsd/aprsd/threads/service.py
#
# This module is used to register threads that the service command runs.
#
# The service command is used to start and stop the APRS service.
# This is a mechanism to register threads that the service or command
# needs to run, and then start stop them as needed.
from aprsd.threads import aprsd as aprsd_threads
from aprsd.utils import singleton
@singleton
class ServiceThreads:
"""Registry for threads that the service command runs.
This enables extensions to register a thread to run during
the service command.
"""
def __init__(self):
self.threads: list[aprsd_threads.APRSDThread] = []
def register(self, thread: aprsd_threads.APRSDThread):
if not isinstance(thread, aprsd_threads.APRSDThread):
raise TypeError(f'Thread {thread} is not an APRSDThread')
self.threads.append(thread)
def unregister(self, thread: aprsd_threads.APRSDThread):
if not isinstance(thread, aprsd_threads.APRSDThread):
raise TypeError(f'Thread {thread} is not an APRSDThread')
self.threads.remove(thread)
def start(self):
"""Start all threads in the list."""
for thread in self.threads:
thread.start()
def join(self):
"""Join all the threads in the list"""
for thread in self.threads:
thread.join()