1
0
mirror of https://github.com/craigerl/aprsd.git synced 2025-08-04 14:32:24 -04:00

Merge pull request #13 from hemna/test

Update tox environment to fix formatting python errors
This commit is contained in:
Craig Lamparter 2020-12-11 06:19:16 -08:00 committed by GitHub
commit 40472ca7d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 750 additions and 391 deletions

22
.github/workflows/python.yml vendored Normal file
View File

@ -0,0 +1,22 @@
name: python
on: [push]
jobs:
tox:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [2.7, 3.6, 3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install tox tox-gh-actions
- name: Test with tox
run: tox

View File

@ -14,6 +14,4 @@
import pbr.version import pbr.version
__version__ = pbr.version.VersionInfo("aprsd").version_string()
__version__ = pbr.version.VersionInfo(
'aprsd').version_string()

View File

@ -1,33 +1,27 @@
import argparse import argparse
import logging import logging
import socketserver
import sys import sys
import time import time
import socketserver
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
from aprsd import utils from aprsd import utils
# command line args # command line args
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--loglevel", parser.add_argument(
default='DEBUG', "--loglevel",
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], default="DEBUG",
help="The log level to use for aprsd.log") choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"],
parser.add_argument("--quiet", help="The log level to use for aprsd.log",
action='store_true', )
help="Don't log to stdout") parser.add_argument("--quiet", action="store_true", help="Don't log to stdout")
parser.add_argument("--port", parser.add_argument("--port", default=9099, type=int, help="The port to listen on .")
default=9099, parser.add_argument("--ip", default="127.0.0.1", help="The IP to listen on ")
type=int,
help="The port to listen on .")
parser.add_argument("--ip",
default='127.0.0.1',
help="The IP to listen on ")
CONFIG = None CONFIG = None
LOG = logging.getLogger('ARPSSERVER') LOG = logging.getLogger("ARPSSERVER")
# Setup the logging faciility # Setup the logging faciility
@ -36,22 +30,19 @@ LOG = logging.getLogger('ARPSSERVER')
def setup_logging(args): def setup_logging(args):
global LOG global LOG
levels = { levels = {
'CRITICAL': logging.CRITICAL, "CRITICAL": logging.CRITICAL,
'ERROR': logging.ERROR, "ERROR": logging.ERROR,
'WARNING': logging.WARNING, "WARNING": logging.WARNING,
'INFO': logging.INFO, "INFO": logging.INFO,
'DEBUG': logging.DEBUG} "DEBUG": logging.DEBUG,
}
log_level = levels[args.loglevel] log_level = levels[args.loglevel]
LOG.setLevel(log_level) LOG.setLevel(log_level)
log_format = ("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s]" log_format = "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s]" " %(message)s"
" %(message)s") date_format = "%m/%d/%Y %I:%M:%S %p"
date_format = '%m/%d/%Y %I:%M:%S %p' log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
log_formatter = logging.Formatter(fmt=log_format, fh = RotatingFileHandler("aprs-server.log", maxBytes=(10248576 * 5), backupCount=4)
datefmt=date_format)
fh = RotatingFileHandler('aprs-server.log',
maxBytes=(10248576 * 5),
backupCount=4)
fh.setFormatter(log_formatter) fh.setFormatter(log_formatter)
LOG.addHandler(fh) LOG.addHandler(fh)
@ -62,7 +53,6 @@ def setup_logging(args):
class MyAPRSTCPHandler(socketserver.BaseRequestHandler): class MyAPRSTCPHandler(socketserver.BaseRequestHandler):
def handle(self): def handle(self):
# self.request is the TCP socket connected to the client # self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip() self.data = self.request.recv(1024).strip()
@ -81,8 +71,8 @@ def main():
CONFIG = utils.parse_config(args) CONFIG = utils.parse_config(args)
ip = CONFIG['aprs']['host'] ip = CONFIG["aprs"]["host"]
port = CONFIG['aprs']['port'] port = CONFIG["aprs"]["port"]
LOG.info("Start server listening on %s:%s" % (args.ip, args.port)) LOG.info("Start server listening on %s:%s" % (args.ip, args.port))
with socketserver.TCPServer((ip, port), MyAPRSTCPHandler) as server: with socketserver.TCPServer((ip, port), MyAPRSTCPHandler) as server:

View File

