1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-06-15 04:42:26 -04:00

Fixed some pep8 failures.

This commit is contained in:
Hemna 2025-01-30 10:44:17 -08:00
parent 52dac7e0a0
commit e9e7e6b59f
3 changed files with 77 additions and 86 deletions

View File

@ -13,7 +13,6 @@ import click
from loguru import logger from loguru import logger
from oslo_config import cfg from oslo_config import cfg
from rich.console import Console from rich.console import Console
from typing import Union
# local imports here # local imports here
import aprsd import aprsd
@ -21,21 +20,18 @@ from aprsd import cli_helper, packets, plugin, threads, utils
from aprsd.client import client_factory from aprsd.client import client_factory
from aprsd.main import cli from aprsd.main import cli
from aprsd.packets import collector as packet_collector from aprsd.packets import collector as packet_collector
from aprsd.packets import core, seen_list
from aprsd.packets import log as packet_log from aprsd.packets import log as packet_log
from aprsd.packets import seen_list
from aprsd.packets import core
from aprsd.packets.filters import dupe_filter
from aprsd.packets.filters import packet_type
from aprsd.packets.filter import PacketFilter from aprsd.packets.filter import PacketFilter
from aprsd.packets.filters import dupe_filter, packet_type
from aprsd.stats import collector from aprsd.stats import collector
from aprsd.threads import keepalive, rx from aprsd.threads import keepalive, rx
from aprsd.threads import stats as stats_thread from aprsd.threads import stats as stats_thread
from aprsd.threads.aprsd import APRSDThread from aprsd.threads.aprsd import APRSDThread
from aprsd.utils import singleton
# setup the global logger # setup the global logger
# log.basicConfig(level=log.DEBUG) # level=10 # log.basicConfig(level=log.DEBUG) # level=10
LOG = logging.getLogger("APRSD") LOG = logging.getLogger('APRSD')
CONF = cfg.CONF CONF = cfg.CONF
LOGU = logger LOGU = logger
console = Console() console = Console()
@ -43,9 +39,9 @@ console = Console()
def signal_handler(sig, frame): def signal_handler(sig, frame):
threads.APRSDThreadList().stop_all() threads.APRSDThreadList().stop_all()
if "subprocess" not in str(frame): if 'subprocess' not in str(frame):
LOG.info( LOG.info(
"Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}".format( 'Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}'.format(
datetime.datetime.now(), datetime.datetime.now(),
), ),
) )
@ -60,14 +56,14 @@ class APRSDListenProcessThread(rx.APRSDFilterThread):
packet_queue, packet_queue,
packet_filter=None, packet_filter=None,
plugin_manager=None, plugin_manager=None,
enabled_plugins=[], enabled_plugins=None,
log_packets=False, log_packets=False,
): ):
super().__init__("ListenProcThread", packet_queue) super().__init__('ListenProcThread', packet_queue)
self.packet_filter = packet_filter self.packet_filter = packet_filter
self.plugin_manager = plugin_manager self.plugin_manager = plugin_manager
if self.plugin_manager: if self.plugin_manager:
LOG.info(f"Plugins {self.plugin_manager.get_message_plugins()}") LOG.info(f'Plugins {self.plugin_manager.get_message_plugins()}')
self.log_packets = log_packets self.log_packets = log_packets
def print_packet(self, packet): def print_packet(self, packet):
@ -85,7 +81,7 @@ class ListenStatsThread(APRSDThread):
"""Log the stats from the PacketList.""" """Log the stats from the PacketList."""
def __init__(self): def __init__(self):
super().__init__("PacketStatsLog") super().__init__('PacketStatsLog')
self._last_total_rx = 0 self._last_total_rx = 0
self.period = 31 self.period = 31
@ -93,27 +89,27 @@ class ListenStatsThread(APRSDThread):
if self.loop_count % self.period == 0: if self.loop_count % self.period == 0:
# log the stats every 10 seconds # log the stats every 10 seconds
stats_json = collector.Collector().collect() stats_json = collector.Collector().collect()
stats = stats_json["PacketList"] stats = stats_json['PacketList']
total_rx = stats["rx"] total_rx = stats['rx']
packet_count = len(stats["packets"]) packet_count = len(stats['packets'])
rx_delta = total_rx - self._last_total_rx rx_delta = total_rx - self._last_total_rx
rate = rx_delta / self.period rate = rx_delta / self.period
# Log summary stats # Log summary stats
LOGU.opt(colors=True).info( LOGU.opt(colors=True).info(
f"<green>RX Rate: {rate:.2f} pps</green> " f'<green>RX Rate: {rate:.2f} pps</green> '
f"<yellow>Total RX: {total_rx}</yellow> " f'<yellow>Total RX: {total_rx}</yellow> '
f"<red>RX Last {self.period} secs: {rx_delta}</red> " f'<red>RX Last {self.period} secs: {rx_delta}</red> '
f"<white>Packets in PacketListStats: {packet_count}</white>", f'<white>Packets in PacketListStats: {packet_count}</white>',
) )
self._last_total_rx = total_rx self._last_total_rx = total_rx
# Log individual type stats # Log individual type stats
for k, v in stats["types"].items(): for k, v in stats['types'].items():
thread_hex = f"fg {utils.hex_from_name(k)}" thread_hex = f'fg {utils.hex_from_name(k)}'
LOGU.opt(colors=True).info( LOGU.opt(colors=True).info(
f"<{thread_hex}>{k:<15}</{thread_hex}> " f'<{thread_hex}>{k:<15}</{thread_hex}> '
f"<blue>RX: {v['rx']}</blue> <red>TX: {v['tx']}</red>", f'<blue>RX: {v["rx"]}</blue> <red>TX: {v["tx"]}</red>',
) )
time.sleep(1) time.sleep(1)
@ -123,19 +119,19 @@ class ListenStatsThread(APRSDThread):
@cli.command() @cli.command()
@cli_helper.add_options(cli_helper.common_options) @cli_helper.add_options(cli_helper.common_options)
@click.option( @click.option(
"--aprs-login", '--aprs-login',
envvar="APRS_LOGIN", envvar='APRS_LOGIN',
show_envvar=True, show_envvar=True,
help="What callsign to send the message from.", help='What callsign to send the message from.',
) )
@click.option( @click.option(
"--aprs-password", '--aprs-password',
envvar="APRS_PASSWORD", envvar='APRS_PASSWORD',
show_envvar=True, show_envvar=True,
help="the APRS-IS password for APRS_LOGIN", help='the APRS-IS password for APRS_LOGIN',
) )
@click.option( @click.option(
"--packet-filter", '--packet-filter',
type=click.Choice( type=click.Choice(
[ [
packets.AckPacket.__name__, packets.AckPacket.__name__,
@ -154,35 +150,35 @@ class ListenStatsThread(APRSDThread):
), ),
multiple=True, multiple=True,
default=[], default=[],
help="Filter by packet type", help='Filter by packet type',
) )
@click.option( @click.option(
"--enable-plugin", '--enable-plugin',
multiple=True, multiple=True,
help="Enable a plugin. This is the name of the file in the plugins directory.", help='Enable a plugin. This is the name of the file in the plugins directory.',
) )
@click.option( @click.option(
"--load-plugins", '--load-plugins',
default=False, default=False,
is_flag=True, is_flag=True,
help="Load plugins as enabled in aprsd.conf ?", help='Load plugins as enabled in aprsd.conf ?',
) )
@click.argument( @click.argument(
"filter", 'filter',
nargs=-1, nargs=-1,
required=True, required=True,
) )
@click.option( @click.option(
"--log-packets", '--log-packets',
default=False, default=False,
is_flag=True, is_flag=True,
help="Log incoming packets.", help='Log incoming packets.',
) )
@click.option( @click.option(
"--enable-packet-stats", '--enable-packet-stats',
default=False, default=False,
is_flag=True, is_flag=True,
help="Enable packet stats periodic logging.", help='Enable packet stats periodic logging.',
) )
@click.pass_context @click.pass_context
@cli_helper.process_standard_options @cli_helper.process_standard_options
@ -212,41 +208,41 @@ def listen(
if not aprs_login: if not aprs_login:
click.echo(ctx.get_help()) click.echo(ctx.get_help())
click.echo("") click.echo('')
ctx.fail("Must set --aprs-login or APRS_LOGIN") ctx.fail('Must set --aprs-login or APRS_LOGIN')
ctx.exit() ctx.exit()
if not aprs_password: if not aprs_password:
click.echo(ctx.get_help()) click.echo(ctx.get_help())
click.echo("") click.echo('')
ctx.fail("Must set --aprs-password or APRS_PASSWORD") ctx.fail('Must set --aprs-password or APRS_PASSWORD')
ctx.exit() ctx.exit()
# CONF.aprs_network.login = aprs_login # CONF.aprs_network.login = aprs_login
# config["aprs"]["password"] = aprs_password # config["aprs"]["password"] = aprs_password
LOG.info(f"APRSD Listen Started version: {aprsd.__version__}") LOG.info(f'APRSD Listen Started version: {aprsd.__version__}')
CONF.log_opt_values(LOG, logging.DEBUG) CONF.log_opt_values(LOG, logging.DEBUG)
collector.Collector() collector.Collector()
# Try and load saved MsgTrack list # Try and load saved MsgTrack list
LOG.debug("Loading saved MsgTrack object.") LOG.debug('Loading saved MsgTrack object.')
# Initialize the client factory and create # Initialize the client factory and create
# The correct client object ready for use # The correct client object ready for use
# Make sure we have 1 client transport enabled # Make sure we have 1 client transport enabled
if not client_factory.is_client_enabled(): if not client_factory.is_client_enabled():
LOG.error("No Clients are enabled in config.") LOG.error('No Clients are enabled in config.')
sys.exit(-1) sys.exit(-1)
# Creates the client object # Creates the client object
LOG.info("Creating client connection") LOG.info('Creating client connection')
aprs_client = client_factory.create() aprs_client = client_factory.create()
LOG.info(aprs_client) LOG.info(aprs_client)
if not aprs_client.login_success: if not aprs_client.login_success:
# We failed to login, will just quit! # We failed to login, will just quit!
msg = f"Login Failure: {aprs_client.login_failure}" msg = f'Login Failure: {aprs_client.login_failure}'
LOG.error(msg) LOG.error(msg)
print(msg) print(msg)
sys.exit(-1) sys.exit(-1)
@ -263,16 +259,16 @@ def listen(
# we don't want the dupe filter to run here. # we don't want the dupe filter to run here.
PacketFilter().unregister(dupe_filter.DupePacketFilter) PacketFilter().unregister(dupe_filter.DupePacketFilter)
if packet_filter: if packet_filter:
LOG.info("Enabling packet filtering for {packet_filter}") LOG.info('Enabling packet filtering for {packet_filter}')
packet_type.PacketTypeFilter().set_allow_list(packet_filter) packet_type.PacketTypeFilter().set_allow_list(packet_filter)
PacketFilter().register(packet_type.PacketTypeFilter) PacketFilter().register(packet_type.PacketTypeFilter)
else: else:
LOG.info("No packet filtering enabled.") LOG.info('No packet filtering enabled.')
pm = None pm = None
if load_plugins: if load_plugins:
pm = plugin.PluginManager() pm = plugin.PluginManager()
LOG.info("Loading plugins") LOG.info('Loading plugins')
pm.setup_plugins(load_help_plugin=False) pm.setup_plugins(load_help_plugin=False)
elif enable_plugin: elif enable_plugin:
pm = plugin.PluginManager() pm = plugin.PluginManager()
@ -283,22 +279,21 @@ def listen(
else: else:
LOG.warning( LOG.warning(
"Not Loading any plugins use --load-plugins to load what's " "Not Loading any plugins use --load-plugins to load what's "
"defined in the config file.", 'defined in the config file.',
) )
if pm: if pm:
for p in pm.get_plugins(): for p in pm.get_plugins():
LOG.info("Loaded plugin %s", p.__class__.__name__) LOG.info('Loaded plugin %s', p.__class__.__name__)
stats = stats_thread.APRSDStatsStoreThread() stats = stats_thread.APRSDStatsStoreThread()
stats.start() stats.start()
LOG.debug("Start APRSDRxThread") LOG.debug('Start APRSDRxThread')
rx_thread = rx.APRSDRXThread(packet_queue=threads.packet_queue) rx_thread = rx.APRSDRXThread(packet_queue=threads.packet_queue)
rx_thread.start() rx_thread.start()
LOG.debug('Create APRSDListenProcessThread')
LOG.debug("Create APRSDListenProcessThread")
listen_thread = APRSDListenProcessThread( listen_thread = APRSDListenProcessThread(
packet_queue=threads.packet_queue, packet_queue=threads.packet_queue,
packet_filter=packet_filter, packet_filter=packet_filter,
@ -306,14 +301,14 @@ def listen(
enabled_plugins=enable_plugin, enabled_plugins=enable_plugin,
log_packets=log_packets, log_packets=log_packets,
) )
LOG.debug("Start APRSDListenProcessThread") LOG.debug('Start APRSDListenProcessThread')
listen_thread.start() listen_thread.start()
if enable_packet_stats: if enable_packet_stats:
listen_stats = ListenStatsThread() listen_stats = ListenStatsThread()
listen_stats.start() listen_stats.start()
keepalive_thread.start() keepalive_thread.start()
LOG.debug("keepalive Join") LOG.debug('keepalive Join')
keepalive_thread.join() keepalive_thread.join()
rx_thread.join() rx_thread.join()
listen_thread.join() listen_thread.join()

View File

@ -1,28 +1,27 @@
import logging import logging
from typing import Union from typing import Union
from oslo_config import cfg from oslo_config import cfg
from aprsd.packets import core
from aprsd import packets from aprsd import packets
from aprsd.utils import trace from aprsd.packets import core
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger("APRSD") LOG = logging.getLogger('APRSD')
class DupePacketFilter: class DupePacketFilter:
"""This is a packet filter to detect duplicate packets. """This is a packet filter to detect duplicate packets.
This Uses the PacketList object to see if a packet exists This Uses the PacketList object to see if a packet exists
already. If it does exist in the PacketList, then we need to already. If it does exist in the PacketList, then we need to
check the flag on the packet to see if it's been processed before. check the flag on the packet to see if it's been processed before.
If the packet has been processed already within the allowed If the packet has been processed already within the allowed
timeframe, then it's a dupe. timeframe, then it's a dupe.
""" """
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]: def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
#LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}") # LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}")
"""Filter a packet out if it's already been seen and processed.""" """Filter a packet out if it's already been seen and processed."""
if isinstance(packet, core.AckPacket): if isinstance(packet, core.AckPacket):
# We don't need to drop AckPackets, those should be # We don't need to drop AckPackets, those should be
@ -51,7 +50,7 @@ class DupePacketFilter:
if not found: if not found:
# We haven't seen this packet before, so we process it. # We haven't seen this packet before, so we process it.
return packet return packet
if not packet.processed: if not packet.processed:
# We haven't processed this packet through the plugins. # We haven't processed this packet through the plugins.
return packet return packet
@ -59,11 +58,11 @@ class DupePacketFilter:
# If the packet came in within N seconds of the # If the packet came in within N seconds of the
# Last time seeing the packet, then we drop it as a dupe. # Last time seeing the packet, then we drop it as a dupe.
LOG.warning( LOG.warning(
f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping." f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.'
) )
else: else:
LOG.warning( LOG.warning(
f"Packet {packet.from_call}:{packet.msgNo} already tracked " f'Packet {packet.from_call}:{packet.msgNo} already tracked '
f"but older than {CONF.packet_dupe_timeout} seconds. processing.", f'but older than {CONF.packet_dupe_timeout} seconds. processing.',
) )
return packet return packet

View File

@ -8,12 +8,9 @@ from oslo_config import cfg
from aprsd import packets, plugin from aprsd import packets, plugin
from aprsd.client import client_factory from aprsd.client import client_factory
from aprsd.packets import collector from aprsd.packets import collector, filter
from aprsd.packets import log as packet_log from aprsd.packets import log as packet_log
from aprsd.threads import APRSDThread, tx from aprsd.threads import APRSDThread, tx
from aprsd.utils import trace
from aprsd.packets import filter
from aprsd.packets.filters import dupe_filter
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger('APRSD') LOG = logging.getLogger('APRSD')
@ -24,13 +21,14 @@ class APRSDRXThread(APRSDThread):
A packet is received in the main loop and then sent to the A packet is received in the main loop and then sent to the
process_packet method, which sends the packet through the collector process_packet method, which sends the packet through the collector
to track the packet for stats, and then put into the packet queue to track the packet for stats, and then put into the packet queue
for processing in a separate thread. for processing in a separate thread.
""" """
_client = None _client = None
# This is the queue that packets are sent to for processing. # This is the queue that packets are sent to for processing.
# We process packets in a separate thread to help prevent # We process packets in a separate thread to help prevent
# getting blocked by the APRS server trying to send us packets. # getting blocked by the APRS server trying to send us packets.
packet_queue = None packet_queue = None
@ -139,7 +137,6 @@ class APRSDRXThread(APRSDThread):
class APRSDFilterThread(APRSDThread): class APRSDFilterThread(APRSDThread):
def __init__(self, thread_name, packet_queue): def __init__(self, thread_name, packet_queue):
super().__init__(thread_name) super().__init__(thread_name)
self.packet_queue = packet_queue self.packet_queue = packet_queue
@ -149,13 +146,13 @@ class APRSDFilterThread(APRSDThread):
if not filter.PacketFilter().filter(packet): if not filter.PacketFilter().filter(packet):
return None return None
return packet return packet
def print_packet(self, packet): def print_packet(self, packet):
"""Allow a child of this class to override this. """Allow a child of this class to override this.
This is helpful if for whatever reason the child class This is helpful if for whatever reason the child class
doesn't want to log packets. doesn't want to log packets.
""" """
packet_log.log(packet) packet_log.log(packet)
@ -174,7 +171,7 @@ class APRSDFilterThread(APRSDThread):
class APRSDProcessPacketThread(APRSDFilterThread): class APRSDProcessPacketThread(APRSDFilterThread):
"""Base class for processing received packets after they have been filtered. """Base class for processing received packets after they have been filtered.
Packets are received from the client, then filtered for dupes, Packets are received from the client, then filtered for dupes,
then sent to the packet queue. This thread pulls packets from then sent to the packet queue. This thread pulls packets from
the packet queue for processing. the packet queue for processing.
@ -184,7 +181,7 @@ class APRSDProcessPacketThread(APRSDFilterThread):
for processing.""" for processing."""
def __init__(self, packet_queue): def __init__(self, packet_queue):
super().__init__("ProcessPKT", packet_queue=packet_queue) super().__init__('ProcessPKT', packet_queue=packet_queue)
if not CONF.enable_sending_ack_packets: if not CONF.enable_sending_ack_packets:
LOG.warning( LOG.warning(
'Sending ack packets is disabled, messages will not be acknowledged.', 'Sending ack packets is disabled, messages will not be acknowledged.',
@ -210,7 +207,7 @@ class APRSDProcessPacketThread(APRSDFilterThread):
def process_packet(self, packet): def process_packet(self, packet):
"""Process a packet received from aprs-is server.""" """Process a packet received from aprs-is server."""
LOG.debug(f"ProcessPKT-LOOP {self.loop_count}") LOG.debug(f'ProcessPKT-LOOP {self.loop_count}')
# set this now as we are going to process it. # set this now as we are going to process it.
# This is used during dupe checking, so set it early # This is used during dupe checking, so set it early