1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-07-12 13:45:16 -04:00

some cleanup

This commit is contained in:
Hemna 2025-01-10 17:09:42 -05:00
parent e4f82d6054
commit e332d7c9d0
5 changed files with 299 additions and 295 deletions

View File

@ -11,7 +11,7 @@ from aprsd.packets import core
from aprsd.utils import trace from aprsd.utils import trace
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger("APRSD") LOG = logging.getLogger('APRSD')
class APRSDFakeClient(metaclass=trace.TraceWrapperMetaclass): class APRSDFakeClient(metaclass=trace.TraceWrapperMetaclass):
@ -24,12 +24,12 @@ class APRSDFakeClient(metaclass=trace.TraceWrapperMetaclass):
path = [] path = []
def __init__(self): def __init__(self):
LOG.info("Starting APRSDFakeClient client.") LOG.info('Starting APRSDFakeClient client.')
self.path = ["WIDE1-1", "WIDE2-1"] self.path = ['WIDE1-1', 'WIDE2-1']
def stop(self): def stop(self):
self.thread_stop = True self.thread_stop = True
LOG.info("Shutdown APRSDFakeClient client.") LOG.info('Shutdown APRSDFakeClient client.')
def is_alive(self): def is_alive(self):
"""If the connection is alive or not.""" """If the connection is alive or not."""
@ -38,35 +38,31 @@ class APRSDFakeClient(metaclass=trace.TraceWrapperMetaclass):
@wrapt.synchronized(lock) @wrapt.synchronized(lock)
def send(self, packet: core.Packet): def send(self, packet: core.Packet):
"""Send an APRS Message object.""" """Send an APRS Message object."""
LOG.info(f"Sending packet: {packet}") LOG.info(f'Sending packet: {packet}')
payload = None payload = None
if isinstance(packet, core.Packet): if isinstance(packet, core.Packet):
packet.prepare() packet.prepare()
payload = packet.payload.encode("US-ASCII") payload = packet.payload.encode('US-ASCII')
if packet.path:
packet.path
else:
self.path
else: else:
msg_payload = f"{packet.raw}{{{str(packet.msgNo)}" msg_payload = f'{packet.raw}{{{str(packet.msgNo)}'
payload = ( payload = (
":{:<9}:{}".format( ':{:<9}:{}'.format(
packet.to_call, packet.to_call,
msg_payload, msg_payload,
) )
).encode("US-ASCII") ).encode('US-ASCII')
LOG.debug( LOG.debug(
f"FAKE::Send '{payload}' TO '{packet.to_call}' From " f"FAKE::Send '{payload}' TO '{packet.to_call}' From "
f"'{packet.from_call}' with PATH \"{self.path}\"", f'\'{packet.from_call}\' with PATH "{self.path}"',
) )
def consumer(self, callback, blocking=False, immortal=False, raw=False): def consumer(self, callback, blocking=False, immortal=False, raw=False):
LOG.debug("Start non blocking FAKE consumer") LOG.debug('Start non blocking FAKE consumer')
# Generate packets here? # Generate packets here?
raw = "GTOWN>APDW16,WIDE1-1,WIDE2-1:}KM6LYW-9>APZ100,TCPIP,GTOWN*::KM6LYW :KM6LYW: 19 Miles SW" raw = 'GTOWN>APDW16,WIDE1-1,WIDE2-1:}KM6LYW-9>APZ100,TCPIP,GTOWN*::KM6LYW :KM6LYW: 19 Miles SW'
pkt_raw = aprslib.parse(raw) pkt_raw = aprslib.parse(raw)
pkt = core.factory(pkt_raw) pkt = core.factory(pkt_raw)
callback(packet=pkt) callback(packet=pkt)
LOG.debug(f"END blocking FAKE consumer {self}") LOG.debug(f'END blocking FAKE consumer {self}')
time.sleep(8) time.sleep(8)

View File

