1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-10-25 18:10:24 -04:00

reworked threading

This patch reworks the threading code for processing
messages.   This patch also extends the aprslib IS class
to allow us to stop processing the consumer packets when
someone hits CTRL-C correctly.  Alloing the app to exit.
This commit is contained in:
Hemna 2020-12-24 12:39:48 -05:00
parent 9768003c2a
commit f65707cb8c
4 changed files with 297 additions and 187 deletions

View File

@ -1,4 +1,6 @@
import logging
import select
import socket
import time
import aprslib
@ -45,7 +47,7 @@ class Client(object):
while not connected:
try:
LOG.info("Creating aprslib client")
aprs_client = aprslib.IS(user, passwd=password, host=host, port=port)
aprs_client = Aprsdis(user, passwd=password, host=host, port=port)
# Force the logging to be the same
aprs_client.logger = LOG
aprs_client.connect()
@ -60,6 +62,63 @@ class Client(object):
return aprs_client
class Aprsdis(aprslib.IS):
"""Extend the aprslib class so we can exit properly."""
# flag to tell us to stop
thread_stop = False
# timeout in seconds
select_timeout = 10
def stop(self):
self.thread_stop = True
def _socket_readlines(self, blocking=False):
"""
Generator for complete lines, received from the server
"""
try:
self.sock.setblocking(0)
except socket.error as e:
self.logger.error("socket error when setblocking(0): %s" % str(e))
raise aprslib.ConnectionDrop("connection dropped")
while not self.thread_stop:
short_buf = b""
newline = b"\r\n"
# set a select timeout, so we get a chance to exit
# when user hits CTRL-C
readable, writable, exceptional = select.select(
[self.sock], [], [], self.select_timeout
)
if not readable:
self.logger.info("nothing to read")
continue
try:
short_buf = self.sock.recv(4096)
# sock.recv returns empty if the connection drops
if not short_buf:
self.logger.error("socket.recv(): returned empty")
raise aprslib.ConnectionDrop("connection dropped")
except socket.error as e:
# self.logger.error("socket error on recv(): %s" % str(e))
if "Resource temporarily unavailable" in str(e):
if not blocking:
if len(self.buf) == 0:
break
self.buf += short_buf
while newline in self.buf:
line, self.buf = self.buf.split(newline, 1)
yield line
def get_client():
cl = Client()
return cl.client

View File

