' + json_pretty + '
diff --git a/aprsd/client.py b/aprsd/client.py index a2d5e45..dd1e248 100644 --- a/aprsd/client.py +++ b/aprsd/client.py @@ -38,6 +38,11 @@ class Client: if config: self.config = config + def new(self): + obj = super().__new__(Client) + obj.config = self.config + return obj + @property def client(self): if not self.aprs_client: @@ -118,15 +123,22 @@ class Aprsdis(aprslib.IS): self.select_timeout, ) if not readable: - continue + if not blocking: + break + else: + continue try: short_buf = self.sock.recv(4096) # sock.recv returns empty if the connection drops if not short_buf: - self.logger.error("socket.recv(): returned empty") - raise aprslib.ConnectionDrop("connection dropped") + if not blocking: + # We could just not be blocking, so empty is expected + continue + else: + self.logger.error("socket.recv(): returned empty") + raise aprslib.ConnectionDrop("connection dropped") except OSError as e: # self.logger.error("socket error on recv(): %s" % str(e)) if "Resource temporarily unavailable" in str(e): @@ -215,7 +227,7 @@ class Aprsdis(aprslib.IS): line = b"" - while True: + while True and not self.thread_stop: try: for line in self._socket_readlines(blocking): if line[0:1] != b"#": diff --git a/aprsd/flask.py b/aprsd/flask.py index e850899..bf4a81c 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -4,14 +4,22 @@ import logging from logging import NullHandler from logging.handlers import RotatingFileHandler import sys +import threading +import time +import aprslib +from aprslib.exceptions import LoginError import flask +from flask import request import flask_classful from flask_httpauth import HTTPBasicAuth +from flask_socketio import Namespace, SocketIO from werkzeug.security import check_password_hash, generate_password_hash import aprsd -from aprsd import kissclient, messaging, packets, plugin, stats, utils +from aprsd import ( + client, kissclient, messaging, packets, plugin, stats, threads, utils, +) LOG = logging.getLogger("APRSD") @@ -20,6 +28,72 @@ auth = HTTPBasicAuth() users = None +class SentMessages: + _instance = None + lock = None + + msgs = {} + + def __new__(cls, *args, **kwargs): + """This magic turns this into a singleton.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + # Put any initialization here. + cls.lock = threading.Lock() + return cls._instance + + def add(self, msg): + with self.lock: + self.msgs[msg.id] = self._create(msg.id) + self.msgs[msg.id]["from"] = msg.fromcall + self.msgs[msg.id]["to"] = msg.tocall + self.msgs[msg.id]["message"] = msg.message.rstrip("\n") + self.msgs[msg.id]["raw"] = str(msg).rstrip("\n") + + def _create(self, id): + return { + "id": id, + "ts": time.time(), + "ack": False, + "from": None, + "to": None, + "raw": None, + "message": None, + "status": None, + "last_update": None, + "reply": None, + } + + def __len__(self): + with self.lock: + return len(self.msgs.keys()) + + def get(self, id): + with self.lock: + if id in self.msgs: + return self.msgs[id] + + def get_all(self): + with self.lock: + return self.msgs + + def set_status(self, id, status): + with self.lock: + self.msgs[id]["last_update"] = str(datetime.datetime.now()) + self.msgs[id]["status"] = status + + def ack(self, id): + """The message got an ack!""" + with self.lock: + self.msgs[id]["last_update"] = str(datetime.datetime.now()) + self.msgs[id]["ack"] = True + + def reply(self, id, packet): + """We got a packet back from the sent message.""" + with self.lock: + self.msgs[id]["reply"] = packet + + # HTTPBasicAuth doesn't work on a class method. # This has to be out here. Rely on the APRSDFlask # class to initialize the users from the config @@ -31,6 +105,172 @@ def verify_password(username, password): return username +class SendMessageThread(threads.APRSDThread): + """Thread for sending a message from web.""" + + aprsis_client = None + request = None + got_ack = False + got_reply = False + + def __init__(self, config, info, msg, namespace): + self.config = config + self.request = info + self.msg = msg + self.namespace = namespace + self.start_time = datetime.datetime.now() + msg = "({} -> {}) : {}".format( + info["from"], + info["to"], + info["message"], + ) + super().__init__(f"WEB_SEND_MSG-{msg}") + + def setup_connection(self): + user = self.request["from"] + password = self.request["password"] + host = self.config["aprs"].get("host", "rotate.aprs.net") + port = self.config["aprs"].get("port", 14580) + connected = False + backoff = 1 + while not connected: + try: + LOG.info("Creating aprslib client") + aprs_client = client.Aprsdis( + user, + passwd=password, + host=host, + port=port, + ) + # Force the logging to be the same + aprs_client.logger = LOG + aprs_client.connect() + connected = True + backoff = 1 + except LoginError as e: + LOG.error(f"Failed to login to APRS-IS Server '{e}'") + connected = False + raise e + except Exception as e: + LOG.error(f"Unable to connect to APRS-IS server. '{e}' ") + time.sleep(backoff) + backoff = backoff * 2 + continue + LOG.debug(f"Logging in to APRS-IS with user '{user}'") + return aprs_client + + def run(self): + LOG.debug("Starting") + from_call = self.request["from"] + self.request["password"] + to_call = self.request["to"] + message = self.request["message"] + LOG.info( + "From: '{}' To: '{}' Send '{}'".format( + from_call, + to_call, + message, + ), + ) + + try: + self.aprs_client = self.setup_connection() + except LoginError as e: + f"Failed to setup Connection {e}" + + self.msg.send_direct(aprsis_client=self.aprs_client) + SentMessages().set_status(self.msg.id, "Sent") + + while not self.thread_stop: + can_loop = self.loop() + if not can_loop: + self.stop() + threads.APRSDThreadList().remove(self) + LOG.debug("Exiting") + + def rx_packet(self, packet): + global socketio + # LOG.debug("Got packet back {}".format(packet)) + resp = packet.get("response", None) + if resp == "ack": + ack_num = packet.get("msgNo") + LOG.info(f"We got ack for our sent message {ack_num}") + messaging.log_packet(packet) + SentMessages().ack(self.msg.id) + socketio.emit( + "ack", SentMessages().get(self.msg.id), + namespace="/ws", + ) + stats.APRSDStats().ack_rx_inc() + self.got_ack = True + if self.request["wait_reply"] == "0" or self.got_reply: + # We aren't waiting for a reply, so we can bail + self.stop() + self.thread_stop = self.aprs_client.thread_stop = True + else: + packets.PacketList().add(packet) + stats.APRSDStats().msgs_rx_inc() + message = packet.get("message_text", None) + fromcall = packet["from"] + msg_number = packet.get("msgNo", "0") + messaging.log_message( + "Received Message", + packet["raw"], + message, + fromcall=fromcall, + ack=msg_number, + ) + SentMessages().reply(self.msg.id, packet) + SentMessages().set_status(self.msg.id, "Got Reply") + socketio.emit( + "reply", SentMessages().get(self.msg.id), + namespace="/ws", + ) + + # Send the ack back? + ack = messaging.AckMessage( + self.request["from"], + fromcall, + msg_id=msg_number, + ) + ack.send_direct() + SentMessages().set_status(self.msg.id, "Ack Sent") + + # Now we can exit, since we are done. + self.got_reply = True + if self.got_ack: + self.stop() + self.thread_stop = self.aprs_client.thread_stop = True + + def loop(self): + # we have a general time limit expecting results of + # around 120 seconds before we exit + now = datetime.datetime.now() + start_delta = str(now - self.start_time) + delta = utils.parse_delta_str(start_delta) + d = datetime.timedelta(**delta) + max_timeout = {"hours": 0.0, "minutes": 1, "seconds": 0} + max_delta = datetime.timedelta(**max_timeout) + if d > max_delta: + LOG.error("XXXXXX Haven't completed everything in 60 seconds. BAIL!") + return False + + if self.got_ack and self.got_reply: + LOG.warning("We got everything already. BAIL") + return False + + try: + # This will register a packet consumer with aprslib + # When new packets come in the consumer will process + # the packet + self.aprs_client.consumer(self.rx_packet, raw=False, blocking=False) + except aprslib.exceptions.ConnectionDrop: + LOG.error("Connection dropped.") + return False + + return True + + class APRSDFlask(flask_classful.FlaskView): config = None @@ -115,6 +355,23 @@ class APRSDFlask(flask_classful.FlaskView): return flask.render_template("messages.html", messages=json.dumps(msgs)) + @auth.login_required + def send_message_status(self): + LOG.debug(request) + msgs = SentMessages() + info = msgs.get_all() + return json.dumps(info) + + @auth.login_required + def send_message(self): + LOG.debug(request) + if request.method == "GET": + return flask.render_template( + "send-message.html", + callsign=self.config["aprs"]["login"], + version=aprsd.__version__, + ) + @auth.login_required def packets(self): packet_list = packets.PacketList().get() @@ -177,6 +434,59 @@ class APRSDFlask(flask_classful.FlaskView): return json.dumps(self._stats()) +class SendMessageNamespace(Namespace): + _config = None + got_ack = False + reply_sent = False + msg = None + request = None + + def __init__(self, namespace=None, config=None): + self._config = config + super().__init__(namespace) + + def on_connect(self): + global socketio + LOG.debug("Web socket connected") + socketio.emit( + "connected", {"data": "Lets dance"}, + namespace="/ws", + ) + + def on_disconnect(self): + LOG.debug("WS Disconnected") + + def on_send(self, data): + global socketio + LOG.debug(f"WS: on_send {data}") + self.request = data + msg = messaging.TextMessage( + data["from"], data["to"], + data["message"], + ) + self.msg = msg + msgs = SentMessages() + msgs.add(msg) + msgs.set_status(msg.id, "Sending") + socketio.emit( + "sent", SentMessages().get(self.msg.id), + namespace="/ws", + ) + + socketio.start_background_task(self._start, self._config, data, msg, self) + LOG.warning("WS: on_send: exit") + + def _start(self, config, data, msg, namespace): + msg_thread = SendMessageThread(self._config, data, msg, self) + msg_thread.start() + + def handle_message(self, data): + LOG.debug(f"WS Data {data}") + + def handle_json(self, data): + LOG.debug(f"WS json {data}") + + def setup_logging(config, flask_app, loglevel, quiet): flask_log = logging.getLogger("werkzeug") @@ -211,6 +521,8 @@ def setup_logging(config, flask_app, loglevel, quiet): def init_flask(config, loglevel, quiet): + global socketio + flask_app = flask.Flask( "aprsd", static_url_path="/static", @@ -224,6 +536,17 @@ def init_flask(config, loglevel, quiet): flask_app.route("/stats", methods=["GET"])(server.stats) flask_app.route("/messages", methods=["GET"])(server.messages) flask_app.route("/packets", methods=["GET"])(server.packets) + flask_app.route("/send-message", methods=["GET"])(server.send_message) + flask_app.route("/send-message-status", methods=["GET"])(server.send_message_status) flask_app.route("/save", methods=["GET"])(server.save) flask_app.route("/plugins", methods=["GET"])(server.plugins) - return flask_app + + socketio = SocketIO( + flask_app, logger=False, engineio_logger=False, + async_mode="threading", + ) + # import eventlet + # eventlet.monkey_patch() + + socketio.on_namespace(SendMessageNamespace("/ws", config=config)) + return socketio, flask_app diff --git a/aprsd/main.py b/aprsd/main.py index bd9a711..30b2d84 100644 --- a/aprsd/main.py +++ b/aprsd/main.py @@ -513,8 +513,9 @@ def server( if web_enabled: flask_enabled = True - app = flask.init_flask(config, loglevel, quiet) - app.run( + (socketio, app) = flask.init_flask(config, loglevel, quiet) + socketio.run( + app, host=config["aprsd"]["web"]["host"], port=config["aprsd"]["web"]["port"], ) diff --git a/aprsd/messaging.py b/aprsd/messaging.py index f53ce0e..0fe54f6 100644 --- a/aprsd/messaging.py +++ b/aprsd/messaging.py @@ -299,7 +299,7 @@ class RawMessage(Message): thread = SendMessageThread(message=self) thread.start() - def send_direct(self): + def send_direct(self, aprsis_client=None): """Send a message without a separate thread.""" cl = self.get_transport() log_message( @@ -379,7 +379,7 @@ class TextMessage(Message): thread = SendMessageThread(message=self) thread.start() - def send_direct(self): + def send_direct(self, aprsis_client=None): """Send a message without a separate thread.""" cl = self.get_transport() log_message( @@ -391,6 +391,7 @@ class TextMessage(Message): ) cl.send(self) stats.APRSDStats().msgs_tx_inc() + packets.PacketList().add(self.dict()) class SendMessageThread(threads.APRSDThread): @@ -498,7 +499,7 @@ class AckMessage(Message): thread = SendAckThread(self) thread.start() - def send_direct(self): + def send_direct(self, aprsis_client=None): """Send an ack message without a separate thread.""" cl = self.get_transport() log_message( diff --git a/aprsd/packets.py b/aprsd/packets.py index ef5e3f5..8e80432 100644 --- a/aprsd/packets.py +++ b/aprsd/packets.py @@ -69,6 +69,7 @@ class WatchList: _instance = None callsigns = {} + config = None def __new__(cls, *args, **kwargs): if cls._instance is None: @@ -97,7 +98,7 @@ class WatchList: } def is_enabled(self): - if "watch_list" in self.config["aprsd"]: + if self.config and "watch_list" in self.config["aprsd"]: return self.config["aprsd"]["watch_list"].get("enabled", False) else: return False diff --git a/aprsd/web/static/js/send-message.js b/aprsd/web/static/js/send-message.js new file mode 100644 index 0000000..812cafc --- /dev/null +++ b/aprsd/web/static/js/send-message.js @@ -0,0 +1,145 @@ +msgs_list = {}; +var cleared = false; + +function size_dict(d){c=0; for (i in d) ++c; return c} + +function init_messages() { + const socket = io("/ws"); + socket.on('connect', function () { + console.log("Connected to socketio"); + }); + socket.on('connected', function(msg) { + console.log("Connected!"); + console.log(msg); + }); + + socket.on("sent", function(msg) { + if (cleared == false) { + var msgsdiv = $("#msgsDiv"); + msgsdiv.html('') + cleared = true + } + add_msg(msg); + }); + + socket.on("ack", function(msg) { + update_msg(msg); + }); + socket.on("reply", function(msg) { + update_msg(msg); + }); + + $("#sendform").submit(function(event) { + event.preventDefault(); + + var $checkboxes = $(this).find('input[type=checkbox]'); + + //loop through the checkboxes and change to hidden fields + $checkboxes.each(function() { + if ($(this)[0].checked) { + $(this).attr('type', 'hidden'); + $(this).val(1); + } else { + $(this).attr('type', 'hidden'); + $(this).val(0); + } + }); + + msg = {'from': $('#from').val(), + 'password': $('#password').val(), + 'to': $('#to').val(), + 'message': $('#message').val(), + 'wait_reply': $('#wait_reply').val(), + } + + socket.emit("send", msg); + + //loop through the checkboxes and change to hidden fields + $checkboxes.each(function() { + $(this).attr('type', 'checkbox'); + }); + }); +} + +function add_msg(msg) { + var msgsdiv = $("#msgsDiv"); + + ts_str = msg["ts"].toString(); + ts = ts_str.split(".")[0]*1000; + var d = new Date(ts).toLocaleDateString("en-US") + var t = new Date(ts).toLocaleTimeString("en-US") + + from = msg['from'] + title_id = 'title_tx' + var from_to = d + " " + t + " " + from + " > " + + if (msg.hasOwnProperty('to')) { + from_to = from_to + msg['to'] + } + from_to = from_to + " - " + msg['message'] + + id = ts_str.split('.')[0] + pretty_id = "pretty_" + id + loader_id = "loader_" + id + ack_id = "ack_" + id + reply_id = "reply_" + id + span_id = "span_" + id + json_pretty = Prism.highlight(JSON.stringify(msg, null, '\t'), Prism.languages.json, 'json'); + msg_html = '
' + json_pretty + '