diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py
index 8ab4ff1..4784e3a 100644
--- a/aprsd/cmds/listen.py
+++ b/aprsd/cmds/listen.py
@@ -13,7 +13,6 @@ import click
from loguru import logger
from oslo_config import cfg
from rich.console import Console
-from typing import Union
# local imports here
import aprsd
@@ -21,21 +20,18 @@ from aprsd import cli_helper, packets, plugin, threads, utils
from aprsd.client import client_factory
from aprsd.main import cli
from aprsd.packets import collector as packet_collector
+from aprsd.packets import core, seen_list
from aprsd.packets import log as packet_log
-from aprsd.packets import seen_list
-from aprsd.packets import core
-from aprsd.packets.filters import dupe_filter
-from aprsd.packets.filters import packet_type
from aprsd.packets.filter import PacketFilter
+from aprsd.packets.filters import dupe_filter, packet_type
from aprsd.stats import collector
from aprsd.threads import keepalive, rx
from aprsd.threads import stats as stats_thread
from aprsd.threads.aprsd import APRSDThread
-from aprsd.utils import singleton
# setup the global logger
# log.basicConfig(level=log.DEBUG) # level=10
-LOG = logging.getLogger("APRSD")
+LOG = logging.getLogger('APRSD')
CONF = cfg.CONF
LOGU = logger
console = Console()
@@ -43,9 +39,9 @@ console = Console()
def signal_handler(sig, frame):
threads.APRSDThreadList().stop_all()
- if "subprocess" not in str(frame):
+ if 'subprocess' not in str(frame):
LOG.info(
- "Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}".format(
+ 'Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}'.format(
datetime.datetime.now(),
),
)
@@ -60,14 +56,14 @@ class APRSDListenProcessThread(rx.APRSDFilterThread):
packet_queue,
packet_filter=None,
plugin_manager=None,
- enabled_plugins=[],
+ enabled_plugins=None,
log_packets=False,
):
- super().__init__("ListenProcThread", packet_queue)
+ super().__init__('ListenProcThread', packet_queue)
self.packet_filter = packet_filter
self.plugin_manager = plugin_manager
if self.plugin_manager:
- LOG.info(f"Plugins {self.plugin_manager.get_message_plugins()}")
+ LOG.info(f'Plugins {self.plugin_manager.get_message_plugins()}')
self.log_packets = log_packets
def print_packet(self, packet):
@@ -85,7 +81,7 @@ class ListenStatsThread(APRSDThread):
"""Log the stats from the PacketList."""
def __init__(self):
- super().__init__("PacketStatsLog")
+ super().__init__('PacketStatsLog')
self._last_total_rx = 0
self.period = 31
@@ -93,27 +89,27 @@ class ListenStatsThread(APRSDThread):
if self.loop_count % self.period == 0:
# log the stats every 10 seconds
stats_json = collector.Collector().collect()
- stats = stats_json["PacketList"]
- total_rx = stats["rx"]
- packet_count = len(stats["packets"])
+ stats = stats_json['PacketList']
+ total_rx = stats['rx']
+ packet_count = len(stats['packets'])
rx_delta = total_rx - self._last_total_rx
- rate = rx_delta / self.period
+ rate = rx_delta / self.period
# Log summary stats
LOGU.opt(colors=True).info(
- f"RX Rate: {rate:.2f} pps "
- f"Total RX: {total_rx} "
- f"RX Last {self.period} secs: {rx_delta} "
- f"Packets in PacketListStats: {packet_count}",
+ f'RX Rate: {rate:.2f} pps '
+ f'Total RX: {total_rx} '
+ f'RX Last {self.period} secs: {rx_delta} '
+ f'Packets in PacketListStats: {packet_count}',
)
self._last_total_rx = total_rx
# Log individual type stats
- for k, v in stats["types"].items():
- thread_hex = f"fg {utils.hex_from_name(k)}"
+ for k, v in stats['types'].items():
+ thread_hex = f'fg {utils.hex_from_name(k)}'
LOGU.opt(colors=True).info(
- f"<{thread_hex}>{k:<15}{thread_hex}> "
- f"RX: {v['rx']} TX: {v['tx']}",
+ f'<{thread_hex}>{k:<15}{thread_hex}> '
+ f'RX: {v["rx"]} TX: {v["tx"]}',
)
time.sleep(1)
@@ -123,19 +119,19 @@ class ListenStatsThread(APRSDThread):
@cli.command()
@cli_helper.add_options(cli_helper.common_options)
@click.option(
- "--aprs-login",
- envvar="APRS_LOGIN",
+ '--aprs-login',
+ envvar='APRS_LOGIN',
show_envvar=True,
- help="What callsign to send the message from.",
+ help='What callsign to send the message from.',
)
@click.option(
- "--aprs-password",
- envvar="APRS_PASSWORD",
+ '--aprs-password',
+ envvar='APRS_PASSWORD',
show_envvar=True,
- help="the APRS-IS password for APRS_LOGIN",
+ help='the APRS-IS password for APRS_LOGIN',
)
@click.option(
- "--packet-filter",
+ '--packet-filter',
type=click.Choice(
[
packets.AckPacket.__name__,
@@ -154,35 +150,35 @@ class ListenStatsThread(APRSDThread):
),
multiple=True,
default=[],
- help="Filter by packet type",
+ help='Filter by packet type',
)
@click.option(
- "--enable-plugin",
+ '--enable-plugin',
multiple=True,
- help="Enable a plugin. This is the name of the file in the plugins directory.",
+ help='Enable a plugin. This is the name of the file in the plugins directory.',
)
@click.option(
- "--load-plugins",
+ '--load-plugins',
default=False,
is_flag=True,
- help="Load plugins as enabled in aprsd.conf ?",
+ help='Load plugins as enabled in aprsd.conf ?',
)
@click.argument(
- "filter",
+ 'filter',
nargs=-1,
required=True,
)
@click.option(
- "--log-packets",
+ '--log-packets',
default=False,
is_flag=True,
- help="Log incoming packets.",
+ help='Log incoming packets.',
)
@click.option(
- "--enable-packet-stats",
+ '--enable-packet-stats',
default=False,
is_flag=True,
- help="Enable packet stats periodic logging.",
+ help='Enable packet stats periodic logging.',
)
@click.pass_context
@cli_helper.process_standard_options
@@ -212,41 +208,41 @@ def listen(
if not aprs_login:
click.echo(ctx.get_help())
- click.echo("")
- ctx.fail("Must set --aprs-login or APRS_LOGIN")
+ click.echo('')
+ ctx.fail('Must set --aprs-login or APRS_LOGIN')
ctx.exit()
if not aprs_password:
click.echo(ctx.get_help())
- click.echo("")
- ctx.fail("Must set --aprs-password or APRS_PASSWORD")
+ click.echo('')
+ ctx.fail('Must set --aprs-password or APRS_PASSWORD')
ctx.exit()
# CONF.aprs_network.login = aprs_login
# config["aprs"]["password"] = aprs_password
- LOG.info(f"APRSD Listen Started version: {aprsd.__version__}")
+ LOG.info(f'APRSD Listen Started version: {aprsd.__version__}')
CONF.log_opt_values(LOG, logging.DEBUG)
collector.Collector()
# Try and load saved MsgTrack list
- LOG.debug("Loading saved MsgTrack object.")
+ LOG.debug('Loading saved MsgTrack object.')
# Initialize the client factory and create
# The correct client object ready for use
# Make sure we have 1 client transport enabled
if not client_factory.is_client_enabled():
- LOG.error("No Clients are enabled in config.")
+ LOG.error('No Clients are enabled in config.')
sys.exit(-1)
# Creates the client object
- LOG.info("Creating client connection")
+ LOG.info('Creating client connection')
aprs_client = client_factory.create()
LOG.info(aprs_client)
if not aprs_client.login_success:
# We failed to login, will just quit!
- msg = f"Login Failure: {aprs_client.login_failure}"
+ msg = f'Login Failure: {aprs_client.login_failure}'
LOG.error(msg)
print(msg)
sys.exit(-1)
@@ -263,16 +259,16 @@ def listen(
# we don't want the dupe filter to run here.
PacketFilter().unregister(dupe_filter.DupePacketFilter)
if packet_filter:
- LOG.info("Enabling packet filtering for {packet_filter}")
+ LOG.info('Enabling packet filtering for {packet_filter}')
packet_type.PacketTypeFilter().set_allow_list(packet_filter)
PacketFilter().register(packet_type.PacketTypeFilter)
else:
- LOG.info("No packet filtering enabled.")
+ LOG.info('No packet filtering enabled.')
pm = None
if load_plugins:
pm = plugin.PluginManager()
- LOG.info("Loading plugins")
+ LOG.info('Loading plugins')
pm.setup_plugins(load_help_plugin=False)
elif enable_plugin:
pm = plugin.PluginManager()
@@ -283,22 +279,21 @@ def listen(
else:
LOG.warning(
"Not Loading any plugins use --load-plugins to load what's "
- "defined in the config file.",
+ 'defined in the config file.',
)
if pm:
for p in pm.get_plugins():
- LOG.info("Loaded plugin %s", p.__class__.__name__)
+ LOG.info('Loaded plugin %s', p.__class__.__name__)
stats = stats_thread.APRSDStatsStoreThread()
stats.start()
- LOG.debug("Start APRSDRxThread")
+ LOG.debug('Start APRSDRxThread')
rx_thread = rx.APRSDRXThread(packet_queue=threads.packet_queue)
rx_thread.start()
-
- LOG.debug("Create APRSDListenProcessThread")
+ LOG.debug('Create APRSDListenProcessThread')
listen_thread = APRSDListenProcessThread(
packet_queue=threads.packet_queue,
packet_filter=packet_filter,
@@ -306,14 +301,14 @@ def listen(
enabled_plugins=enable_plugin,
log_packets=log_packets,
)
- LOG.debug("Start APRSDListenProcessThread")
+ LOG.debug('Start APRSDListenProcessThread')
listen_thread.start()
if enable_packet_stats:
listen_stats = ListenStatsThread()
listen_stats.start()
keepalive_thread.start()
- LOG.debug("keepalive Join")
+ LOG.debug('keepalive Join')
keepalive_thread.join()
rx_thread.join()
listen_thread.join()
diff --git a/aprsd/packets/filters/dupe_filter.py b/aprsd/packets/filters/dupe_filter.py
index e0fa363..0839fcf 100644
--- a/aprsd/packets/filters/dupe_filter.py
+++ b/aprsd/packets/filters/dupe_filter.py
@@ -1,28 +1,27 @@
import logging
-from typing import Union
+from typing import Union
from oslo_config import cfg
-from aprsd.packets import core
from aprsd import packets
-from aprsd.utils import trace
-
+from aprsd.packets import core
CONF = cfg.CONF
-LOG = logging.getLogger("APRSD")
+LOG = logging.getLogger('APRSD')
class DupePacketFilter:
"""This is a packet filter to detect duplicate packets.
This Uses the PacketList object to see if a packet exists
- already. If it does exist in the PacketList, then we need to
+ already. If it does exist in the PacketList, then we need to
check the flag on the packet to see if it's been processed before.
- If the packet has been processed already within the allowed
+ If the packet has been processed already within the allowed
timeframe, then it's a dupe.
"""
+
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
- #LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}")
+ # LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}")
"""Filter a packet out if it's already been seen and processed."""
if isinstance(packet, core.AckPacket):
# We don't need to drop AckPackets, those should be
@@ -51,7 +50,7 @@ class DupePacketFilter:
if not found:
# We haven't seen this packet before, so we process it.
return packet
-
+
if not packet.processed:
# We haven't processed this packet through the plugins.
return packet
@@ -59,11 +58,11 @@ class DupePacketFilter:
# If the packet came in within N seconds of the
# Last time seeing the packet, then we drop it as a dupe.
LOG.warning(
- f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping."
+ f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.'
)
else:
LOG.warning(
- f"Packet {packet.from_call}:{packet.msgNo} already tracked "
- f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
+ f'Packet {packet.from_call}:{packet.msgNo} already tracked '
+ f'but older than {CONF.packet_dupe_timeout} seconds. processing.',
)
return packet
diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py
index 1088daf..b995f8c 100644
--- a/aprsd/threads/rx.py
+++ b/aprsd/threads/rx.py
@@ -8,12 +8,9 @@ from oslo_config import cfg
from aprsd import packets, plugin
from aprsd.client import client_factory
-from aprsd.packets import collector
+from aprsd.packets import collector, filter
from aprsd.packets import log as packet_log
from aprsd.threads import APRSDThread, tx
-from aprsd.utils import trace
-from aprsd.packets import filter
-from aprsd.packets.filters import dupe_filter
CONF = cfg.CONF
LOG = logging.getLogger('APRSD')
@@ -24,13 +21,14 @@ class APRSDRXThread(APRSDThread):
A packet is received in the main loop and then sent to the
process_packet method, which sends the packet through the collector
- to track the packet for stats, and then put into the packet queue
- for processing in a separate thread.
+ to track the packet for stats, and then put into the packet queue
+ for processing in a separate thread.
"""
+
_client = None
# This is the queue that packets are sent to for processing.
- # We process packets in a separate thread to help prevent
+ # We process packets in a separate thread to help prevent
# getting blocked by the APRS server trying to send us packets.
packet_queue = None
@@ -139,7 +137,6 @@ class APRSDRXThread(APRSDThread):
class APRSDFilterThread(APRSDThread):
-
def __init__(self, thread_name, packet_queue):
super().__init__(thread_name)
self.packet_queue = packet_queue
@@ -149,13 +146,13 @@ class APRSDFilterThread(APRSDThread):
if not filter.PacketFilter().filter(packet):
return None
return packet
-
+
def print_packet(self, packet):
"""Allow a child of this class to override this.
This is helpful if for whatever reason the child class
doesn't want to log packets.
-
+
"""
packet_log.log(packet)
@@ -174,7 +171,7 @@ class APRSDFilterThread(APRSDThread):
class APRSDProcessPacketThread(APRSDFilterThread):
"""Base class for processing received packets after they have been filtered.
- Packets are received from the client, then filtered for dupes,
+ Packets are received from the client, then filtered for dupes,
then sent to the packet queue. This thread pulls packets from
the packet queue for processing.
@@ -184,7 +181,7 @@ class APRSDProcessPacketThread(APRSDFilterThread):
for processing."""
def __init__(self, packet_queue):
- super().__init__("ProcessPKT", packet_queue=packet_queue)
+ super().__init__('ProcessPKT', packet_queue=packet_queue)
if not CONF.enable_sending_ack_packets:
LOG.warning(
'Sending ack packets is disabled, messages will not be acknowledged.',
@@ -210,7 +207,7 @@ class APRSDProcessPacketThread(APRSDFilterThread):
def process_packet(self, packet):
"""Process a packet received from aprs-is server."""
- LOG.debug(f"ProcessPKT-LOOP {self.loop_count}")
+ LOG.debug(f'ProcessPKT-LOOP {self.loop_count}')
# set this now as we are going to process it.
# This is used during dupe checking, so set it early