1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-07-30 12:22:27 -04:00

Compare commits

..

No commits in common. "4fd64a3c250594cdba4d9957c04616f84788b4a2" and "101904ca77d816ae9e70bc7d22e6d8516fc3c5ce" have entirely different histories.

14 changed files with 188 additions and 405 deletions

6
.gitignore vendored
View File

@ -60,9 +60,3 @@ AUTHORS
Makefile.venv Makefile.venv
# Copilot # Copilot
.DS_Store .DS_Store
.python-version
.fleet
.vscode
.envrc
.doit.db

View File

@ -18,13 +18,12 @@ class KISS3Client:
# date for last time we heard from the server # date for last time we heard from the server
aprsd_keepalive = datetime.datetime.now() aprsd_keepalive = datetime.datetime.now()
_connected = False
def __init__(self): def __init__(self):
self.setup() self.setup()
def is_alive(self): def is_alive(self):
return self._connected return True
def setup(self): def setup(self):
# we can be TCP kiss or Serial kiss # we can be TCP kiss or Serial kiss
@ -57,33 +56,17 @@ class KISS3Client:
self.path = CONF.kiss_tcp.path self.path = CONF.kiss_tcp.path
LOG.debug('Starting KISS interface connection') LOG.debug('Starting KISS interface connection')
try:
self.kiss.start() self.kiss.start()
if self.kiss.protocol.transport.is_closing():
LOG.warning('KISS transport is closing, not setting consumer callback')
self._connected = False
else:
self._connected = True
except Exception:
LOG.error('Failed to start KISS interface.')
self._connected = False
@trace.trace @trace.trace
def stop(self): def stop(self):
if not self._connected:
# do nothing since we aren't connected
return
try: try:
self.kiss.stop() self.kiss.stop()
self.kiss.loop.call_soon_threadsafe( self.kiss.loop.call_soon_threadsafe(
self.kiss.protocol.transport.close, self.kiss.protocol.transport.close,
) )
except Exception: except Exception as ex:
LOG.error('Failed to stop KISS interface.') LOG.exception(ex)
def close(self):
self.stop()
def set_filter(self, filter): def set_filter(self, filter):
# This does nothing right now. # This does nothing right now.
@ -103,14 +86,8 @@ class KISS3Client:
LOG.exception(ex) LOG.exception(ex)
def consumer(self, callback): def consumer(self, callback):
if not self._connected:
raise Exception('KISS transport is not connected')
self._parse_callback = callback self._parse_callback = callback
if not self.kiss.protocol.transport.is_closing(): self.kiss.read(callback=self.parse_frame, min_frames=None)
self.kiss.read(callback=self.parse_frame, min_frames=1)
else:
self._connected = False
def send(self, packet): def send(self, packet):
"""Send an APRS Message object.""" """Send an APRS Message object."""

View File

@ -140,4 +140,3 @@ class KISSClient(base.APRSClient):
except Exception as ex: except Exception as ex:
LOG.error(f'Consumer failed {ex}') LOG.error(f'Consumer failed {ex}')
LOG.error(ex) LOG.error(ex)
raise ex

View File

