1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-08-03 05:52:26 -04:00

Refactored client classes

This patch completely refactors and simplifies how the clients
are created and used.  There is no need now to have a separate
KISSRXThread.  Since all the custom work for the KISS client is
encapsulated in the kiss client itself, the same RX thread and
callback mechanism works for both the APRSIS client and KISS Client
objects.  There is also no need to determine which transport
(aprsis vs kiss) is being used at runtime by any of the messages
objects.  The same API works for both APRSIS and KISS Client objects
This commit is contained in:
Hemna 2021-09-17 09:32:30 -04:00
parent 23e3876e7b
commit 270be947b5
8 changed files with 409 additions and 437 deletions

View File

@ -1,26 +1,30 @@
import abc
import logging import logging
import select
import time import time
import aprslib import aprslib
from aprslib import is_py3 from aprslib.exceptions import LoginError
from aprslib.exceptions import (
ConnectionDrop, ConnectionError, GenericError, LoginError, ParseError,
UnknownFormat,
)
import aprsd from aprsd import trace
from aprsd import stats from aprsd.clients import aprsis, kiss
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
TRANSPORT_APRSIS = "aprsis"
TRANSPORT_TCPKISS = "tcpkiss"
TRANSPORT_SERIALKISS = "serialkiss"
# Main must create this from the ClientFactory
# object such that it's populated with the
# Correct config
factory = None
class Client: class Client:
"""Singleton client class that constructs the aprslib connection.""" """Singleton client class that constructs the aprslib connection."""
_instance = None _instance = None
aprs_client = None _client = None
config = None config = None
connected = False connected = False
@ -40,14 +44,49 @@ class Client:
@property @property
def client(self): def client(self):
if not self.aprs_client: if not self._client:
self.aprs_client = self.setup_connection() self._client = self.setup_connection()
return self.aprs_client return self._client
def reset(self): def reset(self):
"""Call this to force a rebuild/reconnect.""" """Call this to force a rebuild/reconnect."""
del self.aprs_client del self._client
@abc.abstractmethod
def setup_connection(self):
pass
@staticmethod
@abc.abstractmethod
def is_enabled(config):
pass
@staticmethod
@abc.abstractmethod
def transport(config):
pass
@abc.abstractmethod
def decode_packet(self, *args, **kwargs):
pass
class APRSISClient(Client):
@staticmethod
def is_enabled(config):
# Defaults to True if the enabled flag is non existent
return config["aprs"].get("enabled", True)
@staticmethod
def transport(config):
return TRANSPORT_APRSIS
def decode_packet(self, *args, **kwargs):
"""APRS lib already decodes this."""
return args[0]
@trace.trace
def setup_connection(self): def setup_connection(self):
user = self.config["aprs"]["login"] user = self.config["aprs"]["login"]
password = self.config["aprs"]["password"] password = self.config["aprs"]["password"]
@ -55,10 +94,11 @@ class Client:
port = self.config["aprs"].get("port", 14580) port = self.config["aprs"].get("port", 14580)
connected = False connected = False
backoff = 1 backoff = 1
aprs_client = None
while not connected: while not connected:
try: try:
LOG.info("Creating aprslib client") LOG.info("Creating aprslib client")
aprs_client = Aprsdis(user, passwd=password, host=host, port=port) aprs_client = aprsis.Aprsdis(user, passwd=password, host=host, port=port)
# Force the logging to be the same # Force the logging to be the same
aprs_client.logger = LOG aprs_client.logger = LOG
aprs_client.connect() aprs_client.connect()
@ -77,200 +117,96 @@ class Client:
return aprs_client return aprs_client
class Aprsdis(aprslib.IS): class KISSClient(Client):
"""Extend the aprslib class so we can exit properly."""
# flag to tell us to stop @staticmethod
thread_stop = False def is_enabled(config):
"""Return if tcp or serial KISS is enabled."""
if "kiss" not in config:
return False
# timeout in seconds if "serial" in config["kiss"]:
select_timeout = 1 if config["kiss"]["serial"].get("enabled", False):
return True
def stop(self): if "tcp" in config["kiss"]:
self.thread_stop = True if config["kiss"]["tcp"].get("enabled", False):
LOG.info("Shutdown Aprsdis client.") return True
def send(self, msg): @staticmethod
"""Send an APRS Message object.""" def transport(config):
line = str(msg) if "serial" in config["kiss"]:
self.sendall(line) if config["kiss"]["serial"].get("enabled", False):
return TRANSPORT_SERIALKISS
def _socket_readlines(self, blocking=False): if "tcp" in config["kiss"]:
""" if config["kiss"]["tcp"].get("enabled", False):
Generator for complete lines, received from the server return TRANSPORT_TCPKISS
"""
try:
self.sock.setblocking(0)
except OSError as e:
self.logger.error(f"socket error when setblocking(0): {str(e)}")
raise aprslib.ConnectionDrop("connection dropped")
while not self.thread_stop: def decode_packet(self, *args, **kwargs):
short_buf = b"" """We get a frame, which has to be decoded."""
newline = b"\r\n" frame = kwargs["frame"]
LOG.debug(f"Got an APRS Frame '{frame}'")
# try and nuke the * from the fromcall sign.
frame.header._source._ch = False
payload = str(frame.payload.decode())
msg = f"{str(frame.header)}:{payload}"
# msg = frame.tnc2
LOG.debug(f"Decoding {msg}")
# set a select timeout, so we get a chance to exit packet = aprslib.parse(msg)
# when user hits CTRL-C return packet
readable, writable, exceptional = select.select(
[self.sock],
[],
[],
self.select_timeout,
)
if not readable:
if not blocking:
break
else:
continue
try: @trace.trace
short_buf = self.sock.recv(4096) def setup_connection(self):
ax25client = kiss.Aioax25Client(self.config)
# sock.recv returns empty if the connection drops return ax25client
if not short_buf:
if not blocking:
# We could just not be blocking, so empty is expected
continue
else:
self.logger.error("socket.recv(): returned empty")
raise aprslib.ConnectionDrop("connection dropped")
except OSError as e:
# self.logger.error("socket error on recv(): %s" % str(e))
if "Resource temporarily unavailable" in str(e):
if not blocking:
if len(self.buf) == 0:
break
self.buf += short_buf
while newline in self.buf:
line, self.buf = self.buf.split(newline, 1)
yield line
def _send_login(self):
"""
Sends login string to server
"""
login_str = "user {0} pass {1} vers github.com/craigerl/aprsd {3}{2}\r\n"
login_str = login_str.format(
self.callsign,
self.passwd,
(" filter " + self.filter) if self.filter != "" else "",
aprsd.__version__,
)
self.logger.info("Sending login information")
try:
self._sendall(login_str)
self.sock.settimeout(5)
test = self.sock.recv(len(login_str) + 100)
if is_py3:
test = test.decode("latin-1")
test = test.rstrip()
self.logger.debug("Server: %s", test)
a, b, callsign, status, e = test.split(" ", 4)
s = e.split(",")
if len(s):
server_string = s[0].replace("server ", "")
else:
server_string = e.replace("server ", "")
self.logger.info(f"Connected to {server_string}")
self.server_string = server_string
stats.APRSDStats().set_aprsis_server(server_string)
if callsign == "":
raise LoginError("Server responded with empty callsign???")
if callsign != self.callsign:
raise LoginError(f"Server: {test}")
if status != "verified," and self.passwd != "-1":
raise LoginError("Password is incorrect")
if self.passwd == "-1":
self.logger.info("Login successful (receive only)")
else:
self.logger.info("Login successful")
except LoginError as e:
self.logger.error(str(e))
self.close()
raise
except Exception as e:
self.close()
self.logger.error(f"Failed to login '{e}'")
raise LoginError("Failed to login")
def consumer(self, callback, blocking=True, immortal=False, raw=False):
"""
When a position sentence is received, it will be passed to the callback function
blocking: if true (default), runs forever, otherwise will return after one sentence
You can still exit the loop, by raising StopIteration in the callback function
immortal: When true, consumer will try to reconnect and stop propagation of Parse exceptions
if false (default), consumer will return
raw: when true, raw packet is passed to callback, otherwise the result from aprs.parse()
"""
if not self._connected:
raise ConnectionError("not connected to a server")
line = b""
while True and not self.thread_stop:
try:
for line in self._socket_readlines(blocking):
if line[0:1] != b"#":
if raw:
callback(line)
else:
callback(self._parse(line))
else:
self.logger.debug("Server: %s", line.decode("utf8"))
stats.APRSDStats().set_aprsis_keepalive()
except ParseError as exp:
self.logger.log(
11,
"%s\n Packet: %s",
exp,
exp.packet,
)
except UnknownFormat as exp:
self.logger.log(
9,
"%s\n Packet: %s",
exp,
exp.packet,
)
except LoginError as exp:
self.logger.error("%s: %s", exp.__class__.__name__, exp)
except (KeyboardInterrupt, SystemExit):
raise
except (ConnectionDrop, ConnectionError):
self.close()
if not immortal:
raise
else:
self.connect(blocking=blocking)
continue
except GenericError:
pass
except StopIteration:
break
except Exception:
self.logger.error("APRS Packet: %s", line)
raise
if not blocking:
break
def get_client(): class ClientFactory:
cl = Client() _instance = None
return cl.client
def __new__(cls, *args, **kwargs):
"""This magic turns this into a singleton."""
if cls._instance is None:
cls._instance = super().__new__(cls)
# Put any initialization here.
return cls._instance
def __init__(self, config):
self.config = config
self._builders = {}
def register(self, key, builder):
self._builders[key] = builder
def create(self, key=None):
if not key:
if APRSISClient.is_enabled(self.config):
key = TRANSPORT_APRSIS
elif KISSClient.is_enabled(self.config):
key = KISSClient.transport(self.config)
LOG.debug(f"GET client {key}")
builder = self._builders.get(key)
if not builder:
raise ValueError(key)
return builder(self.config)
def is_client_enabled(self):
"""Make sure at least one client is enabled."""
enabled = False
for key in self._builders.keys():
enabled |= self._builders[key].is_enabled(self.config)
return enabled
@staticmethod
def setup(config):
"""Create and register all possible client objects."""
global factory
factory = ClientFactory(config)
factory.register(TRANSPORT_APRSIS, APRSISClient)
factory.register(TRANSPORT_TCPKISS, KISSClient)
factory.register(TRANSPORT_SERIALKISS, KISSClient)

