diff --git a/.gitignore b/.gitignore
index 831722e..5944341 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,3 +60,9 @@ AUTHORS
Makefile.venv
# Copilot
.DS_Store
+
+.python-version
+.fleet
+.vscode
+.envrc
+.doit.db
diff --git a/aprsd/cmds/listen.py b/aprsd/cmds/listen.py
index d949e3e..4784e3a 100644
--- a/aprsd/cmds/listen.py
+++ b/aprsd/cmds/listen.py
@@ -20,8 +20,10 @@ from aprsd import cli_helper, packets, plugin, threads, utils
from aprsd.client import client_factory
from aprsd.main import cli
from aprsd.packets import collector as packet_collector
+from aprsd.packets import core, seen_list
from aprsd.packets import log as packet_log
-from aprsd.packets import seen_list
+from aprsd.packets.filter import PacketFilter
+from aprsd.packets.filters import dupe_filter, packet_type
from aprsd.stats import collector
from aprsd.threads import keepalive, rx
from aprsd.threads import stats as stats_thread
@@ -29,7 +31,7 @@ from aprsd.threads.aprsd import APRSDThread
# setup the global logger
# log.basicConfig(level=log.DEBUG) # level=10
-LOG = logging.getLogger("APRSD")
+LOG = logging.getLogger('APRSD')
CONF = cfg.CONF
LOGU = logger
console = Console()
@@ -37,9 +39,9 @@ console = Console()
def signal_handler(sig, frame):
threads.APRSDThreadList().stop_all()
- if "subprocess" not in str(frame):
+ if 'subprocess' not in str(frame):
LOG.info(
- "Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}".format(
+ 'Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}'.format(
datetime.datetime.now(),
),
)
@@ -48,90 +50,66 @@ def signal_handler(sig, frame):
collector.Collector().collect()
-class APRSDListenThread(rx.APRSDRXThread):
+class APRSDListenProcessThread(rx.APRSDFilterThread):
def __init__(
self,
packet_queue,
packet_filter=None,
plugin_manager=None,
- enabled_plugins=[],
+ enabled_plugins=None,
log_packets=False,
):
- super().__init__(packet_queue)
+ super().__init__('ListenProcThread', packet_queue)
self.packet_filter = packet_filter
self.plugin_manager = plugin_manager
if self.plugin_manager:
- LOG.info(f"Plugins {self.plugin_manager.get_message_plugins()}")
+ LOG.info(f'Plugins {self.plugin_manager.get_message_plugins()}')
self.log_packets = log_packets
- def process_packet(self, *args, **kwargs):
- packet = self._client.decode_packet(*args, **kwargs)
- filters = {
- packets.Packet.__name__: packets.Packet,
- packets.AckPacket.__name__: packets.AckPacket,
- packets.BeaconPacket.__name__: packets.BeaconPacket,
- packets.GPSPacket.__name__: packets.GPSPacket,
- packets.MessagePacket.__name__: packets.MessagePacket,
- packets.MicEPacket.__name__: packets.MicEPacket,
- packets.ObjectPacket.__name__: packets.ObjectPacket,
- packets.StatusPacket.__name__: packets.StatusPacket,
- packets.ThirdPartyPacket.__name__: packets.ThirdPartyPacket,
- packets.WeatherPacket.__name__: packets.WeatherPacket,
- packets.UnknownPacket.__name__: packets.UnknownPacket,
- }
+ def print_packet(self, packet):
+ if self.log_packets:
+ packet_log.log(packet)
- if self.packet_filter:
- filter_class = filters[self.packet_filter]
- if isinstance(packet, filter_class):
- if self.log_packets:
- packet_log.log(packet)
- if self.plugin_manager:
- # Don't do anything with the reply
- # This is the listen only command.
- self.plugin_manager.run(packet)
- else:
- if self.log_packets:
- packet_log.log(packet)
- if self.plugin_manager:
- # Don't do anything with the reply.
- # This is the listen only command.
- self.plugin_manager.run(packet)
-
- packet_collector.PacketCollector().rx(packet)
+ def process_packet(self, packet: type[core.Packet]):
+ if self.plugin_manager:
+ # Don't do anything with the reply.
+ # This is the listen only command.
+ self.plugin_manager.run(packet)
class ListenStatsThread(APRSDThread):
"""Log the stats from the PacketList."""
def __init__(self):
- super().__init__("PacketStatsLog")
+ super().__init__('PacketStatsLog')
self._last_total_rx = 0
+ self.period = 31
def loop(self):
- if self.loop_count % 10 == 0:
+ if self.loop_count % self.period == 0:
# log the stats every 10 seconds
stats_json = collector.Collector().collect()
- stats = stats_json["PacketList"]
- total_rx = stats["rx"]
- packet_count = len(stats["packets"])
+ stats = stats_json['PacketList']
+ total_rx = stats['rx']
+ packet_count = len(stats['packets'])
rx_delta = total_rx - self._last_total_rx
- rate = rx_delta / 10
+ rate = rx_delta / self.period
# Log summary stats
LOGU.opt(colors=True).info(
- f"RX Rate: {rate} pps "
- f"Total RX: {total_rx} "
- f"RX Last 10 secs: {rx_delta} "
- f"Packets in PacketList: {packet_count}",
+ f'RX Rate: {rate:.2f} pps '
+ f'Total RX: {total_rx} '
+ f'RX Last {self.period} secs: {rx_delta} '
+ f'Packets in PacketListStats: {packet_count}',
)
self._last_total_rx = total_rx
# Log individual type stats
- for k, v in stats["types"].items():
- thread_hex = f"fg {utils.hex_from_name(k)}"
+ for k, v in stats['types'].items():
+ thread_hex = f'fg {utils.hex_from_name(k)}'
LOGU.opt(colors=True).info(
- f"<{thread_hex}>{k:<15}{thread_hex}> "
- f"RX: {v['rx']} TX: {v['tx']}",
+ f'<{thread_hex}>{k:<15}{thread_hex}> '
+ f'RX: {v["rx"]} TX: {v["tx"]}',
)
time.sleep(1)
@@ -141,19 +119,19 @@ class ListenStatsThread(APRSDThread):
@cli.command()
@cli_helper.add_options(cli_helper.common_options)
@click.option(
- "--aprs-login",
- envvar="APRS_LOGIN",
+ '--aprs-login',
+ envvar='APRS_LOGIN',
show_envvar=True,
- help="What callsign to send the message from.",
+ help='What callsign to send the message from.',
)
@click.option(
- "--aprs-password",
- envvar="APRS_PASSWORD",
+ '--aprs-password',
+ envvar='APRS_PASSWORD',
show_envvar=True,
- help="the APRS-IS password for APRS_LOGIN",
+ help='the APRS-IS password for APRS_LOGIN',
)
@click.option(
- "--packet-filter",
+ '--packet-filter',
type=click.Choice(
[
packets.AckPacket.__name__,
@@ -170,35 +148,37 @@ class ListenStatsThread(APRSDThread):
],
case_sensitive=False,
),
- help="Filter by packet type",
-)
-@click.option(
- "--enable-plugin",
multiple=True,
- help="Enable a plugin. This is the name of the file in the plugins directory.",
+ default=[],
+ help='Filter by packet type',
)
@click.option(
- "--load-plugins",
+ '--enable-plugin',
+ multiple=True,
+ help='Enable a plugin. This is the name of the file in the plugins directory.',
+)
+@click.option(
+ '--load-plugins',
default=False,
is_flag=True,
- help="Load plugins as enabled in aprsd.conf ?",
+ help='Load plugins as enabled in aprsd.conf ?',
)
@click.argument(
- "filter",
+ 'filter',
nargs=-1,
required=True,
)
@click.option(
- "--log-packets",
+ '--log-packets',
default=False,
is_flag=True,
- help="Log incoming packets.",
+ help='Log incoming packets.',
)
@click.option(
- "--enable-packet-stats",
+ '--enable-packet-stats',
default=False,
is_flag=True,
- help="Enable packet stats periodic logging.",
+ help='Enable packet stats periodic logging.',
)
@click.pass_context
@cli_helper.process_standard_options
@@ -228,46 +208,46 @@ def listen(
if not aprs_login:
click.echo(ctx.get_help())
- click.echo("")
- ctx.fail("Must set --aprs-login or APRS_LOGIN")
+ click.echo('')
+ ctx.fail('Must set --aprs-login or APRS_LOGIN')
ctx.exit()
if not aprs_password:
click.echo(ctx.get_help())
- click.echo("")
- ctx.fail("Must set --aprs-password or APRS_PASSWORD")
+ click.echo('')
+ ctx.fail('Must set --aprs-password or APRS_PASSWORD')
ctx.exit()
# CONF.aprs_network.login = aprs_login
# config["aprs"]["password"] = aprs_password
- LOG.info(f"APRSD Listen Started version: {aprsd.__version__}")
+ LOG.info(f'APRSD Listen Started version: {aprsd.__version__}')
CONF.log_opt_values(LOG, logging.DEBUG)
collector.Collector()
# Try and load saved MsgTrack list
- LOG.debug("Loading saved MsgTrack object.")
+ LOG.debug('Loading saved MsgTrack object.')
# Initialize the client factory and create
# The correct client object ready for use
# Make sure we have 1 client transport enabled
if not client_factory.is_client_enabled():
- LOG.error("No Clients are enabled in config.")
+ LOG.error('No Clients are enabled in config.')
sys.exit(-1)
# Creates the client object
- LOG.info("Creating client connection")
+ LOG.info('Creating client connection')
aprs_client = client_factory.create()
LOG.info(aprs_client)
if not aprs_client.login_success:
# We failed to login, will just quit!
- msg = f"Login Failure: {aprs_client.login_failure}"
+ msg = f'Login Failure: {aprs_client.login_failure}'
LOG.error(msg)
print(msg)
sys.exit(-1)
- LOG.debug(f"Filter by '{filter}'")
+ LOG.debug(f"Filter messages on aprsis server by '{filter}'")
aprs_client.set_filter(filter)
keepalive_thread = keepalive.KeepAliveThread()
@@ -276,10 +256,19 @@ def listen(
# just deregister the class from the packet collector
packet_collector.PacketCollector().unregister(seen_list.SeenList)
+ # we don't want the dupe filter to run here.
+ PacketFilter().unregister(dupe_filter.DupePacketFilter)
+ if packet_filter:
+ LOG.info('Enabling packet filtering for {packet_filter}')
+ packet_type.PacketTypeFilter().set_allow_list(packet_filter)
+ PacketFilter().register(packet_type.PacketTypeFilter)
+ else:
+ LOG.info('No packet filtering enabled.')
+
pm = None
if load_plugins:
pm = plugin.PluginManager()
- LOG.info("Loading plugins")
+ LOG.info('Loading plugins')
pm.setup_plugins(load_help_plugin=False)
elif enable_plugin:
pm = plugin.PluginManager()
@@ -290,33 +279,37 @@ def listen(
else:
LOG.warning(
"Not Loading any plugins use --load-plugins to load what's "
- "defined in the config file.",
+ 'defined in the config file.',
)
if pm:
for p in pm.get_plugins():
- LOG.info("Loaded plugin %s", p.__class__.__name__)
+ LOG.info('Loaded plugin %s', p.__class__.__name__)
stats = stats_thread.APRSDStatsStoreThread()
stats.start()
- LOG.debug("Create APRSDListenThread")
- listen_thread = APRSDListenThread(
+ LOG.debug('Start APRSDRxThread')
+ rx_thread = rx.APRSDRXThread(packet_queue=threads.packet_queue)
+ rx_thread.start()
+
+ LOG.debug('Create APRSDListenProcessThread')
+ listen_thread = APRSDListenProcessThread(
packet_queue=threads.packet_queue,
packet_filter=packet_filter,
plugin_manager=pm,
enabled_plugins=enable_plugin,
log_packets=log_packets,
)
- LOG.debug("Start APRSDListenThread")
+ LOG.debug('Start APRSDListenProcessThread')
listen_thread.start()
if enable_packet_stats:
listen_stats = ListenStatsThread()
listen_stats.start()
keepalive_thread.start()
- LOG.debug("keepalive Join")
+ LOG.debug('keepalive Join')
keepalive_thread.join()
- LOG.debug("listen_thread Join")
+ rx_thread.join()
listen_thread.join()
stats.join()
diff --git a/aprsd/cmds/server.py b/aprsd/cmds/server.py
index 3c812d2..f4ad975 100644
--- a/aprsd/cmds/server.py
+++ b/aprsd/cmds/server.py
@@ -147,7 +147,7 @@ def server(ctx, flush):
server_threads.register(keepalive.KeepAliveThread())
server_threads.register(stats_thread.APRSDStatsStoreThread())
server_threads.register(
- rx.APRSDPluginRXThread(
+ rx.APRSDRXThread(
packet_queue=threads.packet_queue,
),
)
diff --git a/aprsd/packets/__init__.py b/aprsd/packets/__init__.py
index 62760fb..b5d854c 100644
--- a/aprsd/packets/__init__.py
+++ b/aprsd/packets/__init__.py
@@ -15,6 +15,8 @@ from aprsd.packets.core import ( # noqa: F401
WeatherPacket,
factory,
)
+from aprsd.packets.filter import PacketFilter
+from aprsd.packets.filters.dupe_filter import DupePacketFilter
from aprsd.packets.packet_list import PacketList # noqa: F401
from aprsd.packets.seen_list import SeenList # noqa: F401
from aprsd.packets.tracker import PacketTrack # noqa: F401
@@ -26,5 +28,9 @@ collector.PacketCollector().register(SeenList)
collector.PacketCollector().register(PacketTrack)
collector.PacketCollector().register(WatchList)
+# Register all the packet filters for normal processing
+# For specific commands you can deregister these if you don't want them.
+PacketFilter().register(DupePacketFilter)
+
NULL_MESSAGE = -1
diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py
index 18a3c91..804db00 100644
--- a/aprsd/packets/core.py
+++ b/aprsd/packets/core.py
@@ -106,6 +106,8 @@ class Packet:
last_send_time: float = field(repr=False, default=0, compare=False, hash=False)
# Was the packet acked?
acked: bool = field(repr=False, default=False, compare=False, hash=False)
+ # Was the packet previously processed (for dupe checking)
+ processed: bool = field(repr=False, default=False, compare=False, hash=False)
# Do we allow this packet to be saved to send later?
allow_delay: bool = field(repr=False, default=True, compare=False, hash=False)
@@ -186,12 +188,11 @@ class Packet:
def __repr__(self) -> str:
"""Build the repr version of the packet."""
- repr = (
+ return (
f"{self.__class__.__name__}:"
f" From: {self.from_call} "
f" To: {self.to_call}"
)
- return repr
@dataclass_json
@@ -694,6 +695,8 @@ class UnknownPacket:
path: List[str] = field(default_factory=list, compare=False, hash=False)
packet_type: Optional[str] = field(default=None)
via: Optional[str] = field(default=None, compare=False, hash=False)
+ # Was the packet previously processed (for dupe checking)
+ processed: bool = field(repr=False, default=False, compare=False, hash=False)
@property
def key(self) -> str:
diff --git a/aprsd/packets/filter.py b/aprsd/packets/filter.py
new file mode 100644
index 0000000..152366b
--- /dev/null
+++ b/aprsd/packets/filter.py
@@ -0,0 +1,58 @@
+import logging
+from typing import Callable, Protocol, runtime_checkable, Union, Dict
+
+from aprsd.packets import core
+from aprsd.utils import singleton
+
+LOG = logging.getLogger("APRSD")
+
+
+@runtime_checkable
+class PacketFilterProtocol(Protocol):
+ """Protocol API for a packet filter class.
+ """
+ def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
+ """When we get a packet from the network.
+
+ Return a Packet object if the filter passes. Return None if the
+ Packet is filtered out.
+ """
+ ...
+
+
+@singleton
+class PacketFilter:
+
+ def __init__(self):
+ self.filters: Dict[str, Callable] = {}
+
+ def register(self, packet_filter: Callable) -> None:
+ if not isinstance(packet_filter, PacketFilterProtocol):
+ raise TypeError(f"class {packet_filter} is not a PacketFilterProtocol object")
+
+ if packet_filter not in self.filters:
+ self.filters[packet_filter] = packet_filter()
+
+ def unregister(self, packet_filter: Callable) -> None:
+ if not isinstance(packet_filter, PacketFilterProtocol):
+ raise TypeError(f"class {packet_filter} is not a PacketFilterProtocol object")
+ if packet_filter in self.filters:
+ del self.filters[packet_filter]
+
+ def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
+ """Run through each of the filters.
+
+ This will step through each registered filter class
+ and call filter on it.
+
+ If the filter object returns None, we are done filtering.
+ If the filter object returns the packet, we continue filtering.
+ """
+ for packet_filter in self.filters:
+ try:
+ if not self.filters[packet_filter].filter(packet):
+ LOG.debug(f"{self.filters[packet_filter].__class__.__name__} dropped {packet.__class__.__name__}:{packet.human_info}")
+ return None
+ except Exception as ex:
+ LOG.error(f"{packet_filter.__clas__.__name__} failed filtering packet {packet.__class__.__name__} : {ex}")
+ return packet
diff --git a/aprsd/packets/filters/__init__.py b/aprsd/packets/filters/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/aprsd/packets/filters/dupe_filter.py b/aprsd/packets/filters/dupe_filter.py
new file mode 100644
index 0000000..0839fcf
--- /dev/null
+++ b/aprsd/packets/filters/dupe_filter.py
@@ -0,0 +1,68 @@
+import logging
+from typing import Union
+
+from oslo_config import cfg
+
+from aprsd import packets
+from aprsd.packets import core
+
+CONF = cfg.CONF
+LOG = logging.getLogger('APRSD')
+
+
+class DupePacketFilter:
+ """This is a packet filter to detect duplicate packets.
+
+ This Uses the PacketList object to see if a packet exists
+ already. If it does exist in the PacketList, then we need to
+ check the flag on the packet to see if it's been processed before.
+ If the packet has been processed already within the allowed
+ timeframe, then it's a dupe.
+ """
+
+ def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
+ # LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}")
+ """Filter a packet out if it's already been seen and processed."""
+ if isinstance(packet, core.AckPacket):
+ # We don't need to drop AckPackets, those should be
+ # processed.
+ # Send the AckPacket to the queue for processing elsewhere.
+ return packet
+ else:
+ # Make sure we aren't re-processing the same packet
+ # For RF based APRS Clients we can get duplicate packets
+ # So we need to track them and not process the dupes.
+ pkt_list = packets.PacketList()
+ found = False
+ try:
+ # Find the packet in the list of already seen packets
+ # Based on the packet.key
+ found = pkt_list.find(packet)
+ if not packet.msgNo:
+ # If the packet doesn't have a message id
+ # then there is no reliable way to detect
+ # if it's a dupe, so we just pass it on.
+ # it shouldn't get acked either.
+ found = False
+ except KeyError:
+ found = False
+
+ if not found:
+ # We haven't seen this packet before, so we process it.
+ return packet
+
+ if not packet.processed:
+ # We haven't processed this packet through the plugins.
+ return packet
+ elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
+ # If the packet came in within N seconds of the
+ # Last time seeing the packet, then we drop it as a dupe.
+ LOG.warning(
+ f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.'
+ )
+ else:
+ LOG.warning(
+ f'Packet {packet.from_call}:{packet.msgNo} already tracked '
+ f'but older than {CONF.packet_dupe_timeout} seconds. processing.',
+ )
+ return packet
diff --git a/aprsd/packets/filters/packet_type.py b/aprsd/packets/filters/packet_type.py
new file mode 100644
index 0000000..8cdeb31
--- /dev/null
+++ b/aprsd/packets/filters/packet_type.py
@@ -0,0 +1,53 @@
+import logging
+from typing import Union
+
+from oslo_config import cfg
+
+from aprsd import packets
+from aprsd.packets import core
+from aprsd.utils import singleton
+
+CONF = cfg.CONF
+LOG = logging.getLogger('APRSD')
+
+
+@singleton
+class PacketTypeFilter:
+ """This filter is used to filter out packets that don't match a specific type.
+
+ To use this, register it with the PacketFilter class,
+ then instante it and call set_allow_list() with a list of packet types
+ you want to allow to pass the filtering. All other packets will be
+ filtered out.
+ """
+
+ filters = {
+ packets.Packet.__name__: packets.Packet,
+ packets.AckPacket.__name__: packets.AckPacket,
+ packets.BeaconPacket.__name__: packets.BeaconPacket,
+ packets.GPSPacket.__name__: packets.GPSPacket,
+ packets.MessagePacket.__name__: packets.MessagePacket,
+ packets.MicEPacket.__name__: packets.MicEPacket,
+ packets.ObjectPacket.__name__: packets.ObjectPacket,
+ packets.StatusPacket.__name__: packets.StatusPacket,
+ packets.ThirdPartyPacket.__name__: packets.ThirdPartyPacket,
+ packets.WeatherPacket.__name__: packets.WeatherPacket,
+ packets.UnknownPacket.__name__: packets.UnknownPacket,
+ }
+
+ allow_list = ()
+
+ def set_allow_list(self, filter_list):
+ tmp_list = []
+ for filter in filter_list:
+ LOG.warning(
+ f'Setting filter {filter} : {self.filters[filter]} to tmp {tmp_list}'
+ )
+ tmp_list.append(self.filters[filter])
+ self.allow_list = tuple(tmp_list)
+
+ def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
+ """Only allow packets of certain types to filter through."""
+ if self.allow_list:
+ if isinstance(packet, self.allow_list):
+ return packet
diff --git a/aprsd/threads/__init__.py b/aprsd/threads/__init__.py
index eea5d2e..7946b6d 100644
--- a/aprsd/threads/__init__.py
+++ b/aprsd/threads/__init__.py
@@ -4,9 +4,8 @@ import queue
# aprsd.threads
from .aprsd import APRSDThread, APRSDThreadList # noqa: F401
from .rx import ( # noqa: F401
- APRSDDupeRXThread,
APRSDProcessPacketThread,
APRSDRXThread,
)
-packet_queue = queue.Queue(maxsize=20)
+packet_queue = queue.Queue(maxsize=500)
diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py
index d0244f9..b995f8c 100644
--- a/aprsd/threads/rx.py
+++ b/aprsd/threads/rx.py
@@ -8,18 +8,30 @@ from oslo_config import cfg
from aprsd import packets, plugin
from aprsd.client import client_factory
-from aprsd.packets import collector
+from aprsd.packets import collector, filter
from aprsd.packets import log as packet_log
from aprsd.threads import APRSDThread, tx
-from aprsd.utils import trace
CONF = cfg.CONF
LOG = logging.getLogger('APRSD')
class APRSDRXThread(APRSDThread):
+ """Main Class to connect to an APRS Client and recieve packets.
+
+ A packet is received in the main loop and then sent to the
+ process_packet method, which sends the packet through the collector
+ to track the packet for stats, and then put into the packet queue
+ for processing in a separate thread.
+ """
+
_client = None
+ # This is the queue that packets are sent to for processing.
+ # We process packets in a separate thread to help prevent
+ # getting blocked by the APRS server trying to send us packets.
+ packet_queue = None
+
def __init__(self, packet_queue):
super().__init__('RX_PKT')
self.packet_queue = packet_queue
@@ -52,7 +64,7 @@ class APRSDRXThread(APRSDThread):
# kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56
self._client.consumer(
- self._process_packet,
+ self.process_packet,
raw=False,
blocking=False,
)
@@ -73,37 +85,7 @@ class APRSDRXThread(APRSDThread):
time.sleep(5)
return True
- def _process_packet(self, *args, **kwargs):
- """Intermediate callback so we can update the keepalive time."""
- # Now call the 'real' packet processing for a RX'x packet
- self.process_packet(*args, **kwargs)
-
- @abc.abstractmethod
def process_packet(self, *args, **kwargs):
- pass
-
-
-class APRSDDupeRXThread(APRSDRXThread):
- """Process received packets.
-
- This is the main APRSD Server command thread that
- receives packets and makes sure the packet
- hasn't been seen previously before sending it on
- to be processed.
- """
-
- @trace.trace
- def process_packet(self, *args, **kwargs):
- """This handles the processing of an inbound packet.
-
- When a packet is received by the connected client object,
- it sends the raw packet into this function. This function then
- decodes the packet via the client, and then processes the packet.
- Ack Packets are sent to the PluginProcessPacketThread for processing.
- All other packets have to be checked as a dupe, and then only after
- we haven't seen this packet before, do we send it to the
- PluginProcessPacketThread for processing.
- """
packet = self._client.decode_packet(*args, **kwargs)
if not packet:
LOG.error(
@@ -154,15 +136,44 @@ class APRSDDupeRXThread(APRSDRXThread):
self.packet_queue.put(packet)
-class APRSDPluginRXThread(APRSDDupeRXThread):
- """ "Process received packets.
+class APRSDFilterThread(APRSDThread):
+ def __init__(self, thread_name, packet_queue):
+ super().__init__(thread_name)
+ self.packet_queue = packet_queue
- For backwards compatibility, we keep the APRSDPluginRXThread.
- """
+ def filter_packet(self, packet):
+ # Do any packet filtering prior to processing
+ if not filter.PacketFilter().filter(packet):
+ return None
+ return packet
+
+ def print_packet(self, packet):
+ """Allow a child of this class to override this.
+
+ This is helpful if for whatever reason the child class
+ doesn't want to log packets.
+
+ """
+ packet_log.log(packet)
+
+ def loop(self):
+ try:
+ packet = self.packet_queue.get(timeout=1)
+ self.print_packet(packet)
+ if packet:
+ if self.filter_packet(packet):
+ self.process_packet(packet)
+ except queue.Empty:
+ pass
+ return True
-class APRSDProcessPacketThread(APRSDThread):
- """Base class for processing received packets.
+class APRSDProcessPacketThread(APRSDFilterThread):
+ """Base class for processing received packets after they have been filtered.
+
+ Packets are received from the client, then filtered for dupes,
+ then sent to the packet queue. This thread pulls packets from
+ the packet queue for processing.
This is the base class for processing packets coming from
the consumer. This base class handles sending ack packets and
@@ -170,8 +181,7 @@ class APRSDProcessPacketThread(APRSDThread):
for processing."""
def __init__(self, packet_queue):
- self.packet_queue = packet_queue
- super().__init__('ProcessPKT')
+ super().__init__('ProcessPKT', packet_queue=packet_queue)
if not CONF.enable_sending_ack_packets:
LOG.warning(
'Sending ack packets is disabled, messages will not be acknowledged.',
@@ -195,18 +205,14 @@ class APRSDProcessPacketThread(APRSDThread):
LOG.debug(f'Got REJECT for message {ack_num}')
collector.PacketCollector().rx(packet)
- def loop(self):
- try:
- packet = self.packet_queue.get(timeout=1)
- if packet:
- self.process_packet(packet)
- except queue.Empty:
- pass
- return True
-
def process_packet(self, packet):
"""Process a packet received from aprs-is server."""
LOG.debug(f'ProcessPKT-LOOP {self.loop_count}')
+
+ # set this now as we are going to process it.
+ # This is used during dupe checking, so set it early
+ packet.processed = True
+
our_call = CONF.callsign.lower()
from_call = packet.from_call
diff --git a/aprsd/utils/objectstore.py b/aprsd/utils/objectstore.py
index b04f6e6..be494f0 100644
--- a/aprsd/utils/objectstore.py
+++ b/aprsd/utils/objectstore.py
@@ -6,9 +6,8 @@ import threading
from oslo_config import cfg
-
CONF = cfg.CONF
-LOG = logging.getLogger("APRSD")
+LOG = logging.getLogger('APRSD')
class ObjectStoreMixin:
@@ -63,7 +62,7 @@ class ObjectStoreMixin:
def _save_filename(self):
save_location = CONF.save_location
- return "{}/{}.p".format(
+ return '{}/{}.p'.format(
save_location,
self.__class__.__name__.lower(),
)
@@ -75,13 +74,13 @@ class ObjectStoreMixin:
self._init_store()
save_filename = self._save_filename()
if len(self) > 0:
- LOG.info(
- f"{self.__class__.__name__}::Saving"
- f" {len(self)} entries to disk at "
- f"{save_filename}",
+ LOG.debug(
+ f'{self.__class__.__name__}::Saving'
+ f' {len(self)} entries to disk at '
+ f'{save_filename}',
)
with self.lock:
- with open(save_filename, "wb+") as fp:
+ with open(save_filename, 'wb+') as fp:
pickle.dump(self.data, fp)
else:
LOG.debug(
@@ -97,21 +96,21 @@ class ObjectStoreMixin:
return
if os.path.exists(self._save_filename()):
try:
- with open(self._save_filename(), "rb") as fp:
+ with open(self._save_filename(), 'rb') as fp:
raw = pickle.load(fp)
if raw:
self.data = raw
LOG.debug(
- f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.",
+ f'{self.__class__.__name__}::Loaded {len(self)} entries from disk.',
)
else:
- LOG.debug(f"{self.__class__.__name__}::No data to load.")
+ LOG.debug(f'{self.__class__.__name__}::No data to load.')
except (pickle.UnpicklingError, Exception) as ex:
- LOG.error(f"Failed to UnPickle {self._save_filename()}")
+ LOG.error(f'Failed to UnPickle {self._save_filename()}')
LOG.error(ex)
self.data = {}
else:
- LOG.debug(f"{self.__class__.__name__}::No save file found.")
+ LOG.debug(f'{self.__class__.__name__}::No save file found.')
def flush(self):
"""Nuke the old pickle file that stored the old results from last aprsd run."""