diff --git a/bridge.py b/bridge.py index 8ecb769..dc7d91f 100755 --- a/bridge.py +++ b/bridge.py @@ -338,6 +338,16 @@ def config_reports(_config, _factory): return report_server +# Send data to all OBP connections that have an encryption key. Data such as subscribers are sent to other HBNet servers. +def svrd_send_all(_svrd_data): + _svrd_packet = SVRD + for system in CONFIG['SYSTEMS']: + if CONFIG['SYSTEMS'][system]['ENABLED']: + if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': + if CONFIG['SYSTEMS'][system]['ENCRYPTION_KEY'] != b'': + systems[system].send_system(_svrd_packet + _svrd_data) +## pass + # Import Bridging rules # Note: A stanza *must* exist for any MASTER or CLIENT configured in the main @@ -405,6 +415,8 @@ def rule_timer_loop(unit_flood_time): else: logger.debug('(ROUTER) Conference Bridge NO ACTION: System: %s, Bridge: %s, TS: %s, TGID: %s', _system['SYSTEM'], _bridge, _system['TS'], int_id(_system['TGID'])) + for unit in UNIT_MAP: + svrd_send_all(b'UNIT' + unit) _then = _now - unit_flood_time remove_list = [] for unit in UNIT_MAP: @@ -423,10 +435,11 @@ def rule_timer_loop(unit_flood_time): # run this every 10 seconds to trim orphaned stream ids def stream_trimmer_loop(): + print(UNIT_MAP) ping(CONFIG) logger.debug('(ROUTER) Trimming inactive stream IDs from system lists') _now = time() - + for system in systems: # HBP systems, master and peer if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': @@ -482,6 +495,12 @@ class routerOBP(OPENBRIDGE): # list of self._targets for unit (subscriber, private) calls self._targets = [] + def svrd_received(self, _mode, _data): + logger.info('SVRD Received. Mode: ' + str(_mode) + ' Data: ' + str(_data)) + if _mode == b'UNIT': + UNIT_MAP[_data] = (self._system, time()) + + def group_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data): pkt_time = time() dmrpkt = _data[20:53] @@ -684,6 +703,9 @@ class routerOBP(OPENBRIDGE): # Make/update this unit in the UNIT_MAP cache UNIT_MAP[_rf_src] = (self.name, pkt_time) + + # Send update to all OpenBridge connections +## svrd_send_all(b'UNIT' + _rf_src) # + b'TIME' + pkt_time) # Is this a new call stream? @@ -910,6 +932,9 @@ class routerHBP(HBSYSTEM): # Make/update an entry in the UNIT_MAP for this subscriber UNIT_MAP[_rf_src] = (self.name, pkt_time) + # Update other servers via OBP +## svrd_send_all(b'UNIT' + _rf_src) # + b'TIME' + pkt_time) + # Is this a new call stream? if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']): if (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']): @@ -917,6 +942,10 @@ class routerHBP(HBSYSTEM): return # This is a new call stream + + # Send subscriber ID over OBP + svrd_send_all(b'UNIT' + _rf_src) + self.STATUS[_slot]['RX_START'] = pkt_time logger.info('(%s) *GROUP CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) TGID %s (%s), TS %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) @@ -1186,6 +1215,9 @@ class routerHBP(HBSYSTEM): # Make/update this unit in the UNIT_MAP cache UNIT_MAP[_rf_src] = (self.name, pkt_time) + + # Update other servers via OBP +## svrd_send_all(b'UNIT' + _rf_src) # + b'TIME' + pkt_time) # Is this a new call stream? @@ -1208,6 +1240,7 @@ class routerHBP(HBSYSTEM): self._targets.remove(self._system) # This is a new call stream, so log & report + svrd_send_all(b'UNIT' + _rf_src) self.STATUS[_slot]['RX_START'] = pkt_time logger.info('(%s) *UNIT CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) UNIT: %s (%s), TS: %s, FORWARD: %s', \ self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, self._targets) diff --git a/data_gateway.py b/data_gateway.py index ecdfffa..50a174c 100644 --- a/data_gateway.py +++ b/data_gateway.py @@ -111,6 +111,7 @@ __email__ = 'n0mjs@me.com' hdr_type = '' btf = -1 ssid = '' +UNIT_MAP = {} # From dmr_utils3, modified to decode entire packet. Works for 1/2 rate coded data. def decode_full(_data): @@ -889,6 +890,30 @@ def data_que_send(): except Exception as e: logger.info(e) +# the APRS RX process +def aprs_rx(aprs_rx_login, aprs_passcode, aprs_server, aprs_port, aprs_filter, user_ssid): + global AIS + AIS = aprslib.IS(aprs_rx_login, passwd=int(aprs_passcode), host=aprs_server, port=int(aprs_port)) + user_settings = ast.literal_eval(os.popen('cat ' + user_settings_file).read()) + AIS.set_filter(aprs_filter)#parser.get('DATA_CONFIG', 'APRS_FILTER')) + try: + if 'N0CALL' in aprs_callsign: + logger.info() + logger.info('APRS callsighn set to N0CALL, not connecting to APRS-IS') + logger.info() + pass + else: + AIS.connect() + print('Connecting to APRS-IS') + if int(CONFIG['DATA_CONFIG']['IGATE_BEACON_TIME']) == 0: + logger.info('APRS beacon disabled') + if int(CONFIG['DATA_CONFIG']['IGATE_BEACON_TIME']) != 0: + aprs_beacon=task.LoopingCall(aprs_beacon_send) + aprs_beacon.start(int(CONFIG['DATA_CONFIG']['IGATE_BEACON_TIME'])*60) + AIS.consumer(aprs_process, raw=True, immortal=False) + except Exception as e: + logger.info(e) + ##### DMR data function #### def data_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): # Capture data headers @@ -1142,8 +1167,22 @@ def data_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _fr ###### -##call_type = 'unit' +def rule_timer_loop(): + global UNIT_MAP + logger.debug('(ROUTER) routerHBP Rule timer loop started') + _now = time() + _then = _now - 60 + remove_list = [] + for unit in UNIT_MAP: + if UNIT_MAP[unit][1] < (_then): + remove_list.append(unit) + for unit in remove_list: + del UNIT_MAP[unit] + + logger.debug('Removed unit(s) %s from UNIT_MAP', remove_list) + + class OBP(OPENBRIDGE): def __init__(self, _name, _config, _report): @@ -1151,9 +1190,13 @@ class OBP(OPENBRIDGE): def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): + UNIT_MAP[_rf_src] = (self._system, time()) print('OBP RCVD') data_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data) -## pass + + def svrd_received(self, _mode, _data): + if _mode == b'UNIT': + UNIT_MAP[_data] = (self._system, time()) class HBP(HBSYSTEM): @@ -1162,6 +1205,7 @@ class HBP(HBSYSTEM): HBSYSTEM.__init__(self, _name, _config, _report) def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data): + UNIT_MAP[_rf_src] = (self._system, time()) print('MMDVM RCVD') data_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data) ## pass @@ -1307,5 +1351,17 @@ if __name__ == '__main__': logger.error('(GLOBAL) STOPPING REACTOR TO AVOID MEMORY LEAK: Unhandled error in timed loop.\n %s', failure) reactor.stop() + # Initialize the rule timer -- this if for user activated stuff + rule_timer_task = task.LoopingCall(rule_timer_loop) + rule_timer = rule_timer_task.start(60) + rule_timer.addErrback(loopingErrHandle) + if 'N0CALL' in aprs_callsign: + logger.info('APRS callsighn set to N0CALL, packet not sent.') + pass + else: + aprs_thread = threading.Thread(target=aprs_rx, args=(aprs_callsign, aprs_passcode, aprs_server, aprs_port, aprs_filter, user_ssid,)) + aprs_thread.daemon = True + aprs_thread.start() + reactor.run() diff --git a/hblink.py b/hblink.py index bad3c85..5ab2bae 100755 --- a/hblink.py +++ b/hblink.py @@ -172,6 +172,7 @@ class OPENBRIDGE(DatagramProtocol): # logger.debug('(%s) TX Packet to OpenBridge %s:%s -- %s', self._system, self._config['TARGET_IP'], self._config['TARGET_PORT'], ahex(_packet)) # Special Server Data packet, encrypted using frenet, send elif _packet[:4] == SVRD: + print(_packet) _enc_pkt = encrypt_packet(self._config['ENCRYPTION_KEY'], _packet) _packet = b'SVRD' + _enc_pkt self.transport.write(_packet, (self._config['TARGET_IP'], self._config['TARGET_PORT'])) @@ -183,6 +184,9 @@ class OPENBRIDGE(DatagramProtocol): pass #print(int_id(_peer_id), int_id(_rf_src), int_id(_dst_id), int_id(_seq), _slot, _call_type, _frame_type, repr(_dtype_vseq), int_id(_stream_id)) + def svrd_received(self, _mode, _data): + pass + def datagramReceived(self, _packet, _sockaddr): # Keep This Line Commented Unless HEAVILY Debugging! ## logger.debug('(%s) RX packet from %s -- %s', self._system, _sockaddr, ahex(_packet)) @@ -196,10 +200,10 @@ class OPENBRIDGE(DatagramProtocol): _data = _packet[:53] _hash = _packet[53:] _ckhs = hmac_new(self._config['PASSPHRASE'],_data,sha1).digest() - print(ahex(_ckhs)) - print(ahex(_hash)) +## print(ahex(_ckhs)) +## print(ahex(_hash)) - print(compare_digest(_hash, _ckhs)) +## print(compare_digest(_hash, _ckhs)) if compare_digest(_hash, _ckhs) and _sockaddr == self._config['TARGET_SOCK']: _peer_id = _data[11:15] @@ -261,6 +265,7 @@ class OPENBRIDGE(DatagramProtocol): # Server Data packet, decrypt and process it. elif _packet[:4] == SVRD: _d_pkt = decrypt_packet(self._config['ENCRYPTION_KEY'], _packet[4:]) +## logger.info('SVRD Received: ' + str(_d_pkt)) # DMR Data packet, sent via SVRD if _d_pkt[:4] == b'DATA': @@ -282,6 +287,8 @@ class OPENBRIDGE(DatagramProtocol): _dtype_vseq = (_bits & 0xF) # data, 1=voice header, 2=voice terminator; voice, 0=burst A ... 5=burst F _stream_id = _data[16:20] self.dmrd_received(_peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data) + else: + self.svrd_received(_d_pkt[:4], _d_pkt[4:]) #************************************************ # HB MASTER CLASS