209
aprsd/clients/aprsis.py Normal file
View File

@ -0,0 +1,209 @@
import logging
import select
import aprslib
from aprslib import is_py3
from aprslib.exceptions import (
ConnectionDrop, ConnectionError, GenericError, LoginError, ParseError,
UnknownFormat,
)
import aprsd
from aprsd import stats
LOG = logging.getLogger("APRSD")
class Aprsdis(aprslib.IS):
"""Extend the aprslib class so we can exit properly."""
# flag to tell us to stop
thread_stop = False
# timeout in seconds
select_timeout = 1
def stop(self):
self.thread_stop = True
LOG.info("Shutdown Aprsdis client.")
def send(self, msg):
"""Send an APRS Message object."""
line = str(msg)
self.sendall(line)
def _socket_readlines(self, blocking=False):
"""
Generator for complete lines, received from the server
"""
try:
self.sock.setblocking(0)
except OSError as e:
self.logger.error(f"socket error when setblocking(0): {str(e)}")
raise aprslib.ConnectionDrop("connection dropped")
while not self.thread_stop:
short_buf = b""
newline = b"\r\n"
# set a select timeout, so we get a chance to exit
# when user hits CTRL-C
readable, writable, exceptional = select.select(
[self.sock],
[],
[],
self.select_timeout,
)
if not readable:
if not blocking:
break
else:
continue
try:
short_buf = self.sock.recv(4096)
# sock.recv returns empty if the connection drops
if not short_buf:
if not blocking:
# We could just not be blocking, so empty is expected
continue
else:
self.logger.error("socket.recv(): returned empty")
raise aprslib.ConnectionDrop("connection dropped")
except OSError as e:
# self.logger.error("socket error on recv(): %s" % str(e))
if "Resource temporarily unavailable" in str(e):
if not blocking:
if len(self.buf) == 0:
break
self.buf += short_buf
while newline in self.buf:
line, self.buf = self.buf.split(newline, 1)
yield line
def _send_login(self):
"""
Sends login string to server
"""
login_str = "user {0} pass {1} vers github.com/craigerl/aprsd {3}{2}\r\n"
login_str = login_str.format(
self.callsign,
self.passwd,
(" filter " + self.filter) if self.filter != "" else "",
aprsd.__version__,
)
self.logger.info("Sending login information")
try:
self._sendall(login_str)
self.sock.settimeout(5)
test = self.sock.recv(len(login_str) + 100)
if is_py3:
test = test.decode("latin-1")
test = test.rstrip()
self.logger.debug("Server: %s", test)
a, b, callsign, status, e = test.split(" ", 4)
s = e.split(",")
if len(s):
server_string = s[0].replace("server ", "")
else:
server_string = e.replace("server ", "")
self.logger.info(f"Connected to {server_string}")
self.server_string = server_string
stats.APRSDStats().set_aprsis_server(server_string)
if callsign == "":
raise LoginError("Server responded with empty callsign???")
if callsign != self.callsign:
raise LoginError(f"Server: {test}")
if status != "verified," and self.passwd != "-1":
raise LoginError("Password is incorrect")
if self.passwd == "-1":
self.logger.info("Login successful (receive only)")
else:
self.logger.info("Login successful")
except LoginError as e:
self.logger.error(str(e))
self.close()
raise
except Exception as e:
self.close()
self.logger.error(f"Failed to login '{e}'")
raise LoginError("Failed to login")
def consumer(self, callback, blocking=True, immortal=False, raw=False):
"""
When a position sentence is received, it will be passed to the callback function
blocking: if true (default), runs forever, otherwise will return after one sentence
You can still exit the loop, by raising StopIteration in the callback function
immortal: When true, consumer will try to reconnect and stop propagation of Parse exceptions
if false (default), consumer will return
raw: when true, raw packet is passed to callback, otherwise the result from aprs.parse()
"""
if not self._connected:
raise ConnectionError("not connected to a server")
line = b""
while True and not self.thread_stop:
try:
for line in self._socket_readlines(blocking):
if line[0:1] != b"#":
if raw:
callback(line)
else:
callback(self._parse(line))
else:
self.logger.debug("Server: %s", line.decode("utf8"))
stats.APRSDStats().set_aprsis_keepalive()
except ParseError as exp:
self.logger.log(
11,
"%s\n Packet: %s",
exp,
exp.packet,
)
except UnknownFormat as exp:
self.logger.log(
9,
"%s\n Packet: %s",
exp,
exp.packet,
)
except LoginError as exp:
self.logger.error("%s: %s", exp.__class__.__name__, exp)
except (KeyboardInterrupt, SystemExit):
raise
except (ConnectionDrop, ConnectionError):
self.close()
if not immortal:
raise
else:
self.connect(blocking=blocking)
continue
except GenericError:
pass
except StopIteration:
break
except Exception:
self.logger.error("APRS Packet: %s", line)
raise
if not blocking:
break