@ -20,10 +20,8 @@ from aprsd import cli_helper, packets, plugin, threads, utils
from aprsd.client import client_factory from aprsd.client import client_factory
from aprsd.main import cli from aprsd.main import cli
from aprsd.packets import collector as packet_collector from aprsd.packets import collector as packet_collector
from aprsd.packets import core, seen_list
from aprsd.packets import log as packet_log from aprsd.packets import log as packet_log
from aprsd.packets.filter import PacketFilter from aprsd.packets import seen_list
from aprsd.packets.filters import dupe_filter, packet_type
from aprsd.stats import collector from aprsd.stats import collector
from aprsd.threads import keepalive, rx from aprsd.threads import keepalive, rx
from aprsd.threads import stats as stats_thread from aprsd.threads import stats as stats_thread
@ -31,7 +29,7 @@ from aprsd.threads.aprsd import APRSDThread
# setup the global logger # setup the global logger
# log.basicConfig(level=log.DEBUG) # level=10 # log.basicConfig(level=log.DEBUG) # level=10
LOG = logging.getLogger('APRSD') LOG = logging.getLogger("APRSD")
CONF = cfg.CONF CONF = cfg.CONF
LOGU = logger LOGU = logger
console = Console() console = Console()
@ -39,9 +37,9 @@ console = Console()
def signal_handler(sig, frame): def signal_handler(sig, frame):
threads.APRSDThreadList().stop_all() threads.APRSDThreadList().stop_all()
if 'subprocess' not in str(frame): if "subprocess" not in str(frame):
LOG.info( LOG.info(
'Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}'.format( "Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}".format(
datetime.datetime.now(), datetime.datetime.now(),
), ),
) )
@ -50,66 +48,90 @@ def signal_handler(sig, frame):
collector.Collector().collect() collector.Collector().collect()
class APRSDListenProcessThread(rx.APRSDFilterThread): class APRSDListenThread(rx.APRSDRXThread):
def __init__( def __init__(
self, self,
packet_queue, packet_queue,
packet_filter=None, packet_filter=None,
plugin_manager=None, plugin_manager=None,
enabled_plugins=None, enabled_plugins=[],
log_packets=False, log_packets=False,
): ):
super().__init__('ListenProcThread', packet_queue) super().__init__(packet_queue)
self.packet_filter = packet_filter self.packet_filter = packet_filter
self.plugin_manager = plugin_manager self.plugin_manager = plugin_manager
if self.plugin_manager: if self.plugin_manager:
LOG.info(f'Plugins {self.plugin_manager.get_message_plugins()}') LOG.info(f"Plugins {self.plugin_manager.get_message_plugins()}")
self.log_packets = log_packets self.log_packets = log_packets
def print_packet(self, packet): def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
filters = {
packets.Packet.__name__: packets.Packet,
packets.AckPacket.__name__: packets.AckPacket,
packets.BeaconPacket.__name__: packets.BeaconPacket,
packets.GPSPacket.__name__: packets.GPSPacket,
packets.MessagePacket.__name__: packets.MessagePacket,
packets.MicEPacket.__name__: packets.MicEPacket,
packets.ObjectPacket.__name__: packets.ObjectPacket,
packets.StatusPacket.__name__: packets.StatusPacket,
packets.ThirdPartyPacket.__name__: packets.ThirdPartyPacket,
packets.WeatherPacket.__name__: packets.WeatherPacket,
packets.UnknownPacket.__name__: packets.UnknownPacket,
}
if self.packet_filter:
filter_class = filters[self.packet_filter]
if isinstance(packet, filter_class):
if self.log_packets:
packet_log.log(packet)
if self.plugin_manager:
# Don't do anything with the reply
# This is the listen only command.
self.plugin_manager.run(packet)
else:
if self.log_packets: if self.log_packets:
packet_log.log(packet) packet_log.log(packet)
def process_packet(self, packet: type[core.Packet]):
if self.plugin_manager: if self.plugin_manager:
# Don't do anything with the reply. # Don't do anything with the reply.
# This is the listen only command. # This is the listen only command.
self.plugin_manager.run(packet) self.plugin_manager.run(packet)
packet_collector.PacketCollector().rx(packet)
class ListenStatsThread(APRSDThread): class ListenStatsThread(APRSDThread):
"""Log the stats from the PacketList.""" """Log the stats from the PacketList."""
def __init__(self): def __init__(self):
super().__init__('PacketStatsLog') super().__init__("PacketStatsLog")
self._last_total_rx = 0 self._last_total_rx = 0
self.period = 31
def loop(self): def loop(self):
if self.loop_count % self.period == 0: if self.loop_count % 10 == 0:
# log the stats every 10 seconds # log the stats every 10 seconds
stats_json = collector.Collector().collect() stats_json = collector.Collector().collect()
stats = stats_json['PacketList'] stats = stats_json["PacketList"]
total_rx = stats['rx'] total_rx = stats["rx"]
packet_count = len(stats['packets']) packet_count = len(stats["packets"])
rx_delta = total_rx - self._last_total_rx rx_delta = total_rx - self._last_total_rx
rate = rx_delta / self.period rate = rx_delta / 10
# Log summary stats # Log summary stats
LOGU.opt(colors=True).info( LOGU.opt(colors=True).info(
f'<green>RX Rate: {rate:.2f} pps</green> ' f"<green>RX Rate: {rate} pps</green> "
f'<yellow>Total RX: {total_rx}</yellow> ' f"<yellow>Total RX: {total_rx}</yellow> "
f'<red>RX Last {self.period} secs: {rx_delta}</red> ' f"<red>RX Last 10 secs: {rx_delta}</red> "
f'<white>Packets in PacketListStats: {packet_count}</white>', f"<white>Packets in PacketList: {packet_count}</white>",
) )
self._last_total_rx = total_rx self._last_total_rx = total_rx
# Log individual type stats # Log individual type stats
for k, v in stats['types'].items(): for k, v in stats["types"].items():
thread_hex = f'fg {utils.hex_from_name(k)}' thread_hex = f"fg {utils.hex_from_name(k)}"
LOGU.opt(colors=True).info( LOGU.opt(colors=True).info(
f'<{thread_hex}>{k:<15}</{thread_hex}> ' f"<{thread_hex}>{k:<15}</{thread_hex}> "
f'<blue>RX: {v["rx"]}</blue> <red>TX: {v["tx"]}</red>', f"<blue>RX: {v['rx']}</blue> <red>TX: {v['tx']}</red>",
) )
time.sleep(1) time.sleep(1)
@ -119,19 +141,19 @@ class ListenStatsThread(APRSDThread):
@cli.command() @cli.command()
@cli_helper.add_options(cli_helper.common_options) @cli_helper.add_options(cli_helper.common_options)
@click.option( @click.option(
'--aprs-login', "--aprs-login",
envvar='APRS_LOGIN', envvar="APRS_LOGIN",
show_envvar=True, show_envvar=True,
help='What callsign to send the message from.', help="What callsign to send the message from.",
) )
@click.option( @click.option(
'--aprs-password', "--aprs-password",
envvar='APRS_PASSWORD', envvar="APRS_PASSWORD",
show_envvar=True, show_envvar=True,
help='the APRS-IS password for APRS_LOGIN', help="the APRS-IS password for APRS_LOGIN",
) )
@click.option( @click.option(
'--packet-filter', "--packet-filter",
type=click.Choice( type=click.Choice(
[ [
packets.AckPacket.__name__, packets.AckPacket.__name__,
@ -148,37 +170,35 @@ class ListenStatsThread(APRSDThread):
], ],
case_sensitive=False, case_sensitive=False,
), ),
multiple=True, help="Filter by packet type",
default=[],
help='Filter by packet type',
) )
@click.option( @click.option(
'--enable-plugin', "--enable-plugin",
multiple=True, multiple=True,
help='Enable a plugin. This is the name of the file in the plugins directory.', help="Enable a plugin. This is the name of the file in the plugins directory.",
) )
@click.option( @click.option(
'--load-plugins', "--load-plugins",
default=False, default=False,
is_flag=True, is_flag=True,
help='Load plugins as enabled in aprsd.conf ?', help="Load plugins as enabled in aprsd.conf ?",
) )
@click.argument( @click.argument(
'filter', "filter",
nargs=-1, nargs=-1,
required=True, required=True,
) )
@click.option( @click.option(
'--log-packets', "--log-packets",
default=False, default=False,
is_flag=True, is_flag=True,
help='Log incoming packets.', help="Log incoming packets.",
) )
@click.option( @click.option(
'--enable-packet-stats', "--enable-packet-stats",
default=False, default=False,
is_flag=True, is_flag=True,
help='Enable packet stats periodic logging.', help="Enable packet stats periodic logging.",
) )
@click.pass_context @click.pass_context
@cli_helper.process_standard_options @cli_helper.process_standard_options
@ -208,46 +228,46 @@ def listen(
if not aprs_login: if not aprs_login:
click.echo(ctx.get_help()) click.echo(ctx.get_help())
click.echo('') click.echo("")
ctx.fail('Must set --aprs-login or APRS_LOGIN') ctx.fail("Must set --aprs-login or APRS_LOGIN")
ctx.exit() ctx.exit()
if not aprs_password: if not aprs_password:
click.echo(ctx.get_help()) click.echo(ctx.get_help())
click.echo('') click.echo("")
ctx.fail('Must set --aprs-password or APRS_PASSWORD') ctx.fail("Must set --aprs-password or APRS_PASSWORD")
ctx.exit() ctx.exit()
# CONF.aprs_network.login = aprs_login # CONF.aprs_network.login = aprs_login
# config["aprs"]["password"] = aprs_password # config["aprs"]["password"] = aprs_password
LOG.info(f'APRSD Listen Started version: {aprsd.__version__}') LOG.info(f"APRSD Listen Started version: {aprsd.__version__}")
CONF.log_opt_values(LOG, logging.DEBUG) CONF.log_opt_values(LOG, logging.DEBUG)
collector.Collector() collector.Collector()
# Try and load saved MsgTrack list # Try and load saved MsgTrack list
LOG.debug('Loading saved MsgTrack object.') LOG.debug("Loading saved MsgTrack object.")
# Initialize the client factory and create # Initialize the client factory and create
# The correct client object ready for use # The correct client object ready for use
# Make sure we have 1 client transport enabled # Make sure we have 1 client transport enabled
if not client_factory.is_client_enabled(): if not client_factory.is_client_enabled():
LOG.error('No Clients are enabled in config.') LOG.error("No Clients are enabled in config.")
sys.exit(-1) sys.exit(-1)
# Creates the client object # Creates the client object
LOG.info('Creating client connection') LOG.info("Creating client connection")
aprs_client = client_factory.create() aprs_client = client_factory.create()
LOG.info(aprs_client) LOG.info(aprs_client)
if not aprs_client.login_success: if not aprs_client.login_success:
# We failed to login, will just quit! # We failed to login, will just quit!
msg = f'Login Failure: {aprs_client.login_failure}' msg = f"Login Failure: {aprs_client.login_failure}"
LOG.error(msg) LOG.error(msg)
print(msg) print(msg)
sys.exit(-1) sys.exit(-1)
LOG.debug(f"Filter messages on aprsis server by '{filter}'") LOG.debug(f"Filter by '{filter}'")
aprs_client.set_filter(filter) aprs_client.set_filter(filter)
keepalive_thread = keepalive.KeepAliveThread() keepalive_thread = keepalive.KeepAliveThread()
@ -256,19 +276,10 @@ def listen(
# just deregister the class from the packet collector # just deregister the class from the packet collector
packet_collector.PacketCollector().unregister(seen_list.SeenList) packet_collector.PacketCollector().unregister(seen_list.SeenList)
# we don't want the dupe filter to run here.
PacketFilter().unregister(dupe_filter.DupePacketFilter)
if packet_filter:
LOG.info('Enabling packet filtering for {packet_filter}')
packet_type.PacketTypeFilter().set_allow_list(packet_filter)
PacketFilter().register(packet_type.PacketTypeFilter)
else:
LOG.info('No packet filtering enabled.')
pm = None pm = None
if load_plugins: if load_plugins:
pm = plugin.PluginManager() pm = plugin.PluginManager()
LOG.info('Loading plugins') LOG.info("Loading plugins")
pm.setup_plugins(load_help_plugin=False) pm.setup_plugins(load_help_plugin=False)
elif enable_plugin: elif enable_plugin:
pm = plugin.PluginManager() pm = plugin.PluginManager()
@ -279,37 +290,33 @@ def listen(
else: else:
LOG.warning( LOG.warning(
"Not Loading any plugins use --load-plugins to load what's " "Not Loading any plugins use --load-plugins to load what's "
'defined in the config file.', "defined in the config file.",
) )
if pm: if pm:
for p in pm.get_plugins(): for p in pm.get_plugins():
LOG.info('Loaded plugin %s', p.__class__.__name__) LOG.info("Loaded plugin %s", p.__class__.__name__)
stats = stats_thread.APRSDStatsStoreThread() stats = stats_thread.APRSDStatsStoreThread()
stats.start() stats.start()
LOG.debug('Start APRSDRxThread') LOG.debug("Create APRSDListenThread")
rx_thread = rx.APRSDRXThread(packet_queue=threads.packet_queue) listen_thread = APRSDListenThread(
rx_thread.start()
LOG.debug('Create APRSDListenProcessThread')
listen_thread = APRSDListenProcessThread(
packet_queue=threads.packet_queue, packet_queue=threads.packet_queue,
packet_filter=packet_filter, packet_filter=packet_filter,
plugin_manager=pm, plugin_manager=pm,
enabled_plugins=enable_plugin, enabled_plugins=enable_plugin,
log_packets=log_packets, log_packets=log_packets,
) )
LOG.debug('Start APRSDListenProcessThread') LOG.debug("Start APRSDListenThread")
listen_thread.start() listen_thread.start()
if enable_packet_stats: if enable_packet_stats:
listen_stats = ListenStatsThread() listen_stats = ListenStatsThread()
listen_stats.start() listen_stats.start()
keepalive_thread.start() keepalive_thread.start()
LOG.debug('keepalive Join') LOG.debug("keepalive Join")
keepalive_thread.join() keepalive_thread.join()
rx_thread.join() LOG.debug("listen_thread Join")
listen_thread.join() listen_thread.join()
stats.join() stats.join()

View File

@ -147,7 +147,7 @@ def server(ctx, flush):
server_threads.register(keepalive.KeepAliveThread()) server_threads.register(keepalive.KeepAliveThread())
server_threads.register(stats_thread.APRSDStatsStoreThread()) server_threads.register(stats_thread.APRSDStatsStoreThread())
server_threads.register( server_threads.register(
rx.APRSDRXThread( rx.APRSDPluginRXThread(
packet_queue=threads.packet_queue, packet_queue=threads.packet_queue,
), ),
) )

View File

@ -15,8 +15,6 @@ from aprsd.packets.core import ( # noqa: F401
WeatherPacket, WeatherPacket,
factory, factory,
) )
from aprsd.packets.filter import PacketFilter
from aprsd.packets.filters.dupe_filter import DupePacketFilter
from aprsd.packets.packet_list import PacketList # noqa: F401 from aprsd.packets.packet_list import PacketList # noqa: F401
from aprsd.packets.seen_list import SeenList # noqa: F401 from aprsd.packets.seen_list import SeenList # noqa: F401
from aprsd.packets.tracker import PacketTrack # noqa: F401 from aprsd.packets.tracker import PacketTrack # noqa: F401
@ -28,9 +26,5 @@ collector.PacketCollector().register(SeenList)
collector.PacketCollector().register(PacketTrack) collector.PacketCollector().register(PacketTrack)
collector.PacketCollector().register(WatchList) collector.PacketCollector().register(WatchList)
# Register all the packet filters for normal processing
# For specific commands you can deregister these if you don't want them.
PacketFilter().register(DupePacketFilter)
NULL_MESSAGE = -1 NULL_MESSAGE = -1

View File

@ -106,8 +106,6 @@ class Packet:
last_send_time: float = field(repr=False, default=0, compare=False, hash=False) last_send_time: float = field(repr=False, default=0, compare=False, hash=False)
# Was the packet acked? # Was the packet acked?
acked: bool = field(repr=False, default=False, compare=False, hash=False) acked: bool = field(repr=False, default=False, compare=False, hash=False)
# Was the packet previously processed (for dupe checking)
processed: bool = field(repr=False, default=False, compare=False, hash=False)
# Do we allow this packet to be saved to send later? # Do we allow this packet to be saved to send later?
allow_delay: bool = field(repr=False, default=True, compare=False, hash=False) allow_delay: bool = field(repr=False, default=True, compare=False, hash=False)
@ -188,11 +186,12 @@ class Packet:
def __repr__(self) -> str: def __repr__(self) -> str:
"""Build the repr version of the packet.""" """Build the repr version of the packet."""
return ( repr = (
f"{self.__class__.__name__}:" f"{self.__class__.__name__}:"
f" From: {self.from_call} " f" From: {self.from_call} "
f" To: {self.to_call}" f" To: {self.to_call}"
) )
return repr
@dataclass_json @dataclass_json
@ -695,8 +694,6 @@ class UnknownPacket:
path: List[str] = field(default_factory=list, compare=False, hash=False) path: List[str] = field(default_factory=list, compare=False, hash=False)
packet_type: Optional[str] = field(default=None) packet_type: Optional[str] = field(default=None)
via: Optional[str] = field(default=None, compare=False, hash=False) via: Optional[str] = field(default=None, compare=False, hash=False)
# Was the packet previously processed (for dupe checking)
processed: bool = field(repr=False, default=False, compare=False, hash=False)
@property @property
def key(self) -> str: def key(self) -> str:

View File

@ -1,58 +0,0 @@
import logging
from typing import Callable, Protocol, runtime_checkable, Union, Dict
from aprsd.packets import core
from aprsd.utils import singleton
LOG = logging.getLogger("APRSD")
@runtime_checkable
class PacketFilterProtocol(Protocol):
"""Protocol API for a packet filter class.
"""
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
"""When we get a packet from the network.
Return a Packet object if the filter passes. Return None if the
Packet is filtered out.
"""
...
@singleton
class PacketFilter:
def __init__(self):
self.filters: Dict[str, Callable] = {}
def register(self, packet_filter: Callable) -> None:
if not isinstance(packet_filter, PacketFilterProtocol):
raise TypeError(f"class {packet_filter} is not a PacketFilterProtocol object")
if packet_filter not in self.filters:
self.filters[packet_filter] = packet_filter()
def unregister(self, packet_filter: Callable) -> None:
if not isinstance(packet_filter, PacketFilterProtocol):
raise TypeError(f"class {packet_filter} is not a PacketFilterProtocol object")
if packet_filter in self.filters:
del self.filters[packet_filter]
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
"""Run through each of the filters.
This will step through each registered filter class
and call filter on it.
If the filter object returns None, we are done filtering.
If the filter object returns the packet, we continue filtering.
"""
for packet_filter in self.filters:
try:
if not self.filters[packet_filter].filter(packet):
LOG.debug(f"{self.filters[packet_filter].__class__.__name__} dropped {packet.__class__.__name__}:{packet.human_info}")
return None
except Exception as ex:
LOG.error(f"{packet_filter.__clas__.__name__} failed filtering packet {packet.__class__.__name__} : {ex}")
return packet

View File

@ -1,68 +0,0 @@
import logging
from typing import Union
from oslo_config import cfg
from aprsd import packets
from aprsd.packets import core
CONF = cfg.CONF
LOG = logging.getLogger('APRSD')
class DupePacketFilter:
"""This is a packet filter to detect duplicate packets.
This Uses the PacketList object to see if a packet exists
already. If it does exist in the PacketList, then we need to
check the flag on the packet to see if it's been processed before.
If the packet has been processed already within the allowed
timeframe, then it's a dupe.
"""
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
# LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}")
"""Filter a packet out if it's already been seen and processed."""
if isinstance(packet, core.AckPacket):
# We don't need to drop AckPackets, those should be
# processed.
# Send the AckPacket to the queue for processing elsewhere.
return packet
else:
# Make sure we aren't re-processing the same packet
# For RF based APRS Clients we can get duplicate packets
# So we need to track them and not process the dupes.
pkt_list = packets.PacketList()
found = False
try:
# Find the packet in the list of already seen packets
# Based on the packet.key
found = pkt_list.find(packet)
if not packet.msgNo:
# If the packet doesn't have a message id
# then there is no reliable way to detect
# if it's a dupe, so we just pass it on.
# it shouldn't get acked either.
found = False
except KeyError:
found = False
if not found:
# We haven't seen this packet before, so we process it.
return packet
if not packet.processed:
# We haven't processed this packet through the plugins.
return packet
elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
# If the packet came in within N seconds of the
# Last time seeing the packet, then we drop it as a dupe.
LOG.warning(
f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.'
)
else:
LOG.warning(
f'Packet {packet.from_call}:{packet.msgNo} already tracked '
f'but older than {CONF.packet_dupe_timeout} seconds. processing.',
)
return packet

View File

@ -1,53 +0,0 @@
import logging
from typing import Union
from oslo_config import cfg
from aprsd import packets
from aprsd.packets import core
from aprsd.utils import singleton
CONF = cfg.CONF
LOG = logging.getLogger('APRSD')
@singleton
class PacketTypeFilter:
"""This filter is used to filter out packets that don't match a specific type.
To use this, register it with the PacketFilter class,
then instante it and call set_allow_list() with a list of packet types
you want to allow to pass the filtering. All other packets will be
filtered out.
"""
filters = {
packets.Packet.__name__: packets.Packet,
packets.AckPacket.__name__: packets.AckPacket,
packets.BeaconPacket.__name__: packets.BeaconPacket,
packets.GPSPacket.__name__: packets.GPSPacket,
packets.MessagePacket.__name__: packets.MessagePacket,
packets.MicEPacket.__name__: packets.MicEPacket,
packets.ObjectPacket.__name__: packets.ObjectPacket,
packets.StatusPacket.__name__: packets.StatusPacket,
packets.ThirdPartyPacket.__name__: packets.ThirdPartyPacket,
packets.WeatherPacket.__name__: packets.WeatherPacket,
packets.UnknownPacket.__name__: packets.UnknownPacket,
}
allow_list = ()
def set_allow_list(self, filter_list):
tmp_list = []
for filter in filter_list:
LOG.warning(
f'Setting filter {filter} : {self.filters[filter]} to tmp {tmp_list}'
)
tmp_list.append(self.filters[filter])
self.allow_list = tuple(tmp_list)
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
"""Only allow packets of certain types to filter through."""
if self.allow_list:
if isinstance(packet, self.allow_list):
return packet

View File

@ -4,8 +4,9 @@ import queue
# aprsd.threads # aprsd.threads
from .aprsd import APRSDThread, APRSDThreadList # noqa: F401 from .aprsd import APRSDThread, APRSDThreadList # noqa: F401
from .rx import ( # noqa: F401 from .rx import ( # noqa: F401
APRSDDupeRXThread,
APRSDProcessPacketThread, APRSDProcessPacketThread,
APRSDRXThread, APRSDRXThread,
) )
packet_queue = queue.Queue(maxsize=500) packet_queue = queue.Queue(maxsize=20)

View File

@ -8,32 +8,20 @@ from oslo_config import cfg
from aprsd import packets, plugin from aprsd import packets, plugin
from aprsd.client import client_factory from aprsd.client import client_factory
from aprsd.packets import collector, filter from aprsd.packets import collector
from aprsd.packets import log as packet_log from aprsd.packets import log as packet_log
from aprsd.threads import APRSDThread, tx from aprsd.threads import APRSDThread, tx
from aprsd.utils import trace
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger('APRSD') LOG = logging.getLogger("APRSD")
class APRSDRXThread(APRSDThread): class APRSDRXThread(APRSDThread):
"""Main Class to connect to an APRS Client and recieve packets.
A packet is received in the main loop and then sent to the
process_packet method, which sends the packet through the collector
to track the packet for stats, and then put into the packet queue
for processing in a separate thread.
"""
_client = None _client = None
# This is the queue that packets are sent to for processing.
# We process packets in a separate thread to help prevent
# getting blocked by the APRS server trying to send us packets.
packet_queue = None
def __init__(self, packet_queue): def __init__(self, packet_queue):
super().__init__('RX_PKT') super().__init__("RX_PKT")
self.packet_queue = packet_queue self.packet_queue = packet_queue
def stop(self): def stop(self):
@ -64,7 +52,7 @@ class APRSDRXThread(APRSDThread):
# kwargs. :( # kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56 # https://github.com/rossengeorgiev/aprs-python/pull/56
self._client.consumer( self._client.consumer(
self.process_packet, self._process_packet,
raw=False, raw=False,
blocking=False, blocking=False,
) )
@ -72,7 +60,7 @@ class APRSDRXThread(APRSDThread):
aprslib.exceptions.ConnectionDrop, aprslib.exceptions.ConnectionDrop,
aprslib.exceptions.ConnectionError, aprslib.exceptions.ConnectionError,
): ):
LOG.error('Connection dropped, reconnecting') LOG.error("Connection dropped, reconnecting")
# Force the deletion of the client object connected to aprs # Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client() # This will cause a reconnect, next time client.get_client()
# is called # is called
@ -80,18 +68,45 @@ class APRSDRXThread(APRSDThread):
time.sleep(5) time.sleep(5)
except Exception: except Exception:
# LOG.exception(ex) # LOG.exception(ex)
LOG.error('Resetting connection and trying again.') LOG.error("Resetting connection and trying again.")
self._client.reset() self._client.reset()
time.sleep(5) time.sleep(5)
# Continue to loop
time.sleep(1)
return True return True
def _process_packet(self, *args, **kwargs):
"""Intermediate callback so we can update the keepalive time."""
# Now call the 'real' packet processing for a RX'x packet
self.process_packet(*args, **kwargs)
@abc.abstractmethod
def process_packet(self, *args, **kwargs): def process_packet(self, *args, **kwargs):
pass
class APRSDDupeRXThread(APRSDRXThread):
"""Process received packets.
This is the main APRSD Server command thread that
receives packets and makes sure the packet
hasn't been seen previously before sending it on
to be processed.
"""
@trace.trace
def process_packet(self, *args, **kwargs):
"""This handles the processing of an inbound packet.
When a packet is received by the connected client object,
it sends the raw packet into this function. This function then
decodes the packet via the client, and then processes the packet.
Ack Packets are sent to the PluginProcessPacketThread for processing.
All other packets have to be checked as a dupe, and then only after
we haven't seen this packet before, do we send it to the
PluginProcessPacketThread for processing.
"""
packet = self._client.decode_packet(*args, **kwargs) packet = self._client.decode_packet(*args, **kwargs)
if not packet:
LOG.error(
'No packet received from decode_packet. Most likely a failure to parse'
)
return
packet_log.log(packet) packet_log.log(packet)
pkt_list = packets.PacketList() pkt_list = packets.PacketList()
@ -125,55 +140,26 @@ class APRSDRXThread(APRSDThread):
# If the packet came in within N seconds of the # If the packet came in within N seconds of the
# Last time seeing the packet, then we drop it as a dupe. # Last time seeing the packet, then we drop it as a dupe.
LOG.warning( LOG.warning(
f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.' f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping."
) )
else: else:
LOG.warning( LOG.warning(
f'Packet {packet.from_call}:{packet.msgNo} already tracked ' f"Packet {packet.from_call}:{packet.msgNo} already tracked "
f'but older than {CONF.packet_dupe_timeout} seconds. processing.', f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
) )
collector.PacketCollector().rx(packet) collector.PacketCollector().rx(packet)
self.packet_queue.put(packet) self.packet_queue.put(packet)
class APRSDFilterThread(APRSDThread): class APRSDPluginRXThread(APRSDDupeRXThread):
def __init__(self, thread_name, packet_queue): """ "Process received packets.
super().__init__(thread_name)
self.packet_queue = packet_queue
def filter_packet(self, packet):
# Do any packet filtering prior to processing
if not filter.PacketFilter().filter(packet):
return None
return packet
def print_packet(self, packet):
"""Allow a child of this class to override this.
This is helpful if for whatever reason the child class
doesn't want to log packets.
For backwards compatibility, we keep the APRSDPluginRXThread.
""" """
packet_log.log(packet)
def loop(self):
try:
packet = self.packet_queue.get(timeout=1)
self.print_packet(packet)
if packet:
if self.filter_packet(packet):
self.process_packet(packet)
except queue.Empty:
pass
return True
class APRSDProcessPacketThread(APRSDFilterThread): class APRSDProcessPacketThread(APRSDThread):
"""Base class for processing received packets after they have been filtered. """Base class for processing received packets.
Packets are received from the client, then filtered for dupes,
then sent to the packet queue. This thread pulls packets from
the packet queue for processing.
This is the base class for processing packets coming from This is the base class for processing packets coming from
the consumer. This base class handles sending ack packets and the consumer. This base class handles sending ack packets and
@ -181,38 +167,44 @@ class APRSDProcessPacketThread(APRSDFilterThread):
for processing.""" for processing."""
def __init__(self, packet_queue): def __init__(self, packet_queue):
super().__init__('ProcessPKT', packet_queue=packet_queue) self.packet_queue = packet_queue
super().__init__("ProcessPKT")
if not CONF.enable_sending_ack_packets: if not CONF.enable_sending_ack_packets:
LOG.warning( LOG.warning(
'Sending ack packets is disabled, messages will not be acknowledged.', "Sending ack packets is disabled, messages "
"will not be acknowledged.",
) )
def process_ack_packet(self, packet): def process_ack_packet(self, packet):
"""We got an ack for a message, no need to resend it.""" """We got an ack for a message, no need to resend it."""
ack_num = packet.msgNo ack_num = packet.msgNo
LOG.debug(f'Got ack for message {ack_num}') LOG.debug(f"Got ack for message {ack_num}")
collector.PacketCollector().rx(packet) collector.PacketCollector().rx(packet)
def process_piggyback_ack(self, packet): def process_piggyback_ack(self, packet):
"""We got an ack embedded in a packet.""" """We got an ack embedded in a packet."""
ack_num = packet.ackMsgNo ack_num = packet.ackMsgNo
LOG.debug(f'Got PiggyBackAck for message {ack_num}') LOG.debug(f"Got PiggyBackAck for message {ack_num}")
collector.PacketCollector().rx(packet) collector.PacketCollector().rx(packet)
def process_reject_packet(self, packet): def process_reject_packet(self, packet):
"""We got a reject message for a packet. Stop sending the message.""" """We got a reject message for a packet. Stop sending the message."""
ack_num = packet.msgNo ack_num = packet.msgNo
LOG.debug(f'Got REJECT for message {ack_num}') LOG.debug(f"Got REJECT for message {ack_num}")
collector.PacketCollector().rx(packet) collector.PacketCollector().rx(packet)
def loop(self):
try:
packet = self.packet_queue.get(timeout=1)
if packet:
self.process_packet(packet)
except queue.Empty:
pass
return True
def process_packet(self, packet): def process_packet(self, packet):
"""Process a packet received from aprs-is server.""" """Process a packet received from aprs-is server."""
LOG.debug(f'ProcessPKT-LOOP {self.loop_count}') LOG.debug(f"ProcessPKT-LOOP {self.loop_count}")
# set this now as we are going to process it.
# This is used during dupe checking, so set it early
packet.processed = True
our_call = CONF.callsign.lower() our_call = CONF.callsign.lower()
from_call = packet.from_call from_call = packet.from_call
@ -235,7 +227,7 @@ class APRSDProcessPacketThread(APRSDFilterThread):
): ):
self.process_reject_packet(packet) self.process_reject_packet(packet)
else: else:
if hasattr(packet, 'ackMsgNo') and packet.ackMsgNo: if hasattr(packet, "ackMsgNo") and packet.ackMsgNo:
# we got an ack embedded in this packet # we got an ack embedded in this packet
# we need to handle the ack # we need to handle the ack
self.process_piggyback_ack(packet) self.process_piggyback_ack(packet)
@ -275,7 +267,7 @@ class APRSDProcessPacketThread(APRSDFilterThread):
if not for_us: if not for_us:
LOG.info("Got a packet meant for someone else '{packet.to_call}'") LOG.info("Got a packet meant for someone else '{packet.to_call}'")
else: else:
LOG.info('Got a non AckPacket/MessagePacket') LOG.info("Got a non AckPacket/MessagePacket")
class APRSDPluginProcessPacketThread(APRSDProcessPacketThread): class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
@ -295,7 +287,7 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
tx.send(subreply) tx.send(subreply)
else: else:
wl = CONF.watch_list wl = CONF.watch_list
to_call = wl['alert_callsign'] to_call = wl["alert_callsign"]
tx.send( tx.send(
packets.MessagePacket( packets.MessagePacket(
from_call=CONF.callsign, from_call=CONF.callsign,
@ -307,7 +299,7 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
# We have a message based object. # We have a message based object.
tx.send(reply) tx.send(reply)
except Exception as ex: except Exception as ex:
LOG.error('Plugin failed!!!') LOG.error("Plugin failed!!!")
LOG.exception(ex) LOG.exception(ex)
def process_our_message_packet(self, packet): def process_our_message_packet(self, packet):
@ -363,11 +355,11 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
if to_call == CONF.callsign and not replied: if to_call == CONF.callsign and not replied:
# Tailor the messages accordingly # Tailor the messages accordingly
if CONF.load_help_plugin: if CONF.load_help_plugin:
LOG.warning('Sending help!') LOG.warning("Sending help!")
message_text = "Unknown command! Send 'help' message for help" message_text = "Unknown command! Send 'help' message for help"
else: else:
LOG.warning('Unknown command!') LOG.warning("Unknown command!")
message_text = 'Unknown command!' message_text = "Unknown command!"
tx.send( tx.send(
packets.MessagePacket( packets.MessagePacket(
@ -377,11 +369,11 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
), ),
) )
except Exception as ex: except Exception as ex:
LOG.error('Plugin failed!!!') LOG.error("Plugin failed!!!")
LOG.exception(ex) LOG.exception(ex)
# Do we need to send a reply? # Do we need to send a reply?
if to_call == CONF.callsign: if to_call == CONF.callsign:
reply = 'A Plugin failed! try again?' reply = "A Plugin failed! try again?"
tx.send( tx.send(
packets.MessagePacket( packets.MessagePacket(
from_call=CONF.callsign, from_call=CONF.callsign,
@ -390,4 +382,4 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
), ),
) )
LOG.debug('Completed process_our_message_packet') LOG.debug("Completed process_our_message_packet")

View File

@ -6,8 +6,9 @@ import threading
from oslo_config import cfg from oslo_config import cfg
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger('APRSD') LOG = logging.getLogger("APRSD")
class ObjectStoreMixin: class ObjectStoreMixin:
@ -62,7 +63,7 @@ class ObjectStoreMixin:
def _save_filename(self): def _save_filename(self):
save_location = CONF.save_location save_location = CONF.save_location
return '{}/{}.p'.format( return "{}/{}.p".format(
save_location, save_location,
self.__class__.__name__.lower(), self.__class__.__name__.lower(),
) )
@ -74,13 +75,13 @@ class ObjectStoreMixin:
self._init_store() self._init_store()
save_filename = self._save_filename() save_filename = self._save_filename()
if len(self) > 0: if len(self) > 0:
LOG.debug( LOG.info(
f'{self.__class__.__name__}::Saving' f"{self.__class__.__name__}::Saving"
f' {len(self)} entries to disk at ' f" {len(self)} entries to disk at "
f'{save_filename}', f"{save_filename}",
) )
with self.lock: with self.lock:
with open(save_filename, 'wb+') as fp: with open(save_filename, "wb+") as fp:
pickle.dump(self.data, fp) pickle.dump(self.data, fp)
else: else:
LOG.debug( LOG.debug(
@ -96,21 +97,21 @@ class ObjectStoreMixin:
return return
if os.path.exists(self._save_filename()): if os.path.exists(self._save_filename()):
try: try:
with open(self._save_filename(), 'rb') as fp: with open(self._save_filename(), "rb") as fp:
raw = pickle.load(fp) raw = pickle.load(fp)
if raw: if raw:
self.data = raw self.data = raw
LOG.debug( LOG.debug(
f'{self.__class__.__name__}::Loaded {len(self)} entries from disk.', f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.",
) )
else: else:
LOG.debug(f'{self.__class__.__name__}::No data to load.') LOG.debug(f"{self.__class__.__name__}::No data to load.")
except (pickle.UnpicklingError, Exception) as ex: except (pickle.UnpicklingError, Exception) as ex:
LOG.error(f'Failed to UnPickle {self._save_filename()}') LOG.error(f"Failed to UnPickle {self._save_filename()}")
LOG.error(ex) LOG.error(ex)
self.data = {} self.data = {}
else: else:
LOG.debug(f'{self.__class__.__name__}::No save file found.') LOG.debug(f"{self.__class__.__name__}::No save file found.")
def flush(self): def flush(self):
"""Nuke the old pickle file that stored the old results from last aprsd run.""" """Nuke the old pickle file that stored the old results from last aprsd run."""