diff --git a/aprsd/client.py b/aprsd/client.py index 3e8e198..3b7ee0e 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -1,26 +1,30 @@ +import abc import logging -import select import time import aprslib -from aprslib import is_py3 -from aprslib.exceptions import ( - ConnectionDrop, ConnectionError, GenericError, LoginError, ParseError, - UnknownFormat, -) +from aprslib.exceptions import LoginError -import aprsd -from aprsd import stats +from aprsd import trace +from aprsd.clients import aprsis, kiss LOG = logging.getLogger("APRSD") +TRANSPORT_APRSIS = "aprsis" +TRANSPORT_TCPKISS = "tcpkiss" +TRANSPORT_SERIALKISS = "serialkiss" + +# Main must create this from the ClientFactory +# object such that it's populated with the +# Correct config +factory = None class Client: """Singleton client class that constructs the aprslib connection.""" _instance = None - aprs_client = None + _client = None config = None connected = False @@ -40,14 +44,49 @@ class Client: @property def client(self): - if not self.aprs_client: - self.aprs_client = self.setup_connection() - return self.aprs_client + if not self._client: + self._client = self.setup_connection() + return self._client def reset(self): """Call this to force a rebuild/reconnect.""" - del self.aprs_client + del self._client + @abc.abstractmethod + def setup_connection(self): + pass + + @staticmethod + @abc.abstractmethod + def is_enabled(config): + pass + + @staticmethod + @abc.abstractmethod + def transport(config): + pass + + @abc.abstractmethod + def decode_packet(self, *args, **kwargs): + pass + + +class APRSISClient(Client): + + @staticmethod + def is_enabled(config): + # Defaults to True if the enabled flag is non existent + return config["aprs"].get("enabled", True) + + @staticmethod + def transport(config): + return TRANSPORT_APRSIS + + def decode_packet(self, *args, **kwargs): + """APRS lib already decodes this.""" + return args[0] + + @trace.trace def setup_connection(self): user = self.config["aprs"]["login"] password = self.config["aprs"]["password"] @@ -55,10 +94,11 @@ class Client: port = self.config["aprs"].get("port", 14580) connected = False backoff = 1 + aprs_client = None while not connected: try: LOG.info("Creating aprslib client") - aprs_client = Aprsdis(user, passwd=password, host=host, port=port) + aprs_client = aprsis.Aprsdis(user, passwd=password, host=host, port=port) # Force the logging to be the same aprs_client.logger = LOG aprs_client.connect() @@ -77,200 +117,96 @@ class Client: return aprs_client -class Aprsdis(aprslib.IS): - """Extend the aprslib class so we can exit properly.""" +class KISSClient(Client): - # flag to tell us to stop - thread_stop = False + @staticmethod + def is_enabled(config): + """Return if tcp or serial KISS is enabled.""" + if "kiss" not in config: + return False - # timeout in seconds - select_timeout = 1 + if "serial" in config["kiss"]: + if config["kiss"]["serial"].get("enabled", False): + return True - def stop(self): - self.thread_stop = True - LOG.info("Shutdown Aprsdis client.") + if "tcp" in config["kiss"]: + if config["kiss"]["tcp"].get("enabled", False): + return True - def send(self, msg): - """Send an APRS Message object.""" - line = str(msg) - self.sendall(line) + @staticmethod + def transport(config): + if "serial" in config["kiss"]: + if config["kiss"]["serial"].get("enabled", False): + return TRANSPORT_SERIALKISS - def _socket_readlines(self, blocking=False): - """ - Generator for complete lines, received from the server - """ - try: - self.sock.setblocking(0) - except OSError as e: - self.logger.error(f"socket error when setblocking(0): {str(e)}") - raise aprslib.ConnectionDrop("connection dropped") + if "tcp" in config["kiss"]: + if config["kiss"]["tcp"].get("enabled", False): + return TRANSPORT_TCPKISS - while not self.thread_stop: - short_buf = b"" - newline = b"\r\n" + def decode_packet(self, *args, **kwargs): + """We get a frame, which has to be decoded.""" + frame = kwargs["frame"] + LOG.debug(f"Got an APRS Frame '{frame}'") + # try and nuke the * from the fromcall sign. + frame.header._source._ch = False + payload = str(frame.payload.decode()) + msg = f"{str(frame.header)}:{payload}" + # msg = frame.tnc2 + LOG.debug(f"Decoding {msg}") - # set a select timeout, so we get a chance to exit - # when user hits CTRL-C - readable, writable, exceptional = select.select( - [self.sock], - [], - [], - self.select_timeout, - ) - if not readable: - if not blocking: - break - else: - continue + packet = aprslib.parse(msg) + return packet - try: - short_buf = self.sock.recv(4096) - - # sock.recv returns empty if the connection drops - if not short_buf: - if not blocking: - # We could just not be blocking, so empty is expected - continue - else: - self.logger.error("socket.recv(): returned empty") - raise aprslib.ConnectionDrop("connection dropped") - except OSError as e: - # self.logger.error("socket error on recv(): %s" % str(e)) - if "Resource temporarily unavailable" in str(e): - if not blocking: - if len(self.buf) == 0: - break - - self.buf += short_buf - - while newline in self.buf: - line, self.buf = self.buf.split(newline, 1) - - yield line - - def _send_login(self): - """ - Sends login string to server - """ - login_str = "user {0} pass {1} vers github.com/craigerl/aprsd {3}{2}\r\n" - login_str = login_str.format( - self.callsign, - self.passwd, - (" filter " + self.filter) if self.filter != "" else "", - aprsd.__version__, - ) - - self.logger.info("Sending login information") - - try: - self._sendall(login_str) - self.sock.settimeout(5) - test = self.sock.recv(len(login_str) + 100) - if is_py3: - test = test.decode("latin-1") - test = test.rstrip() - - self.logger.debug("Server: %s", test) - - a, b, callsign, status, e = test.split(" ", 4) - s = e.split(",") - if len(s): - server_string = s[0].replace("server ", "") - else: - server_string = e.replace("server ", "") - - self.logger.info(f"Connected to {server_string}") - self.server_string = server_string - stats.APRSDStats().set_aprsis_server(server_string) - - if callsign == "": - raise LoginError("Server responded with empty callsign???") - if callsign != self.callsign: - raise LoginError(f"Server: {test}") - if status != "verified," and self.passwd != "-1": - raise LoginError("Password is incorrect") - - if self.passwd == "-1": - self.logger.info("Login successful (receive only)") - else: - self.logger.info("Login successful") - - except LoginError as e: - self.logger.error(str(e)) - self.close() - raise - except Exception as e: - self.close() - self.logger.error(f"Failed to login '{e}'") - raise LoginError("Failed to login") - - def consumer(self, callback, blocking=True, immortal=False, raw=False): - """ - When a position sentence is received, it will be passed to the callback function - - blocking: if true (default), runs forever, otherwise will return after one sentence - You can still exit the loop, by raising StopIteration in the callback function - - immortal: When true, consumer will try to reconnect and stop propagation of Parse exceptions - if false (default), consumer will return - - raw: when true, raw packet is passed to callback, otherwise the result from aprs.parse() - """ - - if not self._connected: - raise ConnectionError("not connected to a server") - - line = b"" - - while True and not self.thread_stop: - try: - for line in self._socket_readlines(blocking): - if line[0:1] != b"#": - if raw: - callback(line) - else: - callback(self._parse(line)) - else: - self.logger.debug("Server: %s", line.decode("utf8")) - stats.APRSDStats().set_aprsis_keepalive() - except ParseError as exp: - self.logger.log( - 11, - "%s\n Packet: %s", - exp, - exp.packet, - ) - except UnknownFormat as exp: - self.logger.log( - 9, - "%s\n Packet: %s", - exp, - exp.packet, - ) - except LoginError as exp: - self.logger.error("%s: %s", exp.__class__.__name__, exp) - except (KeyboardInterrupt, SystemExit): - raise - except (ConnectionDrop, ConnectionError): - self.close() - - if not immortal: - raise - else: - self.connect(blocking=blocking) - continue - except GenericError: - pass - except StopIteration: - break - except Exception: - self.logger.error("APRS Packet: %s", line) - raise - - if not blocking: - break + @trace.trace + def setup_connection(self): + ax25client = kiss.Aioax25Client(self.config) + return ax25client -def get_client(): - cl = Client() - return cl.client +class ClientFactory: + _instance = None + + def __new__(cls, *args, **kwargs): + """This magic turns this into a singleton.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + # Put any initialization here. + return cls._instance + + def __init__(self, config): + self.config = config + self._builders = {} + + def register(self, key, builder): + self._builders[key] = builder + + def create(self, key=None): + if not key: + if APRSISClient.is_enabled(self.config): + key = TRANSPORT_APRSIS + elif KISSClient.is_enabled(self.config): + key = KISSClient.transport(self.config) + + LOG.debug(f"GET client {key}") + builder = self._builders.get(key) + if not builder: + raise ValueError(key) + return builder(self.config) + + def is_client_enabled(self): + """Make sure at least one client is enabled.""" + enabled = False + for key in self._builders.keys(): + enabled |= self._builders[key].is_enabled(self.config) + + return enabled + + @staticmethod + def setup(config): + """Create and register all possible client objects.""" + global factory + + factory = ClientFactory(config) + factory.register(TRANSPORT_APRSIS, APRSISClient) + factory.register(TRANSPORT_TCPKISS, KISSClient) + factory.register(TRANSPORT_SERIALKISS, KISSClient) diff --git a/aprsd/clients/aprsis.py b/aprsd/clients/aprsis.py new file mode 100644 index 0000000..ac7bdac --- /dev/null +++ b/aprsd/clients/aprsis.py @@ -0,0 +1,209 @@ +import logging +import select + +import aprslib +from aprslib import is_py3 +from aprslib.exceptions import ( + ConnectionDrop, ConnectionError, GenericError, LoginError, ParseError, + UnknownFormat, +) + +import aprsd +from aprsd import stats + + +LOG = logging.getLogger("APRSD") + + +class Aprsdis(aprslib.IS): + """Extend the aprslib class so we can exit properly.""" + + # flag to tell us to stop + thread_stop = False + + # timeout in seconds + select_timeout = 1 + + def stop(self): + self.thread_stop = True + LOG.info("Shutdown Aprsdis client.") + + def send(self, msg): + """Send an APRS Message object.""" + line = str(msg) + self.sendall(line) + + def _socket_readlines(self, blocking=False): + """ + Generator for complete lines, received from the server + """ + try: + self.sock.setblocking(0) + except OSError as e: + self.logger.error(f"socket error when setblocking(0): {str(e)}") + raise aprslib.ConnectionDrop("connection dropped") + + while not self.thread_stop: + short_buf = b"" + newline = b"\r\n" + + # set a select timeout, so we get a chance to exit + # when user hits CTRL-C + readable, writable, exceptional = select.select( + [self.sock], + [], + [], + self.select_timeout, + ) + if not readable: + if not blocking: + break + else: + continue + + try: + short_buf = self.sock.recv(4096) + + # sock.recv returns empty if the connection drops + if not short_buf: + if not blocking: + # We could just not be blocking, so empty is expected + continue + else: + self.logger.error("socket.recv(): returned empty") + raise aprslib.ConnectionDrop("connection dropped") + except OSError as e: + # self.logger.error("socket error on recv(): %s" % str(e)) + if "Resource temporarily unavailable" in str(e): + if not blocking: + if len(self.buf) == 0: + break + + self.buf += short_buf + + while newline in self.buf: + line, self.buf = self.buf.split(newline, 1) + + yield line + + def _send_login(self): + """ + Sends login string to server + """ + login_str = "user {0} pass {1} vers github.com/craigerl/aprsd {3}{2}\r\n" + login_str = login_str.format( + self.callsign, + self.passwd, + (" filter " + self.filter) if self.filter != "" else "", + aprsd.__version__, + ) + + self.logger.info("Sending login information") + + try: + self._sendall(login_str) + self.sock.settimeout(5) + test = self.sock.recv(len(login_str) + 100) + if is_py3: + test = test.decode("latin-1") + test = test.rstrip() + + self.logger.debug("Server: %s", test) + + a, b, callsign, status, e = test.split(" ", 4) + s = e.split(",") + if len(s): + server_string = s[0].replace("server ", "") + else: + server_string = e.replace("server ", "") + + self.logger.info(f"Connected to {server_string}") + self.server_string = server_string + stats.APRSDStats().set_aprsis_server(server_string) + + if callsign == "": + raise LoginError("Server responded with empty callsign???") + if callsign != self.callsign: + raise LoginError(f"Server: {test}") + if status != "verified," and self.passwd != "-1": + raise LoginError("Password is incorrect") + + if self.passwd == "-1": + self.logger.info("Login successful (receive only)") + else: + self.logger.info("Login successful") + + except LoginError as e: + self.logger.error(str(e)) + self.close() + raise + except Exception as e: + self.close() + self.logger.error(f"Failed to login '{e}'") + raise LoginError("Failed to login") + + def consumer(self, callback, blocking=True, immortal=False, raw=False): + """ + When a position sentence is received, it will be passed to the callback function + + blocking: if true (default), runs forever, otherwise will return after one sentence + You can still exit the loop, by raising StopIteration in the callback function + + immortal: When true, consumer will try to reconnect and stop propagation of Parse exceptions + if false (default), consumer will return + + raw: when true, raw packet is passed to callback, otherwise the result from aprs.parse() + """ + + if not self._connected: + raise ConnectionError("not connected to a server") + + line = b"" + + while True and not self.thread_stop: + try: + for line in self._socket_readlines(blocking): + if line[0:1] != b"#": + if raw: + callback(line) + else: + callback(self._parse(line)) + else: + self.logger.debug("Server: %s", line.decode("utf8")) + stats.APRSDStats().set_aprsis_keepalive() + except ParseError as exp: + self.logger.log( + 11, + "%s\n Packet: %s", + exp, + exp.packet, + ) + except UnknownFormat as exp: + self.logger.log( + 9, + "%s\n Packet: %s", + exp, + exp.packet, + ) + except LoginError as exp: + self.logger.error("%s: %s", exp.__class__.__name__, exp) + except (KeyboardInterrupt, SystemExit): + raise + except (ConnectionDrop, ConnectionError): + self.close() + + if not immortal: + raise + else: + self.connect(blocking=blocking) + continue + except GenericError: + pass + except StopIteration: + break + except Exception: + self.logger.error("APRS Packet: %s", line) + raise + + if not blocking: + break diff --git a/aprsd/kissclient.py b/aprsd/clients/kiss.py similarity index 54% rename from aprsd/kissclient.py rename to aprsd/clients/kiss.py index bed12c2..7ea1ce6 100644 --- a/aprsd/kissclient.py +++ b/aprsd/clients/kiss.py @@ -5,83 +5,20 @@ from aioax25 import interface from aioax25 import kiss as kiss from aioax25.aprs import APRSInterface -from aprsd import trace - -TRANSPORT_TCPKISS = "tcpkiss" -TRANSPORT_SERIALKISS = "serialkiss" LOG = logging.getLogger("APRSD") -class KISSClient: - - _instance = None - config = None - ax25client = None - loop = None - - def __new__(cls, *args, **kwargs): - """Singleton for this class.""" - if cls._instance is None: - cls._instance = super().__new__(cls) - # initialize shit here - return cls._instance - - def __init__(self, config=None): - if config: - self.config = config - - @staticmethod - def kiss_enabled(config): - """Return if tcp or serial KISS is enabled.""" - if "kiss" not in config: - return False - - if "serial" in config["kiss"]: - if config["kiss"]["serial"].get("enabled", False): - return True - - if "tcp" in config["kiss"]: - if config["kiss"]["tcp"].get("enabled", False): - return True - - @staticmethod - def transport(config): - if "serial" in config["kiss"]: - if config["kiss"]["serial"].get("enabled", False): - return TRANSPORT_SERIALKISS - - if "tcp" in config["kiss"]: - if config["kiss"]["tcp"].get("enabled", False): - return TRANSPORT_TCPKISS - - @property - def client(self): - if not self.ax25client: - self.ax25client = self.setup_connection() - return self.ax25client - - def reset(self): - """Call this to fore a rebuild/reconnect.""" - self.ax25client.stop() - del self.ax25client - - @trace.trace - def setup_connection(self): - ax25client = Aioax25Client(self.config) - LOG.debug("Complete") - return ax25client - - class Aioax25Client: def __init__(self, config): self.config = config + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self.loop = asyncio.get_event_loop() self.setup() def setup(self): # we can be TCP kiss or Serial kiss - - self.loop = asyncio.get_event_loop() if "serial" in self.config["kiss"] and self.config["kiss"]["serial"].get( "enabled", False, @@ -131,10 +68,20 @@ class Aioax25Client: self.kissdev._close() self.loop.stop() - def consumer(self, callback, callsign=None): - if not callsign: - callsign = self.config["ham"]["callsign"] - self.aprsint.bind(callback=callback, callsign="WB4BOR", ssid=12, regex=False) + def set_filter(self, filter): + # This does nothing right now. + pass + + def consumer(self, callback, blocking=True, immortal=False, raw=False): + callsign = self.config["kiss"]["callsign"] + call = callsign.split("-") + if len(call) > 1: + callsign = call[0] + ssid = int(call[1]) + else: + ssid = 0 + self.aprsint.bind(callback=callback, callsign=callsign, ssid=ssid, regex=False) + self.loop.run_forever() def send(self, msg): """Send an APRS Message object.""" @@ -145,8 +92,3 @@ class Aioax25Client: path=["WIDE1-1", "WIDE2-1"], oneshot=True, ) - - -def get_client(): - cl = KISSClient() - return cl.client diff --git a/aprsd/flask.py b/aprsd/flask.py index ede31db..ce5ab36 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -19,7 +19,8 @@ from werkzeug.security import check_password_hash, generate_password_hash import aprsd from aprsd import client from aprsd import config as aprsd_config -from aprsd import kissclient, messaging, packets, plugin, stats, threads, utils +from aprsd import messaging, packets, plugin, stats, threads, utils +from aprsd.clients import aprsis LOG = logging.getLogger("APRSD") @@ -136,7 +137,8 @@ class SendMessageThread(threads.APRSDThread): while not connected: try: LOG.info("Creating aprslib client") - aprs_client = client.Aprsdis( + + aprs_client = aprsis.Aprsdis( user, passwd=password, host=host, @@ -312,16 +314,16 @@ class APRSDFlask(flask_classful.FlaskView): ) else: # We might be connected to a KISS socket? - if kissclient.KISSClient.kiss_enabled(self.config): - transport = kissclient.KISSClient.transport(self.config) - if transport == kissclient.TRANSPORT_TCPKISS: + if client.KISSClient.kiss_enabled(self.config): + transport = client.KISSClient.transport(self.config) + if transport == client.TRANSPORT_TCPKISS: aprs_connection = ( "TCPKISS://{}:{}".format( self.config["kiss"]["tcp"]["host"], self.config["kiss"]["tcp"]["port"], ) ) - elif transport == kissclient.TRANSPORT_SERIALKISS: + elif transport == client.TRANSPORT_SERIALKISS: aprs_connection = ( "SerialKISS://{}@{} baud".format( self.config["kiss"]["serial"]["device"], diff --git a/aprsd/main.py b/aprsd/main.py index 91b4816..3bebc80 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -37,7 +37,7 @@ import click_completion # local imports here import aprsd from aprsd import ( - flask, kissclient, messaging, packets, plugin, stats, threads, trace, utils, + flask, messaging, packets, plugin, stats, threads, trace, utils, ) from aprsd import client from aprsd import config as aprsd_config @@ -463,23 +463,13 @@ def server( trace.setup_tracing(["method", "api"]) stats.APRSDStats(config) - if config["aprs"].get("enabled", True): - try: - cl = client.Client(config) - cl.client - except LoginError: - sys.exit(-1) - - rx_thread = threads.APRSDRXThread( - msg_queues=threads.msg_queues, - config=config, - ) - rx_thread.start() - else: - LOG.info( - "APRS network connection Not Enabled in config. This is" - " for setups without internet connectivity.", - ) + # Initialize the client factory and create + # The correct client object ready for use + client.ClientFactory.setup(config) + # Make sure we have 1 client transport enabled + if not client.factory.is_client_enabled(): + LOG.error("No Clients are enabled in config.") + sys.exit(-1) # Create the initial PM singleton and Register plugins plugin_manager = plugin.PluginManager(config) @@ -497,13 +487,11 @@ def server( packets.PacketList(config=config) packets.WatchList(config=config) - if kissclient.KISSClient.kiss_enabled(config): - kcl = kissclient.KISSClient(config=config) - # This initializes the client object. - kcl.client - - kissrx_thread = threads.KISSRXThread(msg_queues=threads.msg_queues, config=config) - kissrx_thread.start() + rx_thread = threads.APRSDRXThread( + msg_queues=threads.msg_queues, + config=config, + ) + rx_thread.start() messaging.MsgTrack().restart() diff --git a/aprsd/messaging.py b/aprsd/messaging.py index 1a983a1..f80574c 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -11,7 +11,7 @@ import time from aprsd import client from aprsd import config as aprsd_config -from aprsd import kissclient, packets, stats, threads, trace +from aprsd import packets, stats, threads LOG = logging.getLogger("APRSD") @@ -20,10 +20,6 @@ LOG = logging.getLogger("APRSD") # and it's ok, but don't send a usage string back NULL_MESSAGE = -1 -MESSAGE_TRANSPORT_TCPKISS = "tcpkiss" -MESSAGE_TRANSPORT_SERIALKISS = "serialkiss" -MESSAGE_TRANSPORT_APRSIS = "aprsis" - class MsgTrack: """Class to keep track of outstanding text messages. @@ -36,13 +32,6 @@ class MsgTrack: automatically adds itself to this class. When the ack is recieved from the radio, the message object is removed from this class. - - # TODO(hemna) - When aprsd is asked to quit this class should be serialized and - saved to disk/db to keep track of the state of outstanding messages. - When aprsd is started, it should try and fetch the saved state, - and reloaded to a live state. - """ _instance = None @@ -241,7 +230,6 @@ class Message(metaclass=abc.ABCMeta): fromcall, tocall, msg_id=None, - transport=MESSAGE_TRANSPORT_APRSIS, ): self.fromcall = fromcall self.tocall = tocall @@ -250,18 +238,11 @@ class Message(metaclass=abc.ABCMeta): c.increment() msg_id = c.value self.id = msg_id - self.transport = transport @abc.abstractmethod def send(self): """Child class must declare.""" - def get_transport(self): - if self.transport == MESSAGE_TRANSPORT_APRSIS: - return client.get_client() - elif self.transport == MESSAGE_TRANSPORT_TCPKISS: - return kissclient.get_client() - class RawMessage(Message): """Send a raw message. @@ -273,8 +254,8 @@ class RawMessage(Message): message = None - def __init__(self, message, transport=MESSAGE_TRANSPORT_APRSIS): - super().__init__(None, None, msg_id=None, transport=transport) + def __init__(self, message): + super().__init__(None, None, msg_id=None) self.message = message def dict(self): @@ -303,7 +284,7 @@ class RawMessage(Message): def send_direct(self, aprsis_client=None): """Send a message without a separate thread.""" - cl = self.get_transport() + cl = client.factory.create().client log_message( "Sending Message Direct", str(self).rstrip("\n"), @@ -312,7 +293,7 @@ class RawMessage(Message): fromcall=self.fromcall, ) cl.send(self) - stats.APRSDStats().msgs_sent_inc() + stats.APRSDStats().msgs_tx_inc() class TextMessage(Message): @@ -327,9 +308,8 @@ class TextMessage(Message): message, msg_id=None, allow_delay=True, - transport=MESSAGE_TRANSPORT_APRSIS, ): - super().__init__(fromcall, tocall, msg_id, transport=transport) + super().__init__(fromcall, tocall, msg_id) self.message = message # do we try and save this message for later if we don't get # an ack? Some messages we don't want to do this ever. @@ -386,7 +366,7 @@ class TextMessage(Message): if aprsis_client: cl = aprsis_client else: - cl = self.get_transport() + cl = client.factory.create().client log_message( "Sending Message Direct", str(self).rstrip("\n"), @@ -424,7 +404,6 @@ class SendMessageThread(threads.APRSDThread): LOG.info("Message Send Complete via Ack.") return False else: - cl = msg.get_transport() send_now = False if msg.last_send_attempt == msg.retry_count: # we reached the send limit, don't send again @@ -455,6 +434,7 @@ class SendMessageThread(threads.APRSDThread): retry_number=msg.last_send_attempt, msg_num=msg.id, ) + cl = client.factory.create().client cl.send(msg) stats.APRSDStats().msgs_tx_inc() packets.PacketList().add(msg.dict()) @@ -469,8 +449,8 @@ class SendMessageThread(threads.APRSDThread): class AckMessage(Message): """Class for building Acks and sending them.""" - def __init__(self, fromcall, tocall, msg_id, transport=MESSAGE_TRANSPORT_APRSIS): - super().__init__(fromcall, tocall, msg_id=msg_id, transport=transport) + def __init__(self, fromcall, tocall, msg_id): + super().__init__(fromcall, tocall, msg_id=msg_id) def dict(self): now = datetime.datetime.now() @@ -509,7 +489,7 @@ class AckMessage(Message): if aprsis_client: cl = aprsis_client else: - cl = self.get_transport() + cl = client.factory.create().client log_message( "Sending ack", str(self).rstrip("\n"), @@ -526,10 +506,8 @@ class SendAckThread(threads.APRSDThread): self.ack = ack super().__init__(f"SendAck-{self.ack.id}") - @trace.trace def loop(self): """Separate thread to send acks with retries.""" - LOG.debug("SendAckThread loop start") send_now = False if self.ack.last_send_attempt == self.ack.retry_count: # we reached the send limit, don't send again @@ -554,7 +532,7 @@ class SendAckThread(threads.APRSDThread): send_now = True if send_now: - cl = self.ack.get_transport() + cl = client.factory.create().client log_message( "Sending ack", str(self.ack).rstrip("\n"), diff --git a/aprsd/plugin.py b/aprsd/plugin.py index fe5efee..d7fb0bc 100644 --- a/aprsd/plugin.py +++ b/aprsd/plugin.py @@ -158,7 +158,7 @@ class APRSDWatchListPluginBase(APRSDPluginBase, metaclass=abc.ABCMeta): ) # make sure the timeout is set or this doesn't work if watch_list: - aprs_client = client.get_client() + aprs_client = client.factory.create().client filter_str = "b/{}".format("/".join(watch_list)) aprs_client.set_filter(filter_str) else: diff --git a/aprsd/threads.py b/aprsd/threads.py index 1132c0a..8cf75f0 100644 --- a/aprsd/threads.py +++ b/aprsd/threads.py @@ -8,7 +8,7 @@ import tracemalloc import aprslib -from aprsd import client, kissclient, messaging, packets, plugin, stats, utils +from aprsd import client, messaging, packets, plugin, stats, utils LOG = logging.getLogger("APRSD") @@ -137,9 +137,9 @@ class KeepAliveThread(APRSDThread): if delta > self.max_delta: # We haven't gotten a keepalive from aprs-is in a while # reset the connection.a - if not kissclient.KISSClient.kiss_enabled(self.config): + if not client.KISSClient.is_enabled(self.config): LOG.warning("Resetting connection to APRS-IS.") - client.Client().reset() + client.factory.create().reset() # Check version every hour delta = now - self.checker_time @@ -158,13 +158,13 @@ class APRSDRXThread(APRSDThread): super().__init__("RX_MSG") self.msg_queues = msg_queues self.config = config + self._client = client.factory.create() def stop(self): self.thread_stop = True - client.get_client().stop() + client.factory.create().client.stop() def loop(self): - aprs_client = client.get_client() # setup the consumer of messages and block until a messages try: @@ -177,7 +177,9 @@ class APRSDRXThread(APRSDThread): # and the aprslib developer didn't want to allow a PR to add # kwargs. :( # https://github.com/rossengeorgiev/aprs-python/pull/56 - aprs_client.consumer(self.process_packet, raw=False, blocking=False) + self._client.client.consumer( + self.process_packet, raw=False, blocking=False, + ) except aprslib.exceptions.ConnectionDrop: LOG.error("Connection dropped, reconnecting") @@ -185,21 +187,21 @@ class APRSDRXThread(APRSDThread): # Force the deletion of the client object connected to aprs # This will cause a reconnect, next time client.get_client() # is called - client.Client().reset() + self._client.reset() # Continue to loop return True - def process_packet(self, packet): + def process_packet(self, *args, **kwargs): + packet = self._client.decode_packet(*args, **kwargs) thread = APRSDProcessPacketThread(packet=packet, config=self.config) thread.start() class APRSDProcessPacketThread(APRSDThread): - def __init__(self, packet, config, transport="aprsis"): + def __init__(self, packet, config): self.packet = packet self.config = config - self.transport = transport name = self.packet["raw"][:10] super().__init__(f"RX_PACKET-{name}") @@ -254,7 +256,6 @@ class APRSDProcessPacketThread(APRSDThread): self.config["aprs"]["login"], fromcall, msg_id=msg_id, - transport=self.transport, ) ack.send() @@ -275,7 +276,6 @@ class APRSDProcessPacketThread(APRSDThread): self.config["aprs"]["login"], fromcall, subreply, - transport=self.transport, ) msg.send() elif isinstance(reply, messaging.Message): @@ -296,7 +296,6 @@ class APRSDProcessPacketThread(APRSDThread): self.config["aprs"]["login"], fromcall, reply, - transport=self.transport, ) msg.send() @@ -309,7 +308,6 @@ class APRSDProcessPacketThread(APRSDThread): self.config["aprs"]["login"], fromcall, reply, - transport=self.transport, ) msg.send() except Exception as ex: @@ -321,88 +319,7 @@ class APRSDProcessPacketThread(APRSDThread): self.config["aprs"]["login"], fromcall, reply, - transport=self.transport, ) msg.send() LOG.debug("Packet processing complete") - - -class APRSDTXThread(APRSDThread): - def __init__(self, msg_queues, config): - super().__init__("TX_MSG") - self.msg_queues = msg_queues - self.config = config - - def loop(self): - try: - msg = self.msg_queues["tx"].get(timeout=1) - msg.send() - except queue.Empty: - pass - # Continue to loop - return True - - -class KISSRXThread(APRSDThread): - """Thread that connects to direwolf's TCPKISS interface. - - All Packets are processed and sent back out the direwolf - interface instead of the aprs-is server. - - """ - - def __init__(self, msg_queues, config): - super().__init__("KISSRX_MSG") - self.msg_queues = msg_queues - self.config = config - - def stop(self): - self.thread_stop = True - kissclient.get_client().stop() - - def loop(self): - kiss_client = kissclient.get_client() - - # setup the consumer of messages and block until a messages - try: - # This will register a packet consumer with aprslib - # When new packets come in the consumer will process - # the packet - - # Do a partial here because the consumer signature doesn't allow - # For kwargs to be passed in to the consumer func we declare - # and the aprslib developer didn't want to allow a PR to add - # kwargs. :( - # https://github.com/rossengeorgiev/aprs-python/pull/56 - kiss_client.consumer(self.process_packet, callsign=self.config["kiss"]["callsign"]) - kiss_client.loop.run_forever() - - except aprslib.exceptions.ConnectionDrop: - LOG.error("Connection dropped, reconnecting") - time.sleep(5) - # Force the deletion of the client object connected to aprs - # This will cause a reconnect, next time client.get_client() - # is called - client.Client().reset() - # Continue to loop - - def process_packet(self, interface, frame): - """Process a packet recieved from aprs-is server.""" - - LOG.debug(f"Got an APRS Frame '{frame}'") - # try and nuke the * from the fromcall sign. - frame.header._source._ch = False - payload = str(frame.payload.decode()) - msg = f"{str(frame.header)}:{payload}" - # msg = frame.tnc2 - LOG.debug(f"Decoding {msg}") - - packet = aprslib.parse(msg) - LOG.debug(packet) - thread = APRSDProcessPacketThread( - packet=packet, config=self.config, - transport=messaging.MESSAGE_TRANSPORT_TCPKISS, - ) - thread.start() - return