mirror of
https://github.com/craigerl/aprsd.git
synced 2025-06-25 21:45:25 -04:00
Compare commits
8 Commits
101904ca77
...
4fd64a3c25
Author | SHA1 | Date | |
---|---|---|---|
4fd64a3c25 | |||
361663e7d2 | |||
fd517b3218 | |||
e9e7e6b59f | |||
52dac7e0a0 | |||
b6da0ebb0d | |||
d82a81a2c3 | |||
6cd7e99713 |
6
.gitignore
vendored
6
.gitignore
vendored
@ -60,3 +60,9 @@ AUTHORS
|
|||||||
Makefile.venv
|
Makefile.venv
|
||||||
# Copilot
|
# Copilot
|
||||||
.DS_Store
|
.DS_Store
|
||||||
|
|
||||||
|
.python-version
|
||||||
|
.fleet
|
||||||
|
.vscode
|
||||||
|
.envrc
|
||||||
|
.doit.db
|
||||||
|
@ -18,12 +18,13 @@ class KISS3Client:
|
|||||||
|
|
||||||
# date for last time we heard from the server
|
# date for last time we heard from the server
|
||||||
aprsd_keepalive = datetime.datetime.now()
|
aprsd_keepalive = datetime.datetime.now()
|
||||||
|
_connected = False
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.setup()
|
self.setup()
|
||||||
|
|
||||||
def is_alive(self):
|
def is_alive(self):
|
||||||
return True
|
return self._connected
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
# we can be TCP kiss or Serial kiss
|
# we can be TCP kiss or Serial kiss
|
||||||
@ -56,17 +57,33 @@ class KISS3Client:
|
|||||||
self.path = CONF.kiss_tcp.path
|
self.path = CONF.kiss_tcp.path
|
||||||
|
|
||||||
LOG.debug('Starting KISS interface connection')
|
LOG.debug('Starting KISS interface connection')
|
||||||
|
try:
|
||||||
self.kiss.start()
|
self.kiss.start()
|
||||||
|
if self.kiss.protocol.transport.is_closing():
|
||||||
|
LOG.warning('KISS transport is closing, not setting consumer callback')
|
||||||
|
self._connected = False
|
||||||
|
else:
|
||||||
|
self._connected = True
|
||||||
|
except Exception:
|
||||||
|
LOG.error('Failed to start KISS interface.')
|
||||||
|
self._connected = False
|
||||||
|
|
||||||
@trace.trace
|
@trace.trace
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
if not self._connected:
|
||||||
|
# do nothing since we aren't connected
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.kiss.stop()
|
self.kiss.stop()
|
||||||
self.kiss.loop.call_soon_threadsafe(
|
self.kiss.loop.call_soon_threadsafe(
|
||||||
self.kiss.protocol.transport.close,
|
self.kiss.protocol.transport.close,
|
||||||
)
|
)
|
||||||
except Exception as ex:
|
except Exception:
|
||||||
LOG.exception(ex)
|
LOG.error('Failed to stop KISS interface.')
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.stop()
|
||||||
|
|
||||||
def set_filter(self, filter):
|
def set_filter(self, filter):
|
||||||
# This does nothing right now.
|
# This does nothing right now.
|
||||||
@ -86,8 +103,14 @@ class KISS3Client:
|
|||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
|
|
||||||
def consumer(self, callback):
|
def consumer(self, callback):
|
||||||
|
if not self._connected:
|
||||||
|
raise Exception('KISS transport is not connected')
|
||||||
|
|
||||||
self._parse_callback = callback
|
self._parse_callback = callback
|
||||||
self.kiss.read(callback=self.parse_frame, min_frames=None)
|
if not self.kiss.protocol.transport.is_closing():
|
||||||
|
self.kiss.read(callback=self.parse_frame, min_frames=1)
|
||||||
|
else:
|
||||||
|
self._connected = False
|
||||||
|
|
||||||
def send(self, packet):
|
def send(self, packet):
|
||||||
"""Send an APRS Message object."""
|
"""Send an APRS Message object."""
|
||||||
|
@ -140,3 +140,4 @@ class KISSClient(base.APRSClient):
|
|||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.error(f'Consumer failed {ex}')
|
LOG.error(f'Consumer failed {ex}')
|
||||||
LOG.error(ex)
|
LOG.error(ex)
|
||||||
|
raise ex
|
||||||
|
@ -20,8 +20,10 @@ from aprsd import cli_helper, packets, plugin, threads, utils
|
|||||||
from aprsd.client import client_factory
|
from aprsd.client import client_factory
|
||||||
from aprsd.main import cli
|
from aprsd.main import cli
|
||||||
from aprsd.packets import collector as packet_collector
|
from aprsd.packets import collector as packet_collector
|
||||||
|
from aprsd.packets import core, seen_list
|
||||||
from aprsd.packets import log as packet_log
|
from aprsd.packets import log as packet_log
|
||||||
from aprsd.packets import seen_list
|
from aprsd.packets.filter import PacketFilter
|
||||||
|
from aprsd.packets.filters import dupe_filter, packet_type
|
||||||
from aprsd.stats import collector
|
from aprsd.stats import collector
|
||||||
from aprsd.threads import keepalive, rx
|
from aprsd.threads import keepalive, rx
|
||||||
from aprsd.threads import stats as stats_thread
|
from aprsd.threads import stats as stats_thread
|
||||||
@ -29,7 +31,7 @@ from aprsd.threads.aprsd import APRSDThread
|
|||||||
|
|
||||||
# setup the global logger
|
# setup the global logger
|
||||||
# log.basicConfig(level=log.DEBUG) # level=10
|
# log.basicConfig(level=log.DEBUG) # level=10
|
||||||
LOG = logging.getLogger("APRSD")
|
LOG = logging.getLogger('APRSD')
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
LOGU = logger
|
LOGU = logger
|
||||||
console = Console()
|
console = Console()
|
||||||
@ -37,9 +39,9 @@ console = Console()
|
|||||||
|
|
||||||
def signal_handler(sig, frame):
|
def signal_handler(sig, frame):
|
||||||
threads.APRSDThreadList().stop_all()
|
threads.APRSDThreadList().stop_all()
|
||||||
if "subprocess" not in str(frame):
|
if 'subprocess' not in str(frame):
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}".format(
|
'Ctrl+C, Sending all threads exit! Can take up to 10 seconds {}'.format(
|
||||||
datetime.datetime.now(),
|
datetime.datetime.now(),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
@ -48,90 +50,66 @@ def signal_handler(sig, frame):
|
|||||||
collector.Collector().collect()
|
collector.Collector().collect()
|
||||||
|
|
||||||
|
|
||||||
class APRSDListenThread(rx.APRSDRXThread):
|
class APRSDListenProcessThread(rx.APRSDFilterThread):
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
packet_queue,
|
packet_queue,
|
||||||
packet_filter=None,
|
packet_filter=None,
|
||||||
plugin_manager=None,
|
plugin_manager=None,
|
||||||
enabled_plugins=[],
|
enabled_plugins=None,
|
||||||
log_packets=False,
|
log_packets=False,
|
||||||
):
|
):
|
||||||
super().__init__(packet_queue)
|
super().__init__('ListenProcThread', packet_queue)
|
||||||
self.packet_filter = packet_filter
|
self.packet_filter = packet_filter
|
||||||
self.plugin_manager = plugin_manager
|
self.plugin_manager = plugin_manager
|
||||||
if self.plugin_manager:
|
if self.plugin_manager:
|
||||||
LOG.info(f"Plugins {self.plugin_manager.get_message_plugins()}")
|
LOG.info(f'Plugins {self.plugin_manager.get_message_plugins()}')
|
||||||
self.log_packets = log_packets
|
self.log_packets = log_packets
|
||||||
|
|
||||||
def process_packet(self, *args, **kwargs):
|
def print_packet(self, packet):
|
||||||
packet = self._client.decode_packet(*args, **kwargs)
|
if self.log_packets:
|
||||||
filters = {
|
packet_log.log(packet)
|
||||||
packets.Packet.__name__: packets.Packet,
|
|
||||||
packets.AckPacket.__name__: packets.AckPacket,
|
|
||||||
packets.BeaconPacket.__name__: packets.BeaconPacket,
|
|
||||||
packets.GPSPacket.__name__: packets.GPSPacket,
|
|
||||||
packets.MessagePacket.__name__: packets.MessagePacket,
|
|
||||||
packets.MicEPacket.__name__: packets.MicEPacket,
|
|
||||||
packets.ObjectPacket.__name__: packets.ObjectPacket,
|
|
||||||
packets.StatusPacket.__name__: packets.StatusPacket,
|
|
||||||
packets.ThirdPartyPacket.__name__: packets.ThirdPartyPacket,
|
|
||||||
packets.WeatherPacket.__name__: packets.WeatherPacket,
|
|
||||||
packets.UnknownPacket.__name__: packets.UnknownPacket,
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.packet_filter:
|
def process_packet(self, packet: type[core.Packet]):
|
||||||
filter_class = filters[self.packet_filter]
|
|
||||||
if isinstance(packet, filter_class):
|
|
||||||
if self.log_packets:
|
|
||||||
packet_log.log(packet)
|
|
||||||
if self.plugin_manager:
|
|
||||||
# Don't do anything with the reply
|
|
||||||
# This is the listen only command.
|
|
||||||
self.plugin_manager.run(packet)
|
|
||||||
else:
|
|
||||||
if self.log_packets:
|
|
||||||
packet_log.log(packet)
|
|
||||||
if self.plugin_manager:
|
if self.plugin_manager:
|
||||||
# Don't do anything with the reply.
|
# Don't do anything with the reply.
|
||||||
# This is the listen only command.
|
# This is the listen only command.
|
||||||
self.plugin_manager.run(packet)
|
self.plugin_manager.run(packet)
|
||||||
|
|
||||||
packet_collector.PacketCollector().rx(packet)
|
|
||||||
|
|
||||||
|
|
||||||
class ListenStatsThread(APRSDThread):
|
class ListenStatsThread(APRSDThread):
|
||||||
"""Log the stats from the PacketList."""
|
"""Log the stats from the PacketList."""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__("PacketStatsLog")
|
super().__init__('PacketStatsLog')
|
||||||
self._last_total_rx = 0
|
self._last_total_rx = 0
|
||||||
|
self.period = 31
|
||||||
|
|
||||||
def loop(self):
|
def loop(self):
|
||||||
if self.loop_count % 10 == 0:
|
if self.loop_count % self.period == 0:
|
||||||
# log the stats every 10 seconds
|
# log the stats every 10 seconds
|
||||||
stats_json = collector.Collector().collect()
|
stats_json = collector.Collector().collect()
|
||||||
stats = stats_json["PacketList"]
|
stats = stats_json['PacketList']
|
||||||
total_rx = stats["rx"]
|
total_rx = stats['rx']
|
||||||
packet_count = len(stats["packets"])
|
packet_count = len(stats['packets'])
|
||||||
rx_delta = total_rx - self._last_total_rx
|
rx_delta = total_rx - self._last_total_rx
|
||||||
rate = rx_delta / 10
|
rate = rx_delta / self.period
|
||||||
|
|
||||||
# Log summary stats
|
# Log summary stats
|
||||||
LOGU.opt(colors=True).info(
|
LOGU.opt(colors=True).info(
|
||||||
f"<green>RX Rate: {rate} pps</green> "
|
f'<green>RX Rate: {rate:.2f} pps</green> '
|
||||||
f"<yellow>Total RX: {total_rx}</yellow> "
|
f'<yellow>Total RX: {total_rx}</yellow> '
|
||||||
f"<red>RX Last 10 secs: {rx_delta}</red> "
|
f'<red>RX Last {self.period} secs: {rx_delta}</red> '
|
||||||
f"<white>Packets in PacketList: {packet_count}</white>",
|
f'<white>Packets in PacketListStats: {packet_count}</white>',
|
||||||
)
|
)
|
||||||
self._last_total_rx = total_rx
|
self._last_total_rx = total_rx
|
||||||
|
|
||||||
# Log individual type stats
|
# Log individual type stats
|
||||||
for k, v in stats["types"].items():
|
for k, v in stats['types'].items():
|
||||||
thread_hex = f"fg {utils.hex_from_name(k)}"
|
thread_hex = f'fg {utils.hex_from_name(k)}'
|
||||||
LOGU.opt(colors=True).info(
|
LOGU.opt(colors=True).info(
|
||||||
f"<{thread_hex}>{k:<15}</{thread_hex}> "
|
f'<{thread_hex}>{k:<15}</{thread_hex}> '
|
||||||
f"<blue>RX: {v['rx']}</blue> <red>TX: {v['tx']}</red>",
|
f'<blue>RX: {v["rx"]}</blue> <red>TX: {v["tx"]}</red>',
|
||||||
)
|
)
|
||||||
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
@ -141,19 +119,19 @@ class ListenStatsThread(APRSDThread):
|
|||||||
@cli.command()
|
@cli.command()
|
||||||
@cli_helper.add_options(cli_helper.common_options)
|
@cli_helper.add_options(cli_helper.common_options)
|
||||||
@click.option(
|
@click.option(
|
||||||
"--aprs-login",
|
'--aprs-login',
|
||||||
envvar="APRS_LOGIN",
|
envvar='APRS_LOGIN',
|
||||||
show_envvar=True,
|
show_envvar=True,
|
||||||
help="What callsign to send the message from.",
|
help='What callsign to send the message from.',
|
||||||
)
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
"--aprs-password",
|
'--aprs-password',
|
||||||
envvar="APRS_PASSWORD",
|
envvar='APRS_PASSWORD',
|
||||||
show_envvar=True,
|
show_envvar=True,
|
||||||
help="the APRS-IS password for APRS_LOGIN",
|
help='the APRS-IS password for APRS_LOGIN',
|
||||||
)
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
"--packet-filter",
|
'--packet-filter',
|
||||||
type=click.Choice(
|
type=click.Choice(
|
||||||
[
|
[
|
||||||
packets.AckPacket.__name__,
|
packets.AckPacket.__name__,
|
||||||
@ -170,35 +148,37 @@ class ListenStatsThread(APRSDThread):
|
|||||||
],
|
],
|
||||||
case_sensitive=False,
|
case_sensitive=False,
|
||||||
),
|
),
|
||||||
help="Filter by packet type",
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
"--enable-plugin",
|
|
||||||
multiple=True,
|
multiple=True,
|
||||||
help="Enable a plugin. This is the name of the file in the plugins directory.",
|
default=[],
|
||||||
|
help='Filter by packet type',
|
||||||
)
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
"--load-plugins",
|
'--enable-plugin',
|
||||||
|
multiple=True,
|
||||||
|
help='Enable a plugin. This is the name of the file in the plugins directory.',
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
'--load-plugins',
|
||||||
default=False,
|
default=False,
|
||||||
is_flag=True,
|
is_flag=True,
|
||||||
help="Load plugins as enabled in aprsd.conf ?",
|
help='Load plugins as enabled in aprsd.conf ?',
|
||||||
)
|
)
|
||||||
@click.argument(
|
@click.argument(
|
||||||
"filter",
|
'filter',
|
||||||
nargs=-1,
|
nargs=-1,
|
||||||
required=True,
|
required=True,
|
||||||
)
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
"--log-packets",
|
'--log-packets',
|
||||||
default=False,
|
default=False,
|
||||||
is_flag=True,
|
is_flag=True,
|
||||||
help="Log incoming packets.",
|
help='Log incoming packets.',
|
||||||
)
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
"--enable-packet-stats",
|
'--enable-packet-stats',
|
||||||
default=False,
|
default=False,
|
||||||
is_flag=True,
|
is_flag=True,
|
||||||
help="Enable packet stats periodic logging.",
|
help='Enable packet stats periodic logging.',
|
||||||
)
|
)
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
@cli_helper.process_standard_options
|
@cli_helper.process_standard_options
|
||||||
@ -228,46 +208,46 @@ def listen(
|
|||||||
|
|
||||||
if not aprs_login:
|
if not aprs_login:
|
||||||
click.echo(ctx.get_help())
|
click.echo(ctx.get_help())
|
||||||
click.echo("")
|
click.echo('')
|
||||||
ctx.fail("Must set --aprs-login or APRS_LOGIN")
|
ctx.fail('Must set --aprs-login or APRS_LOGIN')
|
||||||
ctx.exit()
|
ctx.exit()
|
||||||
|
|
||||||
if not aprs_password:
|
if not aprs_password:
|
||||||
click.echo(ctx.get_help())
|
click.echo(ctx.get_help())
|
||||||
click.echo("")
|
click.echo('')
|
||||||
ctx.fail("Must set --aprs-password or APRS_PASSWORD")
|
ctx.fail('Must set --aprs-password or APRS_PASSWORD')
|
||||||
ctx.exit()
|
ctx.exit()
|
||||||
|
|
||||||
# CONF.aprs_network.login = aprs_login
|
# CONF.aprs_network.login = aprs_login
|
||||||
# config["aprs"]["password"] = aprs_password
|
# config["aprs"]["password"] = aprs_password
|
||||||
|
|
||||||
LOG.info(f"APRSD Listen Started version: {aprsd.__version__}")
|
LOG.info(f'APRSD Listen Started version: {aprsd.__version__}')
|
||||||
|
|
||||||
CONF.log_opt_values(LOG, logging.DEBUG)
|
CONF.log_opt_values(LOG, logging.DEBUG)
|
||||||
collector.Collector()
|
collector.Collector()
|
||||||
|
|
||||||
# Try and load saved MsgTrack list
|
# Try and load saved MsgTrack list
|
||||||
LOG.debug("Loading saved MsgTrack object.")
|
LOG.debug('Loading saved MsgTrack object.')
|
||||||
|
|
||||||
# Initialize the client factory and create
|
# Initialize the client factory and create
|
||||||
# The correct client object ready for use
|
# The correct client object ready for use
|
||||||
# Make sure we have 1 client transport enabled
|
# Make sure we have 1 client transport enabled
|
||||||
if not client_factory.is_client_enabled():
|
if not client_factory.is_client_enabled():
|
||||||
LOG.error("No Clients are enabled in config.")
|
LOG.error('No Clients are enabled in config.')
|
||||||
sys.exit(-1)
|
sys.exit(-1)
|
||||||
|
|
||||||
# Creates the client object
|
# Creates the client object
|
||||||
LOG.info("Creating client connection")
|
LOG.info('Creating client connection')
|
||||||
aprs_client = client_factory.create()
|
aprs_client = client_factory.create()
|
||||||
LOG.info(aprs_client)
|
LOG.info(aprs_client)
|
||||||
if not aprs_client.login_success:
|
if not aprs_client.login_success:
|
||||||
# We failed to login, will just quit!
|
# We failed to login, will just quit!
|
||||||
msg = f"Login Failure: {aprs_client.login_failure}"
|
msg = f'Login Failure: {aprs_client.login_failure}'
|
||||||
LOG.error(msg)
|
LOG.error(msg)
|
||||||
print(msg)
|
print(msg)
|
||||||
sys.exit(-1)
|
sys.exit(-1)
|
||||||
|
|
||||||
LOG.debug(f"Filter by '{filter}'")
|
LOG.debug(f"Filter messages on aprsis server by '{filter}'")
|
||||||
aprs_client.set_filter(filter)
|
aprs_client.set_filter(filter)
|
||||||
|
|
||||||
keepalive_thread = keepalive.KeepAliveThread()
|
keepalive_thread = keepalive.KeepAliveThread()
|
||||||
@ -276,10 +256,19 @@ def listen(
|
|||||||
# just deregister the class from the packet collector
|
# just deregister the class from the packet collector
|
||||||
packet_collector.PacketCollector().unregister(seen_list.SeenList)
|
packet_collector.PacketCollector().unregister(seen_list.SeenList)
|
||||||
|
|
||||||
|
# we don't want the dupe filter to run here.
|
||||||
|
PacketFilter().unregister(dupe_filter.DupePacketFilter)
|
||||||
|
if packet_filter:
|
||||||
|
LOG.info('Enabling packet filtering for {packet_filter}')
|
||||||
|
packet_type.PacketTypeFilter().set_allow_list(packet_filter)
|
||||||
|
PacketFilter().register(packet_type.PacketTypeFilter)
|
||||||
|
else:
|
||||||
|
LOG.info('No packet filtering enabled.')
|
||||||
|
|
||||||
pm = None
|
pm = None
|
||||||
if load_plugins:
|
if load_plugins:
|
||||||
pm = plugin.PluginManager()
|
pm = plugin.PluginManager()
|
||||||
LOG.info("Loading plugins")
|
LOG.info('Loading plugins')
|
||||||
pm.setup_plugins(load_help_plugin=False)
|
pm.setup_plugins(load_help_plugin=False)
|
||||||
elif enable_plugin:
|
elif enable_plugin:
|
||||||
pm = plugin.PluginManager()
|
pm = plugin.PluginManager()
|
||||||
@ -290,33 +279,37 @@ def listen(
|
|||||||
else:
|
else:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
"Not Loading any plugins use --load-plugins to load what's "
|
"Not Loading any plugins use --load-plugins to load what's "
|
||||||
"defined in the config file.",
|
'defined in the config file.',
|
||||||
)
|
)
|
||||||
|
|
||||||
if pm:
|
if pm:
|
||||||
for p in pm.get_plugins():
|
for p in pm.get_plugins():
|
||||||
LOG.info("Loaded plugin %s", p.__class__.__name__)
|
LOG.info('Loaded plugin %s', p.__class__.__name__)
|
||||||
|
|
||||||
stats = stats_thread.APRSDStatsStoreThread()
|
stats = stats_thread.APRSDStatsStoreThread()
|
||||||
stats.start()
|
stats.start()
|
||||||
|
|
||||||
LOG.debug("Create APRSDListenThread")
|
LOG.debug('Start APRSDRxThread')
|
||||||
listen_thread = APRSDListenThread(
|
rx_thread = rx.APRSDRXThread(packet_queue=threads.packet_queue)
|
||||||
|
rx_thread.start()
|
||||||
|
|
||||||
|
LOG.debug('Create APRSDListenProcessThread')
|
||||||
|
listen_thread = APRSDListenProcessThread(
|
||||||
packet_queue=threads.packet_queue,
|
packet_queue=threads.packet_queue,
|
||||||
packet_filter=packet_filter,
|
packet_filter=packet_filter,
|
||||||
plugin_manager=pm,
|
plugin_manager=pm,
|
||||||
enabled_plugins=enable_plugin,
|
enabled_plugins=enable_plugin,
|
||||||
log_packets=log_packets,
|
log_packets=log_packets,
|
||||||
)
|
)
|
||||||
LOG.debug("Start APRSDListenThread")
|
LOG.debug('Start APRSDListenProcessThread')
|
||||||
listen_thread.start()
|
listen_thread.start()
|
||||||
if enable_packet_stats:
|
if enable_packet_stats:
|
||||||
listen_stats = ListenStatsThread()
|
listen_stats = ListenStatsThread()
|
||||||
listen_stats.start()
|
listen_stats.start()
|
||||||
|
|
||||||
keepalive_thread.start()
|
keepalive_thread.start()
|
||||||
LOG.debug("keepalive Join")
|
LOG.debug('keepalive Join')
|
||||||
keepalive_thread.join()
|
keepalive_thread.join()
|
||||||
LOG.debug("listen_thread Join")
|
rx_thread.join()
|
||||||
listen_thread.join()
|
listen_thread.join()
|
||||||
stats.join()
|
stats.join()
|
||||||
|
@ -147,7 +147,7 @@ def server(ctx, flush):
|
|||||||
server_threads.register(keepalive.KeepAliveThread())
|
server_threads.register(keepalive.KeepAliveThread())
|
||||||
server_threads.register(stats_thread.APRSDStatsStoreThread())
|
server_threads.register(stats_thread.APRSDStatsStoreThread())
|
||||||
server_threads.register(
|
server_threads.register(
|
||||||
rx.APRSDPluginRXThread(
|
rx.APRSDRXThread(
|
||||||
packet_queue=threads.packet_queue,
|
packet_queue=threads.packet_queue,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
@ -15,6 +15,8 @@ from aprsd.packets.core import ( # noqa: F401
|
|||||||
WeatherPacket,
|
WeatherPacket,
|
||||||
factory,
|
factory,
|
||||||
)
|
)
|
||||||
|
from aprsd.packets.filter import PacketFilter
|
||||||
|
from aprsd.packets.filters.dupe_filter import DupePacketFilter
|
||||||
from aprsd.packets.packet_list import PacketList # noqa: F401
|
from aprsd.packets.packet_list import PacketList # noqa: F401
|
||||||
from aprsd.packets.seen_list import SeenList # noqa: F401
|
from aprsd.packets.seen_list import SeenList # noqa: F401
|
||||||
from aprsd.packets.tracker import PacketTrack # noqa: F401
|
from aprsd.packets.tracker import PacketTrack # noqa: F401
|
||||||
@ -26,5 +28,9 @@ collector.PacketCollector().register(SeenList)
|
|||||||
collector.PacketCollector().register(PacketTrack)
|
collector.PacketCollector().register(PacketTrack)
|
||||||
collector.PacketCollector().register(WatchList)
|
collector.PacketCollector().register(WatchList)
|
||||||
|
|
||||||
|
# Register all the packet filters for normal processing
|
||||||
|
# For specific commands you can deregister these if you don't want them.
|
||||||
|
PacketFilter().register(DupePacketFilter)
|
||||||
|
|
||||||
|
|
||||||
NULL_MESSAGE = -1
|
NULL_MESSAGE = -1
|
||||||
|
@ -106,6 +106,8 @@ class Packet:
|
|||||||
last_send_time: float = field(repr=False, default=0, compare=False, hash=False)
|
last_send_time: float = field(repr=False, default=0, compare=False, hash=False)
|
||||||
# Was the packet acked?
|
# Was the packet acked?
|
||||||
acked: bool = field(repr=False, default=False, compare=False, hash=False)
|
acked: bool = field(repr=False, default=False, compare=False, hash=False)
|
||||||
|
# Was the packet previously processed (for dupe checking)
|
||||||
|
processed: bool = field(repr=False, default=False, compare=False, hash=False)
|
||||||
|
|
||||||
# Do we allow this packet to be saved to send later?
|
# Do we allow this packet to be saved to send later?
|
||||||
allow_delay: bool = field(repr=False, default=True, compare=False, hash=False)
|
allow_delay: bool = field(repr=False, default=True, compare=False, hash=False)
|
||||||
@ -186,12 +188,11 @@ class Packet:
|
|||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
"""Build the repr version of the packet."""
|
"""Build the repr version of the packet."""
|
||||||
repr = (
|
return (
|
||||||
f"{self.__class__.__name__}:"
|
f"{self.__class__.__name__}:"
|
||||||
f" From: {self.from_call} "
|
f" From: {self.from_call} "
|
||||||
f" To: {self.to_call}"
|
f" To: {self.to_call}"
|
||||||
)
|
)
|
||||||
return repr
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass_json
|
@dataclass_json
|
||||||
@ -694,6 +695,8 @@ class UnknownPacket:
|
|||||||
path: List[str] = field(default_factory=list, compare=False, hash=False)
|
path: List[str] = field(default_factory=list, compare=False, hash=False)
|
||||||
packet_type: Optional[str] = field(default=None)
|
packet_type: Optional[str] = field(default=None)
|
||||||
via: Optional[str] = field(default=None, compare=False, hash=False)
|
via: Optional[str] = field(default=None, compare=False, hash=False)
|
||||||
|
# Was the packet previously processed (for dupe checking)
|
||||||
|
processed: bool = field(repr=False, default=False, compare=False, hash=False)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def key(self) -> str:
|
def key(self) -> str:
|
||||||
|
58
aprsd/packets/filter.py
Normal file
58
aprsd/packets/filter.py
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
import logging
|
||||||
|
from typing import Callable, Protocol, runtime_checkable, Union, Dict
|
||||||
|
|
||||||
|
from aprsd.packets import core
|
||||||
|
from aprsd.utils import singleton
|
||||||
|
|
||||||
|
LOG = logging.getLogger("APRSD")
|
||||||
|
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
|
class PacketFilterProtocol(Protocol):
|
||||||
|
"""Protocol API for a packet filter class.
|
||||||
|
"""
|
||||||
|
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
|
||||||
|
"""When we get a packet from the network.
|
||||||
|
|
||||||
|
Return a Packet object if the filter passes. Return None if the
|
||||||
|
Packet is filtered out.
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
@singleton
|
||||||
|
class PacketFilter:
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.filters: Dict[str, Callable] = {}
|
||||||
|
|
||||||
|
def register(self, packet_filter: Callable) -> None:
|
||||||
|
if not isinstance(packet_filter, PacketFilterProtocol):
|
||||||
|
raise TypeError(f"class {packet_filter} is not a PacketFilterProtocol object")
|
||||||
|
|
||||||
|
if packet_filter not in self.filters:
|
||||||
|
self.filters[packet_filter] = packet_filter()
|
||||||
|
|
||||||
|
def unregister(self, packet_filter: Callable) -> None:
|
||||||
|
if not isinstance(packet_filter, PacketFilterProtocol):
|
||||||
|
raise TypeError(f"class {packet_filter} is not a PacketFilterProtocol object")
|
||||||
|
if packet_filter in self.filters:
|
||||||
|
del self.filters[packet_filter]
|
||||||
|
|
||||||
|
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
|
||||||
|
"""Run through each of the filters.
|
||||||
|
|
||||||
|
This will step through each registered filter class
|
||||||
|
and call filter on it.
|
||||||
|
|
||||||
|
If the filter object returns None, we are done filtering.
|
||||||
|
If the filter object returns the packet, we continue filtering.
|
||||||
|
"""
|
||||||
|
for packet_filter in self.filters:
|
||||||
|
try:
|
||||||
|
if not self.filters[packet_filter].filter(packet):
|
||||||
|
LOG.debug(f"{self.filters[packet_filter].__class__.__name__} dropped {packet.__class__.__name__}:{packet.human_info}")
|
||||||
|
return None
|
||||||
|
except Exception as ex:
|
||||||
|
LOG.error(f"{packet_filter.__clas__.__name__} failed filtering packet {packet.__class__.__name__} : {ex}")
|
||||||
|
return packet
|
0
aprsd/packets/filters/__init__.py
Normal file
0
aprsd/packets/filters/__init__.py
Normal file
68
aprsd/packets/filters/dupe_filter.py
Normal file
68
aprsd/packets/filters/dupe_filter.py
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
import logging
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
from aprsd import packets
|
||||||
|
from aprsd.packets import core
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
LOG = logging.getLogger('APRSD')
|
||||||
|
|
||||||
|
|
||||||
|
class DupePacketFilter:
|
||||||
|
"""This is a packet filter to detect duplicate packets.
|
||||||
|
|
||||||
|
This Uses the PacketList object to see if a packet exists
|
||||||
|
already. If it does exist in the PacketList, then we need to
|
||||||
|
check the flag on the packet to see if it's been processed before.
|
||||||
|
If the packet has been processed already within the allowed
|
||||||
|
timeframe, then it's a dupe.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
|
||||||
|
# LOG.debug(f"{self.__class__.__name__}.filter called for packet {packet}")
|
||||||
|
"""Filter a packet out if it's already been seen and processed."""
|
||||||
|
if isinstance(packet, core.AckPacket):
|
||||||
|
# We don't need to drop AckPackets, those should be
|
||||||
|
# processed.
|
||||||
|
# Send the AckPacket to the queue for processing elsewhere.
|
||||||
|
return packet
|
||||||
|
else:
|
||||||
|
# Make sure we aren't re-processing the same packet
|
||||||
|
# For RF based APRS Clients we can get duplicate packets
|
||||||
|
# So we need to track them and not process the dupes.
|
||||||
|
pkt_list = packets.PacketList()
|
||||||
|
found = False
|
||||||
|
try:
|
||||||
|
# Find the packet in the list of already seen packets
|
||||||
|
# Based on the packet.key
|
||||||
|
found = pkt_list.find(packet)
|
||||||
|
if not packet.msgNo:
|
||||||
|
# If the packet doesn't have a message id
|
||||||
|
# then there is no reliable way to detect
|
||||||
|
# if it's a dupe, so we just pass it on.
|
||||||
|
# it shouldn't get acked either.
|
||||||
|
found = False
|
||||||
|
except KeyError:
|
||||||
|
found = False
|
||||||
|
|
||||||
|
if not found:
|
||||||
|
# We haven't seen this packet before, so we process it.
|
||||||
|
return packet
|
||||||
|
|
||||||
|
if not packet.processed:
|
||||||
|
# We haven't processed this packet through the plugins.
|
||||||
|
return packet
|
||||||
|
elif packet.timestamp - found.timestamp < CONF.packet_dupe_timeout:
|
||||||
|
# If the packet came in within N seconds of the
|
||||||
|
# Last time seeing the packet, then we drop it as a dupe.
|
||||||
|
LOG.warning(
|
||||||
|
f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
LOG.warning(
|
||||||
|
f'Packet {packet.from_call}:{packet.msgNo} already tracked '
|
||||||
|
f'but older than {CONF.packet_dupe_timeout} seconds. processing.',
|
||||||
|
)
|
||||||
|
return packet
|
53
aprsd/packets/filters/packet_type.py
Normal file
53
aprsd/packets/filters/packet_type.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
import logging
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
from aprsd import packets
|
||||||
|
from aprsd.packets import core
|
||||||
|
from aprsd.utils import singleton
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
LOG = logging.getLogger('APRSD')
|
||||||
|
|
||||||
|
|
||||||
|
@singleton
|
||||||
|
class PacketTypeFilter:
|
||||||
|
"""This filter is used to filter out packets that don't match a specific type.
|
||||||
|
|
||||||
|
To use this, register it with the PacketFilter class,
|
||||||
|
then instante it and call set_allow_list() with a list of packet types
|
||||||
|
you want to allow to pass the filtering. All other packets will be
|
||||||
|
filtered out.
|
||||||
|
"""
|
||||||
|
|
||||||
|
filters = {
|
||||||
|
packets.Packet.__name__: packets.Packet,
|
||||||
|
packets.AckPacket.__name__: packets.AckPacket,
|
||||||
|
packets.BeaconPacket.__name__: packets.BeaconPacket,
|
||||||
|
packets.GPSPacket.__name__: packets.GPSPacket,
|
||||||
|
packets.MessagePacket.__name__: packets.MessagePacket,
|
||||||
|
packets.MicEPacket.__name__: packets.MicEPacket,
|
||||||
|
packets.ObjectPacket.__name__: packets.ObjectPacket,
|
||||||
|
packets.StatusPacket.__name__: packets.StatusPacket,
|
||||||
|
packets.ThirdPartyPacket.__name__: packets.ThirdPartyPacket,
|
||||||
|
packets.WeatherPacket.__name__: packets.WeatherPacket,
|
||||||
|
packets.UnknownPacket.__name__: packets.UnknownPacket,
|
||||||
|
}
|
||||||
|
|
||||||
|
allow_list = ()
|
||||||
|
|
||||||
|
def set_allow_list(self, filter_list):
|
||||||
|
tmp_list = []
|
||||||
|
for filter in filter_list:
|
||||||
|
LOG.warning(
|
||||||
|
f'Setting filter {filter} : {self.filters[filter]} to tmp {tmp_list}'
|
||||||
|
)
|
||||||
|
tmp_list.append(self.filters[filter])
|
||||||
|
self.allow_list = tuple(tmp_list)
|
||||||
|
|
||||||
|
def filter(self, packet: type[core.Packet]) -> Union[type[core.Packet], None]:
|
||||||
|
"""Only allow packets of certain types to filter through."""
|
||||||
|
if self.allow_list:
|
||||||
|
if isinstance(packet, self.allow_list):
|
||||||
|
return packet
|
@ -4,9 +4,8 @@ import queue
|
|||||||
# aprsd.threads
|
# aprsd.threads
|
||||||
from .aprsd import APRSDThread, APRSDThreadList # noqa: F401
|
from .aprsd import APRSDThread, APRSDThreadList # noqa: F401
|
||||||
from .rx import ( # noqa: F401
|
from .rx import ( # noqa: F401
|
||||||
APRSDDupeRXThread,
|
|
||||||
APRSDProcessPacketThread,
|
APRSDProcessPacketThread,
|
||||||
APRSDRXThread,
|
APRSDRXThread,
|
||||||
)
|
)
|
||||||
|
|
||||||
packet_queue = queue.Queue(maxsize=20)
|
packet_queue = queue.Queue(maxsize=500)
|
||||||
|
@ -8,20 +8,32 @@ from oslo_config import cfg
|
|||||||
|
|
||||||
from aprsd import packets, plugin
|
from aprsd import packets, plugin
|
||||||
from aprsd.client import client_factory
|
from aprsd.client import client_factory
|
||||||
from aprsd.packets import collector
|
from aprsd.packets import collector, filter
|
||||||
from aprsd.packets import log as packet_log
|
from aprsd.packets import log as packet_log
|
||||||
from aprsd.threads import APRSDThread, tx
|
from aprsd.threads import APRSDThread, tx
|
||||||
from aprsd.utils import trace
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
LOG = logging.getLogger("APRSD")
|
LOG = logging.getLogger('APRSD')
|
||||||
|
|
||||||
|
|
||||||
class APRSDRXThread(APRSDThread):
|
class APRSDRXThread(APRSDThread):
|
||||||
|
"""Main Class to connect to an APRS Client and recieve packets.
|
||||||
|
|
||||||
|
A packet is received in the main loop and then sent to the
|
||||||
|
process_packet method, which sends the packet through the collector
|
||||||
|
to track the packet for stats, and then put into the packet queue
|
||||||
|
for processing in a separate thread.
|
||||||
|
"""
|
||||||
|
|
||||||
_client = None
|
_client = None
|
||||||
|
|
||||||
|
# This is the queue that packets are sent to for processing.
|
||||||
|
# We process packets in a separate thread to help prevent
|
||||||
|
# getting blocked by the APRS server trying to send us packets.
|
||||||
|
packet_queue = None
|
||||||
|
|
||||||
def __init__(self, packet_queue):
|
def __init__(self, packet_queue):
|
||||||
super().__init__("RX_PKT")
|
super().__init__('RX_PKT')
|
||||||
self.packet_queue = packet_queue
|
self.packet_queue = packet_queue
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
@ -52,7 +64,7 @@ class APRSDRXThread(APRSDThread):
|
|||||||
# kwargs. :(
|
# kwargs. :(
|
||||||
# https://github.com/rossengeorgiev/aprs-python/pull/56
|
# https://github.com/rossengeorgiev/aprs-python/pull/56
|
||||||
self._client.consumer(
|
self._client.consumer(
|
||||||
self._process_packet,
|
self.process_packet,
|
||||||
raw=False,
|
raw=False,
|
||||||
blocking=False,
|
blocking=False,
|
||||||
)
|
)
|
||||||
@ -60,7 +72,7 @@ class APRSDRXThread(APRSDThread):
|
|||||||
aprslib.exceptions.ConnectionDrop,
|
aprslib.exceptions.ConnectionDrop,
|
||||||
aprslib.exceptions.ConnectionError,
|
aprslib.exceptions.ConnectionError,
|
||||||
):
|
):
|
||||||
LOG.error("Connection dropped, reconnecting")
|
LOG.error('Connection dropped, reconnecting')
|
||||||
# Force the deletion of the client object connected to aprs
|
# Force the deletion of the client object connected to aprs
|
||||||
# This will cause a reconnect, next time client.get_client()
|
# This will cause a reconnect, next time client.get_client()
|
||||||
# is called
|
# is called
|
||||||
@ -68,45 +80,18 @@ class APRSDRXThread(APRSDThread):
|
|||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
except Exception:
|
except Exception:
|
||||||
# LOG.exception(ex)
|
# LOG.exception(ex)
|
||||||
LOG.error("Resetting connection and trying again.")
|
LOG.error('Resetting connection and trying again.')
|
||||||
self._client.reset()
|
self._client.reset()
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
# Continue to loop
|
|
||||||
time.sleep(1)
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _process_packet(self, *args, **kwargs):
|
|
||||||
"""Intermediate callback so we can update the keepalive time."""
|
|
||||||
# Now call the 'real' packet processing for a RX'x packet
|
|
||||||
self.process_packet(*args, **kwargs)
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def process_packet(self, *args, **kwargs):
|
def process_packet(self, *args, **kwargs):
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class APRSDDupeRXThread(APRSDRXThread):
|
|
||||||
"""Process received packets.
|
|
||||||
|
|
||||||
This is the main APRSD Server command thread that
|
|
||||||
receives packets and makes sure the packet
|
|
||||||
hasn't been seen previously before sending it on
|
|
||||||
to be processed.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@trace.trace
|
|
||||||
def process_packet(self, *args, **kwargs):
|
|
||||||
"""This handles the processing of an inbound packet.
|
|
||||||
|
|
||||||
When a packet is received by the connected client object,
|
|
||||||
it sends the raw packet into this function. This function then
|
|
||||||
decodes the packet via the client, and then processes the packet.
|
|
||||||
Ack Packets are sent to the PluginProcessPacketThread for processing.
|
|
||||||
All other packets have to be checked as a dupe, and then only after
|
|
||||||
we haven't seen this packet before, do we send it to the
|
|
||||||
PluginProcessPacketThread for processing.
|
|
||||||
"""
|
|
||||||
packet = self._client.decode_packet(*args, **kwargs)
|
packet = self._client.decode_packet(*args, **kwargs)
|
||||||
|
if not packet:
|
||||||
|
LOG.error(
|
||||||
|
'No packet received from decode_packet. Most likely a failure to parse'
|
||||||
|
)
|
||||||
|
return
|
||||||
packet_log.log(packet)
|
packet_log.log(packet)
|
||||||
pkt_list = packets.PacketList()
|
pkt_list = packets.PacketList()
|
||||||
|
|
||||||
@ -140,26 +125,55 @@ class APRSDDupeRXThread(APRSDRXThread):
|
|||||||
# If the packet came in within N seconds of the
|
# If the packet came in within N seconds of the
|
||||||
# Last time seeing the packet, then we drop it as a dupe.
|
# Last time seeing the packet, then we drop it as a dupe.
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
f"Packet {packet.from_call}:{packet.msgNo} already tracked, dropping."
|
f'Packet {packet.from_call}:{packet.msgNo} already tracked, dropping.'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
f"Packet {packet.from_call}:{packet.msgNo} already tracked "
|
f'Packet {packet.from_call}:{packet.msgNo} already tracked '
|
||||||
f"but older than {CONF.packet_dupe_timeout} seconds. processing.",
|
f'but older than {CONF.packet_dupe_timeout} seconds. processing.',
|
||||||
)
|
)
|
||||||
collector.PacketCollector().rx(packet)
|
collector.PacketCollector().rx(packet)
|
||||||
self.packet_queue.put(packet)
|
self.packet_queue.put(packet)
|
||||||
|
|
||||||
|
|
||||||
class APRSDPluginRXThread(APRSDDupeRXThread):
|
class APRSDFilterThread(APRSDThread):
|
||||||
""" "Process received packets.
|
def __init__(self, thread_name, packet_queue):
|
||||||
|
super().__init__(thread_name)
|
||||||
|
self.packet_queue = packet_queue
|
||||||
|
|
||||||
|
def filter_packet(self, packet):
|
||||||
|
# Do any packet filtering prior to processing
|
||||||
|
if not filter.PacketFilter().filter(packet):
|
||||||
|
return None
|
||||||
|
return packet
|
||||||
|
|
||||||
|
def print_packet(self, packet):
|
||||||
|
"""Allow a child of this class to override this.
|
||||||
|
|
||||||
|
This is helpful if for whatever reason the child class
|
||||||
|
doesn't want to log packets.
|
||||||
|
|
||||||
For backwards compatibility, we keep the APRSDPluginRXThread.
|
|
||||||
"""
|
"""
|
||||||
|
packet_log.log(packet)
|
||||||
|
|
||||||
|
def loop(self):
|
||||||
|
try:
|
||||||
|
packet = self.packet_queue.get(timeout=1)
|
||||||
|
self.print_packet(packet)
|
||||||
|
if packet:
|
||||||
|
if self.filter_packet(packet):
|
||||||
|
self.process_packet(packet)
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class APRSDProcessPacketThread(APRSDThread):
|
class APRSDProcessPacketThread(APRSDFilterThread):
|
||||||
"""Base class for processing received packets.
|
"""Base class for processing received packets after they have been filtered.
|
||||||
|
|
||||||
|
Packets are received from the client, then filtered for dupes,
|
||||||
|
then sent to the packet queue. This thread pulls packets from
|
||||||
|
the packet queue for processing.
|
||||||
|
|
||||||
This is the base class for processing packets coming from
|
This is the base class for processing packets coming from
|
||||||
the consumer. This base class handles sending ack packets and
|
the consumer. This base class handles sending ack packets and
|
||||||
@ -167,44 +181,38 @@ class APRSDProcessPacketThread(APRSDThread):
|
|||||||
for processing."""
|
for processing."""
|
||||||
|
|
||||||
def __init__(self, packet_queue):
|
def __init__(self, packet_queue):
|
||||||
self.packet_queue = packet_queue
|
super().__init__('ProcessPKT', packet_queue=packet_queue)
|
||||||
super().__init__("ProcessPKT")
|
|
||||||
if not CONF.enable_sending_ack_packets:
|
if not CONF.enable_sending_ack_packets:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
"Sending ack packets is disabled, messages "
|
'Sending ack packets is disabled, messages will not be acknowledged.',
|
||||||
"will not be acknowledged.",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def process_ack_packet(self, packet):
|
def process_ack_packet(self, packet):
|
||||||
"""We got an ack for a message, no need to resend it."""
|
"""We got an ack for a message, no need to resend it."""
|
||||||
ack_num = packet.msgNo
|
ack_num = packet.msgNo
|
||||||
LOG.debug(f"Got ack for message {ack_num}")
|
LOG.debug(f'Got ack for message {ack_num}')
|
||||||
collector.PacketCollector().rx(packet)
|
collector.PacketCollector().rx(packet)
|
||||||
|
|
||||||
def process_piggyback_ack(self, packet):
|
def process_piggyback_ack(self, packet):
|
||||||
"""We got an ack embedded in a packet."""
|
"""We got an ack embedded in a packet."""
|
||||||
ack_num = packet.ackMsgNo
|
ack_num = packet.ackMsgNo
|
||||||
LOG.debug(f"Got PiggyBackAck for message {ack_num}")
|
LOG.debug(f'Got PiggyBackAck for message {ack_num}')
|
||||||
collector.PacketCollector().rx(packet)
|
collector.PacketCollector().rx(packet)
|
||||||
|
|
||||||
def process_reject_packet(self, packet):
|
def process_reject_packet(self, packet):
|
||||||
"""We got a reject message for a packet. Stop sending the message."""
|
"""We got a reject message for a packet. Stop sending the message."""
|
||||||
ack_num = packet.msgNo
|
ack_num = packet.msgNo
|
||||||
LOG.debug(f"Got REJECT for message {ack_num}")
|
LOG.debug(f'Got REJECT for message {ack_num}')
|
||||||
collector.PacketCollector().rx(packet)
|
collector.PacketCollector().rx(packet)
|
||||||
|
|
||||||
def loop(self):
|
|
||||||
try:
|
|
||||||
packet = self.packet_queue.get(timeout=1)
|
|
||||||
if packet:
|
|
||||||
self.process_packet(packet)
|
|
||||||
except queue.Empty:
|
|
||||||
pass
|
|
||||||
return True
|
|
||||||
|
|
||||||
def process_packet(self, packet):
|
def process_packet(self, packet):
|
||||||
"""Process a packet received from aprs-is server."""
|
"""Process a packet received from aprs-is server."""
|
||||||
LOG.debug(f"ProcessPKT-LOOP {self.loop_count}")
|
LOG.debug(f'ProcessPKT-LOOP {self.loop_count}')
|
||||||
|
|
||||||
|
# set this now as we are going to process it.
|
||||||
|
# This is used during dupe checking, so set it early
|
||||||
|
packet.processed = True
|
||||||
|
|
||||||
our_call = CONF.callsign.lower()
|
our_call = CONF.callsign.lower()
|
||||||
|
|
||||||
from_call = packet.from_call
|
from_call = packet.from_call
|
||||||
@ -227,7 +235,7 @@ class APRSDProcessPacketThread(APRSDThread):
|
|||||||
):
|
):
|
||||||
self.process_reject_packet(packet)
|
self.process_reject_packet(packet)
|
||||||
else:
|
else:
|
||||||
if hasattr(packet, "ackMsgNo") and packet.ackMsgNo:
|
if hasattr(packet, 'ackMsgNo') and packet.ackMsgNo:
|
||||||
# we got an ack embedded in this packet
|
# we got an ack embedded in this packet
|
||||||
# we need to handle the ack
|
# we need to handle the ack
|
||||||
self.process_piggyback_ack(packet)
|
self.process_piggyback_ack(packet)
|
||||||
@ -267,7 +275,7 @@ class APRSDProcessPacketThread(APRSDThread):
|
|||||||
if not for_us:
|
if not for_us:
|
||||||
LOG.info("Got a packet meant for someone else '{packet.to_call}'")
|
LOG.info("Got a packet meant for someone else '{packet.to_call}'")
|
||||||
else:
|
else:
|
||||||
LOG.info("Got a non AckPacket/MessagePacket")
|
LOG.info('Got a non AckPacket/MessagePacket')
|
||||||
|
|
||||||
|
|
||||||
class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
||||||
@ -287,7 +295,7 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
|||||||
tx.send(subreply)
|
tx.send(subreply)
|
||||||
else:
|
else:
|
||||||
wl = CONF.watch_list
|
wl = CONF.watch_list
|
||||||
to_call = wl["alert_callsign"]
|
to_call = wl['alert_callsign']
|
||||||
tx.send(
|
tx.send(
|
||||||
packets.MessagePacket(
|
packets.MessagePacket(
|
||||||
from_call=CONF.callsign,
|
from_call=CONF.callsign,
|
||||||
@ -299,7 +307,7 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
|||||||
# We have a message based object.
|
# We have a message based object.
|
||||||
tx.send(reply)
|
tx.send(reply)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.error("Plugin failed!!!")
|
LOG.error('Plugin failed!!!')
|
||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
|
|
||||||
def process_our_message_packet(self, packet):
|
def process_our_message_packet(self, packet):
|
||||||
@ -355,11 +363,11 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
|||||||
if to_call == CONF.callsign and not replied:
|
if to_call == CONF.callsign and not replied:
|
||||||
# Tailor the messages accordingly
|
# Tailor the messages accordingly
|
||||||
if CONF.load_help_plugin:
|
if CONF.load_help_plugin:
|
||||||
LOG.warning("Sending help!")
|
LOG.warning('Sending help!')
|
||||||
message_text = "Unknown command! Send 'help' message for help"
|
message_text = "Unknown command! Send 'help' message for help"
|
||||||
else:
|
else:
|
||||||
LOG.warning("Unknown command!")
|
LOG.warning('Unknown command!')
|
||||||
message_text = "Unknown command!"
|
message_text = 'Unknown command!'
|
||||||
|
|
||||||
tx.send(
|
tx.send(
|
||||||
packets.MessagePacket(
|
packets.MessagePacket(
|
||||||
@ -369,11 +377,11 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.error("Plugin failed!!!")
|
LOG.error('Plugin failed!!!')
|
||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
# Do we need to send a reply?
|
# Do we need to send a reply?
|
||||||
if to_call == CONF.callsign:
|
if to_call == CONF.callsign:
|
||||||
reply = "A Plugin failed! try again?"
|
reply = 'A Plugin failed! try again?'
|
||||||
tx.send(
|
tx.send(
|
||||||
packets.MessagePacket(
|
packets.MessagePacket(
|
||||||
from_call=CONF.callsign,
|
from_call=CONF.callsign,
|
||||||
@ -382,4 +390,4 @@ class APRSDPluginProcessPacketThread(APRSDProcessPacketThread):
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
LOG.debug("Completed process_our_message_packet")
|
LOG.debug('Completed process_our_message_packet')
|
||||||
|
@ -6,9 +6,8 @@ import threading
|
|||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
LOG = logging.getLogger("APRSD")
|
LOG = logging.getLogger('APRSD')
|
||||||
|
|
||||||
|
|
||||||
class ObjectStoreMixin:
|
class ObjectStoreMixin:
|
||||||
@ -63,7 +62,7 @@ class ObjectStoreMixin:
|
|||||||
def _save_filename(self):
|
def _save_filename(self):
|
||||||
save_location = CONF.save_location
|
save_location = CONF.save_location
|
||||||
|
|
||||||
return "{}/{}.p".format(
|
return '{}/{}.p'.format(
|
||||||
save_location,
|
save_location,
|
||||||
self.__class__.__name__.lower(),
|
self.__class__.__name__.lower(),
|
||||||
)
|
)
|
||||||
@ -75,13 +74,13 @@ class ObjectStoreMixin:
|
|||||||
self._init_store()
|
self._init_store()
|
||||||
save_filename = self._save_filename()
|
save_filename = self._save_filename()
|
||||||
if len(self) > 0:
|
if len(self) > 0:
|
||||||
LOG.info(
|
LOG.debug(
|
||||||
f"{self.__class__.__name__}::Saving"
|
f'{self.__class__.__name__}::Saving'
|
||||||
f" {len(self)} entries to disk at "
|
f' {len(self)} entries to disk at '
|
||||||
f"{save_filename}",
|
f'{save_filename}',
|
||||||
)
|
)
|
||||||
with self.lock:
|
with self.lock:
|
||||||
with open(save_filename, "wb+") as fp:
|
with open(save_filename, 'wb+') as fp:
|
||||||
pickle.dump(self.data, fp)
|
pickle.dump(self.data, fp)
|
||||||
else:
|
else:
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
@ -97,21 +96,21 @@ class ObjectStoreMixin:
|
|||||||
return
|
return
|
||||||
if os.path.exists(self._save_filename()):
|
if os.path.exists(self._save_filename()):
|
||||||
try:
|
try:
|
||||||
with open(self._save_filename(), "rb") as fp:
|
with open(self._save_filename(), 'rb') as fp:
|
||||||
raw = pickle.load(fp)
|
raw = pickle.load(fp)
|
||||||
if raw:
|
if raw:
|
||||||
self.data = raw
|
self.data = raw
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
f"{self.__class__.__name__}::Loaded {len(self)} entries from disk.",
|
f'{self.__class__.__name__}::Loaded {len(self)} entries from disk.',
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
LOG.debug(f"{self.__class__.__name__}::No data to load.")
|
LOG.debug(f'{self.__class__.__name__}::No data to load.')
|
||||||
except (pickle.UnpicklingError, Exception) as ex:
|
except (pickle.UnpicklingError, Exception) as ex:
|
||||||
LOG.error(f"Failed to UnPickle {self._save_filename()}")
|
LOG.error(f'Failed to UnPickle {self._save_filename()}')
|
||||||
LOG.error(ex)
|
LOG.error(ex)
|
||||||
self.data = {}
|
self.data = {}
|
||||||
else:
|
else:
|
||||||
LOG.debug(f"{self.__class__.__name__}::No save file found.")
|
LOG.debug(f'{self.__class__.__name__}::No save file found.')
|
||||||
|
|
||||||
def flush(self):
|
def flush(self):
|
||||||
"""Nuke the old pickle file that stored the old results from last aprsd run."""
|
"""Nuke the old pickle file that stored the old results from last aprsd run."""
|
||||||
|
Loading…
x
Reference in New Issue
Block a user