1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-06-14 20:32:25 -04:00

Fixed a problem with WeatherPacket

WeatherPacket was calling self.filter_for_send, which doesn't
exist.  It's self._filter_for_send.
This commit is contained in:
Hemna 2025-04-07 18:41:32 -04:00
parent 1c39546bb9
commit 5469610779

View File

@ -19,26 +19,26 @@ from loguru import logger
from aprsd.utils import counter from aprsd.utils import counter
# For mypy to be happy # For mypy to be happy
A = TypeVar("A", bound="DataClassJsonMixin") A = TypeVar('A', bound='DataClassJsonMixin')
Json = Union[dict, list, str, int, float, bool, None] Json = Union[dict, list, str, int, float, bool, None]
LOG = logging.getLogger() LOG = logging.getLogger()
LOGU = logger LOGU = logger
PACKET_TYPE_BULLETIN = "bulletin" PACKET_TYPE_BULLETIN = 'bulletin'
PACKET_TYPE_MESSAGE = "message" PACKET_TYPE_MESSAGE = 'message'
PACKET_TYPE_ACK = "ack" PACKET_TYPE_ACK = 'ack'
PACKET_TYPE_REJECT = "reject" PACKET_TYPE_REJECT = 'reject'
PACKET_TYPE_MICE = "mic-e" PACKET_TYPE_MICE = 'mic-e'
PACKET_TYPE_WX = "wx" PACKET_TYPE_WX = 'wx'
PACKET_TYPE_WEATHER = "weather" PACKET_TYPE_WEATHER = 'weather'
PACKET_TYPE_OBJECT = "object" PACKET_TYPE_OBJECT = 'object'
PACKET_TYPE_UNKNOWN = "unknown" PACKET_TYPE_UNKNOWN = 'unknown'
PACKET_TYPE_STATUS = "status" PACKET_TYPE_STATUS = 'status'
PACKET_TYPE_BEACON = "beacon" PACKET_TYPE_BEACON = 'beacon'
PACKET_TYPE_THIRDPARTY = "thirdparty" PACKET_TYPE_THIRDPARTY = 'thirdparty'
PACKET_TYPE_TELEMETRY = "telemetry-message" PACKET_TYPE_TELEMETRY = 'telemetry-message'
PACKET_TYPE_UNCOMPRESSED = "uncompressed" PACKET_TYPE_UNCOMPRESSED = 'uncompressed'
NO_DATE = datetime(1900, 10, 24) NO_DATE = datetime(1900, 10, 24)
@ -67,14 +67,14 @@ def _init_msgNo(): # noqa: N802
def _translate_fields(raw: dict) -> dict: def _translate_fields(raw: dict) -> dict:
# Direct key checks instead of iteration # Direct key checks instead of iteration
if "from" in raw: if 'from' in raw:
raw["from_call"] = raw.pop("from") raw['from_call'] = raw.pop('from')
if "to" in raw: if 'to' in raw:
raw["to_call"] = raw.pop("to") raw['to_call'] = raw.pop('to')
# addresse overrides to_call # addresse overrides to_call
if "addresse" in raw: if 'addresse' in raw:
raw["to_call"] = raw["addresse"] raw['to_call'] = raw['addresse']
return raw return raw
@ -82,7 +82,7 @@ def _translate_fields(raw: dict) -> dict:
@dataclass_json @dataclass_json
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class Packet: class Packet:
_type: str = field(default="Packet", hash=False) _type: str = field(default='Packet', hash=False)
from_call: Optional[str] = field(default=None) from_call: Optional[str] = field(default=None)
to_call: Optional[str] = field(default=None) to_call: Optional[str] = field(default=None)
addresse: Optional[str] = field(default=None) addresse: Optional[str] = field(default=None)
@ -120,7 +120,7 @@ class Packet:
@property @property
def key(self) -> str: def key(self) -> str:
"""Build a key for finding this packet in a dict.""" """Build a key for finding this packet in a dict."""
return f"{self.from_call}:{self.addresse}:{self.msgNo}" return f'{self.from_call}:{self.addresse}:{self.msgNo}'
def update_timestamp(self) -> None: def update_timestamp(self) -> None:
self.timestamp = _init_timestamp() self.timestamp = _init_timestamp()
@ -133,7 +133,7 @@ class Packet:
the human readable payload. the human readable payload.
""" """
self.prepare() self.prepare()
msg = self._filter_for_send(self.raw).rstrip("\n") msg = self._filter_for_send(self.raw).rstrip('\n')
return msg return msg
def prepare(self, create_msg_number=False) -> None: def prepare(self, create_msg_number=False) -> None:
@ -152,11 +152,11 @@ class Packet:
) )
# The base packet class has no real payload # The base packet class has no real payload
self.payload = f":{self.to_call.ljust(9)}" self.payload = f':{self.to_call.ljust(9)}'
def _build_raw(self) -> None: def _build_raw(self) -> None:
"""Build the self.raw which is what is sent over the air.""" """Build the self.raw which is what is sent over the air."""
self.raw = "{}>APZ100:{}".format( self.raw = '{}>APZ100:{}'.format(
self.from_call, self.from_call,
self.payload, self.payload,
) )
@ -168,13 +168,13 @@ class Packet:
# 67 displays 64 on the ftm400. (+3 {01 suffix) # 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs # feature req: break long ones into two msgs
if not msg: if not msg:
return "" return ''
message = msg[:67] message = msg[:67]
# We all miss George Carlin # We all miss George Carlin
return re.sub( return re.sub(
"fuck|shit|cunt|piss|cock|bitch", 'fuck|shit|cunt|piss|cock|bitch',
"****", '****',
message, message,
flags=re.IGNORECASE, flags=re.IGNORECASE,
) )
@ -183,100 +183,98 @@ class Packet:
"""Show the raw version of the packet""" """Show the raw version of the packet"""
self.prepare() self.prepare()
if not self.raw: if not self.raw:
raise ValueError("self.raw is unset") raise ValueError('self.raw is unset')
return self.raw return self.raw
def __repr__(self) -> str: def __repr__(self) -> str:
"""Build the repr version of the packet.""" """Build the repr version of the packet."""
return ( return (
f"{self.__class__.__name__}:" f'{self.__class__.__name__}: From: {self.from_call} To: {self.to_call}'
f" From: {self.from_call} "
f" To: {self.to_call}"
) )
@dataclass_json @dataclass_json
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class AckPacket(Packet): class AckPacket(Packet):
_type: str = field(default="AckPacket", hash=False) _type: str = field(default='AckPacket', hash=False)
def _build_payload(self): def _build_payload(self):
self.payload = f":{self.to_call: <9}:ack{self.msgNo}" self.payload = f':{self.to_call: <9}:ack{self.msgNo}'
@dataclass_json @dataclass_json
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class BulletinPacket(Packet): class BulletinPacket(Packet):
_type: str = "BulletinPacket" _type: str = 'BulletinPacket'
# Holds the encapsulated packet # Holds the encapsulated packet
bid: Optional[str] = field(default="1") bid: Optional[str] = field(default='1')
message_text: Optional[str] = field(default=None) message_text: Optional[str] = field(default=None)
@property @property
def key(self) -> str: def key(self) -> str:
"""Build a key for finding this packet in a dict.""" """Build a key for finding this packet in a dict."""
return f"{self.from_call}:BLN{self.bid}" return f'{self.from_call}:BLN{self.bid}'
@property @property
def human_info(self) -> str: def human_info(self) -> str:
return f"BLN{self.bid} {self.message_text}" return f'BLN{self.bid} {self.message_text}'
def _build_payload(self) -> None: def _build_payload(self) -> None:
self.payload = f":BLN{self.bid:<9}" f":{self.message_text}" self.payload = f':BLN{self.bid:<9}:{self.message_text}'
@dataclass_json @dataclass_json
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class RejectPacket(Packet): class RejectPacket(Packet):
_type: str = field(default="RejectPacket", hash=False) _type: str = field(default='RejectPacket', hash=False)
response: Optional[str] = field(default=None) response: Optional[str] = field(default=None)
def __post__init__(self): def __post__init__(self):
if self.response: if self.response:
LOG.warning("Response set!") LOG.warning('Response set!')
def _build_payload(self): def _build_payload(self):
self.payload = f":{self.to_call: <9}:rej{self.msgNo}" self.payload = f':{self.to_call: <9}:rej{self.msgNo}'
@dataclass_json @dataclass_json
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class MessagePacket(Packet): class MessagePacket(Packet):
_type: str = field(default="MessagePacket", hash=False) _type: str = field(default='MessagePacket', hash=False)
message_text: Optional[str] = field(default=None) message_text: Optional[str] = field(default=None)
@property @property
def human_info(self) -> str: def human_info(self) -> str:
self.prepare() self.prepare()
return self._filter_for_send(self.message_text).rstrip("\n") return self._filter_for_send(self.message_text).rstrip('\n')
def _build_payload(self): def _build_payload(self):
if self.msgNo: if self.msgNo:
self.payload = ":{}:{}{{{}".format( self.payload = ':{}:{}{{{}'.format(
self.to_call.ljust(9), self.to_call.ljust(9),
self._filter_for_send(self.message_text).rstrip("\n"), self._filter_for_send(self.message_text).rstrip('\n'),
str(self.msgNo), str(self.msgNo),
) )
else: else:
self.payload = ":{}:{}".format( self.payload = ':{}:{}'.format(
self.to_call.ljust(9), self.to_call.ljust(9),
self._filter_for_send(self.message_text).rstrip("\n"), self._filter_for_send(self.message_text).rstrip('\n'),
) )
@dataclass_json @dataclass_json
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class StatusPacket(Packet): class StatusPacket(Packet):
_type: str = field(default="StatusPacket", hash=False) _type: str = field(default='StatusPacket', hash=False)
status: Optional[str] = field(default=None) status: Optional[str] = field(default=None)
messagecapable: bool = field(default=False) messagecapable: bool = field(default=False)
comment: Optional[str] = field(default=None) comment: Optional[str] = field(default=None)
raw_timestamp: Optional[str] = field(default=None) raw_timestamp: Optional[str] = field(default=None)
def _build_payload(self): def _build_payload(self):
self.payload = ":{}:{}{{{}".format( self.payload = ':{}:{}{{{}'.format(
self.to_call.ljust(9), self.to_call.ljust(9),
self._filter_for_send(self.status).rstrip("\n"), self._filter_for_send(self.status).rstrip('\n'),
str(self.msgNo), str(self.msgNo),
) )
@ -289,7 +287,7 @@ class StatusPacket(Packet):
@dataclass_json @dataclass_json
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class GPSPacket(Packet): class GPSPacket(Packet):
_type: str = field(default="GPSPacket", hash=False) _type: str = field(default='GPSPacket', hash=False)
latitude: float = field(default=0.00) latitude: float = field(default=0.00)
longitude: float = field(default=0.00) longitude: float = field(default=0.00)
altitude: float = field(default=0.00) altitude: float = field(default=0.00)
@ -297,8 +295,8 @@ class GPSPacket(Packet):
posambiguity: int = field(default=0) posambiguity: int = field(default=0)
messagecapable: bool = field(default=False) messagecapable: bool = field(default=False)
comment: Optional[str] = field(default=None) comment: Optional[str] = field(default=None)
symbol: str = field(default="l") symbol: str = field(default='l')
symbol_table: str = field(default="/") symbol_table: str = field(default='/')
raw_timestamp: Optional[str] = field(default=None) raw_timestamp: Optional[str] = field(default=None)
object_name: Optional[str] = field(default=None) object_name: Optional[str] = field(default=None)
object_format: Optional[str] = field(default=None) object_format: Optional[str] = field(default=None)
@ -318,7 +316,7 @@ class GPSPacket(Packet):
def _build_time_zulu(self): def _build_time_zulu(self):
"""Build the timestamp in UTC/zulu.""" """Build the timestamp in UTC/zulu."""
if self.timestamp: if self.timestamp:
return datetime.utcfromtimestamp(self.timestamp).strftime("%d%H%M") return datetime.utcfromtimestamp(self.timestamp).strftime('%d%H%M')
def _build_payload(self): def _build_payload(self):
"""The payload is the non headers portion of the packet.""" """The payload is the non headers portion of the packet."""
@ -326,7 +324,7 @@ class GPSPacket(Packet):
lat = aprslib_util.latitude_to_ddm(self.latitude) lat = aprslib_util.latitude_to_ddm(self.latitude)
long = aprslib_util.longitude_to_ddm(self.longitude) long = aprslib_util.longitude_to_ddm(self.longitude)
payload = [ payload = [
"@" if self.timestamp else "!", '@' if self.timestamp else '!',
time_zulu, time_zulu,
lat, lat,
self.symbol_table, self.symbol_table,
@ -337,34 +335,34 @@ class GPSPacket(Packet):
if self.comment: if self.comment:
payload.append(self._filter_for_send(self.comment)) payload.append(self._filter_for_send(self.comment))
self.payload = "".join(payload) self.payload = ''.join(payload)
def _build_raw(self): def _build_raw(self):
self.raw = f"{self.from_call}>{self.to_call},WIDE2-1:" f"{self.payload}" self.raw = f'{self.from_call}>{self.to_call},WIDE2-1:{self.payload}'
@property @property
def human_info(self) -> str: def human_info(self) -> str:
h_str = [] h_str = []
h_str.append(f"Lat:{self.latitude:03.3f}") h_str.append(f'Lat:{self.latitude:03.3f}')
h_str.append(f"Lon:{self.longitude:03.3f}") h_str.append(f'Lon:{self.longitude:03.3f}')
if self.altitude: if self.altitude:
h_str.append(f"Altitude {self.altitude:03.0f}") h_str.append(f'Altitude {self.altitude:03.0f}')
if self.speed: if self.speed:
h_str.append(f"Speed {self.speed:03.0f}MPH") h_str.append(f'Speed {self.speed:03.0f}MPH')
if self.course: if self.course:
h_str.append(f"Course {self.course:03.0f}") h_str.append(f'Course {self.course:03.0f}')
if self.rng: if self.rng:
h_str.append(f"RNG {self.rng:03.0f}") h_str.append(f'RNG {self.rng:03.0f}')
if self.phg: if self.phg:
h_str.append(f"PHG {self.phg}") h_str.append(f'PHG {self.phg}')
return " ".join(h_str) return ' '.join(h_str)
@dataclass_json @dataclass_json
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class BeaconPacket(GPSPacket): class BeaconPacket(GPSPacket):
_type: str = field(default="BeaconPacket", hash=False) _type: str = field(default='BeaconPacket', hash=False)
def _build_payload(self): def _build_payload(self):
"""The payload is the non headers portion of the packet.""" """The payload is the non headers portion of the packet."""
@ -372,38 +370,38 @@ class BeaconPacket(GPSPacket):
lat = aprslib_util.latitude_to_ddm(self.latitude) lat = aprslib_util.latitude_to_ddm(self.latitude)
lon = aprslib_util.longitude_to_ddm(self.longitude) lon = aprslib_util.longitude_to_ddm(self.longitude)
self.payload = f"@{time_zulu}z{lat}{self.symbol_table}" f"{lon}" self.payload = f'@{time_zulu}z{lat}{self.symbol_table}{lon}'
if self.comment: if self.comment:
comment = self._filter_for_send(self.comment) comment = self._filter_for_send(self.comment)
self.payload = f"{self.payload}{self.symbol}{comment}" self.payload = f'{self.payload}{self.symbol}{comment}'
else: else:
self.payload = f"{self.payload}{self.symbol}APRSD Beacon" self.payload = f'{self.payload}{self.symbol}APRSD Beacon'
def _build_raw(self): def _build_raw(self):
self.raw = f"{self.from_call}>APZ100:" f"{self.payload}" self.raw = f'{self.from_call}>APZ100:{self.payload}'
@property @property
def key(self) -> str: def key(self) -> str:
"""Build a key for finding this packet in a dict.""" """Build a key for finding this packet in a dict."""
if self.raw_timestamp: if self.raw_timestamp:
return f"{self.from_call}:{self.raw_timestamp}" return f'{self.from_call}:{self.raw_timestamp}'
else: else:
return f"{self.from_call}:{self.human_info.replace(' ', '')}" return f'{self.from_call}:{self.human_info.replace(" ", "")}'
@property @property
def human_info(self) -> str: def human_info(self) -> str:
h_str = [] h_str = []
h_str.append(f"Lat:{self.latitude:03.3f}") h_str.append(f'Lat:{self.latitude:03.3f}')
h_str.append(f"Lon:{self.longitude:03.3f}") h_str.append(f'Lon:{self.longitude:03.3f}')
h_str.append(f"{self.comment}") h_str.append(f'{self.comment}')
return " ".join(h_str) return ' '.join(h_str)
@dataclass_json @dataclass_json
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class MicEPacket(GPSPacket): class MicEPacket(GPSPacket):
_type: str = field(default="MicEPacket", hash=False) _type: str = field(default='MicEPacket', hash=False)
messagecapable: bool = False messagecapable: bool = False
mbits: Optional[str] = None mbits: Optional[str] = None
mtype: Optional[str] = None mtype: Optional[str] = None
@ -416,18 +414,18 @@ class MicEPacket(GPSPacket):
@property @property
def key(self) -> str: def key(self) -> str:
"""Build a key for finding this packet in a dict.""" """Build a key for finding this packet in a dict."""
return f"{self.from_call}:{self.human_info.replace(' ', '')}" return f'{self.from_call}:{self.human_info.replace(" ", "")}'
@property @property
def human_info(self) -> str: def human_info(self) -> str:
h_info = super().human_info h_info = super().human_info
return f"{h_info} {self.mbits} mbits" return f'{h_info} {self.mbits} mbits'
@dataclass_json @dataclass_json
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class TelemetryPacket(GPSPacket): class TelemetryPacket(GPSPacket):
_type: str = field(default="TelemetryPacket", hash=False) _type: str = field(default='TelemetryPacket', hash=False)
messagecapable: bool = False messagecapable: bool = False
mbits: Optional[str] = None mbits: Optional[str] = None
mtype: Optional[str] = None mtype: Optional[str] = None
@ -443,23 +441,23 @@ class TelemetryPacket(GPSPacket):
def key(self) -> str: def key(self) -> str:
"""Build a key for finding this packet in a dict.""" """Build a key for finding this packet in a dict."""
if self.raw_timestamp: if self.raw_timestamp:
return f"{self.from_call}:{self.raw_timestamp}" return f'{self.from_call}:{self.raw_timestamp}'
else: else:
return f"{self.from_call}:{self.human_info.replace(' ', '')}" return f'{self.from_call}:{self.human_info.replace(" ", "")}'
@property @property
def human_info(self) -> str: def human_info(self) -> str:
h_info = super().human_info h_info = super().human_info
return f"{h_info} {self.telemetry}" return f'{h_info} {self.telemetry}'
@dataclass_json @dataclass_json
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class ObjectPacket(GPSPacket): class ObjectPacket(GPSPacket):
_type: str = field(default="ObjectPacket", hash=False) _type: str = field(default='ObjectPacket', hash=False)
alive: bool = True alive: bool = True
raw_timestamp: Optional[str] = None raw_timestamp: Optional[str] = None
symbol: str = field(default="r") symbol: str = field(default='r')
# in MPH # in MPH
speed: float = 0.00 speed: float = 0.00
# 0 to 360 # 0 to 360
@ -470,11 +468,11 @@ class ObjectPacket(GPSPacket):
lat = aprslib_util.latitude_to_ddm(self.latitude) lat = aprslib_util.latitude_to_ddm(self.latitude)
long = aprslib_util.longitude_to_ddm(self.longitude) long = aprslib_util.longitude_to_ddm(self.longitude)
self.payload = f"*{time_zulu}z{lat}{self.symbol_table}" f"{long}{self.symbol}" self.payload = f'*{time_zulu}z{lat}{self.symbol_table}{long}{self.symbol}'
if self.comment: if self.comment:
comment = self._filter_for_send(self.comment) comment = self._filter_for_send(self.comment)
self.payload = f"{self.payload}{comment}" self.payload = f'{self.payload}{comment}'
def _build_raw(self): def _build_raw(self):
""" """
@ -487,18 +485,18 @@ class ObjectPacket(GPSPacket):
The frequency, uplink_tone, offset is part of the comment The frequency, uplink_tone, offset is part of the comment
""" """
self.raw = f"{self.from_call}>APZ100:;{self.to_call:9s}" f"{self.payload}" self.raw = f'{self.from_call}>APZ100:;{self.to_call:9s}{self.payload}'
@property @property
def human_info(self) -> str: def human_info(self) -> str:
h_info = super().human_info h_info = super().human_info
return f"{h_info} {self.comment}" return f'{h_info} {self.comment}'
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class WeatherPacket(GPSPacket, DataClassJsonMixin): class WeatherPacket(GPSPacket, DataClassJsonMixin):
_type: str = field(default="WeatherPacket", hash=False) _type: str = field(default='WeatherPacket', hash=False)
symbol: str = "_" symbol: str = '_'
wind_speed: float = 0.00 wind_speed: float = 0.00
wind_direction: int = 0 wind_direction: int = 0
wind_gust: float = 0.00 wind_gust: float = 0.00
@ -516,8 +514,8 @@ class WeatherPacket(GPSPacket, DataClassJsonMixin):
speed: Optional[float] = field(default=None) speed: Optional[float] = field(default=None)
def _translate(self, raw: dict) -> dict: def _translate(self, raw: dict) -> dict:
for key in raw["weather"]: for key in raw['weather']:
raw[key] = raw["weather"][key] raw[key] = raw['weather'][key]
# If we have the broken aprslib, then we need to # If we have the broken aprslib, then we need to
# Convert the course and speed to wind_speed and wind_direction # Convert the course and speed to wind_speed and wind_direction
@ -525,36 +523,36 @@ class WeatherPacket(GPSPacket, DataClassJsonMixin):
# https://github.com/rossengeorgiev/aprs-python/issues/80 # https://github.com/rossengeorgiev/aprs-python/issues/80
# Wind speed and course is option in the SPEC. # Wind speed and course is option in the SPEC.
# For some reason aprslib multiplies the speed by 1.852. # For some reason aprslib multiplies the speed by 1.852.
if "wind_speed" not in raw and "wind_direction" not in raw: if 'wind_speed' not in raw and 'wind_direction' not in raw:
# Most likely this is the broken aprslib # Most likely this is the broken aprslib
# So we need to convert the wind_gust speed # So we need to convert the wind_gust speed
raw["wind_gust"] = round(raw.get("wind_gust", 0) / 0.44704, 3) raw['wind_gust'] = round(raw.get('wind_gust', 0) / 0.44704, 3)
if "wind_speed" not in raw: if 'wind_speed' not in raw:
wind_speed = raw.get("speed") wind_speed = raw.get('speed')
if wind_speed: if wind_speed:
raw["wind_speed"] = round(wind_speed / 1.852, 3) raw['wind_speed'] = round(wind_speed / 1.852, 3)
raw["weather"]["wind_speed"] = raw["wind_speed"] raw['weather']['wind_speed'] = raw['wind_speed']
if "speed" in raw: if 'speed' in raw:
del raw["speed"] del raw['speed']
# Let's adjust the rain numbers as well, since it's wrong # Let's adjust the rain numbers as well, since it's wrong
raw["rain_1h"] = round((raw.get("rain_1h", 0) / 0.254) * 0.01, 3) raw['rain_1h'] = round((raw.get('rain_1h', 0) / 0.254) * 0.01, 3)
raw["weather"]["rain_1h"] = raw["rain_1h"] raw['weather']['rain_1h'] = raw['rain_1h']
raw["rain_24h"] = round((raw.get("rain_24h", 0) / 0.254) * 0.01, 3) raw['rain_24h'] = round((raw.get('rain_24h', 0) / 0.254) * 0.01, 3)
raw["weather"]["rain_24h"] = raw["rain_24h"] raw['weather']['rain_24h'] = raw['rain_24h']
raw["rain_since_midnight"] = round( raw['rain_since_midnight'] = round(
(raw.get("rain_since_midnight", 0) / 0.254) * 0.01, 3 (raw.get('rain_since_midnight', 0) / 0.254) * 0.01, 3
) )
raw["weather"]["rain_since_midnight"] = raw["rain_since_midnight"] raw['weather']['rain_since_midnight'] = raw['rain_since_midnight']
if "wind_direction" not in raw: if 'wind_direction' not in raw:
wind_direction = raw.get("course") wind_direction = raw.get('course')
if wind_direction: if wind_direction:
raw["wind_direction"] = wind_direction raw['wind_direction'] = wind_direction
raw["weather"]["wind_direction"] = raw["wind_direction"] raw['weather']['wind_direction'] = raw['wind_direction']
if "course" in raw: if 'course' in raw:
del raw["course"] del raw['course']
del raw["weather"] del raw['weather']
return raw return raw
@classmethod @classmethod
@ -567,20 +565,20 @@ class WeatherPacket(GPSPacket, DataClassJsonMixin):
def key(self) -> str: def key(self) -> str:
"""Build a key for finding this packet in a dict.""" """Build a key for finding this packet in a dict."""
if self.raw_timestamp: if self.raw_timestamp:
return f"{self.from_call}:{self.raw_timestamp}" return f'{self.from_call}:{self.raw_timestamp}'
elif self.wx_raw_timestamp: elif self.wx_raw_timestamp:
return f"{self.from_call}:{self.wx_raw_timestamp}" return f'{self.from_call}:{self.wx_raw_timestamp}'
@property @property
def human_info(self) -> str: def human_info(self) -> str:
h_str = [] h_str = []
h_str.append(f"Temp {self.temperature:03.0f}F") h_str.append(f'Temp {self.temperature:03.0f}F')
h_str.append(f"Humidity {self.humidity}%") h_str.append(f'Humidity {self.humidity}%')
h_str.append(f"Wind {self.wind_speed:03.0f}MPH@{self.wind_direction}") h_str.append(f'Wind {self.wind_speed:03.0f}MPH@{self.wind_direction}')
h_str.append(f"Pressure {self.pressure}mb") h_str.append(f'Pressure {self.pressure}mb')
h_str.append(f"Rain {self.rain_24h}in/24hr") h_str.append(f'Rain {self.rain_24h}in/24hr')
return " ".join(h_str) return ' '.join(h_str)
def _build_payload(self): def _build_payload(self):
"""Build an uncompressed weather packet """Build an uncompressed weather packet
@ -610,49 +608,49 @@ class WeatherPacket(GPSPacket, DataClassJsonMixin):
time_zulu = self._build_time_zulu() time_zulu = self._build_time_zulu()
contents = [ contents = [
f"@{time_zulu}z{self.latitude}{self.symbol_table}", f'@{time_zulu}z{self.latitude}{self.symbol_table}',
f"{self.longitude}{self.symbol}", f'{self.longitude}{self.symbol}',
f"{self.wind_direction:03d}", f'{self.wind_direction:03d}',
# Speed = sustained 1 minute wind speed in mph # Speed = sustained 1 minute wind speed in mph
f"{self.symbol_table}", f'{self.symbol_table}',
f"{self.wind_speed:03.0f}", f'{self.wind_speed:03.0f}',
# wind gust (peak wind speed in mph in the last 5 minutes) # wind gust (peak wind speed in mph in the last 5 minutes)
f"g{self.wind_gust:03.0f}", f'g{self.wind_gust:03.0f}',
# Temperature in degrees F # Temperature in degrees F
f"t{self.temperature:03.0f}", f't{self.temperature:03.0f}',
# Rainfall (in hundredths of an inch) in the last hour # Rainfall (in hundredths of an inch) in the last hour
f"r{self.rain_1h * 100:03.0f}", f'r{self.rain_1h * 100:03.0f}',
# Rainfall (in hundredths of an inch) in last 24 hours # Rainfall (in hundredths of an inch) in last 24 hours
f"p{self.rain_24h * 100:03.0f}", f'p{self.rain_24h * 100:03.0f}',
# Rainfall (in hundredths of an inch) since midnigt # Rainfall (in hundredths of an inch) since midnigt
f"P{self.rain_since_midnight * 100:03.0f}", f'P{self.rain_since_midnight * 100:03.0f}',
# Humidity # Humidity
f"h{self.humidity:02d}", f'h{self.humidity:02d}',
# Barometric pressure (in tenths of millibars/tenths of hPascal) # Barometric pressure (in tenths of millibars/tenths of hPascal)
f"b{self.pressure:05.0f}", f'b{self.pressure:05.0f}',
] ]
if self.comment: if self.comment:
comment = self.filter_for_send(self.comment) comment = self._filter_for_send(self.comment)
contents.append(comment) contents.append(comment)
self.payload = "".join(contents) self.payload = ''.join(contents)
def _build_raw(self): def _build_raw(self):
self.raw = f"{self.from_call}>{self.to_call},WIDE1-1,WIDE2-1:" f"{self.payload}" self.raw = f'{self.from_call}>{self.to_call},WIDE1-1,WIDE2-1:{self.payload}'
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class ThirdPartyPacket(Packet, DataClassJsonMixin): class ThirdPartyPacket(Packet, DataClassJsonMixin):
_type: str = "ThirdPartyPacket" _type: str = 'ThirdPartyPacket'
# Holds the encapsulated packet # Holds the encapsulated packet
subpacket: Optional[type[Packet]] = field(default=None, compare=True, hash=False) subpacket: Optional[type[Packet]] = field(default=None, compare=True, hash=False)
def __repr__(self): def __repr__(self):
"""Build the repr version of the packet.""" """Build the repr version of the packet."""
repr_str = ( repr_str = (
f"{self.__class__.__name__}:" f'{self.__class__.__name__}:'
f" From: {self.from_call} " f' From: {self.from_call} '
f" To: {self.to_call} " f' To: {self.to_call} '
f" Subpacket: {repr(self.subpacket)}" f' Subpacket: {repr(self.subpacket)}'
) )
return repr_str return repr_str
@ -666,12 +664,12 @@ class ThirdPartyPacket(Packet, DataClassJsonMixin):
@property @property
def key(self) -> str: def key(self) -> str:
"""Build a key for finding this packet in a dict.""" """Build a key for finding this packet in a dict."""
return f"{self.from_call}:{self.subpacket.key}" return f'{self.from_call}:{self.subpacket.key}'
@property @property
def human_info(self) -> str: def human_info(self) -> str:
sub_info = self.subpacket.human_info sub_info = self.subpacket.human_info
return f"{self.from_call}->{self.to_call} {sub_info}" return f'{self.from_call}->{self.to_call} {sub_info}'
@dataclass_json(undefined=Undefined.INCLUDE) @dataclass_json(undefined=Undefined.INCLUDE)
@ -683,7 +681,7 @@ class UnknownPacket:
""" """
unknown_fields: CatchAll unknown_fields: CatchAll
_type: str = "UnknownPacket" _type: str = 'UnknownPacket'
from_call: Optional[str] = field(default=None) from_call: Optional[str] = field(default=None)
to_call: Optional[str] = field(default=None) to_call: Optional[str] = field(default=None)
msgNo: str = field(default_factory=_init_msgNo) # noqa: N815 msgNo: str = field(default_factory=_init_msgNo) # noqa: N815
@ -701,7 +699,7 @@ class UnknownPacket:
@property @property
def key(self) -> str: def key(self) -> str:
"""Build a key for finding this packet in a dict.""" """Build a key for finding this packet in a dict."""
return f"{self.from_call}:{self.packet_type}:{self.to_call}" return f'{self.from_call}:{self.packet_type}:{self.to_call}'
@property @property
def human_info(self) -> str: def human_info(self) -> str:
@ -728,20 +726,20 @@ TYPE_LOOKUP: dict[str, type[Packet]] = {
def get_packet_type(packet: dict) -> str: def get_packet_type(packet: dict) -> str:
"""Decode the packet type from the packet.""" """Decode the packet type from the packet."""
pkt_format = packet.get("format") pkt_format = packet.get('format')
msg_response = packet.get("response") msg_response = packet.get('response')
packet_type = PACKET_TYPE_UNKNOWN packet_type = PACKET_TYPE_UNKNOWN
if pkt_format == "message" and msg_response == "ack": if pkt_format == 'message' and msg_response == 'ack':
packet_type = PACKET_TYPE_ACK packet_type = PACKET_TYPE_ACK
elif pkt_format == "message" and msg_response == "rej": elif pkt_format == 'message' and msg_response == 'rej':
packet_type = PACKET_TYPE_REJECT packet_type = PACKET_TYPE_REJECT
elif pkt_format == "message": elif pkt_format == 'message':
packet_type = PACKET_TYPE_MESSAGE packet_type = PACKET_TYPE_MESSAGE
elif pkt_format == "mic-e": elif pkt_format == 'mic-e':
packet_type = PACKET_TYPE_MICE packet_type = PACKET_TYPE_MICE
elif pkt_format == "object": elif pkt_format == 'object':
packet_type = PACKET_TYPE_OBJECT packet_type = PACKET_TYPE_OBJECT
elif pkt_format == "status": elif pkt_format == 'status':
packet_type = PACKET_TYPE_STATUS packet_type = PACKET_TYPE_STATUS
elif pkt_format == PACKET_TYPE_BULLETIN: elif pkt_format == PACKET_TYPE_BULLETIN:
packet_type = PACKET_TYPE_BULLETIN packet_type = PACKET_TYPE_BULLETIN
@ -752,13 +750,13 @@ def get_packet_type(packet: dict) -> str:
elif pkt_format == PACKET_TYPE_WX: elif pkt_format == PACKET_TYPE_WX:
packet_type = PACKET_TYPE_WEATHER packet_type = PACKET_TYPE_WEATHER
elif pkt_format == PACKET_TYPE_UNCOMPRESSED: elif pkt_format == PACKET_TYPE_UNCOMPRESSED:
if packet.get("symbol") == "_": if packet.get('symbol') == '_':
packet_type = PACKET_TYPE_WEATHER packet_type = PACKET_TYPE_WEATHER
elif pkt_format == PACKET_TYPE_THIRDPARTY: elif pkt_format == PACKET_TYPE_THIRDPARTY:
packet_type = PACKET_TYPE_THIRDPARTY packet_type = PACKET_TYPE_THIRDPARTY
if packet_type == PACKET_TYPE_UNKNOWN: if packet_type == PACKET_TYPE_UNKNOWN:
if "latitude" in packet: if 'latitude' in packet:
packet_type = PACKET_TYPE_BEACON packet_type = PACKET_TYPE_BEACON
else: else:
packet_type = PACKET_TYPE_UNKNOWN packet_type = PACKET_TYPE_UNKNOWN
@ -780,32 +778,32 @@ def is_mice_packet(packet: dict[Any, Any]) -> bool:
def factory(raw_packet: dict[Any, Any]) -> type[Packet]: def factory(raw_packet: dict[Any, Any]) -> type[Packet]:
"""Factory method to create a packet from a raw packet string.""" """Factory method to create a packet from a raw packet string."""
raw = raw_packet raw = raw_packet
if "_type" in raw: if '_type' in raw:
cls = globals()[raw["_type"]] cls = globals()[raw['_type']]
return cls.from_dict(raw) return cls.from_dict(raw)
raw["raw_dict"] = raw.copy() raw['raw_dict'] = raw.copy()
raw = _translate_fields(raw) raw = _translate_fields(raw)
packet_type = get_packet_type(raw) packet_type = get_packet_type(raw)
raw["packet_type"] = packet_type raw['packet_type'] = packet_type
packet_class = TYPE_LOOKUP[packet_type] packet_class = TYPE_LOOKUP[packet_type]
if packet_type == PACKET_TYPE_WX: if packet_type == PACKET_TYPE_WX:
# the weather information is in a dict # the weather information is in a dict
# this brings those values out to the outer dict # this brings those values out to the outer dict
packet_class = WeatherPacket packet_class = WeatherPacket
elif packet_type == PACKET_TYPE_OBJECT and "weather" in raw: elif packet_type == PACKET_TYPE_OBJECT and 'weather' in raw:
packet_class = WeatherPacket packet_class = WeatherPacket
elif packet_type == PACKET_TYPE_UNKNOWN: elif packet_type == PACKET_TYPE_UNKNOWN:
# Try and figure it out here # Try and figure it out here
if "latitude" in raw: if 'latitude' in raw:
packet_class = GPSPacket packet_class = GPSPacket
else: else:
# LOG.warning(raw) # LOG.warning(raw)
packet_class = UnknownPacket packet_class = UnknownPacket
raw.get("addresse", raw.get("to_call")) raw.get('addresse', raw.get('to_call'))
# TODO: Find a global way to enable/disable this # TODO: Find a global way to enable/disable this
# LOGU.opt(colors=True).info( # LOGU.opt(colors=True).info(