diff --git a/bridge.py b/bridge.py index 48a3487..7cb6d04 100755 --- a/bridge.py +++ b/bridge.py @@ -148,7 +148,7 @@ def rule_timer_loop(): report_server.send_clients(b'bridge updated') -# run this every 10 seconds to trim orphaned stream ids +## run this every 10 seconds to trim orphaned stream ids def stream_trimmer_loop(): logger.debug('(ROUTER) Trimming inactive stream IDs from system lists') _now = time() @@ -166,6 +166,9 @@ def stream_trimmer_loop(): system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_RFS']), int_id(_slot['RX_TGID']), slot, _slot['RX_TIME'] - _slot['RX_START']) if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(_slot['RX_STREAM_ID']), int_id(_slot['RX_PEER']), int_id(_slot['RX_RFS']), slot, int_id(_slot['RX_TGID']), _slot['RX_TIME'] - _slot['RX_START']).encode(encoding='utf-8', errors='ignore')) + #Null stream_id - for loop control + if _slot['RX_TIME'] < _now - 60: + _slot['RX_STREAM_ID'] = b'\x00' # TX slot check if _slot['TX_TYPE'] != HBPF_SLT_VTERM and _slot['TX_TIME'] < _now - 5: @@ -179,23 +182,60 @@ def stream_trimmer_loop(): # We can't delete items from a dicationry that's being iterated, so we have to make a temporarly list of entrys to remove later if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': remove_list = [] + fin_list = [] for stream_id in systems[system].STATUS: - try: - if systems[system].STATUS[stream_id]['LAST'] < _now - 5: - remove_list.append(stream_id) - except: - logger.debug("(%s) Keyerror - stream trimmer Stream ID: %s Start: %s Contention: %s RFS: %s TGID: %s",stream_id,systems[system].STATUS[stream_id]['START'],systems[system].STATUS[stream_id]['CONTENTION'],systems[system].STATUS[stream_id]['RFS'],int_id(systems[system].STATUS[stream_id]['TGID'])) - systems[system].STATUS[stream_id]['LAST'] = _now + + #if stream already marked as finished, just remove it + if '_fin' in systems[system].STATUS[stream_id] and systems[system].STATUS[stream_id]['LAST'] < _now - 180: + logger.info('(%s) *FINISHED STREAM* STREAM ID: %s',system, int_id(stream_id)) + fin_list.append(stream_id) continue - for stream_id in remove_list: - if stream_id in systems[system].STATUS: + + #try: + if '_to' not in systems[system].STATUS[stream_id] and '_fin' not in systems[system].STATUS[stream_id] and systems[system].STATUS[stream_id]['LAST'] < _now - 5: _stream = systems[system].STATUS[stream_id] _sysconfig = CONFIG['SYSTEMS'][system] + #systems[system].STATUS[stream_id]['_fin'] = True logger.info('(%s) *TIME OUT* STREAM ID: %s SUB: %s PEER: %s TGID: %s TS 1 Duration: %.2f', \ system, int_id(stream_id), get_alias(int_id(_stream['RFS']), subscriber_ids), get_alias(int_id(_sysconfig['NETWORK_ID']), peer_ids), get_alias(int_id(_stream['TGID']), talkgroup_ids), _stream['LAST'] - _stream['START']) if CONFIG['REPORTS']['REPORT']: systems[system]._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(system, int_id(stream_id), int_id(_sysconfig['NETWORK_ID']), int_id(_stream['RFS']), 1, int_id(_stream['TGID']), _stream['LAST'] - _stream['START']).encode(encoding='utf-8', errors='ignore')) + systems[system].STATUS[stream_id]['_to'] = True + continue + #except: + #logger.warning("(%s) Keyerror - stream trimmer Stream ID: %s",system,stream_id) + #systems[system].STATUS[stream_id]['LAST'] = _now + #continue + + + try: + if systems[system].STATUS[stream_id]['LAST'] < _now - 180: + remove_list.append(stream_id) + except: + logger.warning("(%s) Keyerror - stream trimmer Stream ID: %s",system,stream_id) + systems[system].STATUS[stream_id]['LAST'] = _now + continue + + #remove finished + for stream_id in fin_list: + removed = systems[system].STATUS.pop(stream_id) + + for stream_id in remove_list: + if stream_id in systems[system].STATUS: + _stream = systems[system].STATUS[stream_id] + _sysconfig = CONFIG['SYSTEMS'][system] + removed = systems[system].STATUS.pop(stream_id) + + try: + _bcsq_remove = [] + for tgid in _sysconfig['_bcsq']: + if _sysconfig['_bcsq'][tgid] == stream_id: + _bcsq_remove.append(tgid) + for bcrm in _bcsq_remove: + removed = _sysconfig['_bcsq'].pop(bcrm) + except KeyError: + pass else: logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) @@ -213,18 +253,6 @@ class routerOBP(OPENBRIDGE): pkt_time = time() dmrpkt = _data[20:53] _bits = _data[15] - - #Handle inbound duplicates - if _seq == True and _seq == self._lastSeq: - logger.debug("%s) Duplicate sequence number %s, disgarding",self._system,_seq) - return - #Inbound out-of-order packets - elif _seq == True and (_seq != 1) and (_seq < self._lastSeq): - logger.debug("%s) Out of order packet - last sequence number %s, this sequence number %s, disgarding",self._system,self._lastSeq,_seq) - return - #Inbound missed packets - elif _seq == True and _seq > (self._lastSeq+1): - logger.debug("(%s) Missed packet - last sequence number %s, this sequence number %s",self._system,self._lastSeq,_seq) if _call_type == 'group': # Is this a new call stream? @@ -235,6 +263,8 @@ class routerOBP(OPENBRIDGE): 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, + 'lastSeq': False, + 'lastData': False } # If we can, use the LC from the voice header as to keep all options intact @@ -252,6 +282,63 @@ class routerOBP(OPENBRIDGE): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,START,RX,{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore')) + else: + + #Finished stream handling# + if '_fin' in self.STATUS[_stream_id]: + if '_finlog' not in self.STATUS[_stream_id]: + logger.warning("(%s) OBP *LoopControl* STREAM ID: %s ALREADY FINISHED FROM THIS SOURCE, IGNORING",self._system, int_id(_stream_id)) + self.STATUS[_stream_id]['_finlog'] = True + return + + #LoopControl# + for system in systems: + if system == self._system: + continue + if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': + for _sysslot in systems[system].STATUS: + if 'RX_STREAM_ID' in systems[system].STATUS[_sysslot] and _stream_id == systems[system].STATUS[_sysslot]['RX_STREAM_ID']: + if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: + logger.warning("(%s) OBP *LoopControl* FIRST HBP: %s, STREAM ID: %s, TG: %s, TS: %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id),_sysslot) + self.STATUS[_stream_id]['LOOPLOG'] = True + self.STATUS[_stream_id]['LAST'] = pkt_time + return + else: + #if _stream_id in systems[system].STATUS and systems[system].STATUS[_stream_id]['START'] <= self.STATUS[_stream_id]['START']: + if _stream_id in systems[system].STATUS and '1ST' in systems[system].STATUS[_stream_id] and systems[system].STATUS[_stream_id]['TGID'] == _dst_id: + if 'LOOPLOG' not in self.STATUS[_stream_id] or not self.STATUS[_stream_id]['LOOPLOG']: + logger.warning("(%s) OBP *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id)) + self.STATUS[_stream_id]['LOOPLOG'] = True + self.STATUS[_stream_id]['LAST'] = pkt_time + + if CONFIG['SYSTEMS'][self._system]['ENHANCED_OBP'] and '_bcsq' not in self.STATUS[_stream_id]: + systems[self._system].send_bcsq(_dst_id,_stream_id) + #logger.warning("(%s) OBP *BridgeControl* Sent BCSQ , STREAM ID: %s, TG %s",self._system, int_id(_stream_id), int_id(_dst_id)) + self.STATUS[_stream_id]['_bcsq'] = True + return + + + #Duplicate handling# + #Duplicate complete packet + if self.STATUS[_stream_id]['lastData'] and self.STATUS[_stream_id]['lastData'] == _data and _seq > 1: + logger.warning("(%s) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s",self._system,int_id(_stream_id),int_id(_dst_id)) + return + #Handle inbound duplicates + if _seq and _seq == self.STATUS[_stream_id]['lastSeq']: + logger.warning("(%s) *PacketControl* Duplicate sequence number %s, disgarding. Stream ID:, %s TGID: %s",self._system,_seq,int_id(_stream_id),int_id(_dst_id)) + return + #Inbound out-of-order packets + if _seq and self.STATUS[_stream_id]['lastSeq'] and (_seq != 1) and (_seq < self.STATUS[_stream_id]['lastSeq']): + logger.warning("%s) *PacketControl* Out of order packet - last SEQ: %s, this SEQ: %s, disgarding. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) + return + #Inbound missed packets + if _seq and self.STATUS[_stream_id]['lastSeq'] and _seq > (self.STATUS[_stream_id]['lastSeq']+1): + logger.warning("(%s) *PacketControl* Missed packet(s) - last SEQ: %s, this SEQ: %s. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) + + #Save this sequence number + self.STATUS[_stream_id]['lastSeq'] = _seq + #Save this packet + self.STATUS[_stream_id]['lastData'] = _data self.STATUS[_stream_id]['LAST'] = pkt_time @@ -408,10 +495,11 @@ class routerOBP(OPENBRIDGE): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) - removed = self.STATUS.pop(_stream_id) - logger.debug('(%s) OpenBridge sourced call stream end, remove terminated Stream ID: %s', self._system, int_id(_stream_id)) - if not removed: - selflogger.error('(%s) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM', self._system, int_id(_stream_id)) + self.STATUS[_stream_id]['_fin'] = True + #removed = self.STATUS.pop(_stream_id) + #logger.debug('(%s) OpenBridge sourced call stream end, remove terminated Stream ID: %s', self._system, int_id(_stream_id)) + #if not removed: + #selflogger.error('(%s) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM', self._system, int_id(_stream_id)) #Reset sequence number self._lastSeq = False @@ -449,7 +537,10 @@ class routerHBP(HBSYSTEM): 2: b'\x00', 3: b'\x00', 4: b'\x00', - } + }, + 'lastSeq': False, + 'lastData': False + }, 2: { 'RX_START': time(), @@ -475,7 +566,9 @@ class routerHBP(HBSYSTEM): 2: b'\x00', 3: b'\x00', 4: b'\x00', - } + }, + 'lastSeq': False, + 'lastData': False } } @@ -509,6 +602,56 @@ class routerHBP(HBSYSTEM): else: self.STATUS[_slot]['RX_LC'] = LC_OPT + _dst_id + _rf_src + #LoopControl# + for system in systems: + if system == self._system: + continue + if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': + for _sysslot in systems[system].STATUS: + if 'RX_STREAM_ID' in systems[system].STATUS[_sysslot] and _stream_id == systems[system].STATUS[_sysslot]['RX_STREAM_ID']: + if 'LOOPLOG' not in self.STATUS[_slot] or not self.STATUS[_slot]['LOOPLOG']: + logger.warning("(%s) OBP *LoopControl* FIRST HBP: %s, STREAM ID: %s, TG: %s, TS: %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id),_sysslot) + self.STATUS[_slot]['LOOPLOG'] = True + self.STATUS[_slot]['LAST'] = pkt_time + return + else: + #if _stream_id in systems[system].STATUS and systems[system].STATUS[_stream_id]['START'] <= self.STATUS[_stream_id]['START']: + if _stream_id in systems[system].STATUS and '1ST' in systems[system].STATUS[_stream_id] and systems[system].STATUS[_stream_id]['TGID'] == _dst_id: + if 'LOOPLOG' not in self.STATUS[_slot] or not self.STATUS[_slot]['LOOPLOG']: + logger.warning("(%s) OBP *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id)) + self.STATUS[_slot]['LOOPLOG'] = True + self.STATUS[_slot]['LAST'] = pkt_time + + if CONFIG['SYSTEMS'][self._system]['ENHANCED_OBP'] and '_bcsq' not in self.STATUS[_slot]: + systems[self._system].send_bcsq(_dst_id,_stream_id) + #logger.warning("(%s) OBP *BridgeControl* Sent BCSQ , STREAM ID: %s, TG %s",self._system, int_id(_stream_id), int_id(_dst_id)) + self.STATUS[_slot]['_bcsq'] = True + return + + #Duplicate handling# + #Duplicate complete packet + if self.STATUS[_slot]['lastData'] and self.STATUS[_slot]['lastData'] == _data and _seq > 1: + logger.warning("(%s) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s",self._system,int_id(_stream_id),int_id(_dst_id)) + return + #Handle inbound duplicates + if _seq and _seq == self.STATUS[_slot]['lastSeq']: + logger.warning("(%s) *PacketControl* Duplicate sequence number %s, disgarding. Stream ID:, %s TGID: %s",self._system,_seq,int_id(_stream_id),int_id(_dst_id)) + return + #Inbound out-of-order packets + if _seq and self.STATUS[_slot]['lastSeq'] and (_seq != 1) and (_seq < self.STATUS[_slot]['lastSeq']): + logger.warning("%s) *PacketControl* Out of order packet - last SEQ: %s, this SEQ: %s, disgarding. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_slot]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) + return + #Inbound missed packets + if _seq and self.STATUS[_slot]['lastSeq'] and _seq > (self.STATUS[_slot]['lastSeq']+1): + logger.warning("(%s) *PacketControl* Missed packet(s) - last SEQ: %s, this SEQ: %s. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_slot]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) + + #Save this sequence number + self.STATUS[_slot]['lastSeq'] = _seq + #Save this packet + self.STATUS[_slot]['lastData'] = _data + + + for _bridge in BRIDGES: for _system in BRIDGES[_bridge]: diff --git a/bridge_master.py b/bridge_master.py index 69e5e2a..938419d 100755 --- a/bridge_master.py +++ b/bridge_master.py @@ -179,7 +179,7 @@ def make_single_bridge(_tgid,_sourcesystem,_slot,_tmout): BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 2, 'TGID': _tgid,'ACTIVE': False,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) - if _system[0:3] == 'OBP' and int_id(_tgid) >= 89: + if _system[0:3] == 'OBP' and int_id(_tgid) >= 79: BRIDGES[_tgid_s].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) #Make static bridge - used for on-the-fly relay bridges @@ -265,7 +265,7 @@ def make_single_reflector(_tgid,_tmout,_sourcesystem): BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': True,'TIMEOUT': _tmout * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time() + (_tmout * 60)}) else: BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 2, 'TGID': bytes_3(9),'ACTIVE': False,'TIMEOUT': CONFIG['SYSTEMS'][_system]['DEFAULT_UA_TIMER'] * 60,'TO_TYPE': 'ON','OFF': [],'ON': [_tgid,],'RESET': [], 'TIMER': time()}) - if _system[0:3] == 'OBP' and int_id(_tgid) >= 89: + if _system[0:3] == 'OBP' and int_id(_tgid) >= 79: BRIDGES[_bridge].append({'SYSTEM': _system, 'TS': 1, 'TGID': _tgid,'ACTIVE': True,'TIMEOUT': '','TO_TYPE': 'NONE','OFF': [],'ON': [],'RESET': [], 'TIMER': time()}) def remove_bridge_system(system): @@ -1078,10 +1078,7 @@ class routerOBP(OPENBRIDGE): def __init__(self, _name, _config, _report): OPENBRIDGE.__init__(self, _name, _config, _report) self.STATUS = {} - - #Store last sequence number - self._lastSeq = False - + def to_target(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data, pkt_time, dmrpkt, _bits,_bridge,_system,_noOBP,sysIgnore): _sysIgnore = sysIgnore for _target in BRIDGES[_bridge]: @@ -1251,20 +1248,7 @@ class routerOBP(OPENBRIDGE): pkt_time = time() dmrpkt = _data[20:53] _bits = _data[15] - - #Handle inbound duplicates - if _seq == True and _seq == self._lastSeq: - logger.debug("%s) Duplicate sequence number %s, disgarding",self._system,_seq) - return - #Inbound out-of-order packets - elif _seq == True and (_seq != 1) and (_seq < self._lastSeq): - logger.debug("%s) Out of order packet - last sequence number %s, this sequence number %s, disgarding",self._system,self._lastSeq,_seq) - return - #Inbound missed packets - elif _seq == True and _seq > (self._lastSeq+1): - logger.debug("(%s) Missed packet - last sequence number %s, this sequence number %s",self._system,self._lastSeq,_seq) - - + if _call_type == 'group': # Is this a new call stream? if (_stream_id not in self.STATUS): @@ -1275,7 +1259,10 @@ class routerOBP(OPENBRIDGE): 'CONTENTION':False, 'RFS': _rf_src, 'TGID': _dst_id, - '1ST': True + '1ST': True, + 'lastSeq': False, + 'lastData': False + } # If we can, use the LC from the voice header as to keep all options intact @@ -1296,15 +1283,14 @@ class routerOBP(OPENBRIDGE): else: - - + #Finished stream handling# if '_fin' in self.STATUS[_stream_id]: if '_finlog' not in self.STATUS[_stream_id]: logger.warning("(%s) OBP *LoopControl* STREAM ID: %s ALREADY FINISHED FROM THIS SOURCE, IGNORING",self._system, int_id(_stream_id)) self.STATUS[_stream_id]['_finlog'] = True return - + #LoopControl# for system in systems: if system == self._system: continue @@ -1328,15 +1314,34 @@ class routerOBP(OPENBRIDGE): systems[self._system].send_bcsq(_dst_id,_stream_id) #logger.warning("(%s) OBP *BridgeControl* Sent BCSQ , STREAM ID: %s, TG %s",self._system, int_id(_stream_id), int_id(_dst_id)) self.STATUS[_stream_id]['_bcsq'] = True - - return + + #Duplicate handling# + #Duplicate complete packet + if self.STATUS[_stream_id]['lastData'] and self.STATUS[_stream_id]['lastData'] == _data and _seq > 1: + logger.warning("(%s) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s",self._system,int_id(_stream_id),int_id(_dst_id)) + return + #Handle inbound duplicates + if _seq and _seq == self.STATUS[_stream_id]['lastSeq']: + logger.warning("(%s) *PacketControl* Duplicate sequence number %s, disgarding. Stream ID:, %s TGID: %s",self._system,_seq,int_id(_stream_id),int_id(_dst_id)) + return + #Inbound out-of-order packets + if _seq and self.STATUS[_stream_id]['lastSeq'] and (_seq != 1) and (_seq < self.STATUS[_stream_id]['lastSeq']): + logger.warning("%s) *PacketControl* Out of order packet - last SEQ: %s, this SEQ: %s, disgarding. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) + return + #Inbound missed packets + if _seq and self.STATUS[_stream_id]['lastSeq'] and _seq > (self.STATUS[_stream_id]['lastSeq']+1): + logger.warning("(%s) *PacketControl* Missed packet(s) - last SEQ: %s, this SEQ: %s. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_stream_id]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) + + #Save this sequence number + self.STATUS[_stream_id]['lastSeq'] = _seq + #Save this packet + self.STATUS[_stream_id]['lastData'] = _data + self.STATUS[_stream_id]['LAST'] = pkt_time - #Save this sequence number - self._lastSeq = _seq #Create STAT bridge for unknown TG if CONFIG['GLOBAL']['GEN_STAT_BRIDGES']: @@ -1368,7 +1373,7 @@ class routerOBP(OPENBRIDGE): #selflogger.error('(%s) *CALL END* STREAM ID: %s NOT IN LIST -- THIS IS A REAL PROBLEM', self._system, int_id(_stream_id)) #Reset sequence number - self._lastSeq = False + self.STATUS[_stream_id]['lastSeq'] = False class routerHBP(HBSYSTEM): @@ -1402,7 +1407,9 @@ class routerHBP(HBSYSTEM): 2: b'\x00', 3: b'\x00', 4: b'\x00', - } + }, + 'lastSeq': False, + 'lastData': False }, 2: { 'RX_START': time(), @@ -1428,7 +1435,9 @@ class routerHBP(HBSYSTEM): 2: b'\x00', 3: b'\x00', 4: b'\x00', - } + }, + 'lastSeq': False, + 'lastData': False } } @@ -1764,10 +1773,57 @@ class routerHBP(HBSYSTEM): self.STATUS[_slot]['RX_LC'] = LC_OPT + _dst_id + _rf_src #Create default bridge for unknown TG - if int_id(_dst_id) >= 5 and int_id(_dst_id) != 9 and (str(int_id(_dst_id)) not in BRIDGES): + if int_id(_dst_id) >= 5 and int_id(_dst_id) != 9 and int_id(_dst_id) != 4000 and int_id(_dst_id) != 5000 and (str(int_id(_dst_id)) not in BRIDGES): logger.info('(%s) Bridge for TG %s does not exist. Creating as User Activated. Timeout %s',self._system, int_id(_dst_id),CONFIG['SYSTEMS'][self._system]['DEFAULT_UA_TIMER']) make_single_bridge(_dst_id,self._system,_slot,CONFIG['SYSTEMS'][self._system]['DEFAULT_UA_TIMER']) - + + #LoopControl# + for system in systems: + if system == self._system: + continue + if CONFIG['SYSTEMS'][system]['MODE'] != 'OPENBRIDGE': + for _sysslot in systems[system].STATUS: + if 'RX_STREAM_ID' in systems[system].STATUS[_sysslot] and _stream_id == systems[system].STATUS[_sysslot]['RX_STREAM_ID']: + if 'LOOPLOG' not in self.STATUS[_slot] or not self.STATUS[_slot]['LOOPLOG']: + logger.warning("(%s) OBP *LoopControl* FIRST HBP: %s, STREAM ID: %s, TG: %s, TS: %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id),_sysslot) + self.STATUS[_slot]['LOOPLOG'] = True + self.STATUS[_slot]['LAST'] = pkt_time + return + else: + #if _stream_id in systems[system].STATUS and systems[system].STATUS[_stream_id]['START'] <= self.STATUS[_stream_id]['START']: + if _stream_id in systems[system].STATUS and '1ST' in systems[system].STATUS[_stream_id] and systems[system].STATUS[_stream_id]['TGID'] == _dst_id: + if 'LOOPLOG' not in self.STATUS[_slot] or not self.STATUS[_slot]['LOOPLOG']: + logger.warning("(%s) OBP *LoopControl* FIRST OBP %s, STREAM ID: %s, TG %s, IGNORE THIS SOURCE",self._system, system, int_id(_stream_id), int_id(_dst_id)) + self.STATUS[_slot]['LOOPLOG'] = True + self.STATUS[_slot]['LAST'] = pkt_time + + if CONFIG['SYSTEMS'][self._system]['ENHANCED_OBP'] and '_bcsq' not in self.STATUS[_slot]: + systems[self._system].send_bcsq(_dst_id,_stream_id) + #logger.warning("(%s) OBP *BridgeControl* Sent BCSQ , STREAM ID: %s, TG %s",self._system, int_id(_stream_id), int_id(_dst_id)) + self.STATUS[_slot]['_bcsq'] = True + return + + #Duplicate handling# + #Duplicate complete packet + if self.STATUS[_slot]['lastData'] and self.STATUS[_slot]['lastData'] == _data and _seq > 1: + logger.warning("(%s) *PacketControl* last packet is a complete duplicate of the previous one, disgarding. Stream ID:, %s TGID: %s",self._system,int_id(_stream_id),int_id(_dst_id)) + return + #Handle inbound duplicates + if _seq and _seq == self.STATUS[_slot]['lastSeq']: + logger.warning("(%s) *PacketControl* Duplicate sequence number %s, disgarding. Stream ID:, %s TGID: %s",self._system,_seq,int_id(_stream_id),int_id(_dst_id)) + return + #Inbound out-of-order packets + if _seq and self.STATUS[_slot]['lastSeq'] and (_seq != 1) and (_seq < self.STATUS[_slot]['lastSeq']): + logger.warning("%s) *PacketControl* Out of order packet - last SEQ: %s, this SEQ: %s, disgarding. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_slot]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) + return + #Inbound missed packets + if _seq and self.STATUS[_slot]['lastSeq'] and _seq > (self.STATUS[_slot]['lastSeq']+1): + logger.warning("(%s) *PacketControl* Missed packet(s) - last SEQ: %s, this SEQ: %s. Stream ID:, %s TGID: %s ",self._system,self.STATUS[_slot]['lastSeq'],_seq,int_id(_stream_id),int_id(_dst_id)) + + #Save this sequence number + self.STATUS[_slot]['lastSeq'] = _seq + #Save this packet + self.STATUS[_slot]['lastData'] = _data _sysIgnore = [] for _bridge in BRIDGES: @@ -1792,6 +1848,10 @@ class routerHBP(HBSYSTEM): self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration) if CONFIG['REPORTS']['REPORT']: self._report.send_bridgeEvent('GROUP VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore')) + + #Reset back to False + self.STATUS[_slot]['lastSeq'] = False + self.STATUS[_slot]['lastData'] = False # # Begin in-band signalling for call end. This has nothign to do with routing traffic directly. @@ -2088,8 +2148,8 @@ if __name__ == '__main__': for system in CONFIG['SYSTEMS']: if CONFIG['SYSTEMS'][system]['ENABLED']: - if CONFIG['SYSTEMS'][system]['MODE'] == 'XLXPEER' or CONFIG['SYSTEMS'][system]['MODE'] == 'PEER': - logger.warning('(GLOBAL) system %s not started - XLXPEER and PEER connections currently unsupported ', system) + if CONFIG['SYSTEMS'][system]['MODE'] == 'XLXPEER': + logger.warning('(GLOBAL) system %s not started - XLXPEER connections currently unsupported ', system) continue if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': systems[system] = routerOBP(system, CONFIG, report_server) diff --git a/hblink.py b/hblink.py index a3243d0..299ce0b 100755 --- a/hblink.py +++ b/hblink.py @@ -589,7 +589,7 @@ class HBSYSTEM(DatagramProtocol): _this_peer['PACKAGE_ID'] = _data[262:302] self.send_peer(_peer_id, b''.join([RPTACK, _peer_id])) - logger.info('(%s) Peer %s (%s) has sent repeater configuration, Package ID: %s, Software ID: %s', self._system, _this_peer['CALLSIGN'], _this_peer['RADIO_ID'],self._peers[_peer_id]['PACKAGE_ID'].decode().rstrip(),self._peers[_peer_id]['SOFTWARE_ID'].decode().rstrip()) + logger.info('(%s) Peer %s (%s) has sent repeater configuration, Package ID: %s, Software ID: %s, Desc: %s', self._system, _this_peer['CALLSIGN'], _this_peer['RADIO_ID'],self._peers[_peer_id]['PACKAGE_ID'].decode().rstrip(),self._peers[_peer_id]['SOFTWARE_ID'].decode().rstrip(),self._peers[_peer_id]['DESCRIPTION'].decode().rstrip()) else: self.transport.write(b''.join([MSTNAK, _peer_id]), _sockaddr) logger.warning('(%s) Peer info from Radio ID that has not logged in: %s', self._system, int_id(_peer_id)) @@ -636,7 +636,8 @@ class HBSYSTEM(DatagramProtocol): _peer_id = _data[11:15] if self._config['LOOSE'] or _peer_id == self._config['RADIO_ID']: # Validate the Radio_ID unless using loose validation - _seq = _data[4:5] + #_seq = _data[4:5] + _seq = _data[4] _rf_src = _data[5:8] _dst_id = _data[8:11] _bits = _data[15]