@ -21,12 +21,9 @@
#
# python included libs
import concurrent.futures
import functools
import logging
import os
import queue
import random
import signal
import sys
import threading
@ -41,7 +38,7 @@ import yaml
# local imports here
import aprsd
from aprsd import client, email, messaging, plugin, utils
from aprsd import client, email, messaging, plugin, threads, utils
# setup the global logger
# logging.basicConfig(level=logging.DEBUG) # level=10
@ -57,9 +54,7 @@ LOG_LEVELS = {
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
# Global threading event to trigger stopping all threads
# When user quits app via CTRL-C
event = None
server_threads = []
# localization, please edit:
# HOST = "noam.aprs2.net" # north america tier2 servers round robin
@ -151,7 +146,11 @@ def signal_handler(signal, frame):
global event
LOG.info("Ctrl+C, Sending all threads exit!")
for th in server_threads:
th.stop()
sys.exit(0) # thread ignores this
# end signal_handler
@ -179,108 +178,6 @@ def setup_logging(config, loglevel, quiet):
LOG.addHandler(sh)
def process_ack_packet(packet, config):
ack_num = packet.get("msgNo")
LOG.info("Got ack for message {}".format(ack_num))
messaging.log_message(
"ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"]
)
messaging.ack_dict.update({int(ack_num): 1})
return
def process_mic_e_packet(packet, config):
LOG.info("Mic-E Packet detected. Currenlty unsupported.")
messaging.log_packet(packet)
return
def process_message_packet(packet, config, tx_msg_queue):
LOG.info("Got a message packet")
fromcall = packet["from"]
message = packet.get("message_text", None)
msg_id = packet.get("msgNo", None)
if not msg_id:
msg_id = "0"
messaging.log_message(
"Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id
)
found_command = False
# Get singleton of the PM
pm = plugin.PluginManager()
try:
results = pm.run(fromcall=fromcall, message=message, ack=msg_id)
for reply in results:
found_command = True
# A plugin can return a null message flag which signals
# us that they processed the message correctly, but have
# nothing to reply with, so we avoid replying with a usage string
if reply is not messaging.NULL_MESSAGE:
LOG.debug("Sending '{}'".format(reply))
# msg = {"fromcall": fromcall, "msg": reply}
msg = messaging.TextMessage(config["aprs"]["login"],
fromcall, reply)
tx_msg_queue.put(msg)
else:
LOG.debug("Got NULL MESSAGE from plugin")
if not found_command:
plugins = pm.get_plugins()
names = [x.command_name for x in plugins]
names.sort()
reply = "Usage: {}".format(", ".join(names))
# messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(config["aprs"]["login"],
fromcall, reply)
tx_msg_queue.put(msg)
except Exception as ex:
LOG.exception("Plugin failed!!!", ex)
reply = "A Plugin failed! try again?"
# messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(config["aprs"]["login"],
fromcall, reply)
tx_msg_queue.put(msg)
# let any threads do their thing, then ack
# send an ack last
ack = messaging.AckMessage(config["aprs"]["login"], fromcall, msg_id=msg_id)
ack.send()
LOG.debug("Packet processing complete")
def process_packet(packet, config=None, msg_queues=None, event=None):
"""Process a packet recieved from aprs-is server."""
LOG.debug("Process packet! {}".format(msg_queues))
try:
LOG.debug("Got message: {}".format(packet))
msg = packet.get("message_text", None)
msg_format = packet.get("format", None)
msg_response = packet.get("response", None)
if msg_format == "message" and msg:
# we want to send the message through the
# plugins
process_message_packet(packet, config, msg_queues["tx"])
return
elif msg_response == "ack":
process_ack_packet(packet, config)
return
if msg_format == "mic-e":
# process a mic-e packet
process_mic_e_packet(packet, config)
return
except (aprslib.ParseError, aprslib.UnknownFormat) as exp:
LOG.exception("Failed to parse packet from aprs-is", exp)
@main.command()
def sample_config():
"""This dumps the config to stdout."""
@ -345,7 +242,6 @@ def send_message(
setup_logging(config, loglevel, quiet)
LOG.info("APRSD Started version: {}".format(aprsd.__version__))
message_number = random.randint(1, 90)
if type(command) is tuple:
command = " ".join(command)
LOG.info("Sending Command '{}'".format(command))
@ -368,12 +264,17 @@ def send_message(
fromcall = packet["from"]
msg_number = packet.get("msgNo", "0")
messaging.log_message(
"Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_number
"Received Message",
packet["raw"],
message,
fromcall=fromcall,
ack=msg_number,
)
got_response = True
# Send the ack back?
ack = messaging.AckMessage(config["aprs"]["login"],
fromcall, msg_id=msg_number)
ack = messaging.AckMessage(
config["aprs"]["login"], fromcall, msg_id=msg_number
)
ack.send_direct()
if got_ack and got_response:
@ -405,16 +306,6 @@ def send_message(
cl.reset()
def tx_msg_thread(msg_queues=None, event=None):
"""Thread to handle sending any messages outbound."""
LOG.info("TX_MSG_THREAD")
while not event.is_set() or not msg_queues["tx"].empty():
msg = msg_queues["tx"].get()
LOG.info("TXQ: got message '{}'".format(msg))
msg.send()
#messaging.send_message(msg["fromcall"], msg["msg"])
# main() ###
@main.command()
@click.option(
@ -474,55 +365,24 @@ def server(loglevel, quiet, disable_validation, config_file):
# Create the initial PM singleton and Register plugins
plugin_manager = plugin.PluginManager(config)
plugin_manager.setup_plugins()
cl = client.Client(config)
client.Client(config)
rx_msg_queue = queue.Queue(maxsize=20)
tx_msg_queue = queue.Queue(maxsize=20)
msg_queues = {"rx": rx_msg_queue,
"tx": tx_msg_queue,}
rx_msg = threading.Thread(
target=rx_msg_thread, name="RX_msg", kwargs={'msg_queues':msg_queues,
'event': event}
)
tx_msg = threading.Thread(
target=tx_msg_thread, name="TX_msg", kwargs={'msg_queues':msg_queues,
'event': event}
)
rx_msg.start()
tx_msg.start()
while True:
# Now use the helper which uses the singleton
aprs_client = client.get_client()
# setup the consumer of messages and block until a messages
try:
# This will register a packet consumer with aprslib
# When new packets come in the consumer will process
# the packet
# Do a partial here because the consumer signature doesn't allow
# For kwargs to be passed in to the consumer func we declare
# and the aprslib developer didn't want to allow a PR to add
# kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56
process_partial = functools.partial(process_packet,
msg_queues=msg_queues,
event=event, config=config)
aprs_client.consumer(process_partial, raw=False)
except aprslib.exceptions.ConnectionDrop:
LOG.error("Connection dropped, reconnecting")
time.sleep(5)
# Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client()
# is called
client.Client().reset()
msg_queues = {
"rx": rx_msg_queue,
"tx": tx_msg_queue,
}
rx_thread = threads.APRSDRXThread(msg_queues=msg_queues, config=config)
tx_thread = threads.APRSDTXThread(msg_queues=msg_queues, config=config)
# TODO(hemna): add EmailThread
server_threads.append(rx_thread)
server_threads.append(tx_thread)
rx_thread.start()
tx_thread.start()
LOG.info("APRSD Exiting.")
tx_msg.join()
sys.exit(0)
# setup and run the main blocking loop

View File

@ -1,11 +1,11 @@
import abc
import logging
from multiprocessing import RawValue
import pprint
import re
import threading
import time
import uuid
from multiprocessing import RawValue
from aprsd import client
@ -20,6 +20,7 @@ ack_dict = {}
# and it's ok, but don't send a usage string back
NULL_MESSAGE = -1
class MessageCounter(object):
"""
Global message id counter class.
@ -31,13 +32,14 @@ class MessageCounter(object):
from the MessageCounter.
"""
_instance = None
def __new__(cls, *args, **kwargs):
"""Make this a singleton class."""
if cls._instance is None:
cls._instance = super(MessageCounter, cls).__new__(cls)
cls._instance.val = RawValue('i', 1)
cls._instance.val = RawValue("i", 1)
cls._instance.lock = threading.Lock()
return cls._instance
@ -61,6 +63,7 @@ class MessageCounter(object):
class Message(object, metaclass=abc.ABCMeta):
"""Base Message Class."""
# The message id to send over the air
id = 0
@ -102,13 +105,15 @@ class TextMessage(Message):
def __repr__(self):
"""Build raw string to send over the air."""
return "{}>APRS::{}:{}{{{}\n".format(
self.fromcall, self.tocall.ljust(9),
self._filter_for_send(), str(self.id),)
self.fromcall,
self.tocall.ljust(9),
self._filter_for_send(),
str(self.id),
)
def __str__(self):
return "From({}) TO({}) - Message({}): '{}'".format(
self.fromcall, self.tocall,
self.id, self.message
self.fromcall, self.tocall, self.id, self.message
)
def ack(self):
@ -162,7 +167,8 @@ class TextMessage(Message):
ack_dict.clear()
LOG.debug(pprint.pformat(ack_dict))
LOG.debug(
"DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict)
"DEBUG: Cleared ack dictionary, ack_dict length is now %s."
% len(ack_dict)
)
ack_dict[self.id] = 0 # clear ack for this message number
@ -173,8 +179,11 @@ class TextMessage(Message):
"""Send a message without a separate thread."""
cl = client.get_client()
log_message(
"Sending Message Direct", repr(self).rstrip("\n"), self.message, tocall=self.tocall,
fromcall=self.fromcall
"Sending Message Direct",
repr(self).rstrip("\n"),
self.message,
tocall=self.tocall,
fromcall=self.fromcall,
)
cl.sendall(repr(self))
@ -182,19 +191,16 @@ class TextMessage(Message):
class AckMessage(Message):
"""Class for building Acks and sending them."""
def __init__(self, fromcall, tocall, msg_id):
super(AckMessage, self).__init__(fromcall, tocall, msg_id=msg_id)
def __repr__(self):
return "{}>APRS::{}:ack{}\n".format(
self.fromcall, self.tocall.ljust(9), self.id)
self.fromcall, self.tocall.ljust(9), self.id
)
def __str__(self):
return "From({}) TO({}) Ack ({})".format(
self.fromcall, self.tocall,
self.id
)
return "From({}) TO({}) Ack ({})".format(self.fromcall, self.tocall, self.id)
def send_thread(self):
"""Separate thread to send acks with retries."""
@ -216,10 +222,9 @@ class AckMessage(Message):
def send(self):
LOG.debug("Send ACK({}:{}) to radio.".format(self.tocall, self.id))
thread = threading.Thread(
target=self.send_thread, name="send_ack"
)
thread = threading.Thread(target=self.send_thread, name="send_ack")
thread.start()
# end send_ack()
def send_direct(self):

186
aprsd/threads.py Normal file
View File

@ -0,0 +1,186 @@
import logging
import queue
import threading
import time
import aprslib
from aprsd import client, messaging, plugin
LOG = logging.getLogger("APRSD")
class APRSDThread(threading.Thread):
def __init__(self, name, msg_queues, config):
super(APRSDThread, self).__init__(name=name)
self.msg_queues = msg_queues
self.config = config
self.thread_stop = False
def stop(self):
self.thread_stop = True
def run(self):
while not self.thread_stop:
self._run()
class APRSDRXThread(APRSDThread):
def __init__(self, msg_queues, config):
super(APRSDRXThread, self).__init__("RX_MSG", msg_queues, config)
self.thread_stop = False
def stop(self):
self.thread_stop = True
self.aprs.stop()
def callback(self, packet):
try:
packet = aprslib.parse(packet)
print(packet)
except (aprslib.ParseError, aprslib.UnknownFormat):
pass
def run(self):
LOG.info("Starting")
while not self.thread_stop:
aprs_client = client.get_client()
# setup the consumer of messages and block until a messages
try:
# This will register a packet consumer with aprslib
# When new packets come in the consumer will process
# the packet
# Do a partial here because the consumer signature doesn't allow
# For kwargs to be passed in to the consumer func we declare
# and the aprslib developer didn't want to allow a PR to add
# kwargs. :(
# https://github.com/rossengeorgiev/aprs-python/pull/56
aprs_client.consumer(self.process_packet, raw=False, blocking=False)
except aprslib.exceptions.ConnectionDrop:
LOG.error("Connection dropped, reconnecting")
time.sleep(5)
# Force the deletion of the client object connected to aprs
# This will cause a reconnect, next time client.get_client()
# is called
client.Client().reset()
LOG.info("Exiting ")
def process_ack_packet(self, packet):
ack_num = packet.get("msgNo")
LOG.info("Got ack for message {}".format(ack_num))
messaging.log_message(
"ACK", packet["raw"], None, ack=ack_num, fromcall=packet["from"]
)
messaging.ack_dict.update({int(ack_num): 1})
return
def process_mic_e_packet(self, packet):
LOG.info("Mic-E Packet detected. Currenlty unsupported.")
messaging.log_packet(packet)
return
def process_message_packet(self, packet):
LOG.info("Got a message packet")
fromcall = packet["from"]
message = packet.get("message_text", None)
msg_id = packet.get("msgNo", None)
if not msg_id:
msg_id = "0"
messaging.log_message(
"Received Message", packet["raw"], message, fromcall=fromcall, ack=msg_id
)
found_command = False
# Get singleton of the PM
pm = plugin.PluginManager()
try:
results = pm.run(fromcall=fromcall, message=message, ack=msg_id)
for reply in results:
found_command = True
# A plugin can return a null message flag which signals
# us that they processed the message correctly, but have
# nothing to reply with, so we avoid replying with a usage string
if reply is not messaging.NULL_MESSAGE:
LOG.debug("Sending '{}'".format(reply))
# msg = {"fromcall": fromcall, "msg": reply}
msg = messaging.TextMessage(
self.config["aprs"]["login"], fromcall, reply
)
self.msg_queues["tx"].put(msg)
else:
LOG.debug("Got NULL MESSAGE from plugin")
if not found_command:
plugins = pm.get_plugins()
names = [x.command_name for x in plugins]
names.sort()
reply = "Usage: {}".format(", ".join(names))
# messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(
self.config["aprs"]["login"], fromcall, reply
)
self.msg_queues["tx"].put(msg)
except Exception as ex:
LOG.exception("Plugin failed!!!", ex)
reply = "A Plugin failed! try again?"
# messaging.send_message(fromcall, reply)
msg = messaging.TextMessage(self.config["aprs"]["login"], fromcall, reply)
self.msg_queues["tx"].put(msg)
# let any threads do their thing, then ack
# send an ack last
ack = messaging.AckMessage(
self.config["aprs"]["login"], fromcall, msg_id=msg_id
)
ack.send()
LOG.debug("Packet processing complete")
def process_packet(self, packet):
"""Process a packet recieved from aprs-is server."""
LOG.debug("Process packet! {}".format(self.msg_queues))
try:
LOG.debug("Got message: {}".format(packet))
msg = packet.get("message_text", None)
msg_format = packet.get("format", None)
msg_response = packet.get("response", None)
if msg_format == "message" and msg:
# we want to send the message through the
# plugins
self.process_message_packet(packet)
return
elif msg_response == "ack":
self.process_ack_packet(packet)
return
if msg_format == "mic-e":
# process a mic-e packet
self.process_mic_e_packet(packet)
return
except (aprslib.ParseError, aprslib.UnknownFormat) as exp:
LOG.exception("Failed to parse packet from aprs-is", exp)
class APRSDTXThread(APRSDThread):
def __init__(self, msg_queues, config):
super(APRSDTXThread, self).__init__("TX_MSG", msg_queues, config)
def run(self):
LOG.info("Starting")
while not self.thread_stop:
try:
msg = self.msg_queues["tx"].get(timeout=0.1)
LOG.info("TXQ: got message '{}'".format(msg))
msg.send()
except queue.Empty:
pass
LOG.info("Exiting ")