View File

@ -5,83 +5,20 @@ from aioax25 import interface
from aioax25 import kiss as kiss from aioax25 import kiss as kiss
from aioax25.aprs import APRSInterface from aioax25.aprs import APRSInterface
from aprsd import trace
TRANSPORT_TCPKISS = "tcpkiss"
TRANSPORT_SERIALKISS = "serialkiss"
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
class KISSClient:
_instance = None
config = None
ax25client = None
loop = None
def __new__(cls, *args, **kwargs):
"""Singleton for this class."""
if cls._instance is None:
cls._instance = super().__new__(cls)
# initialize shit here
return cls._instance
def __init__(self, config=None):
if config:
self.config = config
@staticmethod
def kiss_enabled(config):
"""Return if tcp or serial KISS is enabled."""
if "kiss" not in config:
return False
if "serial" in config["kiss"]:
if config["kiss"]["serial"].get("enabled", False):
return True
if "tcp" in config["kiss"]:
if config["kiss"]["tcp"].get("enabled", False):
return True
@staticmethod
def transport(config):
if "serial" in config["kiss"]:
if config["kiss"]["serial"].get("enabled", False):
return TRANSPORT_SERIALKISS
if "tcp" in config["kiss"]:
if config["kiss"]["tcp"].get("enabled", False):
return TRANSPORT_TCPKISS
@property
def client(self):
if not self.ax25client:
self.ax25client = self.setup_connection()
return self.ax25client
def reset(self):
"""Call this to fore a rebuild/reconnect."""
self.ax25client.stop()
del self.ax25client
@trace.trace
def setup_connection(self):
ax25client = Aioax25Client(self.config)
LOG.debug("Complete")
return ax25client
class Aioax25Client: class Aioax25Client:
def __init__(self, config): def __init__(self, config):
self.config = config self.config = config
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = asyncio.get_event_loop()
self.setup() self.setup()
def setup(self): def setup(self):
# we can be TCP kiss or Serial kiss # we can be TCP kiss or Serial kiss
self.loop = asyncio.get_event_loop()
if "serial" in self.config["kiss"] and self.config["kiss"]["serial"].get( if "serial" in self.config["kiss"] and self.config["kiss"]["serial"].get(
"enabled", "enabled",
False, False,
@ -131,10 +68,20 @@ class Aioax25Client:
self.kissdev._close() self.kissdev._close()
self.loop.stop() self.loop.stop()
def consumer(self, callback, callsign=None): def set_filter(self, filter):
if not callsign: # This does nothing right now.
callsign = self.config["ham"]["callsign"] pass
self.aprsint.bind(callback=callback, callsign="WB4BOR", ssid=12, regex=False)
def consumer(self, callback, blocking=True, immortal=False, raw=False):
callsign = self.config["kiss"]["callsign"]
call = callsign.split("-")
if len(call) > 1:
callsign = call[0]
ssid = int(call[1])
else:
ssid = 0
self.aprsint.bind(callback=callback, callsign=callsign, ssid=ssid, regex=False)
self.loop.run_forever()
def send(self, msg): def send(self, msg):
"""Send an APRS Message object.""" """Send an APRS Message object."""
@ -145,8 +92,3 @@ class Aioax25Client:
path=["WIDE1-1", "WIDE2-1"], path=["WIDE1-1", "WIDE2-1"],
oneshot=True, oneshot=True,
) )
def get_client():
cl = KISSClient()
return cl.client