@ -16,11 +16,11 @@ from aprsd.main import cli
from aprsd.utils import trace from aprsd.utils import trace
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger("APRSD") LOG = logging.getLogger('APRSD')
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@cli.group(help="Development type subcommands", context_settings=CONTEXT_SETTINGS) @cli.group(help='Development type subcommands', context_settings=CONTEXT_SETTINGS)
@click.pass_context @click.pass_context
def dev(ctx): def dev(ctx):
pass pass
@ -29,37 +29,37 @@ def dev(ctx):
@dev.command() @dev.command()
@cli_helper.add_options(cli_helper.common_options) @cli_helper.add_options(cli_helper.common_options)
@click.option( @click.option(
"--aprs-login", '--aprs-login',
envvar="APRS_LOGIN", envvar='APRS_LOGIN',
show_envvar=True, show_envvar=True,
help="What callsign to send the message from.", help='What callsign to send the message from.',
) )
@click.option( @click.option(
"-p", '-p',
"--plugin", '--plugin',
"plugin_path", 'plugin_path',
show_default=True, show_default=True,
default=None, default=None,
help="The plugin to run. Ex: aprsd.plugins.ping.PingPlugin", help='The plugin to run. Ex: aprsd.plugins.ping.PingPlugin',
) )
@click.option( @click.option(
"-a", '-a',
"--all", '--all',
"load_all", 'load_all',
show_default=True, show_default=True,
is_flag=True, is_flag=True,
default=False, default=False,
help="Load all the plugins in config?", help='Load all the plugins in config?',
) )
@click.option( @click.option(
"-n", '-n',
"--num", '--num',
"number", 'number',
show_default=True, show_default=True,
default=1, default=1,
help="Number of times to call the plugin", help='Number of times to call the plugin',
) )
@click.argument("message", nargs=-1, required=True) @click.argument('message', nargs=-1, required=True)
@click.pass_context @click.pass_context
@cli_helper.process_standard_options @cli_helper.process_standard_options
def test_plugin( def test_plugin(
@ -76,7 +76,7 @@ def test_plugin(
if not aprs_login: if not aprs_login:
if CONF.aprs_network.login == conf.client.DEFAULT_LOGIN: if CONF.aprs_network.login == conf.client.DEFAULT_LOGIN:
click.echo("Must set --aprs_login or APRS_LOGIN") click.echo('Must set --aprs_login or APRS_LOGIN')
ctx.exit(-1) ctx.exit(-1)
return return
else: else:
@ -86,16 +86,16 @@ def test_plugin(
if not plugin_path: if not plugin_path:
click.echo(ctx.get_help()) click.echo(ctx.get_help())
click.echo("") click.echo('')
click.echo("Failed to provide -p option to test a plugin") click.echo('Failed to provide -p option to test a plugin')
ctx.exit(-1) ctx.exit(-1)
return return
if type(message) is tuple: if type(message) is tuple:
message = " ".join(message) message = ' '.join(message)
if CONF.trace_enabled: if CONF.trace_enabled:
trace.setup_tracing(["method", "api"]) trace.setup_tracing(['method', 'api'])
base.APRSClient() base.APRSClient()
@ -105,13 +105,13 @@ def test_plugin(
obj = pm._create_class(plugin_path, plugin.APRSDPluginBase) obj = pm._create_class(plugin_path, plugin.APRSDPluginBase)
if not obj: if not obj:
click.echo(ctx.get_help()) click.echo(ctx.get_help())
click.echo("") click.echo('')
ctx.fail(f"Failed to create object from plugin path '{plugin_path}'") ctx.fail(f"Failed to create object from plugin path '{plugin_path}'")
ctx.exit() ctx.exit()
# Register the plugin they wanted tested. # Register the plugin they wanted tested.
LOG.info( LOG.info(
"Testing plugin {} Version {}".format( 'Testing plugin {} Version {}'.format(
obj.__class__, obj.__class__,
obj.version, obj.version,
), ),
@ -126,7 +126,7 @@ def test_plugin(
) )
LOG.info(f"P'{plugin_path}' F'{fromcall}' C'{message}'") LOG.info(f"P'{plugin_path}' F'{fromcall}' C'{message}'")
for x in range(number): for _ in range(number):
replies = pm.run(packet) replies = pm.run(packet)
# Plugin might have threads, so lets stop them so we can exit. # Plugin might have threads, so lets stop them so we can exit.
# obj.stop_threads() # obj.stop_threads()
@ -147,17 +147,12 @@ def test_plugin(
elif isinstance(reply, packets.Packet): elif isinstance(reply, packets.Packet):
# We have a message based object. # We have a message based object.
LOG.info(reply) LOG.info(reply)
else: elif reply is not packets.NULL_MESSAGE:
# A plugin can return a null message flag which signals LOG.info(
# us that they processed the message correctly, but have packets.MessagePacket(
# nothing to reply with, so we avoid replying with a from_call=CONF.callsign,
# usage string to_call=fromcall,
if reply is not packets.NULL_MESSAGE: message_text=reply,
LOG.info( ),
packets.MessagePacket( )
from_call=CONF.callsign,
to_call=fromcall,
message_text=reply,
),
)
pm.stop() pm.stop()

View File

@ -15,30 +15,30 @@ from aprsd.threads.stats import StatsStore
# setup the global logger # setup the global logger
# log.basicConfig(level=log.DEBUG) # level=10 # log.basicConfig(level=log.DEBUG) # level=10
LOG = logging.getLogger("APRSD") LOG = logging.getLogger('APRSD')
CONF = cfg.CONF CONF = cfg.CONF
@cli.command() @cli.command()
@cli_helper.add_options(cli_helper.common_options) @cli_helper.add_options(cli_helper.common_options)
@click.option( @click.option(
"--host", '--host',
type=str, type=str,
default=None, default=None,
help="IP address of the remote aprsd admin web ui fetch stats from.", help='IP address of the remote aprsd admin web ui fetch stats from.',
) )
@click.option( @click.option(
"--port", '--port',
type=int, type=int,
default=None, default=None,
help="Port of the remote aprsd web admin interface to fetch stats from.", help='Port of the remote aprsd web admin interface to fetch stats from.',
) )
@click.pass_context @click.pass_context
@cli_helper.process_standard_options @cli_helper.process_standard_options
def fetch_stats(ctx, host, port): def fetch_stats(ctx, host, port):
"""Fetch stats from a APRSD admin web interface.""" """Fetch stats from a APRSD admin web interface."""
console = Console() console = Console()
console.print(f"APRSD Fetch-Stats started version: {aprsd.__version__}") console.print(f'APRSD Fetch-Stats started version: {aprsd.__version__}')
CONF.log_opt_values(LOG, logging.DEBUG) CONF.log_opt_values(LOG, logging.DEBUG)
if not host: if not host:
@ -46,114 +46,110 @@ def fetch_stats(ctx, host, port):
if not port: if not port:
port = CONF.admin.web_port port = CONF.admin.web_port
msg = f"Fetching stats from {host}:{port}" msg = f'Fetching stats from {host}:{port}'
console.print(msg) console.print(msg)
with console.status(msg): with console.status(msg):
response = requests.get(f"http://{host}:{port}/stats", timeout=120) response = requests.get(f'http://{host}:{port}/stats', timeout=120)
if not response: if not response:
console.print( console.print(
f"Failed to fetch stats from {host}:{port}?", f'Failed to fetch stats from {host}:{port}?',
style="bold red", style='bold red',
) )
return return
stats = response.json() stats = response.json()
if not stats: if not stats:
console.print( console.print(
f"Failed to fetch stats from aprsd admin ui at {host}:{port}", f'Failed to fetch stats from aprsd admin ui at {host}:{port}',
style="bold red", style='bold red',
) )
return return
aprsd_title = ( aprsd_title = (
"APRSD " 'APRSD '
f"[bold cyan]v{stats['APRSDStats']['version']}[/] " f'[bold cyan]v{stats["APRSDStats"]["version"]}[/] '
f"Callsign [bold green]{stats['APRSDStats']['callsign']}[/] " f'Callsign [bold green]{stats["APRSDStats"]["callsign"]}[/] '
f"Uptime [bold yellow]{stats['APRSDStats']['uptime']}[/]" f'Uptime [bold yellow]{stats["APRSDStats"]["uptime"]}[/]'
) )
console.rule(f"Stats from {host}:{port}") console.rule(f'Stats from {host}:{port}')
console.print("\n\n") console.print('\n\n')
console.rule(aprsd_title) console.rule(aprsd_title)
# Show the connection to APRS # Show the connection to APRS
# It can be a connection to an APRS-IS server or a local TNC via KISS or KISSTCP # It can be a connection to an APRS-IS server or a local TNC via KISS or KISSTCP
if "aprs-is" in stats: if 'aprs-is' in stats:
title = f"APRS-IS Connection {stats['APRSClientStats']['server_string']}" title = f'APRS-IS Connection {stats["APRSClientStats"]["server_string"]}'
table = Table(title=title) table = Table(title=title)
table.add_column("Key") table.add_column('Key')
table.add_column("Value") table.add_column('Value')
for key, value in stats["APRSClientStats"].items(): for key, value in stats['APRSClientStats'].items():
table.add_row(key, value) table.add_row(key, value)
console.print(table) console.print(table)
threads_table = Table(title="Threads") threads_table = Table(title='Threads')
threads_table.add_column("Name") threads_table.add_column('Name')
threads_table.add_column("Alive?") threads_table.add_column('Alive?')
for name, alive in stats["APRSDThreadList"].items(): for name, alive in stats['APRSDThreadList'].items():
threads_table.add_row(name, str(alive)) threads_table.add_row(name, str(alive))
console.print(threads_table) console.print(threads_table)
packet_totals = Table(title="Packet Totals") packet_totals = Table(title='Packet Totals')
packet_totals.add_column("Key") packet_totals.add_column('Key')
packet_totals.add_column("Value") packet_totals.add_column('Value')
packet_totals.add_row("Total Received", str(stats["PacketList"]["rx"])) packet_totals.add_row('Total Received', str(stats['PacketList']['rx']))
packet_totals.add_row("Total Sent", str(stats["PacketList"]["tx"])) packet_totals.add_row('Total Sent', str(stats['PacketList']['tx']))
console.print(packet_totals) console.print(packet_totals)
# Show each of the packet types # Show each of the packet types
packets_table = Table(title="Packets By Type") packets_table = Table(title='Packets By Type')
packets_table.add_column("Packet Type") packets_table.add_column('Packet Type')
packets_table.add_column("TX") packets_table.add_column('TX')
packets_table.add_column("RX") packets_table.add_column('RX')
for key, value in stats["PacketList"]["packets"].items(): for key, value in stats['PacketList']['packets'].items():
packets_table.add_row(key, str(value["tx"]), str(value["rx"])) packets_table.add_row(key, str(value['tx']), str(value['rx']))
console.print(packets_table) console.print(packets_table)
if "plugins" in stats: if 'plugins' in stats:
count = len(stats["PluginManager"]) count = len(stats['PluginManager'])
plugins_table = Table(title=f"Plugins ({count})") plugins_table = Table(title=f'Plugins ({count})')
plugins_table.add_column("Plugin") plugins_table.add_column('Plugin')
plugins_table.add_column("Enabled") plugins_table.add_column('Enabled')
plugins_table.add_column("Version") plugins_table.add_column('Version')
plugins_table.add_column("TX") plugins_table.add_column('TX')
plugins_table.add_column("RX") plugins_table.add_column('RX')
plugins = stats["PluginManager"] plugins = stats['PluginManager']
for key, value in plugins.items(): for key, _ in plugins.items():
plugins_table.add_row( plugins_table.add_row(
key, key,
str(plugins[key]["enabled"]), str(plugins[key]['enabled']),
plugins[key]["version"], plugins[key]['version'],
str(plugins[key]["tx"]), str(plugins[key]['tx']),
str(plugins[key]["rx"]), str(plugins[key]['rx']),
) )
console.print(plugins_table) console.print(plugins_table)
seen_list = stats.get("SeenList") if seen_list := stats.get('SeenList'):
if seen_list:
count = len(seen_list) count = len(seen_list)
seen_table = Table(title=f"Seen List ({count})") seen_table = Table(title=f'Seen List ({count})')
seen_table.add_column("Callsign") seen_table.add_column('Callsign')
seen_table.add_column("Message Count") seen_table.add_column('Message Count')
seen_table.add_column("Last Heard") seen_table.add_column('Last Heard')
for key, value in seen_list.items(): for key, value in seen_list.items():
seen_table.add_row(key, str(value["count"]), value["last"]) seen_table.add_row(key, str(value['count']), value['last'])
console.print(seen_table) console.print(seen_table)
watch_list = stats.get("WatchList") if watch_list := stats.get('WatchList'):
if watch_list:
count = len(watch_list) count = len(watch_list)
watch_table = Table(title=f"Watch List ({count})") watch_table = Table(title=f'Watch List ({count})')
watch_table.add_column("Callsign") watch_table.add_column('Callsign')
watch_table.add_column("Last Heard") watch_table.add_column('Last Heard')
for key, value in watch_list.items(): for key, value in watch_list.items():
watch_table.add_row(key, value["last"]) watch_table.add_row(key, value['last'])
console.print(watch_table) console.print(watch_table)
@ -161,27 +157,27 @@ def fetch_stats(ctx, host, port):
@cli.command() @cli.command()
@cli_helper.add_options(cli_helper.common_options) @cli_helper.add_options(cli_helper.common_options)
@click.option( @click.option(
"--raw", '--raw',
is_flag=True, is_flag=True,
default=False, default=False,
help="Dump raw stats instead of formatted output.", help='Dump raw stats instead of formatted output.',
) )
@click.option( @click.option(
"--show-section", '--show-section',
default=["All"], default=['All'],
help="Show specific sections of the stats. " help='Show specific sections of the stats. '
" Choices: All, APRSDStats, APRSDThreadList, APRSClientStats," ' Choices: All, APRSDStats, APRSDThreadList, APRSClientStats,'
" PacketList, SeenList, WatchList", ' PacketList, SeenList, WatchList',
multiple=True, multiple=True,
type=click.Choice( type=click.Choice(
[ [
"All", 'All',
"APRSDStats", 'APRSDStats',
"APRSDThreadList", 'APRSDThreadList',
"APRSClientStats", 'APRSClientStats',
"PacketList", 'PacketList',
"SeenList", 'SeenList',
"WatchList", 'WatchList',
], ],
case_sensitive=False, case_sensitive=False,
), ),
@ -191,122 +187,122 @@ def fetch_stats(ctx, host, port):
def dump_stats(ctx, raw, show_section): def dump_stats(ctx, raw, show_section):
"""Dump the current stats from the running APRSD instance.""" """Dump the current stats from the running APRSD instance."""
console = Console() console = Console()
console.print(f"APRSD Dump-Stats started version: {aprsd.__version__}") console.print(f'APRSD Dump-Stats started version: {aprsd.__version__}')
with console.status("Dumping stats"): with console.status('Dumping stats'):
ss = StatsStore() ss = StatsStore()
ss.load() ss.load()
stats = ss.data stats = ss.data
if raw: if raw:
if "All" in show_section: if 'All' in show_section:
console.print(stats) console.print(stats)
return return
else: else:
for section in show_section: for section in show_section:
console.print(f"Dumping {section} section:") console.print(f'Dumping {section} section:')
console.print(stats[section]) console.print(stats[section])
return return
t = Table(title="APRSD Stats") t = Table(title='APRSD Stats')
t.add_column("Key") t.add_column('Key')
t.add_column("Value") t.add_column('Value')
for key, value in stats["APRSDStats"].items(): for key, value in stats['APRSDStats'].items():
t.add_row(key, str(value)) t.add_row(key, str(value))
if "All" in show_section or "APRSDStats" in show_section: if 'All' in show_section or 'APRSDStats' in show_section:
console.print(t) console.print(t)
# Show the thread list # Show the thread list
t = Table(title="Thread List") t = Table(title='Thread List')
t.add_column("Name") t.add_column('Name')
t.add_column("Class") t.add_column('Class')
t.add_column("Alive?") t.add_column('Alive?')
t.add_column("Loop Count") t.add_column('Loop Count')
t.add_column("Age") t.add_column('Age')
for name, value in stats["APRSDThreadList"].items(): for name, value in stats['APRSDThreadList'].items():
t.add_row( t.add_row(
name, name,
value["class"], value['class'],
str(value["alive"]), str(value['alive']),
str(value["loop_count"]), str(value['loop_count']),
str(value["age"]), str(value['age']),
) )
if "All" in show_section or "APRSDThreadList" in show_section: if 'All' in show_section or 'APRSDThreadList' in show_section:
console.print(t) console.print(t)
# Show the plugins # Show the plugins
t = Table(title="Plugin List") t = Table(title='Plugin List')
t.add_column("Name") t.add_column('Name')
t.add_column("Enabled") t.add_column('Enabled')
t.add_column("Version") t.add_column('Version')
t.add_column("TX") t.add_column('TX')
t.add_column("RX") t.add_column('RX')
for name, value in stats["PluginManager"].items(): for name, value in stats['PluginManager'].items():
t.add_row( t.add_row(
name, name,
str(value["enabled"]), str(value['enabled']),
value["version"], value['version'],
str(value["tx"]), str(value['tx']),
str(value["rx"]), str(value['rx']),
) )
if "All" in show_section or "PluginManager" in show_section: if 'All' in show_section or 'PluginManager' in show_section:
console.print(t) console.print(t)
# Now show the client stats # Now show the client stats
t = Table(title="Client Stats") t = Table(title='Client Stats')
t.add_column("Key") t.add_column('Key')
t.add_column("Value") t.add_column('Value')
for key, value in stats["APRSClientStats"].items(): for key, value in stats['APRSClientStats'].items():
t.add_row(key, str(value)) t.add_row(key, str(value))
if "All" in show_section or "APRSClientStats" in show_section: if 'All' in show_section or 'APRSClientStats' in show_section:
console.print(t) console.print(t)
# now show the packet list # now show the packet list
packet_list = stats.get("PacketList") packet_list = stats.get('PacketList')
t = Table(title="Packet List") t = Table(title='Packet List')
t.add_column("Key") t.add_column('Key')
t.add_column("Value") t.add_column('Value')
t.add_row("Total Received", str(packet_list["rx"])) t.add_row('Total Received', str(packet_list['rx']))
t.add_row("Total Sent", str(packet_list["tx"])) t.add_row('Total Sent', str(packet_list['tx']))
if "All" in show_section or "PacketList" in show_section: if 'All' in show_section or 'PacketList' in show_section:
console.print(t) console.print(t)
# now show the seen list # now show the seen list
seen_list = stats.get("SeenList") seen_list = stats.get('SeenList')
sorted_seen_list = sorted( sorted_seen_list = sorted(
seen_list.items(), seen_list.items(),
) )
t = Table(title="Seen List") t = Table(title='Seen List')
t.add_column("Callsign") t.add_column('Callsign')
t.add_column("Message Count") t.add_column('Message Count')
t.add_column("Last Heard") t.add_column('Last Heard')
for key, value in sorted_seen_list: for key, value in sorted_seen_list:
t.add_row( t.add_row(
key, key,
str(value["count"]), str(value['count']),
str(value["last"]), str(value['last']),
) )
if "All" in show_section or "SeenList" in show_section: if 'All' in show_section or 'SeenList' in show_section:
console.print(t) console.print(t)
# now show the watch list # now show the watch list
watch_list = stats.get("WatchList") watch_list = stats.get('WatchList')
sorted_watch_list = sorted( sorted_watch_list = sorted(
watch_list.items(), watch_list.items(),
) )
t = Table(title="Watch List") t = Table(title='Watch List')
t.add_column("Callsign") t.add_column('Callsign')
t.add_column("Last Heard") t.add_column('Last Heard')
for key, value in sorted_watch_list: for key, value in sorted_watch_list:
t.add_row( t.add_row(
key, key,
str(value["last"]), str(value['last']),
) )
if "All" in show_section or "WatchList" in show_section: if 'All' in show_section or 'WatchList' in show_section:
console.print(t) console.print(t)

View File

@ -19,12 +19,12 @@ from aprsd import plugin as aprsd_plugin
from aprsd.main import cli from aprsd.main import cli
from aprsd.plugins import fortune, notify, ping, time, version, weather from aprsd.plugins import fortune, notify, ping, time, version, weather
LOG = logging.getLogger("APRSD") LOG = logging.getLogger('APRSD')
PYPI_URL = "https://pypi.org/search/" PYPI_URL = 'https://pypi.org/search/'
def onerror(name): def onerror(name):
print(f"Error importing module {name}") print(f'Error importing module {name}')
type, value, traceback = sys.exc_info() type, value, traceback = sys.exc_info()
print_tb(traceback) print_tb(traceback)
@ -40,19 +40,19 @@ def is_plugin(obj):
def plugin_type(obj): def plugin_type(obj):
for c in inspect.getmro(obj): for c in inspect.getmro(obj):
if issubclass(c, aprsd_plugin.APRSDRegexCommandPluginBase): if issubclass(c, aprsd_plugin.APRSDRegexCommandPluginBase):
return "RegexCommand" return 'RegexCommand'
if issubclass(c, aprsd_plugin.APRSDWatchListPluginBase): if issubclass(c, aprsd_plugin.APRSDWatchListPluginBase):
return "WatchList" return 'WatchList'
if issubclass(c, aprsd_plugin.APRSDPluginBase): if issubclass(c, aprsd_plugin.APRSDPluginBase):
return "APRSDPluginBase" return 'APRSDPluginBase'
return "Unknown" return 'Unknown'
def walk_package(package): def walk_package(package):
return pkgutil.walk_packages( return pkgutil.walk_packages(
package.__path__, package.__path__,
package.__name__ + ".", package.__name__ + '.',
onerror=onerror, onerror=onerror,
) )
@ -62,23 +62,23 @@ def get_module_info(package_name, module_name, module_path):
return None return None
dir_path = os.path.realpath(module_path) dir_path = os.path.realpath(module_path)
pattern = "*.py" pattern = '*.py'
obj_list = [] obj_list = []
for path, _subdirs, files in os.walk(dir_path): for path, _subdirs, files in os.walk(dir_path):
for name in files: for name in files:
if fnmatch.fnmatch(name, pattern): if fnmatch.fnmatch(name, pattern):
module = smuggle(f"{path}/{name}") module = smuggle(f'{path}/{name}')
for mem_name, obj in inspect.getmembers(module): for mem_name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and is_plugin(obj): if inspect.isclass(obj) and is_plugin(obj):
obj_list.append( obj_list.append(
{ {
"package": package_name, 'package': package_name,
"name": mem_name, 'name': mem_name,
"obj": obj, 'obj': obj,
"version": obj.version, 'version': obj.version,
"path": f"{'.'.join([module_name, obj.__name__])}", 'path': f'{".".join([module_name, obj.__name__])}',
}, },
) )
@ -89,17 +89,17 @@ def _get_installed_aprsd_items():
# installed plugins # installed plugins
plugins = {} plugins = {}
extensions = {} extensions = {}
for finder, name, ispkg in pkgutil.iter_modules(): for _finder, name, ispkg in pkgutil.iter_modules():
if ispkg and name.startswith("aprsd_"): if ispkg and name.startswith('aprsd_'):
module = importlib.import_module(name) module = importlib.import_module(name)
pkgs = walk_package(module) pkgs = walk_package(module)
for pkg in pkgs: for pkg in pkgs:
pkg_info = get_module_info( pkg_info = get_module_info(
module.__name__, pkg.name, module.__path__[0] module.__name__, pkg.name, module.__path__[0]
) )
if "plugin" in name: if 'plugin' in name:
plugins[name] = pkg_info plugins[name] = pkg_info
elif "extension" in name: elif 'extension' in name:
extensions[name] = pkg_info extensions[name] = pkg_info
return plugins, extensions return plugins, extensions
@ -126,57 +126,57 @@ def show_built_in_plugins(console):
cls = entry[1] cls = entry[1]
if issubclass(cls, aprsd_plugin.APRSDPluginBase): if issubclass(cls, aprsd_plugin.APRSDPluginBase):
info = { info = {
"name": cls.__qualname__, 'name': cls.__qualname__,
"path": f"{cls.__module__}.{cls.__qualname__}", 'path': f'{cls.__module__}.{cls.__qualname__}',
"version": cls.version, 'version': cls.version,
"docstring": cls.__doc__, 'docstring': cls.__doc__,
"short_desc": cls.short_description, 'short_desc': cls.short_description,
} }
if issubclass(cls, aprsd_plugin.APRSDRegexCommandPluginBase): if issubclass(cls, aprsd_plugin.APRSDRegexCommandPluginBase):
info["command_regex"] = cls.command_regex info['command_regex'] = cls.command_regex
info["type"] = "RegexCommand" info['type'] = 'RegexCommand'
if issubclass(cls, aprsd_plugin.APRSDWatchListPluginBase): if issubclass(cls, aprsd_plugin.APRSDWatchListPluginBase):
info["type"] = "WatchList" info['type'] = 'WatchList'
plugins.append(info) plugins.append(info)
plugins = sorted(plugins, key=lambda i: i["name"]) plugins = sorted(plugins, key=lambda i: i['name'])
table = Table( table = Table(
title="[not italic]:snake:[/] [bold][magenta]APRSD Built-in Plugins [not italic]:snake:[/]", title='[not italic]:snake:[/] [bold][magenta]APRSD Built-in Plugins [not italic]:snake:[/]',
) )
table.add_column("Plugin Name", style="cyan", no_wrap=True) table.add_column('Plugin Name', style='cyan', no_wrap=True)
table.add_column("Info", style="bold yellow") table.add_column('Info', style='bold yellow')
table.add_column("Type", style="bold green") table.add_column('Type', style='bold green')
table.add_column("Plugin Path", style="bold blue") table.add_column('Plugin Path', style='bold blue')
for entry in plugins: for entry in plugins:
table.add_row(entry["name"], entry["short_desc"], entry["type"], entry["path"]) table.add_row(entry['name'], entry['short_desc'], entry['type'], entry['path'])
console.print(table) console.print(table)
def _get_pypi_packages(): def _get_pypi_packages():
if simple_r := requests.get( if simple_r := requests.get(
"https://pypi.org/simple", 'https://pypi.org/simple',
headers={"Accept": "application/vnd.pypi.simple.v1+json"}, headers={'Accept': 'application/vnd.pypi.simple.v1+json'},
): ):
simple_response = simple_r.json() simple_response = simple_r.json()
else: else:
simple_response = {} simple_response = {}
key = "aprsd" key = 'aprsd'
matches = [ matches = [
p["name"] for p in simple_response["projects"] if p["name"].startswith(key) p['name'] for p in simple_response['projects'] if p['name'].startswith(key)
] ]
packages = [] packages = []
for pkg in matches: for pkg in matches:
# Get info for first match # Get info for first match
if r := requests.get( if r := requests.get(
f"https://pypi.org/pypi/{pkg}/json", f'https://pypi.org/pypi/{pkg}/json',
headers={"Accept": "application/json"}, headers={'Accept': 'application/json'},
): ):
packages.append(r.json()) packages.append(r.json())
@ -187,40 +187,40 @@ def show_pypi_plugins(installed_plugins, console):
packages = _get_pypi_packages() packages = _get_pypi_packages()
title = Text.assemble( title = Text.assemble(
("Pypi.org APRSD Installable Plugin Packages\n\n", "bold magenta"), ('Pypi.org APRSD Installable Plugin Packages\n\n', 'bold magenta'),
("Install any of the following plugins with\n", "bold yellow"), ('Install any of the following plugins with\n', 'bold yellow'),
("'pip install ", "bold white"), ("'pip install ", 'bold white'),
("<Plugin Package Name>'", "cyan"), ("<Plugin Package Name>'", 'cyan'),
) )
table = Table(title=title) table = Table(title=title)
table.add_column("Plugin Package Name", style="cyan", no_wrap=True) table.add_column('Plugin Package Name', style='cyan', no_wrap=True)
table.add_column("Description", style="yellow") table.add_column('Description', style='yellow')
table.add_column("Version", style="yellow", justify="center") table.add_column('Version', style='yellow', justify='center')
table.add_column("Released", style="bold green", justify="center") table.add_column('Released', style='bold green', justify='center')
table.add_column("Installed?", style="red", justify="center") table.add_column('Installed?', style='red', justify='center')
emoji = ":open_file_folder:" emoji = ':open_file_folder:'
for package in packages: for package in packages:
link = package["info"]["package_url"] link = package['info']['package_url']
version = package["info"]["version"] version = package['info']['version']
package_name = package["info"]["name"] package_name = package['info']['name']
description = package["info"]["summary"] description = package['info']['summary']
created = package["releases"][version][0]["upload_time"] created = package['releases'][version][0]['upload_time']
if "aprsd-" not in package_name or "-plugin" not in package_name: if 'aprsd-' not in package_name or '-plugin' not in package_name:
continue continue
under = package_name.replace("-", "_") under = package_name.replace('-', '_')
installed = "Yes" if under in installed_plugins else "No" installed = 'Yes' if under in installed_plugins else 'No'
table.add_row( table.add_row(
f"[link={link}]{emoji}[/link] {package_name}", f'[link={link}]{emoji}[/link] {package_name}',
description, description,
version, version,
created, created,
installed, installed,
) )
console.print("\n") console.print('\n')
console.print(table) console.print(table)
@ -228,39 +228,39 @@ def show_pypi_extensions(installed_extensions, console):
packages = _get_pypi_packages() packages = _get_pypi_packages()
title = Text.assemble( title = Text.assemble(
("Pypi.org APRSD Installable Extension Packages\n\n", "bold magenta"), ('Pypi.org APRSD Installable Extension Packages\n\n', 'bold magenta'),
("Install any of the following extensions by running\n", "bold yellow"), ('Install any of the following extensions by running\n', 'bold yellow'),
("'pip install ", "bold white"), ("'pip install ", 'bold white'),
("<Plugin Package Name>'", "cyan"), ("<Plugin Package Name>'", 'cyan'),
) )
table = Table(title=title) table = Table(title=title)
table.add_column("Extension Package Name", style="cyan", no_wrap=True) table.add_column('Extension Package Name', style='cyan', no_wrap=True)
table.add_column("Description", style="yellow") table.add_column('Description', style='yellow')
table.add_column("Version", style="yellow", justify="center") table.add_column('Version', style='yellow', justify='center')
table.add_column("Released", style="bold green", justify="center") table.add_column('Released', style='bold green', justify='center')
table.add_column("Installed?", style="red", justify="center") table.add_column('Installed?', style='red', justify='center')
emoji = ":open_file_folder:" emoji = ':open_file_folder:'
for package in packages: for package in packages:
link = package["info"]["package_url"] link = package['info']['package_url']
version = package["info"]["version"] version = package['info']['version']
package_name = package["info"]["name"] package_name = package['info']['name']
description = package["info"]["summary"] description = package['info']['summary']
created = package["releases"][version][0]["upload_time"] created = package['releases'][version][0]['upload_time']
if "aprsd-" not in package_name or "-extension" not in package_name: if 'aprsd-' not in package_name or '-extension' not in package_name:
continue continue
under = package_name.replace("-", "_") under = package_name.replace('-', '_')
installed = "Yes" if under in installed_extensions else "No" installed = 'Yes' if under in installed_extensions else 'No'
table.add_row( table.add_row(
f"[link={link}]{emoji}[/link] {package_name}", f'[link={link}]{emoji}[/link] {package_name}',
description, description,
version, version,
created, created,
installed, installed,
) )
console.print("\n") console.print('\n')
console.print(table) console.print(table)
@ -269,24 +269,24 @@ def show_installed_plugins(installed_plugins, console):
return return
table = Table( table = Table(
title="[not italic]:snake:[/] [bold][magenta]APRSD Installed 3rd party Plugins [not italic]:snake:[/]", title='[not italic]:snake:[/] [bold][magenta]APRSD Installed 3rd party Plugins [not italic]:snake:[/]',
) )
table.add_column("Package Name", style=" bold white", no_wrap=True) table.add_column('Package Name', style=' bold white', no_wrap=True)
table.add_column("Plugin Name", style="cyan", no_wrap=True) table.add_column('Plugin Name', style='cyan', no_wrap=True)
table.add_column("Version", style="yellow", justify="center") table.add_column('Version', style='yellow', justify='center')
table.add_column("Type", style="bold green") table.add_column('Type', style='bold green')
table.add_column("Plugin Path", style="bold blue") table.add_column('Plugin Path', style='bold blue')
for name in installed_plugins: for name in installed_plugins:
for plugin in installed_plugins[name]: for plugin in installed_plugins[name]:
table.add_row( table.add_row(
name.replace("_", "-"), name.replace('_', '-'),
plugin["name"], plugin['name'],
plugin["version"], plugin['version'],
plugin_type(plugin["obj"]), plugin_type(plugin['obj']),
plugin["path"], plugin['path'],
) )
console.print("\n") console.print('\n')
console.print(table) console.print(table)
@ -298,14 +298,14 @@ def list_plugins(ctx):
"""List the built in plugins available to APRSD.""" """List the built in plugins available to APRSD."""
console = Console() console = Console()
with console.status("Show Built-in Plugins") as status: with console.status('Show Built-in Plugins') as status:
show_built_in_plugins(console) show_built_in_plugins(console)
status.update("Fetching pypi.org plugins") status.update('Fetching pypi.org plugins')
installed_plugins = get_installed_plugins() installed_plugins = get_installed_plugins()
show_pypi_plugins(installed_plugins, console) show_pypi_plugins(installed_plugins, console)
status.update("Looking for installed APRSD plugins") status.update('Looking for installed APRSD plugins')
show_installed_plugins(installed_plugins, console) show_installed_plugins(installed_plugins, console)
@ -317,9 +317,9 @@ def list_extensions(ctx):
"""List the built in plugins available to APRSD.""" """List the built in plugins available to APRSD."""
console = Console() console = Console()
with console.status("Show APRSD Extensions") as status: with console.status('Show APRSD Extensions') as status:
status.update("Fetching pypi.org APRSD Extensions") status.update('Fetching pypi.org APRSD Extensions')
status.update("Looking for installed APRSD Extensions") status.update('Looking for installed APRSD Extensions')
installed_extensions = get_installed_extensions() installed_extensions = get_installed_extensions()
show_pypi_extensions(installed_extensions, console) show_pypi_extensions(installed_extensions, console)

View File

@ -173,3 +173,20 @@ skip_gitignore = true
branch = true branch = true
[tool.setuptools_scm] [tool.setuptools_scm]
[tool.ruff]
line-length = 88
select = [
"F", # pyflakes rules
"E", # pycodestyle error rules
"W", # pycodestyle warning rules
"B", # flake8-bugbear rules
# "I", # isort rules
]
ignore = [
"E501", # line-too-long
]
[tool.ruff.format]
indent-style = "space"
quote-style = "single"