diff --git a/CHANGELOG.md b/CHANGELOG.md index e8648c6..8f1c864 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [Unreleased] ### Added - Added Trustee field to qrz command for club callsigns. +- Added alias for `ae7q call` command (`ae7q c`). +- Added ae7q lookup by FRN and Licensee ID, and for trustee records (`ae7q frn, licensee, trustee`). ### Changed - Changelog command to accept a version as argument. - The qrz command can now link to a QRZ page instead of embedding the data with the `--link` flag. diff --git a/exts/ae7q.py b/exts/ae7q.py index e5441b4..bfcbcb6 100644 --- a/exts/ae7q.py +++ b/exts/ae7q.py @@ -10,9 +10,8 @@ Test callsigns: KN8U: active, restricted AB2EE: expired, restricted KE8FGB: assigned once, no restrictions -NA2AAA: unassigned, no records -KC4USA: reserved but has call history -WF4EMA: " +KV4AAA: unassigned, no records +KC4USA: reserved, no call history, *but* has application history """ import discord.ext.commands as commands @@ -34,110 +33,380 @@ class AE7QCog(commands.Cog): if ctx.invoked_subcommand is None: await ctx.send_help(ctx.command) - @_ae7q_lookup.command(name="call", category=cmn.cat.lookup) + @_ae7q_lookup.command(name="call", aliases=["c"], category=cmn.cat.lookup) async def _ae7q_call(self, ctx: commands.Context, callsign: str): - '''Look up the history for a callsign on [ae7q.com](http://ae7q.com/).''' - callsign = callsign.upper() - desc = '' - base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" - embed = cmn.embed_factory(ctx) + '''Look up the history of a callsign on [ae7q.com](http://ae7q.com/).''' + with ctx.typing(): + callsign = callsign.upper() + desc = '' + base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" + embed = cmn.embed_factory(ctx) - async with self.session.get(base_url + callsign) as resp: - if resp.status != 200: - embed.title = "Error in AE7Q call command" - embed.description = 'Could not load AE7Q' - embed.colour = cmn.colours.bad - await ctx.send(embed=embed) - return - page = await resp.text() + async with self.session.get(base_url + callsign) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q call command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() - soup = BeautifulSoup(page, features="html.parser") - tables = soup.select("table.Database") + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] - for table in tables: - rows = table.find_all("tr") - if len(rows) > 1 and len(rows[0]) > 1: - break - if desc == '': - for row in rows: + table = tables[0] + + # find the first table in the page, and use it to make a description + if len(table[0]) == 1: + for row in table: desc += " ".join(row.getText().split()) desc += '\n' desc = desc.replace(callsign, f'`{callsign}`') - rows = None + table = tables[1] - first_header = ''.join(rows[0].find_all("th")[0].strings) + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None - if rows is None or first_header != 'Entity Name': + # catch if the wrong table was selected + if first_header is None or first_header != 'Entity Name': + embed.title = f"AE7Q History for {callsign}" + embed.colour = cmn.colours.bad + embed.url = base_url + callsign + embed.description = desc + embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) + return + + table = await process_table(table[1:]) + + embed = cmn.embed_factory(ctx) embed.title = f"AE7Q History for {callsign}" - embed.colour = cmn.colours.bad - embed.url = f"{base_url}{callsign}" + embed.colour = cmn.colours.good + embed.url = base_url + callsign + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[1]})' # **Name** (Applicant Type) + body = (f'Class: *{row[2]}*\n' + f'Region: *{row[3]}*\n' + f'Status: *{row[4]}*\n' + f'Granted: *{row[5]}*\n' + f'Effective: *{row[6]}*\n' + f'Cancelled: *{row[7]}*\n' + f'Expires: *{row[8]}*') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' + embed.description = desc - embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) - return - table_contents = [] # store your table here - for tr in rows: - if rows.index(tr) == 0: - # first_header = ''.join(tr.find_all("th")[0].strings) - # if first_header == 'Entity Name': - # print('yooooo') - continue - row_cells = [] - for td in tr.find_all('td'): - if td.getText().strip() != '': - row_cells.append(td.getText().strip()) - else: - row_cells.append('-') - if 'colspan' in td.attrs and int(td.attrs['colspan']) > 1: - for i in range(int(td.attrs['colspan']) - 1): - row_cells.append(row_cells[-1]) - for i, cell in enumerate(row_cells): - if cell == '"': - row_cells[i] = table_contents[-1][i] - if len(row_cells) > 1: - table_contents += [row_cells] + @_ae7q_lookup.command(name="trustee", aliases=["t"], category=cmn.cat.lookup) + async def _ae7q_trustee(self, ctx: commands.Context, callsign: str): + '''Look up the licenses for which a licensee is trustee on [ae7q.com](http://ae7q.com/).''' + with ctx.typing(): + callsign = callsign.upper() + desc = '' + base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" + embed = cmn.embed_factory(ctx) - embed = cmn.embed_factory(ctx) - embed.title = f"AE7Q Records for {callsign}" - embed.colour = cmn.colours.good - embed.url = f"{base_url}{callsign}" + async with self.session.get(base_url + callsign) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q trustee command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() - for row in table_contents[0:3]: - header = f'**{row[0]}** ({row[1]})' - body = (f'Class: *{row[2]}*\n' - f'Region: *{row[3]}*\n' - f'Status: *{row[4]}*\n' - f'Granted: *{row[5]}*\n' - f'Effective: *{row[6]}*\n' - f'Cancelled: *{row[7]}*\n' - f'Expires: *{row[8]}*') - embed.add_field(name=header, value=body, inline=False) + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] - embed.description = desc - if len(table_contents) > 3: - embed.description += f'\nRecords 1 to 3 of {len(table_contents)}. See ae7q.com for more...' + try: + table = tables[2] if len(tables[0][0]) == 1 else tables[1] + except IndexError: + embed.title = f"AE7Q Trustee History for {callsign}" + embed.colour = cmn.colours.bad + embed.url = base_url + callsign + embed.description = desc + embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) + return - await ctx.send(embed=embed) + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None - # TODO: write commands for other AE7Q response types? - # @_ae7q_lookup.command(name="trustee") - # async def _ae7q_trustee(self, ctx: commands.Context, callsign: str): - # pass + # catch if the wrong table was selected + if first_header is None or not first_header.startswith("With"): + embed.title = f"AE7Q Trustee History for {callsign}" + embed.colour = cmn.colours.bad + embed.url = base_url + callsign + embed.description = desc + embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) + return - # @_ae7q_lookup.command(name="applications", aliases=['apps']) - # async def _ae7q_applications(self, ctx: commands.Context, callsign: str): - # pass + table = await process_table(table[2:]) - # @_ae7q_lookup.command(name="frn") - # async def _ae7q_frn(self, ctx: commands.Context, frn: str): - # base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN=" - # pass + embed = cmn.embed_factory(ctx) + embed.title = f"AE7Q Trustee History for {callsign}" + embed.colour = cmn.colours.good + embed.url = base_url + callsign - # @_ae7q_lookup.command(name="licensee", aliases=["lic"]) - # async def _ae7q_licensee(self, ctx: commands.Context, frn: str): - # base_url = "http://ae7q.com/query/data/LicenseeIdHistory.php?ID=" - # pass + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[3]})' # **Name** (Applicant Type) + body = (f'Name: *{row[2]}*\n' + f'Region: *{row[1]}*\n' + f'Status: *{row[4]}*\n' + f'Granted: *{row[5]}*\n' + f'Effective: *{row[6]}*\n' + f'Cancelled: *{row[7]}*\n' + f'Expires: *{row[8]}*') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' + + embed.description = desc + + await ctx.send(embed=embed) + + @_ae7q_lookup.command(name="applications", aliases=["a"], category=cmn.cat.lookup) + async def _ae7q_applications(self, ctx: commands.Context, callsign: str): + '''Look up the application history for a callsign on [ae7q.com](http://ae7q.com/).''' + """ + with ctx.typing(): + callsign = callsign.upper() + desc = '' + base_url = "http://ae7q.com/query/data/CallHistory.php?CALL=" + embed = cmn.embed_factory(ctx) + + async with self.session.get(base_url + callsign) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q applications command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() + + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + + table = tables[0] + + # find the first table in the page, and use it to make a description + if len(table[0]) == 1: + for row in table: + desc += " ".join(row.getText().split()) + desc += '\n' + desc = desc.replace(callsign, f'`{callsign}`') + + # select the last table to get applications + table = tables[-1] + + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header is None or not first_header.startswith("Receipt"): + embed.title = f"AE7Q Application History for {callsign}" + embed.colour = cmn.colours.bad + embed.url = base_url + callsign + embed.description = desc + embed.description += f'\nNo records found for `{callsign}`' + await ctx.send(embed=embed) + return + + table = await process_table(table[1:]) + + embed = cmn.embed_factory(ctx) + embed.title = f"AE7Q Application History for {callsign}" + embed.colour = cmn.colours.good + embed.url = base_url + callsign + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[1]}** ({row[3]})' # **Name** (Callsign) + body = (f'Received: *{row[0]}*\n' + f'Region: *{row[2]}*\n' + f'Purpose: *{row[5]}*\n' + f'Last Action: *{row[7]}*\n' + f'Application Status: *{row[8]}*\n') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + desc += f'\nRecords 1 to 3 of {len(table)}. See ae7q.com for more...' + + embed.description = desc + + await ctx.send(embed=embed) + """ + pass + + @_ae7q_lookup.command(name="frn", aliases=["f"], category=cmn.cat.lookup) + async def _ae7q_frn(self, ctx: commands.Context, frn: str): + '''Look up the history of an FRN on [ae7q.com](http://ae7q.com/).''' + """ + NOTES: + - 2 tables: callsign history and application history + - If not found: no tables + """ + with ctx.typing(): + base_url = "http://ae7q.com/query/data/FrnHistory.php?FRN=" + embed = cmn.embed_factory(ctx) + + async with self.session.get(base_url + frn) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q frn command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() + + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + + if not len(tables): + embed.title = f"AE7Q History for FRN {frn}" + embed.colour = cmn.colours.bad + embed.url = base_url + frn + embed.description = f'No records found for FRN `{frn}`' + await ctx.send(embed=embed) + return + + table = tables[0] + + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header is None or not first_header.startswith('With Licensee'): + embed.title = f"AE7Q History for FRN {frn}" + embed.colour = cmn.colours.bad + embed.url = base_url + frn + embed.description = f'No records found for FRN `{frn}`' + await ctx.send(embed=embed) + return + + table = await process_table(table[2:]) + + embed = cmn.embed_factory(ctx) + embed.title = f"AE7Q History for FRN {frn}" + embed.colour = cmn.colours.good + embed.url = base_url + frn + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[3]})' # **Callsign** (Applicant Type) + body = (f'Name: *{row[2]}*\n' + f'Class: *{row[4]}*\n' + f'Region: *{row[1]}*\n' + f'Status: *{row[5]}*\n' + f'Granted: *{row[6]}*\n' + f'Effective: *{row[7]}*\n' + f'Cancelled: *{row[8]}*\n' + f'Expires: *{row[9]}*') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + embed.description = f'Records 1 to 3 of {len(table)}. See ae7q.com for more...' + + await ctx.send(embed=embed) + + @_ae7q_lookup.command(name="licensee", aliases=["l"], category=cmn.cat.lookup) + async def _ae7q_licensee(self, ctx: commands.Context, licensee_id: str): + '''Look up the history of a licensee ID on [ae7q.com](http://ae7q.com/).''' + with ctx.typing(): + licensee_id = licensee_id.upper() + base_url = "http://ae7q.com/query/data/LicenseeIdHistory.php?ID=" + embed = cmn.embed_factory(ctx) + + async with self.session.get(base_url + licensee_id) as resp: + if resp.status != 200: + embed.title = "Error in AE7Q licensee command" + embed.description = 'Could not load AE7Q' + embed.colour = cmn.colours.bad + await ctx.send(embed=embed) + return + page = await resp.text() + + soup = BeautifulSoup(page, features="html.parser") + tables = [[row for row in table.find_all("tr")] for table in soup.select("table.Database")] + + if not len(tables): + embed.title = f"AE7Q History for Licensee {licensee_id}" + embed.colour = cmn.colours.bad + embed.url = base_url + licensee_id + embed.description = f'No records found for Licensee `{licensee_id}`' + await ctx.send(embed=embed) + return + + table = tables[0] + + table_headers = table[0].find_all("th") + first_header = ''.join(table_headers[0].strings) if len(table_headers) > 0 else None + + # catch if the wrong table was selected + if first_header is None or not first_header.startswith('With FCC'): + embed.title = f"AE7Q History for Licensee {licensee_id}" + embed.colour = cmn.colours.bad + embed.url = base_url + licensee_id + embed.description = f'No records found for Licensee `{licensee_id}`' + await ctx.send(embed=embed) + return + + table = await process_table(table[2:]) + + embed = cmn.embed_factory(ctx) + embed.title = f"AE7Q History for Licensee {licensee_id}" + embed.colour = cmn.colours.good + embed.url = base_url + licensee_id + + # add the first three rows of the table to the embed + for row in table[0:3]: + header = f'**{row[0]}** ({row[3]})' # **Callsign** (Applicant Type) + body = (f'Name: *{row[2]}*\n' + f'Class: *{row[4]}*\n' + f'Region: *{row[1]}*\n' + f'Status: *{row[5]}*\n' + f'Granted: *{row[6]}*\n' + f'Effective: *{row[7]}*\n' + f'Cancelled: *{row[8]}*\n' + f'Expires: *{row[9]}*') + embed.add_field(name=header, value=body, inline=False) + + if len(table) > 3: + embed.description = f'Records 1 to 3 of {len(table)}. See ae7q.com for more...' + + await ctx.send(embed=embed) + + +async def process_table(table: list): + """Processes tables (*not* including headers) and returns the processed table""" + table_contents = [] + for tr in table: + row = [] + for td in tr.find_all('td'): + cell_val = td.getText().strip() + row.append(cell_val if cell_val else '-') + + # take care of columns that span multiple rows by copying the contents rightward + if 'colspan' in td.attrs and int(td.attrs['colspan']) > 1: + for i in range(int(td.attrs['colspan']) - 1): + row.append(row[-1]) + + # get rid of ditto marks by copying the contents from the previous row + for i, cell in enumerate(row): + if cell == "\"": + row[i] = table_contents[-1][i] + # add row to table + table_contents += [row] + return table_contents def setup(bot: commands.Bot):