From e37f99a6ddbb32e5d1a59f30c7179bb81451fc6a Mon Sep 17 00:00:00 2001 From: Hemna Date: Sun, 18 Dec 2022 21:44:23 -0500 Subject: [PATCH] reworked collecting and reporting stats This is the start of the cleanup of reporting of packet stats --- aprsd/cmds/send_message.py | 3 +- aprsd/cmds/webchat.py | 1 - aprsd/flask.py | 11 ++- aprsd/packets/core.py | 31 +++++-- aprsd/packets/packet_list.py | 20 +++-- aprsd/packets/tracker.py | 2 - aprsd/stats.py | 130 ++++++++++++---------------- aprsd/threads/keep_alive.py | 8 +- aprsd/threads/rx.py | 4 +- aprsd/threads/tx.py | 35 +++++--- aprsd/utils/json.py | 60 +++++++++++++ aprsd/web/admin/static/js/charts.js | 3 +- aprsd/web/admin/static/js/main.js | 40 +++++---- 13 files changed, 208 insertions(+), 140 deletions(-) create mode 100644 aprsd/utils/json.py diff --git a/aprsd/cmds/send_message.py b/aprsd/cmds/send_message.py index 08a236f..469901d 100644 --- a/aprsd/cmds/send_message.py +++ b/aprsd/cmds/send_message.py @@ -100,7 +100,8 @@ def send_message( global got_ack, got_response cl = client.factory.create() packet = cl.decode_packet(packet) - packet.log("RX_PKT") + packets.PacketList().rx(packet) + packet.log("RX") # LOG.debug("Got packet back {}".format(packet)) if isinstance(packet, packets.AckPacket): got_ack = True diff --git a/aprsd/cmds/webchat.py b/aprsd/cmds/webchat.py index 17ca3b5..d0c0b9b 100644 --- a/aprsd/cmds/webchat.py +++ b/aprsd/cmds/webchat.py @@ -185,7 +185,6 @@ class WebChatProcessPacketThread(rx.APRSDProcessPacketThread): fromcall = packet.from_call packets.PacketList().rx(packet) - stats.APRSDStats().msgs_rx_inc() message = packet.get("message_text", None) msg = { "id": 0, diff --git a/aprsd/flask.py b/aprsd/flask.py index 2b85857..8f4ddf6 100644 --- a/aprsd/flask.py +++ b/aprsd/flask.py @@ -380,7 +380,12 @@ class APRSDFlask(flask_classful.FlaskView): @auth.login_required def packets(self): packet_list = packets.PacketList().get() - return json.dumps(packet_list) + tmp_list = [] + for pkt in packet_list: + tmp_list.append(pkt.json) + + LOG.info(f"PACKETS {tmp_list}") + return json.dumps(tmp_list) @auth.login_required def plugins(self): @@ -420,8 +425,8 @@ class APRSDFlask(flask_classful.FlaskView): stats_dict["aprsd"]["watch_list"] = new_list packet_list = packets.PacketList() - rx = packet_list.total_received() - tx = packet_list.total_sent() + rx = packet_list.total_rx() + tx = packet_list.total_tx() stats_dict["packets"] = { "sent": tx, "received": rx, diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index c476472..db359d2 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -1,6 +1,7 @@ import abc from dataclasses import asdict, dataclass, field import datetime +import json import logging import re import time @@ -9,9 +10,11 @@ from typing import List import dacite -from aprsd import client, stats +from aprsd import client +from aprsd.packets.packet_list import PacketList # noqa: F401 from aprsd.threads import tx from aprsd.utils import counter +from aprsd.utils import json as aprsd_json LOG = logging.getLogger("APRSD") @@ -57,16 +60,26 @@ class Packet(metaclass=abc.ABCMeta): raw_dict: dict = field(repr=False, default_factory=lambda: {}) # Fields related to sending packets out - send_count: int = field(repr=False, default=1) + send_count: int = field(repr=False, default=0) retry_count: int = field(repr=False, default=3) last_send_time: datetime.timedelta = field(repr=False, default=None) - last_send_attempt: int = field(repr=False, default=0) # Do we allow this packet to be saved to send later? allow_delay: bool = field(repr=False, default=True) def __post__init__(self): LOG.warning(f"POST INIT {self}") + @property + def __dict__(self): + return asdict(self) + + @property + def json(self): + """ + get the json formated string + """ + return json.dumps(self.__dict__, cls=aprsd_json.EnhancedJSONEncoder) + def get(self, key, default=None): """Emulate a getter on a dict.""" if hasattr(self, key): @@ -122,13 +135,13 @@ class Packet(metaclass=abc.ABCMeta): log_list = ["\n"] name = self.__class__.__name__ if header: - if isinstance(self, AckPacket) and "tx" in header.lower(): + if "tx" in header.lower(): log_list.append( - f"{header}____________({name}__" - f"TX:{self.send_count} of {self.retry_count})", + f"{header}________({name} " + f"TX:{self.send_count+1} of {self.retry_count})", ) else: - log_list.append(f"{header}____________({name})") + log_list.append(f"{header}________({name})") # log_list.append(f" Packet : {self.__class__.__name__}") log_list.append(f" Raw : {self.raw}") if self.to_call: @@ -148,7 +161,7 @@ class Packet(metaclass=abc.ABCMeta): if self.msgNo: log_list.append(f" Msg # : {self.msgNo}") - log_list.append(f"{header}____________({name})") + log_list.append(f"{header}________({name})") LOG.info("\n".join(log_list)) LOG.debug(self) @@ -178,7 +191,7 @@ class Packet(metaclass=abc.ABCMeta): cl = client.factory.create().client self.log(header="TX Message Direct") cl.send(self.raw) - stats.APRSDStats().msgs_tx_inc() + PacketList().tx(self) @dataclass diff --git a/aprsd/packets/packet_list.py b/aprsd/packets/packet_list.py index db850c6..d9c2c94 100644 --- a/aprsd/packets/packet_list.py +++ b/aprsd/packets/packet_list.py @@ -3,7 +3,7 @@ import threading import wrapt -from aprsd import utils +from aprsd import stats, utils from aprsd.packets import seen_list @@ -19,8 +19,8 @@ class PacketList: packet_list: utils.RingBuffer = utils.RingBuffer(1000) - total_recv: int = 0 - total_tx: int = 0 + _total_rx: int = 0 + _total_tx: int = 0 def __new__(cls, *args, **kwargs): if cls._instance is None: @@ -43,25 +43,27 @@ class PacketList: @wrapt.synchronized(lock) def rx(self, packet): """Add a packet that was received.""" - self.total_recv += 1 + self._total_rx += 1 self.packet_list.append(packet) seen_list.SeenList().update_seen(packet) + stats.APRSDStats().rx(packet) @wrapt.synchronized(lock) def tx(self, packet): """Add a packet that was received.""" - self.total_tx += 1 + self._total_tx += 1 self.packet_list.append(packet) seen_list.SeenList().update_seen(packet) + stats.APRSDStats().tx(packet) @wrapt.synchronized(lock) def get(self): return self.packet_list.get() @wrapt.synchronized(lock) - def total_received(self): - return self.total_recv + def total_rx(self): + return self._total_rx @wrapt.synchronized(lock) - def total_sent(self): - return self.total_tx + def total_tx(self): + return self._total_tx diff --git a/aprsd/packets/tracker.py b/aprsd/packets/tracker.py index 8ec7b67..8d3d931 100644 --- a/aprsd/packets/tracker.py +++ b/aprsd/packets/tracker.py @@ -3,7 +3,6 @@ import threading import wrapt -from aprsd import stats from aprsd.utils import objectstore @@ -76,7 +75,6 @@ class PacketTrack(objectstore.ObjectStoreMixin): def add(self, packet): key = int(packet.msgNo) self.data[key] = packet - stats.APRSDStats().msgs_tracked_inc() self.total_tracked += 1 @wrapt.synchronized(lock) diff --git a/aprsd/stats.py b/aprsd/stats.py index cc8dc8f..91b9b8c 100644 --- a/aprsd/stats.py +++ b/aprsd/stats.py @@ -21,15 +21,6 @@ class APRSDStats: _aprsis_server = None _aprsis_keepalive = None - _msgs_tracked = 0 - _msgs_tx = 0 - _msgs_rx = 0 - - _msgs_mice_rx = 0 - - _ack_tx = 0 - _ack_rx = 0 - _email_thread_last_time = None _email_tx = 0 _email_rx = 0 @@ -37,6 +28,37 @@ class APRSDStats: _mem_current = 0 _mem_peak = 0 + _pkt_cnt = { + "Packet": { + "tx": 0, + "rx": 0, + }, + "AckPacket": { + "tx": 0, + "rx": 0, + }, + "GPSPacket": { + "tx": 0, + "rx": 0, + }, + "StatusPacket": { + "tx": 0, + "rx": 0, + }, + "MicEPacket": { + "tx": 0, + "rx": 0, + }, + "MessagePacket": { + "tx": 0, + "rx": 0, + }, + "WeatherPacket": { + "tx": 0, + "rx": 0, + }, + } + def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) @@ -90,67 +112,18 @@ class APRSDStats: def set_aprsis_keepalive(self): self._aprsis_keepalive = datetime.datetime.now() - def rx_packet(self, packet): - if isinstance(packet, packets.MessagePacket): - self.msgs_rx_inc() - elif isinstance(packet, packets.MicEPacket): - self.msgs_mice_inc() - elif isinstance(packet, packets.AckPacket): - self.ack_rx_inc() + def rx(self, packet): + type = packet.__class__.__name__ + self._pkt_cnt[type]["rx"] += 1 - @wrapt.synchronized(lock) - @property - def msgs_tx(self): - return self._msgs_tx - - @wrapt.synchronized(lock) - def msgs_tx_inc(self): - self._msgs_tx += 1 - - @wrapt.synchronized(lock) - @property - def msgs_rx(self): - return self._msgs_rx - - @wrapt.synchronized(lock) - def msgs_rx_inc(self): - self._msgs_rx += 1 - - @wrapt.synchronized(lock) - @property - def msgs_mice_rx(self): - return self._msgs_mice_rx - - @wrapt.synchronized(lock) - def msgs_mice_inc(self): - self._msgs_mice_rx += 1 - - @wrapt.synchronized(lock) - @property - def ack_tx(self): - return self._ack_tx - - @wrapt.synchronized(lock) - def ack_tx_inc(self): - self._ack_tx += 1 - - @wrapt.synchronized(lock) - @property - def ack_rx(self): - return self._ack_rx - - @wrapt.synchronized(lock) - def ack_rx_inc(self): - self._ack_rx += 1 + def tx(self, packet): + type = packet.__class__.__name__ + self._pkt_cnt[type]["tx"] += 1 @wrapt.synchronized(lock) @property def msgs_tracked(self): - return self._msgs_tracked - - @wrapt.synchronized(lock) - def msgs_tracked_inc(self): - self._msgs_tracked += 1 + return packets.PacketTrack().total_tracked @wrapt.synchronized(lock) @property @@ -212,11 +185,13 @@ class APRSDStats: wl = packets.WatchList() sl = packets.SeenList() + pl = packets.PacketList() stats = { "aprsd": { "version": aprsd.__version__, "uptime": utils.strfdelta(self.uptime), + "callsign": self.config["aprsd"]["callsign"], "memory_current": int(self.memory), "memory_current_str": utils.human_size(self.memory), "memory_peak": int(self.memory_peak), @@ -229,18 +204,20 @@ class APRSDStats: "callsign": self.config["aprs"]["login"], "last_update": last_aprsis_keepalive, }, + "packets": { + "tracked": int(pl.total_tx() + pl.total_rx()), + "sent": int(pl.total_tx()), + "received": int(pl.total_rx()), + }, "messages": { - "tracked": int(self.msgs_tracked), - "sent": int(self.msgs_tx), - "recieved": int(self.msgs_rx), - "ack_sent": int(self.ack_tx), - "ack_recieved": int(self.ack_rx), - "mic-e recieved": int(self.msgs_mice_rx), + "sent": self._pkt_cnt["MessagePacket"]["tx"], + "received": self._pkt_cnt["MessagePacket"]["tx"], + "ack_sent": self._pkt_cnt["AckPacket"]["tx"], }, "email": { "enabled": self.config["aprsd"]["email"]["enabled"], "sent": int(self._email_tx), - "recieved": int(self._email_rx), + "received": int(self._email_rx), "thread_last_update": last_update, }, "plugins": plugin_stats, @@ -248,15 +225,16 @@ class APRSDStats: return stats def __str__(self): + pl = packets.PacketList() return ( "Uptime:{} Msgs TX:{} RX:{} " "ACK: TX:{} RX:{} " "Email TX:{} RX:{} LastLoop:{} ".format( self.uptime, - self._msgs_tx, - self._msgs_rx, - self._ack_tx, - self._ack_rx, + pl.total_tx(), + pl.total_rx(), + self._pkt_cnt["AckPacket"]["tx"], + self._pkt_cnt["AckPacket"]["rx"], self._email_tx, self._email_rx, self._email_thread_last_time, diff --git a/aprsd/threads/keep_alive.py b/aprsd/threads/keep_alive.py index 4813663..9c8007f 100644 --- a/aprsd/threads/keep_alive.py +++ b/aprsd/threads/keep_alive.py @@ -56,11 +56,11 @@ class KeepAliveThread(APRSDThread): ).format( login, utils.strfdelta(stats_obj.uptime), - pl.total_recv, - pl.total_tx, + pl.total_rx(), + pl.total_tx(), tracked_packets, - stats_obj.msgs_tx, - stats_obj.msgs_rx, + stats_obj._pkt_cnt["MessagePacket"]["tx"], + stats_obj._pkt_cnt["MessagePacket"]["rx"], last_msg_time, email_thread_time, utils.human_size(current), diff --git a/aprsd/threads/rx.py b/aprsd/threads/rx.py index 3ff8c9e..173b024 100644 --- a/aprsd/threads/rx.py +++ b/aprsd/threads/rx.py @@ -5,7 +5,7 @@ import time import aprslib -from aprsd import client, packets, plugin, stats +from aprsd import client, packets, plugin from aprsd.threads import APRSDThread @@ -91,7 +91,6 @@ class APRSDProcessPacketThread(APRSDThread): LOG.info(f"Got ack for message {ack_num}") pkt_tracker = packets.PacketTrack() pkt_tracker.remove(ack_num) - stats.APRSDStats().ack_rx_inc() return def loop(self): @@ -130,7 +129,6 @@ class APRSDProcessPacketThread(APRSDThread): if isinstance(packet, packets.MessagePacket): if to_call and to_call.lower() == our_call: # It's a MessagePacket and it's for us! - stats.APRSDStats().msgs_rx_inc() # let any threads do their thing, then ack # send an ack last ack_pkt = packets.AckPacket( diff --git a/aprsd/threads/tx.py b/aprsd/threads/tx.py index b8fca07..ac3651a 100644 --- a/aprsd/threads/tx.py +++ b/aprsd/threads/tx.py @@ -2,7 +2,7 @@ import datetime import logging import time -from aprsd import client, stats +from aprsd import client from aprsd import threads as aprsd_threads from aprsd.packets import packet_list, tracker @@ -36,14 +36,23 @@ class SendPacketThread(aprsd_threads.APRSDThread): if not packet: # The message has been removed from the tracking queue # So it got acked and we are done. - LOG.info("Message Send Complete via Ack.") + LOG.info( + f"{packet.__class__.__name__}" + f"({packet.msgNo}) " + "Message Send Complete via Ack.", + ) return False else: send_now = False - if packet.last_send_attempt == packet.retry_count: + if packet.send_count == packet.retry_count: # we reached the send limit, don't send again # TODO(hemna) - Need to put this in a delayed queue? - LOG.info("Message Send Complete. Max attempts reached.") + LOG.info( + f"{packet.__class__.__name__} " + f"({packet.msgNo}) " + "Message Send Complete. Max attempts reached" + f" {packet.retry_count}", + ) if not packet.allow_delay: pkt_tracker.remove(packet.msgNo) return False @@ -52,7 +61,7 @@ class SendPacketThread(aprsd_threads.APRSDThread): if packet.last_send_time: # Message has a last send time tracking now = datetime.datetime.now() - sleeptime = (packet.last_send_attempt + 1) * 31 + sleeptime = (packet.send_count + 1) * 31 delta = now - packet.last_send_time if delta > datetime.timedelta(seconds=sleeptime): # It's time to try to send it again @@ -66,10 +75,9 @@ class SendPacketThread(aprsd_threads.APRSDThread): packet.log("TX") cl = client.factory.create().client cl.send(packet.raw) - stats.APRSDStats().msgs_tx_inc() packet_list.PacketList().tx(packet) packet.last_send_time = datetime.datetime.now() - packet.last_send_attempt += 1 + packet.send_count += 1 time.sleep(1) # Make sure we get called again. @@ -87,10 +95,15 @@ class SendAckThread(aprsd_threads.APRSDThread): def loop(self): """Separate thread to send acks with retries.""" send_now = False - if self.packet.last_send_attempt == self.packet.retry_count: + if self.packet.send_count == self.packet.retry_count: # we reached the send limit, don't send again # TODO(hemna) - Need to put this in a delayed queue? - LOG.info("Ack Send Complete. Max attempts reached.") + LOG.info( + f"{self.packet.__class__.__name__}" + f"({self.packet.msgNo}) " + "Send Complete. Max attempts reached" + f" {self.packet.retry_count}", + ) return False if self.packet.last_send_time: @@ -113,10 +126,8 @@ class SendAckThread(aprsd_threads.APRSDThread): cl = client.factory.create().client self.packet.log("TX") cl.send(self.packet.raw) - self.packet.send_count += 1 - stats.APRSDStats().ack_tx_inc() packet_list.PacketList().tx(self.packet) - self.packet.last_send_attempt += 1 + self.packet.send_count += 1 self.packet.last_send_time = datetime.datetime.now() time.sleep(1) diff --git a/aprsd/utils/json.py b/aprsd/utils/json.py new file mode 100644 index 0000000..8876738 --- /dev/null +++ b/aprsd/utils/json.py @@ -0,0 +1,60 @@ +import datetime +import decimal +import json +import sys + + +class EnhancedJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, datetime.datetime): + args = ( + "year", "month", "day", "hour", "minute", + "second", "microsecond", + ) + return { + "__type__": "datetime.datetime", + "args": [getattr(obj, a) for a in args], + } + elif isinstance(obj, datetime.date): + args = ("year", "month", "day") + return { + "__type__": "datetime.date", + "args": [getattr(obj, a) for a in args], + } + elif isinstance(obj, datetime.time): + args = ("hour", "minute", "second", "microsecond") + return { + "__type__": "datetime.time", + "args": [getattr(obj, a) for a in args], + } + elif isinstance(obj, datetime.timedelta): + args = ("days", "seconds", "microseconds") + return { + "__type__": "datetime.timedelta", + "args": [getattr(obj, a) for a in args], + } + elif isinstance(obj, decimal.Decimal): + return { + "__type__": "decimal.Decimal", + "args": [str(obj)], + } + else: + return super().default(obj) + + +class EnhancedJSONDecoder(json.JSONDecoder): + + def __init__(self, *args, **kwargs): + super().__init__( + *args, object_hook=self.object_hook, + **kwargs, + ) + + def object_hook(self, d): + if "__type__" not in d: + return d + o = sys.modules[__name__] + for e in d["__type__"].split("."): + o = getattr(o, e) + args, kwargs = d.get("args", ()), d.get("kwargs", {}) + return o(*args, **kwargs) diff --git a/aprsd/web/admin/static/js/charts.js b/aprsd/web/admin/static/js/charts.js index 9d4ed65..4ceb6d7 100644 --- a/aprsd/web/admin/static/js/charts.js +++ b/aprsd/web/admin/static/js/charts.js @@ -219,6 +219,7 @@ function updateQuadData(chart, label, first, second, third, fourth) { } function update_stats( data ) { + our_callsign = data["stats"]["aprsd"]["callsign"]; $("#version").text( data["stats"]["aprsd"]["version"] ); $("#aprs_connection").html( data["aprs_connection"] ); $("#uptime").text( "uptime: " + data["stats"]["aprsd"]["uptime"] ); @@ -226,7 +227,7 @@ function update_stats( data ) { $("#jsonstats").html(html_pretty); short_time = data["time"].split(/\s(.+)/)[1]; updateDualData(packets_chart, short_time, data["stats"]["packets"]["sent"], data["stats"]["packets"]["received"]); - updateQuadData(message_chart, short_time, data["stats"]["messages"]["sent"], data["stats"]["messages"]["recieved"], data["stats"]["messages"]["ack_sent"], data["stats"]["messages"]["ack_recieved"]); + updateQuadData(message_chart, short_time, data["stats"]["messages"]["sent"], data["stats"]["messages"]["received"], data["stats"]["messages"]["ack_sent"], data["stats"]["messages"]["ack_recieved"]); updateDualData(email_chart, short_time, data["stats"]["email"]["sent"], data["stats"]["email"]["recieved"]); updateDualData(memory_chart, short_time, data["stats"]["aprsd"]["memory_peak"], data["stats"]["aprsd"]["memory_current"]); } diff --git a/aprsd/web/admin/static/js/main.js b/aprsd/web/admin/static/js/main.js index 40b3942..491b894 100644 --- a/aprsd/web/admin/static/js/main.js +++ b/aprsd/web/admin/static/js/main.js @@ -1,5 +1,6 @@ // watchlist is a dict of ham callsign => symbol, packets var watchlist = {}; +var our_callsign = ""; function aprs_img(item, x_offset, y_offset) { var x = x_offset * -16; @@ -107,34 +108,35 @@ function update_packets( data ) { packetsdiv.html('') } jQuery.each(data, function(i, val) { - update_watchlist_from_packet(val['from'], val); - if ( packet_list.hasOwnProperty(val["ts"]) == false ) { + pkt = JSON.parse(val); + update_watchlist_from_packet(pkt['from_call'], pkt); + if ( packet_list.hasOwnProperty(val["timestamp"]) == false ) { // Store the packet - packet_list[val["ts"]] = val; - ts_str = val["ts"].toString(); - ts = ts_str.split(".")[0]*1000; - var d = new Date(ts).toLocaleDateString("en-US") - var t = new Date(ts).toLocaleTimeString("en-US") - if (val.hasOwnProperty('from') == false) { - from = val['fromcall'] - title_id = 'title_tx' + packet_list[pkt["timestamp"]] = pkt; + //ts_str = val["timestamp"].toString(); + //ts = ts_str.split(".")[0]*1000; + ts = pkt["timestamp"] + var d = new Date(ts).toLocaleDateString("en-US"); + var t = new Date(ts).toLocaleTimeString("en-US"); + var from_call = pkt['from_call']; + if (from_call == our_callsign) { + title_id = 'title_tx'; } else { - from = val['from'] - title_id = 'title_rx' + title_id = 'title_rx'; } - var from_to = d + " " + t + "    " + from + " > " + var from_to = d + " " + t + "    " + from_call + " > " if (val.hasOwnProperty('addresse')) { - from_to = from_to + val['addresse'] - } else if (val.hasOwnProperty('tocall')) { - from_to = from_to + val['tocall'] - } else if (val.hasOwnProperty('format') && val['format'] == 'mic-e') { + from_to = from_to + pkt['addresse'] + } else if (pkt.hasOwnProperty('to_call')) { + from_to = from_to + pkt['to_call'] + } else if (pkt.hasOwnProperty('format') && pkt['format'] == 'mic-e') { from_to = from_to + "Mic-E" } - from_to = from_to + "  -  " + val['raw'] + from_to = from_to + "  -  " + pkt['raw'] - json_pretty = Prism.highlight(JSON.stringify(val, null, '\t'), Prism.languages.json, 'json'); + json_pretty = Prism.highlight(JSON.stringify(pkt, null, '\t'), Prism.languages.json, 'json'); pkt_html = '
' + from_to + '
' + json_pretty + '

' packetsdiv.prepend(pkt_html); }