From 54696107797c3d788e5d0b2dad164e1a7cf439f2 Mon Sep 17 00:00:00 2001 From: Hemna Date: Mon, 7 Apr 2025 18:41:32 -0400 Subject: [PATCH] Fixed a problem with WeatherPacket WeatherPacket was calling self.filter_for_send, which doesn't exist. It's self._filter_for_send. --- aprsd/packets/core.py | 330 +++++++++++++++++++++--------------------- 1 file changed, 164 insertions(+), 166 deletions(-) diff --git a/aprsd/packets/core.py b/aprsd/packets/core.py index 804db00..b5cda0c 100644 --- a/aprsd/packets/core.py +++ b/aprsd/packets/core.py @@ -19,26 +19,26 @@ from loguru import logger from aprsd.utils import counter # For mypy to be happy -A = TypeVar("A", bound="DataClassJsonMixin") +A = TypeVar('A', bound='DataClassJsonMixin') Json = Union[dict, list, str, int, float, bool, None] LOG = logging.getLogger() LOGU = logger -PACKET_TYPE_BULLETIN = "bulletin" -PACKET_TYPE_MESSAGE = "message" -PACKET_TYPE_ACK = "ack" -PACKET_TYPE_REJECT = "reject" -PACKET_TYPE_MICE = "mic-e" -PACKET_TYPE_WX = "wx" -PACKET_TYPE_WEATHER = "weather" -PACKET_TYPE_OBJECT = "object" -PACKET_TYPE_UNKNOWN = "unknown" -PACKET_TYPE_STATUS = "status" -PACKET_TYPE_BEACON = "beacon" -PACKET_TYPE_THIRDPARTY = "thirdparty" -PACKET_TYPE_TELEMETRY = "telemetry-message" -PACKET_TYPE_UNCOMPRESSED = "uncompressed" +PACKET_TYPE_BULLETIN = 'bulletin' +PACKET_TYPE_MESSAGE = 'message' +PACKET_TYPE_ACK = 'ack' +PACKET_TYPE_REJECT = 'reject' +PACKET_TYPE_MICE = 'mic-e' +PACKET_TYPE_WX = 'wx' +PACKET_TYPE_WEATHER = 'weather' +PACKET_TYPE_OBJECT = 'object' +PACKET_TYPE_UNKNOWN = 'unknown' +PACKET_TYPE_STATUS = 'status' +PACKET_TYPE_BEACON = 'beacon' +PACKET_TYPE_THIRDPARTY = 'thirdparty' +PACKET_TYPE_TELEMETRY = 'telemetry-message' +PACKET_TYPE_UNCOMPRESSED = 'uncompressed' NO_DATE = datetime(1900, 10, 24) @@ -67,14 +67,14 @@ def _init_msgNo(): # noqa: N802 def _translate_fields(raw: dict) -> dict: # Direct key checks instead of iteration - if "from" in raw: - raw["from_call"] = raw.pop("from") - if "to" in raw: - raw["to_call"] = raw.pop("to") + if 'from' in raw: + raw['from_call'] = raw.pop('from') + if 'to' in raw: + raw['to_call'] = raw.pop('to') # addresse overrides to_call - if "addresse" in raw: - raw["to_call"] = raw["addresse"] + if 'addresse' in raw: + raw['to_call'] = raw['addresse'] return raw @@ -82,7 +82,7 @@ def _translate_fields(raw: dict) -> dict: @dataclass_json @dataclass(unsafe_hash=True) class Packet: - _type: str = field(default="Packet", hash=False) + _type: str = field(default='Packet', hash=False) from_call: Optional[str] = field(default=None) to_call: Optional[str] = field(default=None) addresse: Optional[str] = field(default=None) @@ -120,7 +120,7 @@ class Packet: @property def key(self) -> str: """Build a key for finding this packet in a dict.""" - return f"{self.from_call}:{self.addresse}:{self.msgNo}" + return f'{self.from_call}:{self.addresse}:{self.msgNo}' def update_timestamp(self) -> None: self.timestamp = _init_timestamp() @@ -133,7 +133,7 @@ class Packet: the human readable payload. """ self.prepare() - msg = self._filter_for_send(self.raw).rstrip("\n") + msg = self._filter_for_send(self.raw).rstrip('\n') return msg def prepare(self, create_msg_number=False) -> None: @@ -152,11 +152,11 @@ class Packet: ) # The base packet class has no real payload - self.payload = f":{self.to_call.ljust(9)}" + self.payload = f':{self.to_call.ljust(9)}' def _build_raw(self) -> None: """Build the self.raw which is what is sent over the air.""" - self.raw = "{}>APZ100:{}".format( + self.raw = '{}>APZ100:{}'.format( self.from_call, self.payload, ) @@ -168,13 +168,13 @@ class Packet: # 67 displays 64 on the ftm400. (+3 {01 suffix) # feature req: break long ones into two msgs if not msg: - return "" + return '' message = msg[:67] # We all miss George Carlin return re.sub( - "fuck|shit|cunt|piss|cock|bitch", - "****", + 'fuck|shit|cunt|piss|cock|bitch', + '****', message, flags=re.IGNORECASE, ) @@ -183,100 +183,98 @@ class Packet: """Show the raw version of the packet""" self.prepare() if not self.raw: - raise ValueError("self.raw is unset") + raise ValueError('self.raw is unset') return self.raw def __repr__(self) -> str: """Build the repr version of the packet.""" return ( - f"{self.__class__.__name__}:" - f" From: {self.from_call} " - f" To: {self.to_call}" + f'{self.__class__.__name__}: From: {self.from_call} To: {self.to_call}' ) @dataclass_json @dataclass(unsafe_hash=True) class AckPacket(Packet): - _type: str = field(default="AckPacket", hash=False) + _type: str = field(default='AckPacket', hash=False) def _build_payload(self): - self.payload = f":{self.to_call: <9}:ack{self.msgNo}" + self.payload = f':{self.to_call: <9}:ack{self.msgNo}' @dataclass_json @dataclass(unsafe_hash=True) class BulletinPacket(Packet): - _type: str = "BulletinPacket" + _type: str = 'BulletinPacket' # Holds the encapsulated packet - bid: Optional[str] = field(default="1") + bid: Optional[str] = field(default='1') message_text: Optional[str] = field(default=None) @property def key(self) -> str: """Build a key for finding this packet in a dict.""" - return f"{self.from_call}:BLN{self.bid}" + return f'{self.from_call}:BLN{self.bid}' @property def human_info(self) -> str: - return f"BLN{self.bid} {self.message_text}" + return f'BLN{self.bid} {self.message_text}' def _build_payload(self) -> None: - self.payload = f":BLN{self.bid:<9}" f":{self.message_text}" + self.payload = f':BLN{self.bid:<9}:{self.message_text}' @dataclass_json @dataclass(unsafe_hash=True) class RejectPacket(Packet): - _type: str = field(default="RejectPacket", hash=False) + _type: str = field(default='RejectPacket', hash=False) response: Optional[str] = field(default=None) def __post__init__(self): if self.response: - LOG.warning("Response set!") + LOG.warning('Response set!') def _build_payload(self): - self.payload = f":{self.to_call: <9}:rej{self.msgNo}" + self.payload = f':{self.to_call: <9}:rej{self.msgNo}' @dataclass_json @dataclass(unsafe_hash=True) class MessagePacket(Packet): - _type: str = field(default="MessagePacket", hash=False) + _type: str = field(default='MessagePacket', hash=False) message_text: Optional[str] = field(default=None) @property def human_info(self) -> str: self.prepare() - return self._filter_for_send(self.message_text).rstrip("\n") + return self._filter_for_send(self.message_text).rstrip('\n') def _build_payload(self): if self.msgNo: - self.payload = ":{}:{}{{{}".format( + self.payload = ':{}:{}{{{}'.format( self.to_call.ljust(9), - self._filter_for_send(self.message_text).rstrip("\n"), + self._filter_for_send(self.message_text).rstrip('\n'), str(self.msgNo), ) else: - self.payload = ":{}:{}".format( + self.payload = ':{}:{}'.format( self.to_call.ljust(9), - self._filter_for_send(self.message_text).rstrip("\n"), + self._filter_for_send(self.message_text).rstrip('\n'), ) @dataclass_json @dataclass(unsafe_hash=True) class StatusPacket(Packet): - _type: str = field(default="StatusPacket", hash=False) + _type: str = field(default='StatusPacket', hash=False) status: Optional[str] = field(default=None) messagecapable: bool = field(default=False) comment: Optional[str] = field(default=None) raw_timestamp: Optional[str] = field(default=None) def _build_payload(self): - self.payload = ":{}:{}{{{}".format( + self.payload = ':{}:{}{{{}'.format( self.to_call.ljust(9), - self._filter_for_send(self.status).rstrip("\n"), + self._filter_for_send(self.status).rstrip('\n'), str(self.msgNo), ) @@ -289,7 +287,7 @@ class StatusPacket(Packet): @dataclass_json @dataclass(unsafe_hash=True) class GPSPacket(Packet): - _type: str = field(default="GPSPacket", hash=False) + _type: str = field(default='GPSPacket', hash=False) latitude: float = field(default=0.00) longitude: float = field(default=0.00) altitude: float = field(default=0.00) @@ -297,8 +295,8 @@ class GPSPacket(Packet): posambiguity: int = field(default=0) messagecapable: bool = field(default=False) comment: Optional[str] = field(default=None) - symbol: str = field(default="l") - symbol_table: str = field(default="/") + symbol: str = field(default='l') + symbol_table: str = field(default='/') raw_timestamp: Optional[str] = field(default=None) object_name: Optional[str] = field(default=None) object_format: Optional[str] = field(default=None) @@ -318,7 +316,7 @@ class GPSPacket(Packet): def _build_time_zulu(self): """Build the timestamp in UTC/zulu.""" if self.timestamp: - return datetime.utcfromtimestamp(self.timestamp).strftime("%d%H%M") + return datetime.utcfromtimestamp(self.timestamp).strftime('%d%H%M') def _build_payload(self): """The payload is the non headers portion of the packet.""" @@ -326,7 +324,7 @@ class GPSPacket(Packet): lat = aprslib_util.latitude_to_ddm(self.latitude) long = aprslib_util.longitude_to_ddm(self.longitude) payload = [ - "@" if self.timestamp else "!", + '@' if self.timestamp else '!', time_zulu, lat, self.symbol_table, @@ -337,34 +335,34 @@ class GPSPacket(Packet): if self.comment: payload.append(self._filter_for_send(self.comment)) - self.payload = "".join(payload) + self.payload = ''.join(payload) def _build_raw(self): - self.raw = f"{self.from_call}>{self.to_call},WIDE2-1:" f"{self.payload}" + self.raw = f'{self.from_call}>{self.to_call},WIDE2-1:{self.payload}' @property def human_info(self) -> str: h_str = [] - h_str.append(f"Lat:{self.latitude:03.3f}") - h_str.append(f"Lon:{self.longitude:03.3f}") + h_str.append(f'Lat:{self.latitude:03.3f}') + h_str.append(f'Lon:{self.longitude:03.3f}') if self.altitude: - h_str.append(f"Altitude {self.altitude:03.0f}") + h_str.append(f'Altitude {self.altitude:03.0f}') if self.speed: - h_str.append(f"Speed {self.speed:03.0f}MPH") + h_str.append(f'Speed {self.speed:03.0f}MPH') if self.course: - h_str.append(f"Course {self.course:03.0f}") + h_str.append(f'Course {self.course:03.0f}') if self.rng: - h_str.append(f"RNG {self.rng:03.0f}") + h_str.append(f'RNG {self.rng:03.0f}') if self.phg: - h_str.append(f"PHG {self.phg}") + h_str.append(f'PHG {self.phg}') - return " ".join(h_str) + return ' '.join(h_str) @dataclass_json @dataclass(unsafe_hash=True) class BeaconPacket(GPSPacket): - _type: str = field(default="BeaconPacket", hash=False) + _type: str = field(default='BeaconPacket', hash=False) def _build_payload(self): """The payload is the non headers portion of the packet.""" @@ -372,38 +370,38 @@ class BeaconPacket(GPSPacket): lat = aprslib_util.latitude_to_ddm(self.latitude) lon = aprslib_util.longitude_to_ddm(self.longitude) - self.payload = f"@{time_zulu}z{lat}{self.symbol_table}" f"{lon}" + self.payload = f'@{time_zulu}z{lat}{self.symbol_table}{lon}' if self.comment: comment = self._filter_for_send(self.comment) - self.payload = f"{self.payload}{self.symbol}{comment}" + self.payload = f'{self.payload}{self.symbol}{comment}' else: - self.payload = f"{self.payload}{self.symbol}APRSD Beacon" + self.payload = f'{self.payload}{self.symbol}APRSD Beacon' def _build_raw(self): - self.raw = f"{self.from_call}>APZ100:" f"{self.payload}" + self.raw = f'{self.from_call}>APZ100:{self.payload}' @property def key(self) -> str: """Build a key for finding this packet in a dict.""" if self.raw_timestamp: - return f"{self.from_call}:{self.raw_timestamp}" + return f'{self.from_call}:{self.raw_timestamp}' else: - return f"{self.from_call}:{self.human_info.replace(' ', '')}" + return f'{self.from_call}:{self.human_info.replace(" ", "")}' @property def human_info(self) -> str: h_str = [] - h_str.append(f"Lat:{self.latitude:03.3f}") - h_str.append(f"Lon:{self.longitude:03.3f}") - h_str.append(f"{self.comment}") - return " ".join(h_str) + h_str.append(f'Lat:{self.latitude:03.3f}') + h_str.append(f'Lon:{self.longitude:03.3f}') + h_str.append(f'{self.comment}') + return ' '.join(h_str) @dataclass_json @dataclass(unsafe_hash=True) class MicEPacket(GPSPacket): - _type: str = field(default="MicEPacket", hash=False) + _type: str = field(default='MicEPacket', hash=False) messagecapable: bool = False mbits: Optional[str] = None mtype: Optional[str] = None @@ -416,18 +414,18 @@ class MicEPacket(GPSPacket): @property def key(self) -> str: """Build a key for finding this packet in a dict.""" - return f"{self.from_call}:{self.human_info.replace(' ', '')}" + return f'{self.from_call}:{self.human_info.replace(" ", "")}' @property def human_info(self) -> str: h_info = super().human_info - return f"{h_info} {self.mbits} mbits" + return f'{h_info} {self.mbits} mbits' @dataclass_json @dataclass(unsafe_hash=True) class TelemetryPacket(GPSPacket): - _type: str = field(default="TelemetryPacket", hash=False) + _type: str = field(default='TelemetryPacket', hash=False) messagecapable: bool = False mbits: Optional[str] = None mtype: Optional[str] = None @@ -443,23 +441,23 @@ class TelemetryPacket(GPSPacket): def key(self) -> str: """Build a key for finding this packet in a dict.""" if self.raw_timestamp: - return f"{self.from_call}:{self.raw_timestamp}" + return f'{self.from_call}:{self.raw_timestamp}' else: - return f"{self.from_call}:{self.human_info.replace(' ', '')}" + return f'{self.from_call}:{self.human_info.replace(" ", "")}' @property def human_info(self) -> str: h_info = super().human_info - return f"{h_info} {self.telemetry}" + return f'{h_info} {self.telemetry}' @dataclass_json @dataclass(unsafe_hash=True) class ObjectPacket(GPSPacket): - _type: str = field(default="ObjectPacket", hash=False) + _type: str = field(default='ObjectPacket', hash=False) alive: bool = True raw_timestamp: Optional[str] = None - symbol: str = field(default="r") + symbol: str = field(default='r') # in MPH speed: float = 0.00 # 0 to 360 @@ -470,11 +468,11 @@ class ObjectPacket(GPSPacket): lat = aprslib_util.latitude_to_ddm(self.latitude) long = aprslib_util.longitude_to_ddm(self.longitude) - self.payload = f"*{time_zulu}z{lat}{self.symbol_table}" f"{long}{self.symbol}" + self.payload = f'*{time_zulu}z{lat}{self.symbol_table}{long}{self.symbol}' if self.comment: comment = self._filter_for_send(self.comment) - self.payload = f"{self.payload}{comment}" + self.payload = f'{self.payload}{comment}' def _build_raw(self): """ @@ -487,18 +485,18 @@ class ObjectPacket(GPSPacket): The frequency, uplink_tone, offset is part of the comment """ - self.raw = f"{self.from_call}>APZ100:;{self.to_call:9s}" f"{self.payload}" + self.raw = f'{self.from_call}>APZ100:;{self.to_call:9s}{self.payload}' @property def human_info(self) -> str: h_info = super().human_info - return f"{h_info} {self.comment}" + return f'{h_info} {self.comment}' @dataclass(unsafe_hash=True) class WeatherPacket(GPSPacket, DataClassJsonMixin): - _type: str = field(default="WeatherPacket", hash=False) - symbol: str = "_" + _type: str = field(default='WeatherPacket', hash=False) + symbol: str = '_' wind_speed: float = 0.00 wind_direction: int = 0 wind_gust: float = 0.00 @@ -516,8 +514,8 @@ class WeatherPacket(GPSPacket, DataClassJsonMixin): speed: Optional[float] = field(default=None) def _translate(self, raw: dict) -> dict: - for key in raw["weather"]: - raw[key] = raw["weather"][key] + for key in raw['weather']: + raw[key] = raw['weather'][key] # If we have the broken aprslib, then we need to # Convert the course and speed to wind_speed and wind_direction @@ -525,36 +523,36 @@ class WeatherPacket(GPSPacket, DataClassJsonMixin): # https://github.com/rossengeorgiev/aprs-python/issues/80 # Wind speed and course is option in the SPEC. # For some reason aprslib multiplies the speed by 1.852. - if "wind_speed" not in raw and "wind_direction" not in raw: + if 'wind_speed' not in raw and 'wind_direction' not in raw: # Most likely this is the broken aprslib # So we need to convert the wind_gust speed - raw["wind_gust"] = round(raw.get("wind_gust", 0) / 0.44704, 3) - if "wind_speed" not in raw: - wind_speed = raw.get("speed") + raw['wind_gust'] = round(raw.get('wind_gust', 0) / 0.44704, 3) + if 'wind_speed' not in raw: + wind_speed = raw.get('speed') if wind_speed: - raw["wind_speed"] = round(wind_speed / 1.852, 3) - raw["weather"]["wind_speed"] = raw["wind_speed"] - if "speed" in raw: - del raw["speed"] + raw['wind_speed'] = round(wind_speed / 1.852, 3) + raw['weather']['wind_speed'] = raw['wind_speed'] + if 'speed' in raw: + del raw['speed'] # Let's adjust the rain numbers as well, since it's wrong - raw["rain_1h"] = round((raw.get("rain_1h", 0) / 0.254) * 0.01, 3) - raw["weather"]["rain_1h"] = raw["rain_1h"] - raw["rain_24h"] = round((raw.get("rain_24h", 0) / 0.254) * 0.01, 3) - raw["weather"]["rain_24h"] = raw["rain_24h"] - raw["rain_since_midnight"] = round( - (raw.get("rain_since_midnight", 0) / 0.254) * 0.01, 3 + raw['rain_1h'] = round((raw.get('rain_1h', 0) / 0.254) * 0.01, 3) + raw['weather']['rain_1h'] = raw['rain_1h'] + raw['rain_24h'] = round((raw.get('rain_24h', 0) / 0.254) * 0.01, 3) + raw['weather']['rain_24h'] = raw['rain_24h'] + raw['rain_since_midnight'] = round( + (raw.get('rain_since_midnight', 0) / 0.254) * 0.01, 3 ) - raw["weather"]["rain_since_midnight"] = raw["rain_since_midnight"] + raw['weather']['rain_since_midnight'] = raw['rain_since_midnight'] - if "wind_direction" not in raw: - wind_direction = raw.get("course") + if 'wind_direction' not in raw: + wind_direction = raw.get('course') if wind_direction: - raw["wind_direction"] = wind_direction - raw["weather"]["wind_direction"] = raw["wind_direction"] - if "course" in raw: - del raw["course"] + raw['wind_direction'] = wind_direction + raw['weather']['wind_direction'] = raw['wind_direction'] + if 'course' in raw: + del raw['course'] - del raw["weather"] + del raw['weather'] return raw @classmethod @@ -567,20 +565,20 @@ class WeatherPacket(GPSPacket, DataClassJsonMixin): def key(self) -> str: """Build a key for finding this packet in a dict.""" if self.raw_timestamp: - return f"{self.from_call}:{self.raw_timestamp}" + return f'{self.from_call}:{self.raw_timestamp}' elif self.wx_raw_timestamp: - return f"{self.from_call}:{self.wx_raw_timestamp}" + return f'{self.from_call}:{self.wx_raw_timestamp}' @property def human_info(self) -> str: h_str = [] - h_str.append(f"Temp {self.temperature:03.0f}F") - h_str.append(f"Humidity {self.humidity}%") - h_str.append(f"Wind {self.wind_speed:03.0f}MPH@{self.wind_direction}") - h_str.append(f"Pressure {self.pressure}mb") - h_str.append(f"Rain {self.rain_24h}in/24hr") + h_str.append(f'Temp {self.temperature:03.0f}F') + h_str.append(f'Humidity {self.humidity}%') + h_str.append(f'Wind {self.wind_speed:03.0f}MPH@{self.wind_direction}') + h_str.append(f'Pressure {self.pressure}mb') + h_str.append(f'Rain {self.rain_24h}in/24hr') - return " ".join(h_str) + return ' '.join(h_str) def _build_payload(self): """Build an uncompressed weather packet @@ -610,49 +608,49 @@ class WeatherPacket(GPSPacket, DataClassJsonMixin): time_zulu = self._build_time_zulu() contents = [ - f"@{time_zulu}z{self.latitude}{self.symbol_table}", - f"{self.longitude}{self.symbol}", - f"{self.wind_direction:03d}", + f'@{time_zulu}z{self.latitude}{self.symbol_table}', + f'{self.longitude}{self.symbol}', + f'{self.wind_direction:03d}', # Speed = sustained 1 minute wind speed in mph - f"{self.symbol_table}", - f"{self.wind_speed:03.0f}", + f'{self.symbol_table}', + f'{self.wind_speed:03.0f}', # wind gust (peak wind speed in mph in the last 5 minutes) - f"g{self.wind_gust:03.0f}", + f'g{self.wind_gust:03.0f}', # Temperature in degrees F - f"t{self.temperature:03.0f}", + f't{self.temperature:03.0f}', # Rainfall (in hundredths of an inch) in the last hour - f"r{self.rain_1h * 100:03.0f}", + f'r{self.rain_1h * 100:03.0f}', # Rainfall (in hundredths of an inch) in last 24 hours - f"p{self.rain_24h * 100:03.0f}", + f'p{self.rain_24h * 100:03.0f}', # Rainfall (in hundredths of an inch) since midnigt - f"P{self.rain_since_midnight * 100:03.0f}", + f'P{self.rain_since_midnight * 100:03.0f}', # Humidity - f"h{self.humidity:02d}", + f'h{self.humidity:02d}', # Barometric pressure (in tenths of millibars/tenths of hPascal) - f"b{self.pressure:05.0f}", + f'b{self.pressure:05.0f}', ] if self.comment: - comment = self.filter_for_send(self.comment) + comment = self._filter_for_send(self.comment) contents.append(comment) - self.payload = "".join(contents) + self.payload = ''.join(contents) def _build_raw(self): - self.raw = f"{self.from_call}>{self.to_call},WIDE1-1,WIDE2-1:" f"{self.payload}" + self.raw = f'{self.from_call}>{self.to_call},WIDE1-1,WIDE2-1:{self.payload}' @dataclass(unsafe_hash=True) class ThirdPartyPacket(Packet, DataClassJsonMixin): - _type: str = "ThirdPartyPacket" + _type: str = 'ThirdPartyPacket' # Holds the encapsulated packet subpacket: Optional[type[Packet]] = field(default=None, compare=True, hash=False) def __repr__(self): """Build the repr version of the packet.""" repr_str = ( - f"{self.__class__.__name__}:" - f" From: {self.from_call} " - f" To: {self.to_call} " - f" Subpacket: {repr(self.subpacket)}" + f'{self.__class__.__name__}:' + f' From: {self.from_call} ' + f' To: {self.to_call} ' + f' Subpacket: {repr(self.subpacket)}' ) return repr_str @@ -666,12 +664,12 @@ class ThirdPartyPacket(Packet, DataClassJsonMixin): @property def key(self) -> str: """Build a key for finding this packet in a dict.""" - return f"{self.from_call}:{self.subpacket.key}" + return f'{self.from_call}:{self.subpacket.key}' @property def human_info(self) -> str: sub_info = self.subpacket.human_info - return f"{self.from_call}->{self.to_call} {sub_info}" + return f'{self.from_call}->{self.to_call} {sub_info}' @dataclass_json(undefined=Undefined.INCLUDE) @@ -683,7 +681,7 @@ class UnknownPacket: """ unknown_fields: CatchAll - _type: str = "UnknownPacket" + _type: str = 'UnknownPacket' from_call: Optional[str] = field(default=None) to_call: Optional[str] = field(default=None) msgNo: str = field(default_factory=_init_msgNo) # noqa: N815 @@ -701,7 +699,7 @@ class UnknownPacket: @property def key(self) -> str: """Build a key for finding this packet in a dict.""" - return f"{self.from_call}:{self.packet_type}:{self.to_call}" + return f'{self.from_call}:{self.packet_type}:{self.to_call}' @property def human_info(self) -> str: @@ -728,20 +726,20 @@ TYPE_LOOKUP: dict[str, type[Packet]] = { def get_packet_type(packet: dict) -> str: """Decode the packet type from the packet.""" - pkt_format = packet.get("format") - msg_response = packet.get("response") + pkt_format = packet.get('format') + msg_response = packet.get('response') packet_type = PACKET_TYPE_UNKNOWN - if pkt_format == "message" and msg_response == "ack": + if pkt_format == 'message' and msg_response == 'ack': packet_type = PACKET_TYPE_ACK - elif pkt_format == "message" and msg_response == "rej": + elif pkt_format == 'message' and msg_response == 'rej': packet_type = PACKET_TYPE_REJECT - elif pkt_format == "message": + elif pkt_format == 'message': packet_type = PACKET_TYPE_MESSAGE - elif pkt_format == "mic-e": + elif pkt_format == 'mic-e': packet_type = PACKET_TYPE_MICE - elif pkt_format == "object": + elif pkt_format == 'object': packet_type = PACKET_TYPE_OBJECT - elif pkt_format == "status": + elif pkt_format == 'status': packet_type = PACKET_TYPE_STATUS elif pkt_format == PACKET_TYPE_BULLETIN: packet_type = PACKET_TYPE_BULLETIN @@ -752,13 +750,13 @@ def get_packet_type(packet: dict) -> str: elif pkt_format == PACKET_TYPE_WX: packet_type = PACKET_TYPE_WEATHER elif pkt_format == PACKET_TYPE_UNCOMPRESSED: - if packet.get("symbol") == "_": + if packet.get('symbol') == '_': packet_type = PACKET_TYPE_WEATHER elif pkt_format == PACKET_TYPE_THIRDPARTY: packet_type = PACKET_TYPE_THIRDPARTY if packet_type == PACKET_TYPE_UNKNOWN: - if "latitude" in packet: + if 'latitude' in packet: packet_type = PACKET_TYPE_BEACON else: packet_type = PACKET_TYPE_UNKNOWN @@ -780,32 +778,32 @@ def is_mice_packet(packet: dict[Any, Any]) -> bool: def factory(raw_packet: dict[Any, Any]) -> type[Packet]: """Factory method to create a packet from a raw packet string.""" raw = raw_packet - if "_type" in raw: - cls = globals()[raw["_type"]] + if '_type' in raw: + cls = globals()[raw['_type']] return cls.from_dict(raw) - raw["raw_dict"] = raw.copy() + raw['raw_dict'] = raw.copy() raw = _translate_fields(raw) packet_type = get_packet_type(raw) - raw["packet_type"] = packet_type + raw['packet_type'] = packet_type packet_class = TYPE_LOOKUP[packet_type] if packet_type == PACKET_TYPE_WX: # the weather information is in a dict # this brings those values out to the outer dict packet_class = WeatherPacket - elif packet_type == PACKET_TYPE_OBJECT and "weather" in raw: + elif packet_type == PACKET_TYPE_OBJECT and 'weather' in raw: packet_class = WeatherPacket elif packet_type == PACKET_TYPE_UNKNOWN: # Try and figure it out here - if "latitude" in raw: + if 'latitude' in raw: packet_class = GPSPacket else: # LOG.warning(raw) packet_class = UnknownPacket - raw.get("addresse", raw.get("to_call")) + raw.get('addresse', raw.get('to_call')) # TODO: Find a global way to enable/disable this # LOGU.opt(colors=True).info(