@ -19,37 +19,49 @@ import time
def fuzzy(hour, minute, degree=1): def fuzzy(hour, minute, degree=1):
'''Implements the fuzzy clock. """Implements the fuzzy clock.
returns the the string that spells out the time - hour:minute returns the the string that spells out the time - hour:minute
Supports two degrees of fuzziness. Set with degree = 1 or degree = 2 Supports two degrees of fuzziness. Set with degree = 1 or degree = 2
When degree = 1, time is in quantum of 5 minutes. When degree = 1, time is in quantum of 5 minutes.
When degree = 2, time is in quantum of 15 minutes.''' When degree = 2, time is in quantum of 15 minutes."""
if degree <= 0 or degree > 2: if degree <= 0 or degree > 2:
print('Please use a degree of 1 or 2. Using fuzziness degree=1') print("Please use a degree of 1 or 2. Using fuzziness degree=1")
degree = 1 degree = 1
begin = 'It\'s ' begin = "It's "
f0 = 'almost ' f0 = "almost "
f1 = 'exactly ' f1 = "exactly "
f2 = 'around ' f2 = "around "
b0 = ' past ' b0 = " past "
b1 = ' to ' b1 = " to "
hourList = ('One', 'Two', 'Three', 'Four', 'Five', 'Six', 'Seven', 'Eight', hourlist = (
'Nine', 'Ten', 'Eleven', 'Twelve') "One",
"Two",
"Three",
"Four",
"Five",
"Six",
"Seven",
"Eight",
"Nine",
"Ten",
"Eleven",
"Twelve",
)
s1 = s2 = s3 = s4 = '' s1 = s2 = s3 = s4 = ""
base = 5 base = 5
if degree == 1: if degree == 1:
base = 5 base = 5
val = ('Five', 'Ten', 'Quarter', 'Twenty', 'Twenty-Five', 'Half') val = ("Five", "Ten", "Quarter", "Twenty", "Twenty-Five", "Half")
elif degree == 2: elif degree == 2:
base = 15 base = 15
val = ('Quarter', 'Half') val = ("Quarter", "Half")
# to find whether we have to use 'almost', 'exactly' or 'around' # to find whether we have to use 'almost', 'exactly' or 'around'
dmin = minute % base dmin = minute % base
@ -74,20 +86,20 @@ def fuzzy(hour, minute, degree=1):
if minute <= base / 2: if minute <= base / 2:
# Case like "It's around/exactly Ten" # Case like "It's around/exactly Ten"
s2 = s3 = '' s2 = s3 = ""
s4 = hourList[hour - 12 - 1] s4 = hourlist[hour - 12 - 1]
elif minute >= 60 - base / 2: elif minute >= 60 - base / 2:
# Case like "It's almost Ten" # Case like "It's almost Ten"
s2 = s3 = '' s2 = s3 = ""
s4 = hourList[hour - 12] s4 = hourlist[hour - 12]
else: else:
# Other cases with all words, like "It's around Quarter past One" # Other cases with all words, like "It's around Quarter past One"
if minute > 30: if minute > 30:
s3 = b1 # to s3 = b1 # to
s4 = hourList[hour - 12] s4 = hourlist[hour - 12]
else: else:
s3 = b0 # past s3 = b0 # past
s4 = hourList[hour - 12 - 1] s4 = hourlist[hour - 12 - 1]
return begin + s1 + s2 + s3 + s4 return begin + s1 + s2 + s3 + s4
@ -102,17 +114,17 @@ def main():
try: try:
deg = int(sys.argv[1]) deg = int(sys.argv[1])
except Exception: except Exception:
print('Please use a degree of 1 or 2. Using fuzziness degree=1') print("Please use a degree of 1 or 2. Using fuzziness degree=1")
if len(sys.argv) >= 3: if len(sys.argv) >= 3:
tm = sys.argv[2].split(':') tm = sys.argv[2].split(":")
try: try:
h = int(tm[0]) h = int(tm[0])
m = int(tm[1]) m = int(tm[1])
if h < 0 or h > 23 or m < 0 or m > 59: if h < 0 or h > 23 or m < 0 or m > 59:
raise Exception raise Exception
except Exception: except Exception:
print('Bad time entered. Using the system time.') print("Bad time entered. Using the system time.")
h = stm.tm_hour h = stm.tm_hour
m = stm.tm_min m = stm.tm_min
print(fuzzy(h, m, deg)) print(fuzzy(h, m, deg))

View File

@ -23,38 +23,36 @@
# python included libs # python included libs
import datetime import datetime
import email import email
import imaplib
import json import json
import logging import logging
import os import os
import socket
import pprint import pprint
import re import re
import signal import signal
import six
import smtplib import smtplib
import socket
import subprocess import subprocess
import sys import sys
# import telnetlib
import threading import threading
import time import time
#import urllib from email.mime.text import MIMEText
import requests from logging.handlers import RotatingFileHandler
import click import click
import click_completion import click_completion
from email.mime.text import MIMEText
import imapclient import imapclient
import imaplib import requests
from logging.handlers import RotatingFileHandler import six
import yaml
# local imports here # local imports here
import aprsd import aprsd
from aprsd.fuzzyclock import fuzzy
from aprsd import utils from aprsd import utils
from aprsd.fuzzyclock import fuzzy
# setup the global logger # setup the global logger
LOG = logging.getLogger('APRSD') LOG = logging.getLogger("APRSD")
# global for the config yaml # global for the config yaml
CONFIG = None CONFIG = None
@ -100,7 +98,7 @@ message_number = 0
def custom_startswith(string, incomplete): def custom_startswith(string, incomplete):
"""A custom completion match that supports case insensitive matching.""" """A custom completion match that supports case insensitive matching."""
if os.environ.get('_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE'): if os.environ.get("_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE"):
string = string.lower() string = string.lower()
incomplete = incomplete.lower() incomplete = incomplete.lower()
return string.startswith(incomplete) return string.startswith(incomplete)
@ -115,8 +113,11 @@ Available shell types:
\b \b
%s %s
Default type: auto Default type: auto
""" % "\n ".join('{:<12} {}'.format(k, click_completion.core.shells[k]) for k in sorted( """ % "\n ".join(
click_completion.core.shells.keys())) "{:<12} {}".format(k, click_completion.core.shells[k])
for k in sorted(click_completion.core.shells.keys())
)
@click.group(help=cmd_help) @click.group(help=cmd_help)
@click.version_option() @click.version_option()
@ -125,24 +126,48 @@ def main():
@main.command() @main.command()
@click.option('-i', '--case-insensitive/--no-case-insensitive', help="Case insensitive completion") @click.option(
@click.argument('shell', required=False, type=click_completion.DocumentedChoice(click_completion.core.shells)) "-i", "--case-insensitive/--no-case-insensitive", help="Case insensitive completion"
)
@click.argument(
"shell",
required=False,
type=click_completion.DocumentedChoice(click_completion.core.shells),
)
def show(shell, case_insensitive): def show(shell, case_insensitive):
"""Show the click-completion-command completion code""" """Show the click-completion-command completion code"""
extra_env = {'_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE': 'ON'} if case_insensitive else {} extra_env = (
{"_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE": "ON"}
if case_insensitive
else {}
)
click.echo(click_completion.core.get_code(shell, extra_env=extra_env)) click.echo(click_completion.core.get_code(shell, extra_env=extra_env))
@main.command() @main.command()
@click.option('--append/--overwrite', help="Append the completion code to the file", default=None) @click.option(
@click.option('-i', '--case-insensitive/--no-case-insensitive', help="Case insensitive completion") "--append/--overwrite", help="Append the completion code to the file", default=None
@click.argument('shell', required=False, type=click_completion.DocumentedChoice(click_completion.core.shells)) )
@click.argument('path', required=False) @click.option(
"-i", "--case-insensitive/--no-case-insensitive", help="Case insensitive completion"
)
@click.argument(
"shell",
required=False,
type=click_completion.DocumentedChoice(click_completion.core.shells),
)
@click.argument("path", required=False)
def install(append, case_insensitive, shell, path): def install(append, case_insensitive, shell, path):
"""Install the click-completion-command completion""" """Install the click-completion-command completion"""
extra_env = {'_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE': 'ON'} if case_insensitive else {} extra_env = (
shell, path = click_completion.core.install(shell=shell, path=path, append=append, extra_env=extra_env) {"_CLICK_COMPLETION_COMMAND_CASE_INSENSITIVE_COMPLETE": "ON"}
click.echo('%s completion installed in %s' % (shell, path)) if case_insensitive
else {}
)
shell, path = click_completion.core.install(
shell=shell, path=path, append=append, extra_env=extra_env
)
click.echo("%s completion installed in %s" % (shell, path))
def setup_connection(): def setup_connection():
@ -153,10 +178,10 @@ def setup_connection():
try: try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(300) sock.settimeout(300)
sock.connect((CONFIG['aprs']['host'], 14580)) sock.connect((CONFIG["aprs"]["host"], 14580))
connected = True connected = True
LOG.debug("Connected to server: " + CONFIG['aprs']['host']) LOG.debug("Connected to server: " + CONFIG["aprs"]["host"])
sock_file = sock.makefile(mode='r') sock_file = sock.makefile(mode="r")
# sock_file = sock.makefile(mode='r', encoding=None, errors=None, newline=None) # sock_file = sock.makefile(mode='r', encoding=None, errors=None, newline=None)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # disable nagle algorithm # sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # disable nagle algorithm
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 512) # buffer size # sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 512) # buffer size
@ -166,10 +191,10 @@ def setup_connection():
time.sleep(5) time.sleep(5)
continue continue
# os._exit(1) # os._exit(1)
user = CONFIG['aprs']['login'] user = CONFIG["aprs"]["login"]
password = CONFIG['aprs']['password'] password = CONFIG["aprs"]["password"]
LOG.debug("Logging in to APRS-IS with user '%s'" % user) LOG.debug("Logging in to APRS-IS with user '%s'" % user)
msg = ("user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__)) msg = "user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__)
sock.send(msg.encode()) sock.send(msg.encode())
@ -177,11 +202,13 @@ def signal_handler(signal, frame):
LOG.info("Ctrl+C, exiting.") LOG.info("Ctrl+C, exiting.")
# sys.exit(0) # thread ignores this # sys.exit(0) # thread ignores this
os._exit(0) os._exit(0)
# end signal_handler # end signal_handler
def parse_email(msgid, data, server): def parse_email(msgid, data, server):
envelope = data[b'ENVELOPE'] envelope = data[b"ENVELOPE"]
# email address match # email address match
# use raw string to avoid invalid escape secquence errors r"string here" # use raw string to avoid invalid escape secquence errors r"string here"
f = re.search(r"([\.\w_-]+@[\.\w_-]+)", str(envelope.from_[0])) f = re.search(r"([\.\w_-]+@[\.\w_-]+)", str(envelope.from_[0]))
@ -190,15 +217,21 @@ def parse_email(msgid, data, server):
else: else:
from_addr = "noaddr" from_addr = "noaddr"
LOG.debug("Got a message from '{}'".format(from_addr)) LOG.debug("Got a message from '{}'".format(from_addr))
m = server.fetch([msgid], ['RFC822']) m = server.fetch([msgid], ["RFC822"])
msg = email.message_from_string(m[msgid][b'RFC822'].decode(errors='ignore')) msg = email.message_from_string(m[msgid][b"RFC822"].decode(errors="ignore"))
if msg.is_multipart(): if msg.is_multipart():
text = "" text = ""
html = None html = None
# default in case body somehow isn't set below - happened once # default in case body somehow isn't set below - happened once
body = "* unreadable msg received" body = "* unreadable msg received"
for part in msg.get_payload(): # FIXME this uses the last text or html part in the email, want the first, reverse order somehow? for (
if part.get_content_charset() is None: # or BREAK when we hit a text or html? part
) in (
msg.get_payload()
): # FIXME this uses the last text or html part in the email, want the first, reverse order somehow?
if (
part.get_content_charset() is None
): # or BREAK when we hit a text or html?
# We cannot know the character set, # We cannot know the character set,
# so return decoded "something" # so return decoded "something"
text = part.get_payload(decode=True) text = part.get_payload(decode=True)
@ -206,16 +239,15 @@ def parse_email(msgid, data, server):
charset = part.get_content_charset() charset = part.get_content_charset()
if part.get_content_type() == 'text/plain': if part.get_content_type() == "text/plain":
text = six.text_type( text = six.text_type(
part.get_payload(decode=True), str(charset), part.get_payload(decode=True), str(charset), "ignore"
"ignore").encode('utf8', 'replace') ).encode("utf8", "replace")
if part.get_content_type() == 'text/html': if part.get_content_type() == "text/html":
html = six.text_type( html = six.text_type(
part.get_payload(decode=True), part.get_payload(decode=True), str(charset), "ignore"
str(charset), ).encode("utf8", "replace")
"ignore").encode('utf8', 'replace')
if text is not None: if text is not None:
# strip removes white space fore and aft of string # strip removes white space fore and aft of string
@ -226,49 +258,46 @@ def parse_email(msgid, data, server):
# email.uscc.net sends no charset, blows up unicode function below # email.uscc.net sends no charset, blows up unicode function below
if msg.get_content_charset() is None: if msg.get_content_charset() is None:
text = six.text_type( text = six.text_type(
msg.get_payload(decode=True), msg.get_payload(decode=True), "US-ASCII", "ignore"
'US-ASCII', ).encode("utf8", "replace")
'ignore').encode('utf8', 'replace')
else: else:
text = six.text_type( text = six.text_type(
msg.get_payload(decode=True), msg.get_payload(decode=True), msg.get_content_charset(), "ignore"
msg.get_content_charset(), ).encode("utf8", "replace")
'ignore').encode('utf8', 'replace')
body = text.strip() body = text.strip()
# FIXED: UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 6: ordinal not in range(128) # FIXED: UnicodeDecodeError: 'ascii' codec can't decode byte 0xf0 in position 6: ordinal not in range(128)
# decode with errors='ignore'. be sure to encode it before we return it below, also with errors='ignore' # decode with errors='ignore'. be sure to encode it before we return it below, also with errors='ignore'
try: try:
body = body.decode(errors='ignore') body = body.decode(errors="ignore")
except Exception as e: except Exception as e:
LOG.error("Unicode decode failure: " + str(e)) LOG.error("Unicode decode failure: " + str(e))
LOG.error("Unidoce decode failed: " + str(body)) LOG.error("Unidoce decode failed: " + str(body))
body = "Unreadable unicode msg" body = "Unreadable unicode msg"
# strip all html tags # strip all html tags
body = re.sub('<[^<]+?>', '', body) body = re.sub("<[^<]+?>", "", body)
# strip CR/LF, make it one line, .rstrip fails at this # strip CR/LF, make it one line, .rstrip fails at this
body = body.replace("\n", " ").replace("\r", " ") body = body.replace("\n", " ").replace("\r", " ")
# ascii might be out of range, so encode it, removing any error characters # ascii might be out of range, so encode it, removing any error characters
body = body.encode(errors='ignore') body = body.encode(errors="ignore")
return (body, from_addr) return (body, from_addr)
# end parse_email # end parse_email
def _imap_connect(): def _imap_connect():
imap_port = CONFIG['imap'].get('port', 143) imap_port = CONFIG["imap"].get("port", 143)
use_ssl = CONFIG['imap'].get('use_ssl', False) use_ssl = CONFIG["imap"].get("use_ssl", False)
host = CONFIG['imap']['host'] host = CONFIG["imap"]["host"]
msg = ("{}{}:{}".format( msg = "{}{}:{}".format("TLS " if use_ssl else "", host, imap_port)
'TLS ' if use_ssl else '',
host,
imap_port
))
# LOG.debug("Connect to IMAP host {} with user '{}'". # LOG.debug("Connect to IMAP host {} with user '{}'".
# format(msg, CONFIG['imap']['login'])) # format(msg, CONFIG['imap']['login']))
try: try:
server = imapclient.IMAPClient(CONFIG['imap']['host'], port=imap_port, server = imapclient.IMAPClient(
use_uid=True, ssl=use_ssl) CONFIG["imap"]["host"], port=imap_port, use_uid=True, ssl=use_ssl
)
except Exception: except Exception:
LOG.error("Failed to connect IMAP server") LOG.error("Failed to connect IMAP server")
return return
@ -276,28 +305,25 @@ def _imap_connect():
# LOG.debug("Connected to IMAP host {}".format(msg)) # LOG.debug("Connected to IMAP host {}".format(msg))
try: try:
server.login(CONFIG['imap']['login'], CONFIG['imap']['password']) server.login(CONFIG["imap"]["login"], CONFIG["imap"]["password"])
except (imaplib.IMAP4.error, Exception) as e: except (imaplib.IMAP4.error, Exception) as e:
msg = getattr(e, 'message', repr(e)) msg = getattr(e, "message", repr(e))
LOG.error("Failed to login {}".format(msg)) LOG.error("Failed to login {}".format(msg))
return return
# LOG.debug("Logged in to IMAP, selecting INBOX") # LOG.debug("Logged in to IMAP, selecting INBOX")
server.select_folder('INBOX') server.select_folder("INBOX")
return server return server
def _smtp_connect(): def _smtp_connect():
host = CONFIG['smtp']['host'] host = CONFIG["smtp"]["host"]
smtp_port = CONFIG['smtp']['port'] smtp_port = CONFIG["smtp"]["port"]
use_ssl = CONFIG['smtp'].get('use_ssl', False) use_ssl = CONFIG["smtp"].get("use_ssl", False)
msg = ("{}{}:{}".format( msg = "{}{}:{}".format("SSL " if use_ssl else "", host, smtp_port)
'SSL ' if use_ssl else '', LOG.debug(
host, "Connect to SMTP host {} with user '{}'".format(msg, CONFIG["imap"]["login"])
smtp_port )
))
LOG.debug("Connect to SMTP host {} with user '{}'".
format(msg, CONFIG['imap']['login']))
try: try:
if use_ssl: if use_ssl:
@ -311,7 +337,7 @@ def _smtp_connect():
LOG.debug("Connected to smtp host {}".format(msg)) LOG.debug("Connected to smtp host {}".format(msg))
try: try:
server.login(CONFIG['smtp']['login'], CONFIG['smtp']['password']) server.login(CONFIG["smtp"]["login"], CONFIG["smtp"]["password"])
except Exception: except Exception:
LOG.error("Couldn't connect to SMTP Server") LOG.error("Couldn't connect to SMTP Server")
return return
@ -344,7 +370,7 @@ def resend_email(count, fromcall):
year = date.year year = date.year
today = "%s-%s-%s" % (day, month, year) today = "%s-%s-%s" % (day, month, year)
shortcuts = CONFIG['shortcuts'] shortcuts = CONFIG["shortcuts"]
# swap key/value # swap key/value
shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()]) shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()])
@ -354,7 +380,7 @@ def resend_email(count, fromcall):
LOG.exception("Failed to Connect to IMAP. Cannot resend email ", e) LOG.exception("Failed to Connect to IMAP. Cannot resend email ", e)
return return
messages = server.search(['SINCE', today]) messages = server.search(["SINCE", today])
# LOG.debug("%d messages received today" % len(messages)) # LOG.debug("%d messages received today" % len(messages))
msgexists = False msgexists = False
@ -362,7 +388,7 @@ def resend_email(count, fromcall):
messages.sort(reverse=True) messages.sort(reverse=True)
del messages[int(count) :] # only the latest "count" messages del messages[int(count) :] # only the latest "count" messages
for message in messages: for message in messages:
for msgid, data in list(server.fetch(message, ['ENVELOPE']).items()): for msgid, data in list(server.fetch(message, ["ENVELOPE"]).items()):
# one at a time, otherwise order is random # one at a time, otherwise order is random
(body, from_addr) = parse_email(msgid, data, server) (body, from_addr) = parse_email(msgid, data, server)
# unset seen flag, will stay bold in email client # unset seen flag, will stay bold in email client
@ -384,9 +410,11 @@ def resend_email(count, fromcall):
# thinking this is a duplicate message. # thinking this is a duplicate message.
# The FT1XDR pretty much ignores the aprs message number in this # The FT1XDR pretty much ignores the aprs message number in this
# regard. The FTM400 gets it right. # regard. The FTM400 gets it right.
reply = "No new msg %s:%s:%s" % (str(h).zfill(2), reply = "No new msg %s:%s:%s" % (
str(h).zfill(2),
str(m).zfill(2), str(m).zfill(2),
str(s).zfill(2)) str(s).zfill(2),
)
send_message(fromcall, reply) send_message(fromcall, reply)
# check email more often since we're resending one now # check email more often since we're resending one now
@ -412,7 +440,7 @@ def check_email_thread():
check_email_delay += 1 check_email_delay += 1
LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds") LOG.debug("check_email_delay is " + str(check_email_delay) + " seconds")
shortcuts = CONFIG['shortcuts'] shortcuts = CONFIG["shortcuts"]
# swap key/value # swap key/value
shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()]) shortcuts_inverted = dict([[v, k] for k, v in shortcuts.items()])
@ -431,14 +459,15 @@ def check_email_thread():
if not server: if not server:
continue continue
messages = server.search(['SINCE', today]) messages = server.search(["SINCE", today])
# LOG.debug("{} messages received today".format(len(messages))) # LOG.debug("{} messages received today".format(len(messages)))
for msgid, data in server.fetch(messages, ['ENVELOPE']).items(): for msgid, data in server.fetch(messages, ["ENVELOPE"]).items():
envelope = data[b'ENVELOPE'] envelope = data[b"ENVELOPE"]
# LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date)) # LOG.debug('ID:%d "%s" (%s)' % (msgid, envelope.subject.decode(), envelope.date))
f = re.search(r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", f = re.search(
str(envelope.from_[0])) r"'([[A-a][0-9]_-]+@[[A-a][0-9]_-\.]+)", str(envelope.from_[0])
)
if f is not None: if f is not None:
from_addr = f.group(1) from_addr = f.group(1)
else: else:
@ -447,10 +476,12 @@ def check_email_thread():
# LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid])) # LOG.debug("Message flags/tags: " + str(server.get_flags(msgid)[msgid]))
# if "APRS" not in server.get_flags(msgid)[msgid]: # if "APRS" not in server.get_flags(msgid)[msgid]:
# in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both # in python3, imap tags are unicode. in py2 they're strings. so .decode them to handle both
taglist=[x.decode(errors='ignore') for x in server.get_flags(msgid)[msgid]] taglist = [
x.decode(errors="ignore") for x in server.get_flags(msgid)[msgid]
]
if "APRS" not in taglist: if "APRS" not in taglist:
# if msg not flagged as sent via aprs # if msg not flagged as sent via aprs
server.fetch([msgid], ['RFC822']) server.fetch([msgid], ["RFC822"])
(body, from_addr) = parse_email(msgid, data, server) (body, from_addr) = parse_email(msgid, data, server)
# unset seen flag, will stay bold in email client # unset seen flag, will stay bold in email client
server.remove_flags(msgid, [imapclient.SEEN]) server.remove_flags(msgid, [imapclient.SEEN])
@ -459,10 +490,10 @@ def check_email_thread():
# reverse lookup of a shortcut # reverse lookup of a shortcut
from_addr = shortcuts_inverted[from_addr] from_addr = shortcuts_inverted[from_addr]
reply = "-" + from_addr + " " + body.decode(errors='ignore') reply = "-" + from_addr + " " + body.decode(errors="ignore")
send_message(CONFIG['ham']['callsign'], reply) send_message(CONFIG["ham"]["callsign"], reply)
# flag message as sent via aprs # flag message as sent via aprs
server.add_flags(msgid, ['APRS']) server.add_flags(msgid, ["APRS"])
# unset seen flag, will stay bold in email client # unset seen flag, will stay bold in email client
server.remove_flags(msgid, [imapclient.SEEN]) server.remove_flags(msgid, [imapclient.SEEN])
# check email more often since we just received an email # check email more often since we just received an email
@ -470,16 +501,16 @@ def check_email_thread():
server.logout() server.logout()
# end check_email() # end check_email()
def send_ack_thread(tocall, ack, retry_count): def send_ack_thread(tocall, ack, retry_count):
tocall = tocall.ljust(9) # pad to nine chars tocall = tocall.ljust(9) # pad to nine chars
line = ("{}>APRS::{}:ack{}\n".format( line = "{}>APRS::{}:ack{}\n".format(CONFIG["aprs"]["login"], tocall, ack)
CONFIG['aprs']['login'], tocall, ack))
for i in range(retry_count, 0, -1): for i in range(retry_count, 0, -1):
LOG.info("Sending ack __________________ Tx({})".format(i)) LOG.info("Sending ack __________________ Tx({})".format(i))
LOG.info("Raw : {}".format(line.rstrip('\n'))) LOG.info("Raw : {}".format(line.rstrip("\n")))
LOG.info("To : {}".format(tocall)) LOG.info("To : {}".format(tocall))
LOG.info("Ack number : {}".format(ack)) LOG.info("Ack number : {}".format(ack))
sock.send(line.encode()) sock.send(line.encode())
@ -492,9 +523,9 @@ def send_ack_thread(tocall, ack, retry_count):
def send_ack(tocall, ack): def send_ack(tocall, ack):
retry_count = 3 retry_count = 3
thread = threading.Thread(target=send_ack_thread, thread = threading.Thread(
name="send_ack", target=send_ack_thread, name="send_ack", args=(tocall, ack, retry_count)
args=(tocall, ack, retry_count)) )
thread.start() thread.start()
return () return ()
# end send_ack() # end send_ack()
@ -505,17 +536,22 @@ def send_message_thread(tocall, message, this_message_number, retry_count):
# line = (CONFIG['aprs']['login'] + ">APRS::" + tocall + ":" + message # line = (CONFIG['aprs']['login'] + ">APRS::" + tocall + ":" + message
# + "{" + str(this_message_number) + "\n") # + "{" + str(this_message_number) + "\n")
# line = ("{}>APRS::{}:{}{{{}\n".format( CONFIG['aprs']['login'], tocall, message.encode(errors='ignore'), str(this_message_number),)) # line = ("{}>APRS::{}:{}{{{}\n".format( CONFIG['aprs']['login'], tocall, message.encode(errors='ignore'), str(this_message_number),))
line = ("{}>APRS::{}:{}{{{}\n".format( CONFIG['aprs']['login'], tocall, message, str(this_message_number),)) line = "{}>APRS::{}:{}{{{}\n".format(
CONFIG["aprs"]["login"],
tocall,
message,
str(this_message_number),
)
for i in range(retry_count, 0, -1): for i in range(retry_count, 0, -1):
LOG.debug("DEBUG: send_message_thread msg:ack combos are: ") LOG.debug("DEBUG: send_message_thread msg:ack combos are: ")
LOG.debug(pprint.pformat(ack_dict)) LOG.debug(pprint.pformat(ack_dict))
if ack_dict[this_message_number] != 1: if ack_dict[this_message_number] != 1:
LOG.info("Sending message_______________ {}(Tx{})" LOG.info(
.format( "Sending message_______________ {}(Tx{})".format(
str(this_message_number), str(this_message_number), str(i)
str(i) )
)) )
LOG.info("Raw : {}".format(line.rstrip('\n'))) LOG.info("Raw : {}".format(line.rstrip("\n")))
LOG.info("To : {}".format(tocall)) LOG.info("To : {}".format(tocall))
# LOG.info("Message : {}".format(message.encode(errors='ignore'))) # LOG.info("Message : {}".format(message.encode(errors='ignore')))
LOG.info("Message : {}".format(message)) LOG.info("Message : {}".format(message))
@ -539,12 +575,14 @@ def send_message(tocall, message):
message_number += 1 message_number += 1
if len(ack_dict) > 90: if len(ack_dict) > 90:
# empty ack dict if it's really big, could result in key error later # empty ack dict if it's really big, could result in key error later
LOG.debug("DEBUG: Length of ack dictionary is big at %s clearing." % LOG.debug(
len(ack_dict)) "DEBUG: Length of ack dictionary is big at %s clearing." % len(ack_dict)
)
ack_dict.clear() ack_dict.clear()
LOG.debug(pprint.pformat(ack_dict)) LOG.debug(pprint.pformat(ack_dict))
LOG.debug("DEBUG: Cleared ack dictionary, ack_dict length is now %s." % LOG.debug(
len(ack_dict)) "DEBUG: Cleared ack dictionary, ack_dict length is now %s." % len(ack_dict)
)
ack_dict[message_number] = 0 # clear ack for this message number ack_dict[message_number] = 0 # clear ack for this message number
tocall = tocall.ljust(9) # pad to nine chars tocall = tocall.ljust(9) # pad to nine chars
@ -554,25 +592,26 @@ def send_message(tocall, message):
# feature req: break long ones into two msgs # feature req: break long ones into two msgs
message = message[:67] message = message[:67]
# We all miss George Carlin # We all miss George Carlin
message = re.sub('fuck|shit|cunt|piss|cock|bitch', '****', message) message = re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
thread = threading.Thread( thread = threading.Thread(
target=send_message_thread, target=send_message_thread,
name="send_message", name="send_message",
args=(tocall, message, message_number, retry_count)) args=(tocall, message, message_number, retry_count),
)
thread.start() thread.start()
return () return ()
# end send_message() # end send_message()
def process_message(line): def process_message(line):
f = re.search('^(.*)>', line) f = re.search("^(.*)>", line)
fromcall = f.group(1) fromcall = f.group(1)
searchstring = '::%s[ ]*:(.*)' % CONFIG['aprs']['login'] searchstring = "::%s[ ]*:(.*)" % CONFIG["aprs"]["login"]
# verify this, callsign is padded out with spaces to colon # verify this, callsign is padded out with spaces to colon
m = re.search(searchstring, line) m = re.search(searchstring, line)
fullmessage = m.group(1) fullmessage = m.group(1)
ack_attached = re.search('(.*){([0-9A-Z]+)', fullmessage) ack_attached = re.search("(.*){([0-9A-Z]+)", fullmessage)
# ack formats include: {1, {AB}, {12 # ack formats include: {1, {AB}, {12
if ack_attached: if ack_attached:
# "{##" suffix means radio wants an ack back # "{##" suffix means radio wants an ack back
@ -599,12 +638,12 @@ def send_email(to_addr, content):
global check_email_delay global check_email_delay
LOG.info("Sending Email_________________") LOG.info("Sending Email_________________")
shortcuts = CONFIG['shortcuts'] shortcuts = CONFIG["shortcuts"]
if to_addr in shortcuts: if to_addr in shortcuts:
LOG.info("To : " + to_addr) LOG.info("To : " + to_addr)
to_addr = shortcuts[to_addr] to_addr = shortcuts[to_addr]
LOG.info(" (" + to_addr + ")") LOG.info(" (" + to_addr + ")")
subject = CONFIG['ham']['callsign'] subject = CONFIG["ham"]["callsign"]
# content = content + "\n\n(NOTE: reply with one line)" # content = content + "\n\n(NOTE: reply with one line)"
LOG.info("Subject : " + subject) LOG.info("Subject : " + subject)
LOG.info("Body : " + content) LOG.info("Body : " + content)
@ -613,20 +652,20 @@ def send_email(to_addr, content):
check_email_delay = 60 check_email_delay = 60
msg = MIMEText(content) msg = MIMEText(content)
msg['Subject'] = subject msg["Subject"] = subject
msg['From'] = CONFIG['smtp']['login'] msg["From"] = CONFIG["smtp"]["login"]
msg['To'] = to_addr msg["To"] = to_addr
server = _smtp_connect() server = _smtp_connect()
if server: if server:
try: try:
server.sendmail(CONFIG['smtp']['login'], [to_addr], msg.as_string()) server.sendmail(CONFIG["smtp"]["login"], [to_addr], msg.as_string())
except Exception as e: except Exception as e:
msg = getattr(e, 'message', repr(e)) msg = getattr(e, "message", repr(e))
LOG.error("Sendmail Error!!!! '{}'", msg) LOG.error("Sendmail Error!!!! '{}'", msg)
server.quit() server.quit()
return(-1) return -1
server.quit() server.quit()
return(0) return 0
# end send_email # end send_email
@ -634,24 +673,22 @@ def send_email(to_addr, content):
# to disable logging to stdout, but still log to file # to disable logging to stdout, but still log to file
# use the --quiet option on the cmdln # use the --quiet option on the cmdln
def setup_logging(loglevel, quiet): def setup_logging(loglevel, quiet):
global LOG
levels = { levels = {
'CRITICAL': logging.CRITICAL, "CRITICAL": logging.CRITICAL,
'ERROR': logging.ERROR, "ERROR": logging.ERROR,
'WARNING': logging.WARNING, "WARNING": logging.WARNING,
'INFO': logging.INFO, "INFO": logging.INFO,
'DEBUG': logging.DEBUG} "DEBUG": logging.DEBUG,
}
log_level = levels[loglevel] log_level = levels[loglevel]
LOG.setLevel(log_level) LOG.setLevel(log_level)
log_format = ("%(asctime)s [%(threadName)-12s] [%(levelname)-5.5s]" log_format = "%(asctime)s [%(threadName)-12s] [%(levelname)-5.5s]" " %(message)s"
" %(message)s") date_format = "%m/%d/%Y %I:%M:%S %p"
date_format = '%m/%d/%Y %I:%M:%S %p' log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
log_formatter = logging.Formatter(fmt=log_format, fh = RotatingFileHandler(
datefmt=date_format) CONFIG["aprs"]["logfile"], maxBytes=(10248576 * 5), backupCount=4
fh = RotatingFileHandler(CONFIG['aprs']['logfile'], )
maxBytes=(10248576 * 5),
backupCount=4)
fh.setFormatter(log_formatter) fh.setFormatter(log_formatter)
LOG.addHandler(fh) LOG.addHandler(fh)
@ -661,32 +698,38 @@ def setup_logging(loglevel, quiet):
LOG.addHandler(sh) LOG.addHandler(sh)
@main.command() @main.command()
def sample_config(): def sample_config():
"""This dumps the config to stdout.""" """This dumps the config to stdout."""
print(utils.example_config) click.echo(yaml.dump(utils.DEFAULT_CONFIG_DICT))
# main() ### # main() ###
@main.command() @main.command()
@click.option("--loglevel", @click.option(
default='DEBUG', "--loglevel",
default="DEBUG",
show_default=True, show_default=True,
type=click.Choice(['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], type=click.Choice(
case_sensitive=False), ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], case_sensitive=False
),
show_choices=True, show_choices=True,
help="The log level to use for aprsd.log") help="The log level to use for aprsd.log",
@click.option("--quiet", )
is_flag=True, @click.option("--quiet", is_flag=True, default=False, help="Don't log to stdout")
default=False, @click.option(
help="Don't log to stdout") "-c",
def server(loglevel, quiet): "--config",
"config_file",
show_default=True,
default=utils.DEFAULT_CONFIG_FILE,
help="The aprsd config file to use for options.",
)
def server(loglevel, quiet, config_file):
"""Start the aprsd server process.""" """Start the aprsd server process."""
global CONFIG global CONFIG
CONFIG = utils.parse_config() CONFIG = utils.parse_config(config_file)
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
setup_logging(loglevel, quiet) setup_logging(loglevel, quiet)
LOG.info("APRSD Started version: {}".format(aprsd.__version__)) LOG.info("APRSD Started version: {}".format(aprsd.__version__))
@ -698,19 +741,17 @@ def server(loglevel, quiet):
LOG.error("Failed to validate email config options") LOG.error("Failed to validate email config options")
sys.exit(-1) sys.exit(-1)
user = CONFIG['aprs']['login'] user = CONFIG["aprs"]["login"]
password = CONFIG['aprs']['password'] # password = CONFIG["aprs"]["password"]
# LOG.debug("LOGIN to APRSD with user '%s'" % user) # LOG.debug("LOGIN to APRSD with user '%s'" % user)
# msg = ("user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__)) # msg = ("user {} pass {} vers aprsd {}\n".format(user, password, aprsd.__version__))
# sock.send(msg.encode()) # sock.send(msg.encode())
time.sleep(2) time.sleep(2)
checkemailthread = threading.Thread(target=check_email_thread, checkemailthread = threading.Thread(
name="check_email", target=check_email_thread, name="check_email", args=()
args=() ) # args must be tuple ) # args must be tuple
checkemailthread.start() checkemailthread.start()
LOG.debug("reset empty line counter") LOG.debug("reset empty line counter")
@ -724,7 +765,7 @@ def server(loglevel, quiet):
line = sock_file.readline().strip() line = sock_file.readline().strip()
LOG.info("APRS-IS: " + line) LOG.info("APRS-IS: " + line)
# is aprs message to us? not beacon, status, empty line, etc # is aprs message to us? not beacon, status, empty line, etc
searchstring = '::%s' % user searchstring = "::%s" % user
if re.search(searchstring, line): if re.search(searchstring, line):
LOG.debug("main: found message addressed to us begin process_message") LOG.debug("main: found message addressed to us begin process_message")
(fromcall, message, ack) = process_message(line) (fromcall, message, ack) = process_message(line)
@ -734,31 +775,36 @@ def server(loglevel, quiet):
# LOG.debug("Noise: " + line) # LOG.debug("Noise: " + line)
# detect closed socket, getting lots of empty lines # detect closed socket, getting lots of empty lines
if len(line.strip()) == 0: if len(line.strip()) == 0:
LOG.debug("Zero line length received. Consecutive empty line count: " + str(empty_line_rx)) LOG.debug(
"Zero line length received. Consecutive empty line count: "
+ str(empty_line_rx)
)
empty_line_rx += 1 empty_line_rx += 1
if empty_line_rx >= 30: if empty_line_rx >= 30:
LOG.debug("Excessive empty lines received, socket likely CLOSED_WAIT. Reconnecting.") LOG.debug(
"Excessive empty lines received, socket likely CLOSED_WAIT. Reconnecting."
)
empty_line_rx = 0 empty_line_rx = 0
raise Exception("closed_socket") raise Exception("closed_socket")
continue # line is something we don't care about continue # line is something we don't care about
# ACK (ack##) # ACK (ack##)
if re.search('^ack[0-9]+', message): if re.search("^ack[0-9]+", message):
LOG.debug("ACK") LOG.debug("ACK")
# put message_number:1 in dict to record the ack # put message_number:1 in dict to record the ack
a = re.search('^ack([0-9]+)', message) a = re.search("^ack([0-9]+)", message)
ack_dict.update({int(a.group(1)): 1}) ack_dict.update({int(a.group(1)): 1})
continue # break out of this so we don't ack an ack at the end continue # break out of this so we don't ack an ack at the end
# EMAIL (-) # EMAIL (-)
# is email command # is email command
elif re.search('^-.*', message): elif re.search("^-.*", message):
LOG.debug("EMAIL") LOG.debug("EMAIL")
searchstring = '^' + CONFIG['ham']['callsign'] + '.*' searchstring = "^" + CONFIG["ham"]["callsign"] + ".*"
# only I can do email # only I can do email
if re.search(searchstring, fromcall): if re.search(searchstring, fromcall):
# digits only, first one is number of emails to resend # digits only, first one is number of emails to resend
r = re.search('^-([0-9])[0-9]*$', message) r = re.search("^-([0-9])[0-9]*$", message)
if r is not None: if r is not None:
resend_email(r.group(1), fromcall) resend_email(r.group(1), fromcall)
# -user@address.com body of email # -user@address.com body of email
@ -769,16 +815,18 @@ def server(loglevel, quiet):
to_addr = a.group(1) to_addr = a.group(1)
content = a.group(2) content = a.group(2)
# send recipient link to aprs.fi map # send recipient link to aprs.fi map
if content == 'mapme': if content == "mapme":
content = ( content = (
"Click for my location: http://aprs.fi/{}". "Click for my location: http://aprs.fi/{}".format(
format(CONFIG['ham']['callsign'])) CONFIG["ham"]["callsign"]
)
)
too_soon = 0 too_soon = 0
now = time.time() now = time.time()
# see if we sent this msg number recently # see if we sent this msg number recently
if ack in email_sent_dict: if ack in email_sent_dict:
timedelta = now - email_sent_dict[ack] timedelta = now - email_sent_dict[ack]
if (timedelta < 300): # five minutes if timedelta < 300: # five minutes
too_soon = 1 too_soon = 1
if not too_soon or ack == 0: if not too_soon or ack == 0:
send_result = send_email(to_addr, content) send_result = send_email(to_addr, content)
@ -786,113 +834,175 @@ def server(loglevel, quiet):
send_message(fromcall, "-" + to_addr + " failed") send_message(fromcall, "-" + to_addr + " failed")
else: else:
# send_message(fromcall, "-" + to_addr + " sent") # send_message(fromcall, "-" + to_addr + " sent")
if len(email_sent_dict) > 98: # clear email sent dictionary if somehow goes over 100 if (
LOG.debug("DEBUG: email_sent_dict is big (" + str(len(email_sent_dict)) + ") clearing out.") len(email_sent_dict) > 98
): # clear email sent dictionary if somehow goes over 100
LOG.debug(
"DEBUG: email_sent_dict is big ("
+ str(len(email_sent_dict))
+ ") clearing out."
)
email_sent_dict.clear() email_sent_dict.clear()
email_sent_dict[ack] = now email_sent_dict[ack] = now
else: else:
LOG.info("Email for message number " + ack + " recently sent, not sending again.") LOG.info(
"Email for message number "
+ ack
+ " recently sent, not sending again."
)
else: else:
send_message(fromcall, "Bad email address") send_message(fromcall, "Bad email address")
# TIME (t) # TIME (t)
elif re.search('^[tT]', message): elif re.search("^[tT]", message):
LOG.debug("TIME") LOG.debug("TIME")
stm = time.localtime() stm = time.localtime()
h = stm.tm_hour h = stm.tm_hour
m = stm.tm_min m = stm.tm_min
cur_time = fuzzy(h, m, 1) cur_time = fuzzy(h, m, 1)
reply = cur_time + " (" + str(h) + ":" + str(m).rjust(2, '0') + "PDT)" + " (" + message.rstrip() + ")" reply = (
thread = threading.Thread(target=send_message, cur_time
name="send_message", + " ("
args=(fromcall, reply)) + str(h)
+ ":"
+ str(m).rjust(2, "0")
+ "PDT)"
+ " ("
+ message.rstrip()
+ ")"
)
thread = threading.Thread(
target=send_message, name="send_message", args=(fromcall, reply)
)
thread.start() thread.start()
# FORTUNE (f) # FORTUNE (f)
elif re.search('^[fF]', message): elif re.search("^[fF]", message):
LOG.debug("FORTUNE") LOG.debug("FORTUNE")
process = subprocess.Popen(['/usr/games/fortune', '-s', '-n 60'], stdout=subprocess.PIPE) process = subprocess.Popen(
["/usr/games/fortune", "-s", "-n 60"], stdout=subprocess.PIPE
)
reply = process.communicate()[0] reply = process.communicate()[0]
# send_message(fromcall, reply.rstrip()) # send_message(fromcall, reply.rstrip())
reply = reply.decode(errors='ignore') reply = reply.decode(errors="ignore")
send_message(fromcall, reply.rstrip()) send_message(fromcall, reply.rstrip())
# PING (p) # PING (p)
elif re.search('^[pP]', message): elif re.search("^[pP]", message):
LOG.debug("PING") LOG.debug("PING")
stm = time.localtime() stm = time.localtime()
h = stm.tm_hour h = stm.tm_hour
m = stm.tm_min m = stm.tm_min
s = stm.tm_sec s = stm.tm_sec
reply = "Pong! " + str(h).zfill(2) + ":" + str(m).zfill(2) + ":" + str(s).zfill(2) reply = (
"Pong! "
+ str(h).zfill(2)
+ ":"
+ str(m).zfill(2)
+ ":"
+ str(s).zfill(2)
)
send_message(fromcall, reply.rstrip()) send_message(fromcall, reply.rstrip())
# LOCATION (l) "8 Miles E Auburn CA 1771' 38.91547,-120.99500 0.1h ago" # LOCATION (l) "8 Miles E Auburn CA 1771' 38.91547,-120.99500 0.1h ago"
elif re.search('^[lL]', message): elif re.search("^[lL]", message):
LOG.debug("LOCATION") LOG.debug("LOCATION")
# get last location of a callsign, get descriptive name from weather service # get last location of a callsign, get descriptive name from weather service
try: try:
a = re.search(r"^.*\s+(.*)", message) # optional second argument is a callsign to search a = re.search(
r"^.*\s+(.*)", message
) # optional second argument is a callsign to search
if a is not None: if a is not None:
searchcall = a.group(1) searchcall = a.group(1)
searchcall = searchcall.upper() searchcall = searchcall.upper()
else: else:
searchcall = fromcall # if no second argument, search for calling station searchcall = fromcall # if no second argument, search for calling station
url = "http://api.aprs.fi/api/get?name=" + searchcall + "&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json" url = (
#response = urllib.urlopen(url) "http://api.aprs.fi/api/get?name="
+ searchcall
+ "&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
)
response = requests.get(url) response = requests.get(url)
# aprs_data = json.loads(response.read()) # aprs_data = json.loads(response.read())
aprs_data = json.loads(response.text) aprs_data = json.loads(response.text)
lat = aprs_data['entries'][0]['lat'] lat = aprs_data["entries"][0]["lat"]
lon = aprs_data['entries'][0]['lng'] lon = aprs_data["entries"][0]["lng"]
try: # altitude not always provided try: # altitude not always provided
alt = aprs_data['entries'][0]['altitude'] alt = aprs_data["entries"][0]["altitude"]
except Exception: except Exception:
alt = 0 alt = 0
altfeet = int(alt * 3.28084) altfeet = int(alt * 3.28084)
aprs_lasttime_seconds = aprs_data['entries'][0]['lasttime'] aprs_lasttime_seconds = aprs_data["entries"][0]["lasttime"]
aprs_lasttime_seconds = aprs_lasttime_seconds.encode('ascii', errors='ignore') # unicode to ascii aprs_lasttime_seconds = aprs_lasttime_seconds.encode(
"ascii", errors="ignore"
) # unicode to ascii
delta_seconds = time.time() - int(aprs_lasttime_seconds) delta_seconds = time.time() - int(aprs_lasttime_seconds)
delta_hours = delta_seconds / 60 / 60 delta_hours = delta_seconds / 60 / 60
url2 = "https://forecast.weather.gov/MapClick.php?lat=" + str(lat) + "&lon=" + str(lon) + "&FcstType=json" url2 = (
#response2 = urllib.urlopen(url2) "https://forecast.weather.gov/MapClick.php?lat="
+ str(lat)
+ "&lon="
+ str(lon)
+ "&FcstType=json"
)
response2 = requests.get(url2) response2 = requests.get(url2)
# wx_data = json.loads(response2.read()) # wx_data = json.loads(response2.read())
wx_data = json.loads(response2.text) wx_data = json.loads(response2.text)
reply = searchcall + ": " + wx_data['location']['areaDescription'] + " " + str(altfeet) + "' " + str(lat) + "," + str(lon) + " " + str("%.1f" % round(delta_hours, 1)) + "h ago" reply = (
searchcall
+ ": "
+ wx_data["location"]["areaDescription"]
+ " "
+ str(altfeet)
+ "' "
+ str(lat)
+ ","
+ str(lon)
+ " "
+ str("%.1f" % round(delta_hours, 1))
+ "h ago"
)
# reply = reply.encode('ascii', errors='ignore') # unicode to ascii # reply = reply.encode('ascii', errors='ignore') # unicode to ascii
send_message(fromcall, reply.rstrip()) send_message(fromcall, reply.rstrip())
except Exception as e: except Exception as e:
LOG.debug("Locate failed with: " + "%s" % str(e)) LOG.debug("Locate failed with: " + "%s" % str(e))
reply = "Unable to find station " + searchcall + ". Sending beacons?" reply = (
"Unable to find station " + searchcall + ". Sending beacons?"
)
send_message(fromcall, reply.rstrip()) send_message(fromcall, reply.rstrip())
# WEATHER (w) "42F(68F/48F) Haze. Tonight, Haze then Chance Rain." # WEATHER (w) "42F(68F/48F) Haze. Tonight, Haze then Chance Rain."
elif re.search('^[wW]', message): elif re.search("^[wW]", message):
LOG.debug("WEATHER") LOG.debug("WEATHER")
# get my last location from aprsis then get weather from # get my last location from aprsis then get weather from
# weather service # weather service
try: try:
url = ("http://api.aprs.fi/api/get?" "&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json" "&name=%s" % fromcall) url = (
#response = urllib.urlopen(url) "http://api.aprs.fi/api/get?"
"&what=loc&apikey=104070.f9lE8qg34L8MZF&format=json"
"&name=%s" % fromcall
)
response = requests.get(url) response = requests.get(url)
# aprs_data = json.loads(response.read()) # aprs_data = json.loads(response.read())
aprs_data = json.loads(response.text) aprs_data = json.loads(response.text)
lat = aprs_data['entries'][0]['lat'] lat = aprs_data["entries"][0]["lat"]
lon = aprs_data['entries'][0]['lng'] lon = aprs_data["entries"][0]["lng"]
url2 = ("https://forecast.weather.gov/MapClick.php?lat=%s" "&lon=%s&FcstType=json" % (lat, lon)) url2 = (
#response2 = urllib.urlopen(url2) "https://forecast.weather.gov/MapClick.php?lat=%s"
"&lon=%s&FcstType=json" % (lat, lon)
)
response2 = requests.get(url2) response2 = requests.get(url2)
# wx_data = json.loads(response2.read()) # wx_data = json.loads(response2.read())
wx_data = json.loads(response2.text) wx_data = json.loads(response2.text)
reply = "%sF(%sF/%sF) %s. %s, %s." % ( reply = "%sF(%sF/%sF) %s. %s, %s." % (
wx_data['currentobservation']['Temp'], wx_data["currentobservation"]["Temp"],
wx_data['data']['temperature'][0], wx_data["data"]["temperature"][0],
wx_data['data']['temperature'][1], wx_data["data"]["temperature"][1],
wx_data['data']['weather'][0], wx_data["data"]["weather"][0],
wx_data['time']['startPeriodName'][1], wx_data["time"]["startPeriodName"][1],
wx_data['data']['weather'][1]) wx_data["data"]["weather"][1],
)
LOG.debug("reply: " + reply.rstrip()) LOG.debug("reply: " + reply.rstrip())
send_message(fromcall, reply.rstrip()) send_message(fromcall, reply.rstrip())
except Exception as e: except Exception as e:
@ -915,7 +1025,12 @@ def server(loglevel, quiet):
except Exception as e: except Exception as e:
LOG.error("Error in mainline loop:") LOG.error("Error in mainline loop:")
LOG.error("%s" % str(e)) LOG.error("%s" % str(e))
if ( str(e) == "closed_socket" or str(e) == "timed out" or str(e) == "Temporary failure in name resolution" or str(e) == "Network is unreachable"): if (
str(e) == "closed_socket"
or str(e) == "timed out"
or str(e) == "Temporary failure in name resolution"
or str(e) == "Network is unreachable"
):
LOG.error("Attempting to reconnect.") LOG.error("Attempting to reconnect.")
sock_file.close() sock_file.close()
sock.shutdown(0) sock.shutdown(0)

View File

@ -1,40 +1,44 @@
"""Utilities and helper functions.""" """Utilities and helper functions."""
import logging import errno
import os import os
import sys import sys
import click
import yaml import yaml
# an example of what should be in the ~/.aprsd/config.yml # an example of what should be in the ~/.aprsd/config.yml
example_config = ''' DEFAULT_CONFIG_DICT = {
ham: "ham": {"callsign": "KFART"},
callsign: KFART "aprs": {
"login": "someusername",
"password": "somepassword",
"host": "noam.aprs2.net",
"port": 14580,
"logfile": "/tmp/arsd.log",
},
"shortcuts": {
"aa": "5551239999@vtext.com",
"cl": "craiglamparter@somedomain.org",
"wb": "555309@vtext.com",
},
"smtp": {
"login": "something",
"password": "some lame password",
"host": "imap.gmail.com",
"port": 465,
"use_ssl": False,
},
"imap": {
"login": "imapuser",
"password": "something here too",
"host": "imap.gmail.com",
"port": 993,
"use_ssl": True,
},
}
aprs: DEFAULT_CONFIG_FILE = "~/.config/aprsd/aprsd.yml"
login: someusername
password: password
host: noam.aprs2.net
port: 14580
logfile: /tmp/aprsd.log
shortcuts:
'aa': '5551239999@vtext.com'
'cl': 'craiglamparter@somedomain.org'
'wb': '555309@vtext.com'
smtp:
login: something
password: some lame password
host: imap.gmail.com
port: 465
imap:
login: imapuser
password: something dumb
host: imap.gmail.com
'''
log = logging.getLogger('APRSD')
def env(*vars, **kwargs): def env(*vars, **kwargs):
@ -45,20 +49,56 @@ def env(*vars, **kwargs):
value = os.environ.get(v, None) value = os.environ.get(v, None)
if value: if value:
return value return value
return kwargs.get('default', '') return kwargs.get("default", "")
def get_config(): def mkdir_p(path):
"""This tries to read the yaml config from ~/.aprsd/config.yml.""" """Make directory and have it work in py2 and py3."""
config_file = os.path.expanduser("~/.aprsd/config.yml") try:
if os.path.exists(config_file): os.makedirs(path)
with open(config_file, "r") as stream: except OSError as exc: # Python >= 2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
def create_default_config():
"""Create a default config file."""
# make sure the directory location exists
config_file_expanded = os.path.expanduser(DEFAULT_CONFIG_FILE)
config_dir = os.path.dirname(config_file_expanded)
if not os.path.exists(config_dir):
click.echo("Config dir '{}' doesn't exist, creating.".format(config_dir))
mkdir_p(config_dir)
with open(config_file_expanded, "w+") as cf:
yaml.dump(DEFAULT_CONFIG_DICT, cf)
def get_config(config_file):
"""This tries to read the yaml config from <config_file>."""
config_file_expanded = os.path.expanduser(config_file)
if os.path.exists(config_file_expanded):
with open(config_file_expanded, "r") as stream:
config = yaml.load(stream, Loader=yaml.FullLoader) config = yaml.load(stream, Loader=yaml.FullLoader)
return config return config
else: else:
log.critical("%s is missing, please create config file" % config_file) if config_file == DEFAULT_CONFIG_FILE:
print("\nCopy to ~/.aprsd/config.yml and edit\n\nSample config:\n %s" click.echo(
% example_config) "{} is missing, creating config file".format(config_file_expanded)
)
create_default_config()
msg = (
"Default config file created at {}. Please edit with your "
"settings.".format(config_file)
)
click.echo(msg)
else:
# The user provided a config file path different from the
# Default, so we won't try and create it, just bitch and bail.
msg = "Custom config file '{}' is missing.".format(config_file)
click.echo(msg)
sys.exit(-1) sys.exit(-1)
@ -66,42 +106,55 @@ def get_config():
# and consume the settings. # and consume the settings.
# If the required params don't exist, # If the required params don't exist,
# it will look in the environment # it will look in the environment
def parse_config(): def parse_config(config_file):
# for now we still use globals....ugh # for now we still use globals....ugh
global CONFIG, LOG global CONFIG
def fail(msg): def fail(msg):
LOG.critical(msg) click.echo(msg)
sys.exit(-1) sys.exit(-1)
def check_option(config, section, name=None, default=None): def check_option(config, section, name=None, default=None, default_fail=None):
if section in config: if section in config:
if name and name not in config[section]: if name and name not in config[section]:
if not default: if not default:
fail("'%s' was not in '%s' section of config file" % fail(
(name, section)) "'%s' was not in '%s' section of config file" % (name, section)
)
else: else:
config[section][name] = default config[section][name] = default
else:
if (
default_fail
and name in config[section]
and config[section][name] == default_fail
):
# We have to fail and bail if the user hasn't edited
# this config option.
fail("Config file needs to be edited from provided defaults.")
else: else:
fail("'%s' section wasn't in config file" % section) fail("'%s' section wasn't in config file" % section)
return config return config
# Now read the ~/.aprds/config.yml config = get_config(config_file)
config = get_config() check_option(config, "shortcuts")
check_option(config, 'shortcuts') # special check here to make sure user has edited the config file
check_option(config, 'ham', 'callsign') # and changed the ham callsign
check_option(config, 'aprs', 'login') check_option(
check_option(config, 'aprs', 'password') config, "ham", "callsign", default_fail=DEFAULT_CONFIG_DICT["ham"]["callsign"]
check_option(config, 'aprs', 'host') )
check_option(config, 'aprs', 'port') check_option(config, "aprs", "login")
config = check_option(config, 'aprs', 'logfile', './aprsd.log') check_option(config, "aprs", "password")
check_option(config, 'imap', 'host') check_option(config, "aprs", "host")
check_option(config, 'imap', 'login') check_option(config, "aprs", "port")
check_option(config, 'imap', 'password') check_option(config, "aprs", "logfile", "./aprsd.log")
check_option(config, 'smtp', 'host') check_option(config, "imap", "host")
check_option(config, 'smtp', 'port') check_option(config, "imap", "login")
check_option(config, 'smtp', 'login') check_option(config, "imap", "password")
check_option(config, 'smtp', 'password') check_option(config, "smtp", "host")
check_option(config, "smtp", "port")
check_option(config, "smtp", "login")
check_option(config, "smtp", "password")
return config return config
LOG.info("aprsd config loaded")

9
dev-requirements.in Normal file
View File

@ -0,0 +1,9 @@
tox
pytest
pytest-cov
mypy
flake8
pep8-naming
black
isort
Sphinx

60
dev-requirements.txt Normal file
View File

@ -0,0 +1,60 @@
#
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile dev-requirements.in
#
alabaster==0.7.12 # via sphinx
appdirs==1.4.4 # via black, virtualenv
attrs==20.3.0 # via pytest
babel==2.9.0 # via sphinx
black==20.8b1 # via -r dev-requirements.in
certifi==2020.12.5 # via requests
chardet==3.0.4 # via requests
coverage==5.3 # via pytest-cov
distlib==0.3.1 # via virtualenv
docutils==0.16 # via sphinx
filelock==3.0.12 # via tox, virtualenv
flake8-polyfill==1.0.2 # via pep8-naming
flake8==3.8.4 # via -r dev-requirements.in, flake8-polyfill
idna==2.10 # via requests
imagesize==1.2.0 # via sphinx
iniconfig==1.1.1 # via pytest
isort==5.6.4 # via -r dev-requirements.in
jinja2==2.11.2 # via sphinx
markupsafe==1.1.1 # via jinja2
mccabe==0.6.1 # via flake8
mypy-extensions==0.4.3 # via black, mypy
mypy==0.790 # via -r dev-requirements.in
packaging==20.7 # via pytest, sphinx, tox
pathspec==0.8.1 # via black
pep8-naming==0.11.1 # via -r dev-requirements.in
pluggy==0.13.1 # via pytest, tox
py==1.9.0 # via pytest, tox
pycodestyle==2.6.0 # via flake8
pyflakes==2.2.0 # via flake8
pygments==2.7.3 # via sphinx
pyparsing==2.4.7 # via packaging
pytest-cov==2.10.1 # via -r dev-requirements.in
pytest==6.1.2 # via -r dev-requirements.in, pytest-cov
pytz==2020.4 # via babel
regex==2020.11.13 # via black
requests==2.25.0 # via sphinx
six==1.15.0 # via tox, virtualenv
snowballstemmer==2.0.0 # via sphinx
sphinx==3.3.1 # via -r dev-requirements.in
sphinxcontrib-applehelp==1.0.2 # via sphinx
sphinxcontrib-devhelp==1.0.2 # via sphinx
sphinxcontrib-htmlhelp==1.0.3 # via sphinx
sphinxcontrib-jsmath==1.0.1 # via sphinx
sphinxcontrib-qthelp==1.0.3 # via sphinx
sphinxcontrib-serializinghtml==1.1.4 # via sphinx
toml==0.10.2 # via black, pytest, tox
tox==3.20.1 # via -r dev-requirements.in
typed-ast==1.4.1 # via black, mypy
typing-extensions==3.7.4.3 # via black, mypy
urllib3==1.26.2 # via requests
virtualenv==20.2.2 # via tox
# The following packages are considered to be unsafe in a requirements file:
# setuptools

View File

@ -4,3 +4,4 @@ imapclient
pbr pbr
pyyaml pyyaml
six six
requests

View File

@ -24,6 +24,4 @@ try:
except ImportError: except ImportError:
pass pass
setuptools.setup( setuptools.setup(setup_requires=["pbr"], pbr=True)
setup_requires=['pbr'],
pbr=True)

View File

@ -0,0 +1,3 @@
flake8
pytest
mock

0
tests/__init__.py Normal file
View File

23
tests/test_main.py Normal file
View File

@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
import sys
import unittest
import pytest
from aprsd import main
if sys.version_info >= (3, 2):
from unittest import mock
else:
import mock
class testMain(unittest.TestCase):
@mock.patch("aprsd.main._imap_connect")
@mock.patch("aprsd.main._smtp_connect")
def test_validate_email(self, imap_mock, smtp_mock):
"""Test to make sure we fail."""
imap_mock.return_value = None
smtp_mock.return_value = {"smaiof": "fire"}
main.validate_email()

89
tox.ini
View File

@ -1,26 +1,51 @@
[tox] [tox]
minversion = 1.6 minversion = 2.9.0
skipdist = True skipdist = True
envlist = py27,py36,py37,fast8,pep8,cover,docs skip_missing_interpreters = true
envlist = py{27,36,37,38},pep8,fmt-check
# Activate isolated build environment. tox will use a virtual environment
# to build a source distribution from the source tree. For build tools and
# arguments use the pyproject.toml file as specified in PEP-517 and PEP-518.
isolated_build = true
[testenv] [testenv]
setenv = VIRTUAL_ENV={envdir} setenv = VIRTUAL_ENV={envdir}
usedevelop = True usedevelop = True
install_command = pip install {opts} {packages} install_command = pip install {opts} {packages}
deps = -r{toxinidir}/requirements.txt deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt -r{toxinidir}/test-requirements.txt
-r{toxinidir}/dev-requirements.txt
commands = commands =
pytest aprsd/main.py {posargs} # Use -bb to enable BytesWarnings as error to catch str/bytes misuse.
# Use -Werror to treat warnings as errors.
# {envpython} -bb -Werror -m pytest \
# --cov="{envsitepackagesdir}/aprsd" --cov-report=html --cov-report=term {posargs}
{envpython} -bb -Werror -m pytest {posargs}
[testenv:cover] [testenv:py27]
setenv = VIRTUAL_ENV={envdir}
usedevelop = True
install_command = pip install {opts} {packages}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements-py2.txt
commands = commands =
pytest --cov=aprsd # Use -bb to enable BytesWarnings as error to catch str/bytes misuse.
# Use -Werror to treat warnings as errors.
# {envpython} -bb -Werror -m pytest \
# --cov="{envsitepackagesdir}/aprsd" --cov-report=html --cov-report=term {posargs}
{envpython} -bb -Werror -m pytest
[testenv:docs] [testenv:docs]
deps = -r{toxinidir}/test-requirements.txt deps = -r{toxinidir}/test-requirements.txt
commands = sphinx-build -b html docs/source docs/html commands = sphinx-build -b html docs/source docs/html
[testenv:pep8-27]
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements-py2.txt
commands =
flake8 {posargs} aprsd
[testenv:pep8] [testenv:pep8]
commands = commands =
flake8 {posargs} aprsd flake8 {posargs} aprsd
@ -33,7 +58,57 @@ commands =
{toxinidir}/tools/fast8.sh {toxinidir}/tools/fast8.sh
passenv = FAST8_NUM_COMMITS passenv = FAST8_NUM_COMMITS
[testenv:lint]
skip_install = true
deps =
-r{toxinidir}/dev-requirements.txt
commands =
flake8 aprsd
[flake8] [flake8]
max-line-length = 99
show-source = True show-source = True
ignore = E713,E501 ignore = E713,E501,W503
extend-ignore = E203,W503
extend-exclude = venv
exclude = .venv,.git,.tox,dist,doc,.ropeproject exclude = .venv,.git,.tox,dist,doc,.ropeproject
# This is the configuration for the tox-gh-actions plugin for GitHub Actions
# https://github.com/ymyzk/tox-gh-actions
# This section is not needed if not using GitHub Actions for CI.
[gh-actions]
python =
2.7: py27, pep8-27
3.6: py36, pep8, fmt-check
3.7: py38, pep8, fmt-check
3.8: py38, pep8, fmt-check, type-check, docs
3.9: py39
[testenv:fmt]
# This will reformat your code to comply with pep8
# and standard formatting
skip_install = true
deps =
-r{toxinidir}/dev-requirements.txt
commands =
isort .
black .
[testenv:fmt-check]
# Runs a check only on code formatting.
# you can fix imports by running isort standalone
# you can fix code formatting by running black standalone
skip_install = true
deps =
-r{toxinidir}/dev-requirements.txt
commands =
isort --check-only .
black --check .
[testenv:type-check]
skip_install = true
deps =
-r{toxinidir}/requirements.txt
-r{toxinidir}/dev-requirements.txt
commands =
mypy aprsd