From e9e7e6b59f9f93f3f09142e56407bc87603a44cb Mon Sep 17 00:00:00 2001 From: Hemna Date: Thu, 30 Jan 2025 10:44:17 -0800 Subject: [PATCH] Fixed some pep8 failures. --- aprsd/cmds/listen.py | 117 +++++++++++++-------------- aprsd/packets/filters/dupe_filter.py | 23 +++--- aprsd/threads/rx.py | 23 +++--- 3 files changed, 77 insertions(+), 86 deletions(-) diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py index 8ab4ff1..4784e3a 100644 --- a/aprsd/cmds/listen.py +++ b/aprsd/cmds/listen.py @@ -13,7 +13,6 @@ import click from loguru import logger from oslo_config import cfg from rich.console import Console -from typing import Union # local imports here import aprsd @@ -21,21 +20,18 @@ from aprsd import cli_helper, packets, plugin, threads, utils from aprsd.client import client_factory from aprsd.main import cli from aprsd.packets import collector as packet_collector +from aprsd.packets import core, seen_list from aprsd.packets import log as packet_log -from aprsd.packets import seen_list -from aprsd.packets import core -from aprsd.packets.filters import dupe_filter -from aprsd.packets.filters import packet_type from aprsd.packets.filter import PacketFilter +from aprsd.packets.filters import dupe_filter, packet_type from aprsd.stats import collector from aprsd.threads import keepalive, rx from aprsd.threads import stats as stats_thread from aprsd.threads.aprsd import APRSDThread -from aprsd.utils import singleton # setup the global logger # log.basicConfig(level=log.DEBUG) # level=10 -LOG = logging.getLogger("APRSD") +LOG = logging.getLogger('APRSD') CONF = cfg.CONF LOGU = logger console = Console() @@ -43,9 +39,9 @@ console = Console() def signal_handler(sig, frame): threads.APRSDThreadList().stop_all() - if "subprocess" not in str(frame): + if 'subprocess' not in str(frame): LOG.info( - "Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}".format( + 'Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}'.format( datetime.datetime.now(), ), ) @@ -60,14 +56,14 @@ class APRSDListenProcessThread(rx.APRSDFilterThread): packet_queue, packet_filter=None, plugin_manager=None, - enabled_plugins=[], + enabled_plugins=None, log_packets=False, ): - super().__init__("ListenProcThread", packet_queue) + super().__init__('ListenProcThread', packet_queue) self.packet_filter = packet_filter self.plugin_manager = plugin_manager if self.plugin_manager: - LOG.info(f"Plugins {self.plugin_manager.get_message_plugins()}") + LOG.info(f'Plugins {self.plugin_manager.get_message_plugins()}') self.log_packets = log_packets def print_packet(self, packet): @@ -85,7 +81,7 @@ class ListenStatsThread(APRSDThread): """Log the stats from the PacketList.""" def __init__(self): - super().__init__("PacketStatsLog") + super().__init__('PacketStatsLog') self._last_total_rx = 0 self.period = 31 @@ -93,27 +89,27 @@ class ListenStatsThread(APRSDThread): if self.loop_count % self.period == 0: # log the stats every 10 seconds stats_json = collector.Collector().collect() - stats = stats_json["PacketList"] - total_rx = stats["rx"] - packet_count = len(stats["packets"]) + stats = stats_json['PacketList'] + total_rx = stats['rx'] + packet_count = len(stats['packets']) rx_delta = total_rx - self._last_total_rx - rate = rx_delta / self.period + rate = rx_delta / self.period # Log summary stats LOGU.opt(colors=True).info( - f"RX Rate: {rate:.2f} pps " - f"Total RX: {total_rx} " - f"RX Last {self.period} secs: {rx_delta} " - f"Packets in PacketListStats: {packet_count}", + f'RX Rate: {rate:.2f} pps ' + f'Total RX: {total_rx} ' + f'RX Last {self.period} secs: {rx_delta} ' + f'Packets in PacketListStats: {packet_count}', ) self._last_total_rx = total_rx # Log individual type stats - for k, v in stats["types"].items(): - thread_hex = f"fg {utils.hex_from_name(k)}" + for k, v in stats['types'].items(): + thread_hex = f'fg {utils.hex_from_name(k)}' LOGU.opt(colors=True).info( - f"<{thread_hex}>{k:<15} " - f"RX: {v['rx']} TX: {v['tx']}", + f'<{thread_hex}>{k:<15} ' + f'RX: {v["rx"]} TX: {v["tx"]}', ) time.sleep(1) @@ -123,19 +119,19 @@ class ListenStatsThread(APRSDThread): @cli.command() @cli_helper.add_options(cli_helper.common_options) @click.option( - "--aprs-login", - envvar="APRS_LOGIN", + '--aprs-login', + envvar='APRS_LOGIN', show_envvar=True, - help="What callsign to send the message from.", + help='What callsign to send the message from.', ) @click.option( - "--aprs-password", - envvar="APRS_PASSWORD", + '--aprs-password', + envvar='APRS_PASSWORD', show_envvar=True, - help="the APRS-IS password for APRS_LOGIN", + help='the APRS-IS password for APRS_LOGIN', ) @click.option( - "--packet-filter", + '--packet-filter', type=click.Choice( [ packets.AckPacket.__name__, @@ -154,35 +150,35 @@ class ListenStatsThread(APRSDThread): ), multiple=True, default=[], - help="Filter by packet type", + help='Filter by packet type', ) @click.option( - "--enable-plugin", + '--enable-plugin', multiple=True, - help="Enable a plugin. This is the name of the file in the plugins directory.", + help='Enable a plugin. This is the name of the file in the plugins directory.', ) @click.option( - "--load-plugins", + '--load-plugins', default=False, is_flag=True, - help="Load plugins as enabled in aprsd.conf ?", + help='Load plugins as enabled in aprsd.conf ?', ) @click.argument( - "filter", + 'filter', nargs=-1, required=True, ) @click.option( - "--log-packets", + '--log-packets', default=False, is_flag=True, - help="Log incoming packets.", + help='Log incoming packets.', ) @click.option( - "--enable-packet-stats", + '--enable-packet-stats', default=False, is_flag=True, - help="Enable packet stats periodic logging.", + help='Enable packet stats periodic logging.', ) @click.pass_context @cli_helper.process_standard_options @@ -212,41 +208,41 @@ def listen( if not aprs_login: click.echo(ctx.get_help()) - click.echo("") - ctx.fail("Must set --aprs-login or APRS_LOGIN") + click.echo('') + ctx.fail('Must set --aprs-login or APRS_LOGIN') ctx.exit() if not aprs_password: click.echo(ctx.get_help()) - click.echo("") - ctx.fail("Must set --aprs-password or APRS_PASSWORD") + click.echo('') + ctx.fail('Must set --aprs-password or APRS_PASSWORD') ctx.exit() # CONF.aprs_network.login = aprs_login # config["aprs"]["password"] = aprs_password - LOG.info(f"APRSD Listen Started version: {aprsd.__version__}") + LOG.info(f'APRSD Listen Started version: {aprsd.__version__}') CONF.log_opt_values(LOG, logging.DEBUG) collector.Collector() # Try and load saved MsgTrack list - LOG.debug("Loading saved MsgTrack object.") + LOG.debug('Loading saved MsgTrack object.') # Initialize the client factory and create # The correct client object ready for use # Make sure we have 1 client transport enabled if not client_factory.is_client_enabled(): - LOG.error("No Clients are enabled in config.") + LOG.error('No Clients are enabled in config.') sys.exit(-1) # Creates the client object - LOG.info("Creating client connection") + LOG.info('Creating client connection') aprs_client = client_factory.create() LOG.info(aprs_client) if not aprs_client.login_success: # We failed to login, will just quit! - msg = f"Login Failure: {aprs_client.login_failure}" + msg = f'Login Failure: {aprs_client.login_failure}' LOG.error(msg) print(msg) sys.exit(-1) @@ -263,16 +259,16 @@ def listen( # we don't want the dupe filter to run here. PacketFilter().unregister(dupe_filter.DupePacketFilter) if packet_filter: - LOG.info("Enabling packet filtering for {packet_filter}") + LOG.info('Enabling packet filtering for {packet_filter}') packet_type.PacketTypeFilter().set_allow_list(packet_filter) PacketFilter().register(packet_type.PacketTypeFilter) else: - LOG.info("No packet filtering enabled.") + LOG.info('No packet filtering enabled.') pm = None if load_plugins: pm = plugin.PluginManager() - LOG.info("Loading plugins") + LOG.info('Loading plugins') pm.setup_plugins(load_help_plugin=False) elif enable_plugin: pm = plugin.PluginManager() @@ -283,22 +279,21 @@ def listen( else: LOG.warning( "Not Loading any plugins use --load-plugins to load what's " - "defined in the config file.", + 'defined in the config file.', ) if pm: for p in pm.get_plugins(): - LOG.info("Loaded plugin %s", p.__class__.__name__) + LOG.info('Loaded plugin %s', p.__class__.__name__) stats = stats_thread.APRSDStatsStoreThread() stats.start() - LOG.debug("Start APRSDRxThread") + LOG.debug('Start APRSDRxThread') rx_thread = rx.APRSDRXThread(packet_queue=threads.packet_queue) rx_thread.start() - - LOG.debug("Create APRSDListenProcessThread") + LOG.debug('Create APRSDListenProcessThread') listen_thread = APRSDListenProcessThread( packet_queue=threads.packet_queue, packet_filter=packet_filter, @@ -306,14 +301,14 @@ def listen( enabled_plugins=enable_plugin, log_packets=log_packets, ) - LOG.debug("Start APRSDListenProcessThread") + LOG.debug('Start APRSDListenProcessThread') listen_thread.start() if enable_packet_stats: listen_stats = ListenStatsThread() listen_stats.start() keepalive_thread.start() - LOG.debug("keepalive Join") + LOG.debug('keepalive Join') keepalive_thread.join() rx_thread.join() listen_thread.join() diff --git a/aprsd/packets/filters/dupe_filter.py b/aprsd/packets/filters/dupe_filter.py index e0fa363..0839fcf 100644 --- a/aprsd/packets/filters/dupe_filter.py +++ b/aprsd/packets/filters/dupe_filter.py @@ -1,28 +1,27 @@ import logging -from typing import Union +from typing import Union from oslo_config import cfg -from aprsd.packets import core from aprsd import packets -from aprsd.utils import trace - +from aprsd.packets import core CONF = cfg.CONF -LOG = logging.getLogger("APRSD") +LOG = logging.getLogger('APRSD') class DupePacketFilter: """This is a packet filter to detect duplicate packets. This Uses the PacketList object to see if a packet exists - already. If it does exist in the PacketList, then we need to + already. If it does exist in the PacketList, then we need to check the flag on the packet to see if it's been processed before. - If the packet has been processed already within the allowed + If the packet has been processed already within the allowed timeframe, then it's a dupe. """ + def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]: - #LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}") + # LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}") """Filter a packet out if it's already been seen and processed.""" if isinstance(packet, core.AckPacket): # We don't need to drop AckPackets, those should be @@ -51,7 +50,7 @@ class DupePacketFilter: if not found: # We haven't seen this packet before, so we process it. return packet - + if not packet.processed: # We haven't processed this packet through the plugins. return packet @@ -59,11 +58,11 @@ class DupePacketFilter: # If the packet came in within N seconds of the # Last time seeing the packet, then we drop it as a dupe. LOG.warning( - f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping." + f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.' ) else: LOG.warning( - f"Packet {packet.from_call}:{packet.msgNo} already tracked " - f"but older than {CONF.packet_dupe_timeout} seconds. processing.", + f'Packet {packet.from_call}:{packet.msgNo} already tracked ' + f'but older than {CONF.packet_dupe_timeout} seconds. processing.', ) return packet diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 1088daf..b995f8c 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -8,12 +8,9 @@ from oslo_config import cfg from aprsd import packets, plugin from aprsd.client import client_factory -from aprsd.packets import collector +from aprsd.packets import collector, filter from aprsd.packets import log as packet_log from aprsd.threads import APRSDThread, tx -from aprsd.utils import trace -from aprsd.packets import filter -from aprsd.packets.filters import dupe_filter CONF = cfg.CONF LOG = logging.getLogger('APRSD') @@ -24,13 +21,14 @@ class APRSDRXThread(APRSDThread): A packet is received in the main loop and then sent to the process_packet method, which sends the packet through the collector - to track the packet for stats, and then put into the packet queue - for processing in a separate thread. + to track the packet for stats, and then put into the packet queue + for processing in a separate thread. """ + _client = None # This is the queue that packets are sent to for processing. - # We process packets in a separate thread to help prevent + # We process packets in a separate thread to help prevent # getting blocked by the APRS server trying to send us packets. packet_queue = None @@ -139,7 +137,6 @@ class APRSDRXThread(APRSDThread): class APRSDFilterThread(APRSDThread): - def __init__(self, thread_name, packet_queue): super().__init__(thread_name) self.packet_queue = packet_queue @@ -149,13 +146,13 @@ class APRSDFilterThread(APRSDThread): if not filter.PacketFilter().filter(packet): return None return packet - + def print_packet(self, packet): """Allow a child of this class to override this. This is helpful if for whatever reason the child class doesn't want to log packets. - + """ packet_log.log(packet) @@ -174,7 +171,7 @@ class APRSDFilterThread(APRSDThread): class APRSDProcessPacketThread(APRSDFilterThread): """Base class for processing received packets after they have been filtered. - Packets are received from the client, then filtered for dupes, + Packets are received from the client, then filtered for dupes, then sent to the packet queue. This thread pulls packets from the packet queue for processing. @@ -184,7 +181,7 @@ class APRSDProcessPacketThread(APRSDFilterThread): for processing.""" def __init__(self, packet_queue): - super().__init__("ProcessPKT", packet_queue=packet_queue) + super().__init__('ProcessPKT', packet_queue=packet_queue) if not CONF.enable_sending_ack_packets: LOG.warning( 'Sending ack packets is disabled, messages will not be acknowledged.', @@ -210,7 +207,7 @@ class APRSDProcessPacketThread(APRSDFilterThread): def process_packet(self, packet): """Process a packet received from aprs-is server.""" - LOG.debug(f"ProcessPKT-LOOP {self.loop_count}") + LOG.debug(f'ProcessPKT-LOOP {self.loop_count}') # set this now as we are going to process it. # This is used during dupe checking, so set it early