View File

@ -19,7 +19,8 @@ from werkzeug.security import check_password_hash, generate_password_hash
import aprsd import aprsd
from aprsd import client from aprsd import client
from aprsd import config as aprsd_config from aprsd import config as aprsd_config
from aprsd import kissclient, messaging, packets, plugin, stats, threads, utils from aprsd import messaging, packets, plugin, stats, threads, utils
from aprsd.clients import aprsis
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
@ -136,7 +137,8 @@ class SendMessageThread(threads.APRSDThread):
while not connected: while not connected:
try: try:
LOG.info("Creating aprslib client") LOG.info("Creating aprslib client")
aprs_client = client.Aprsdis(
aprs_client = aprsis.Aprsdis(
user, user,
passwd=password, passwd=password,
host=host, host=host,
@ -312,16 +314,16 @@ class APRSDFlask(flask_classful.FlaskView):
) )
else: else:
# We might be connected to a KISS socket? # We might be connected to a KISS socket?
if kissclient.KISSClient.kiss_enabled(self.config): if client.KISSClient.kiss_enabled(self.config):
transport = kissclient.KISSClient.transport(self.config) transport = client.KISSClient.transport(self.config)
if transport == kissclient.TRANSPORT_TCPKISS: if transport == client.TRANSPORT_TCPKISS:
aprs_connection = ( aprs_connection = (
"TCPKISS://{}:{}".format( "TCPKISS://{}:{}".format(
self.config["kiss"]["tcp"]["host"], self.config["kiss"]["tcp"]["host"],
self.config["kiss"]["tcp"]["port"], self.config["kiss"]["tcp"]["port"],
) )
) )
elif transport == kissclient.TRANSPORT_SERIALKISS: elif transport == client.TRANSPORT_SERIALKISS:
aprs_connection = ( aprs_connection = (
"SerialKISS://{}@{} baud".format( "SerialKISS://{}@{} baud".format(
self.config["kiss"]["serial"]["device"], self.config["kiss"]["serial"]["device"],

View File

@ -37,7 +37,7 @@ import click_completion
# local imports here # local imports here
import aprsd import aprsd
from aprsd import ( from aprsd import (
flask, kissclient, messaging, packets, plugin, stats, threads, trace, utils, flask, messaging, packets, plugin, stats, threads, trace, utils,
) )
from aprsd import client from aprsd import client
from aprsd import config as aprsd_config from aprsd import config as aprsd_config
@ -463,23 +463,13 @@ def server(
trace.setup_tracing(["method", "api"]) trace.setup_tracing(["method", "api"])
stats.APRSDStats(config) stats.APRSDStats(config)
if config["aprs"].get("enabled", True): # Initialize the client factory and create
try: # The correct client object ready for use
cl = client.Client(config) client.ClientFactory.setup(config)
cl.client # Make sure we have 1 client transport enabled
except LoginError: if not client.factory.is_client_enabled():
sys.exit(-1) LOG.error("No Clients are enabled in config.")
sys.exit(-1)
rx_thread = threads.APRSDRXThread(
msg_queues=threads.msg_queues,
config=config,
)
rx_thread.start()
else:
LOG.info(
"APRS network connection Not Enabled in config. This is"
" for setups without internet connectivity.",
)
# Create the initial PM singleton and Register plugins # Create the initial PM singleton and Register plugins
plugin_manager = plugin.PluginManager(config) plugin_manager = plugin.PluginManager(config)
@ -497,13 +487,11 @@ def server(
packets.PacketList(config=config) packets.PacketList(config=config)
packets.WatchList(config=config) packets.WatchList(config=config)
if kissclient.KISSClient.kiss_enabled(config): rx_thread = threads.APRSDRXThread(
kcl = kissclient.KISSClient(config=config) msg_queues=threads.msg_queues,
# This initializes the client object. config=config,
kcl.client )
rx_thread.start()
kissrx_thread = threads.KISSRXThread(msg_queues=threads.msg_queues, config=config)
kissrx_thread.start()
messaging.MsgTrack().restart() messaging.MsgTrack().restart()

View File

@ -11,7 +11,7 @@ import time
from aprsd import client from aprsd import client
from aprsd import config as aprsd_config from aprsd import config as aprsd_config
from aprsd import kissclient, packets, stats, threads, trace from aprsd import packets, stats, threads
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
@ -20,10 +20,6 @@ LOG = logging.getLogger("APRSD")
# and it's ok, but don't send a usage string back # and it's ok, but don't send a usage string back
NULL_MESSAGE = -1 NULL_MESSAGE = -1
MESSAGE_TRANSPORT_TCPKISS = "tcpkiss"
MESSAGE_TRANSPORT_SERIALKISS = "serialkiss"
MESSAGE_TRANSPORT_APRSIS = "aprsis"
class MsgTrack: class MsgTrack:
"""Class to keep track of outstanding text messages. """Class to keep track of outstanding text messages.
@ -36,13 +32,6 @@ class MsgTrack:
automatically adds itself to this class. When the ack is automatically adds itself to this class. When the ack is
recieved from the radio, the message object is removed from recieved from the radio, the message object is removed from
this class. this class.
# TODO(hemna)
When aprsd is asked to quit this class should be serialized and
saved to disk/db to keep track of the state of outstanding messages.
When aprsd is started, it should try and fetch the saved state,
and reloaded to a live state.
""" """
_instance = None _instance = None
@ -241,7 +230,6 @@ class Message(metaclass=abc.ABCMeta):
fromcall, fromcall,
tocall, tocall,
msg_id=None, msg_id=None,
transport=MESSAGE_TRANSPORT_APRSIS,
): ):
self.fromcall = fromcall self.fromcall = fromcall
self.tocall = tocall self.tocall = tocall
@ -250,18 +238,11 @@ class Message(metaclass=abc.ABCMeta):
c.increment() c.increment()
msg_id = c.value msg_id = c.value
self.id = msg_id self.id = msg_id
self.transport = transport
@abc.abstractmethod @abc.abstractmethod
def send(self): def send(self):
"""Child class must declare.""" """Child class must declare."""
def get_transport(self):
if self.transport == MESSAGE_TRANSPORT_APRSIS:
return client.get_client()
elif self.transport == MESSAGE_TRANSPORT_TCPKISS:
return kissclient.get_client()
class RawMessage(Message): class RawMessage(Message):
"""Send a raw message. """Send a raw message.
@ -273,8 +254,8 @@ class RawMessage(Message):
message = None message = None
def __init__(self, message, transport=MESSAGE_TRANSPORT_APRSIS): def __init__(self, message):
super().__init__(None, None, msg_id=None, transport=transport) super().__init__(None, None, msg_id=None)
self.message = message self.message = message
def dict(self): def dict(self):
@ -303,7 +284,7 @@ class RawMessage(Message):
def send_direct(self, aprsis_client=None): def send_direct(self, aprsis_client=None):
"""Send a message without a separate thread.""" """Send a message without a separate thread."""
cl = self.get_transport() cl = client.factory.create().client
log_message( log_message(
"Sending Message Direct", "Sending Message Direct",
str(self).rstrip("\n"), str(self).rstrip("\n"),
@ -312,7 +293,7 @@ class RawMessage(Message):
fromcall=self.fromcall, fromcall=self.fromcall,
) )
cl.send(self) cl.send(self)
stats.APRSDStats().msgs_sent_inc() stats.APRSDStats().msgs_tx_inc()
class TextMessage(Message): class TextMessage(Message):
@ -327,9 +308,8 @@ class TextMessage(Message):
message, message,
msg_id=None, msg_id=None,
allow_delay=True, allow_delay=True,
transport=MESSAGE_TRANSPORT_APRSIS,
): ):
super().__init__(fromcall, tocall, msg_id, transport=transport) super().__init__(fromcall, tocall, msg_id)
self.message = message self.message = message
# do we try and save this message for later if we don't get # do we try and save this message for later if we don't get
# an ack? Some messages we don't want to do this ever. # an ack? Some messages we don't want to do this ever.
@ -386,7 +366,7 @@ class TextMessage(Message):
if aprsis_client: if aprsis_client:
cl = aprsis_client cl = aprsis_client
else: else:
cl = self.get_transport() cl = client.factory.create().client
log_message( log_message(
"Sending Message Direct", "Sending Message Direct",
str(self).rstrip("\n"), str(self).rstrip("\n"),
@ -424,7 +404,6 @@ class SendMessageThread(threads.APRSDThread):
LOG.info("Message Send Complete via Ack.") LOG.info("Message Send Complete via Ack.")
return False return False
else: else:
cl = msg.get_transport()
send_now = False send_now = False
if msg.last_send_attempt == msg.retry_count: if msg.last_send_attempt == msg.retry_count:
# we reached the send limit, don't send again # we reached the send limit, don't send again
@ -455,6 +434,7 @@ class SendMessageThread(threads.APRSDThread):
retry_number=msg.last_send_attempt, retry_number=msg.last_send_attempt,
msg_num=msg.id, msg_num=msg.id,
) )
cl = client.factory.create().client
cl.send(msg) cl.send(msg)
stats.APRSDStats().msgs_tx_inc() stats.APRSDStats().msgs_tx_inc()
packets.PacketList().add(msg.dict()) packets.PacketList().add(msg.dict())
@ -469,8 +449,8 @@ class SendMessageThread(threads.APRSDThread):
class AckMessage(Message): class AckMessage(Message):
"""Class for building Acks and sending them.""" """Class for building Acks and sending them."""
def __init__(self, fromcall, tocall, msg_id, transport=MESSAGE_TRANSPORT_APRSIS): def __init__(self, fromcall, tocall, msg_id):
super().__init__(fromcall, tocall, msg_id=msg_id, transport=transport) super().__init__(fromcall, tocall, msg_id=msg_id)
def dict(self): def dict(self):
now = datetime.datetime.now() now = datetime.datetime.now()
@ -509,7 +489,7 @@ class AckMessage(Message):
if aprsis_client: if aprsis_client:
cl = aprsis_client cl = aprsis_client
else: else:
cl = self.get_transport() cl = client.factory.create().client
log_message( log_message(
"Sending ack", "Sending ack",
str(self).rstrip("\n"), str(self).rstrip("\n"),
@ -526,10 +506,8 @@ class SendAckThread(threads.APRSDThread):
self.ack = ack self.ack = ack
super().__init__(f"SendAck-{self.ack.id}") super().__init__(f"SendAck-{self.ack.id}")
@trace.trace
def loop(self): def loop(self):
"""Separate thread to send acks with retries.""" """Separate thread to send acks with retries."""
LOG.debug("SendAckThread loop start")
send_now = False send_now = False
if self.ack.last_send_attempt == self.ack.retry_count: if self.ack.last_send_attempt == self.ack.retry_count:
# we reached the send limit, don't send again # we reached the send limit, don't send again
@ -554,7 +532,7 @@ class SendAckThread(threads.APRSDThread):
send_now = True send_now = True
if send_now: if send_now:
cl = self.ack.get_transport() cl = client.factory.create().client
log_message( log_message(
"Sending ack", "Sending ack",
str(self.ack).rstrip("\n"), str(self.ack).rstrip("\n"),

View File

@ -158,7 +158,7 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta):
) )
# make sure the timeout is set or this doesn't work # make sure the timeout is set or this doesn't work
if watch_list: if watch_list:
aprs_client = client.get_client() aprs_client = client.factory.create().client
filter_str = "b/{}".format("/".join(watch_list)) filter_str = "b/{}".format("/".join(watch_list))
aprs_client.set_filter(filter_str) aprs_client.set_filter(filter_str)
else: else:

View File

@ -8,7 +8,7 @@ import tracemalloc
import aprslib import aprslib
from aprsd import client, kissclient, messaging, packets, plugin, stats, utils from aprsd import client, messaging, packets, plugin, stats, utils
LOG = logging.getLogger("APRSD") LOG = logging.getLogger("APRSD")
@ -137,9 +137,9 @@ class KeepAliveThread(APRSDThread):
if delta > self.max_delta: if delta > self.max_delta:
# We haven't gotten a keepalive from aprs-is in a while # We haven't gotten a keepalive from aprs-is in a while
# reset the connection.a # reset the connection.a
if not kissclient.KISSClient.kiss_enabled(self.config): if not client.KISSClient.is_enabled(self.config):
LOG.warning("Resetting connection to APRS-IS.") LOG.warning("Resetting connection to APRS-IS.")
client.Client().reset() client.factory.create().reset()
# Check version every hour # Check version every hour
delta = now - self.checker_time delta = now - self.checker_time
@ -158,13 +158,13 @@ class APRSDRXThread(APRSDThread):
super().__init__("RX_MSG") super().__init__("RX_MSG")
self.msg_queues = msg_queues self.msg_queues = msg_queues
self.config = config self.config = config
self._client = client.factory.create()
def stop(self): def stop(self):
self.thread_stop = True self.thread_stop = True
client.get_client().stop() client.factory.create().client.stop()
def loop(self): def loop(self):
aprs_client = client.get_client()
# setup the consumer of messages and block until a messages # setup the consumer of messages and block until a messages
try: try:
@ -177,7 +177,9 @@ class APRSDRXThread(APRSDThread):
# and the aprslib developer didn't want to allow a PR to add # and the aprslib developer didn't want to allow a PR to add
# kwargs. :( # kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56 # https://github.com/rossengeorgiev/aprs-python/pull/56
aprs_client.consumer(self.process_packet, raw=False, blocking=False) self._client.client.consumer(
self.process_packet, raw=False, blocking=False,
)
except aprslib.exceptions.ConnectionDrop: except aprslib.exceptions.ConnectionDrop:
LOG.error("Connection dropped, reconnecting") LOG.error("Connection dropped, reconnecting")
@ -185,21 +187,21 @@ class APRSDRXThread(APRSDThread):
# Force the deletion of the client object connected to aprs # Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client() # This will cause a reconnect, next time client.get_client()
# is called # is called
client.Client().reset() self._client.reset()
# Continue to loop # Continue to loop
return True return True
def process_packet(self, packet): def process_packet(self, *args, **kwargs):
packet = self._client.decode_packet(*args, **kwargs)
thread = APRSDProcessPacketThread(packet=packet, config=self.config) thread = APRSDProcessPacketThread(packet=packet, config=self.config)
thread.start() thread.start()
class APRSDProcessPacketThread(APRSDThread): class APRSDProcessPacketThread(APRSDThread):
def __init__(self, packet, config, transport="aprsis"): def __init__(self, packet, config):
self.packet = packet self.packet = packet
self.config = config self.config = config
self.transport = transport
name = self.packet["raw"][:10] name = self.packet["raw"][:10]
super().__init__(f"RX_PACKET-{name}") super().__init__(f"RX_PACKET-{name}")
@ -254,7 +256,6 @@ class APRSDProcessPacketThread(APRSDThread):
self.config["aprs"]["login"], self.config["aprs"]["login"],
fromcall, fromcall,
msg_id=msg_id, msg_id=msg_id,
transport=self.transport,
) )
ack.send() ack.send()
@ -275,7 +276,6 @@ class APRSDProcessPacketThread(APRSDThread):
self.config["aprs"]["login"], self.config["aprs"]["login"],
fromcall, fromcall,
subreply, subreply,
transport=self.transport,
) )
msg.send() msg.send()
elif isinstance(reply, messaging.Message): elif isinstance(reply, messaging.Message):
@ -296,7 +296,6 @@ class APRSDProcessPacketThread(APRSDThread):
self.config["aprs"]["login"], self.config["aprs"]["login"],
fromcall, fromcall,
reply, reply,
transport=self.transport,
) )
msg.send() msg.send()
@ -309,7 +308,6 @@ class APRSDProcessPacketThread(APRSDThread):
self.config["aprs"]["login"], self.config["aprs"]["login"],
fromcall, fromcall,
reply, reply,
transport=self.transport,
) )
msg.send() msg.send()
except Exception as ex: except Exception as ex:
@ -321,88 +319,7 @@ class APRSDProcessPacketThread(APRSDThread):
self.config["aprs"]["login"], self.config["aprs"]["login"],
fromcall, fromcall,
reply, reply,
transport=self.transport,
) )
msg.send() msg.send()
LOG.debug("Packet processing complete") LOG.debug("Packet processing complete")
class APRSDTXThread(APRSDThread):
def __init__(self, msg_queues, config):
super().__init__("TX_MSG")
self.msg_queues = msg_queues
self.config = config
def loop(self):
try:
msg = self.msg_queues["tx"].get(timeout=1)
msg.send()
except queue.Empty:
pass
# Continue to loop
return True
class KISSRXThread(APRSDThread):
"""Thread that connects to direwolf's TCPKISS interface.
All Packets are processed and sent back out the direwolf
interface instead of the aprs-is server.
"""
def __init__(self, msg_queues, config):
super().__init__("KISSRX_MSG")
self.msg_queues = msg_queues
self.config = config
def stop(self):
self.thread_stop = True
kissclient.get_client().stop()
def loop(self):
kiss_client = kissclient.get_client()
# setup the consumer of messages and block until a messages
try:
# This will register a packet consumer with aprslib
# When new packets come in the consumer will process
# the packet
# Do a partial here because the consumer signature doesn't allow
# For kwargs to be passed in to the consumer func we declare
# and the aprslib developer didn't want to allow a PR to add
# kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56
kiss_client.consumer(self.process_packet, callsign=self.config["kiss"]["callsign"])
kiss_client.loop.run_forever()
except aprslib.exceptions.ConnectionDrop:
LOG.error("Connection dropped, reconnecting")
time.sleep(5)
# Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client()
# is called
client.Client().reset()
# Continue to loop
def process_packet(self, interface, frame):
"""Process a packet recieved from aprs-is server."""
LOG.debug(f"Got an APRS Frame '{frame}'")
# try and nuke the * from the fromcall sign.
frame.header._source._ch = False
payload = str(frame.payload.decode())
msg = f"{str(frame.header)}:{payload}"
# msg = frame.tnc2
LOG.debug(f"Decoding {msg}")
packet = aprslib.parse(msg)
LOG.debug(packet)
thread = APRSDProcessPacketThread(
packet=packet, config=self.config,
transport=messaging.MESSAGE_TRANSPORT_TCPKISS,
)
thread.start()
return