Adding inbound OBP Processing
This commit is contained in:
		
							parent
							
								
									364ead81ec
								
							
						
					
					
						commit
						877e1c6c65
					
				
							
								
								
									
										126
									
								
								bridge.py
									
									
									
									
									
								
							
							
						
						
									
										126
									
								
								bridge.py
									
									
									
									
									
								
							@ -413,10 +413,136 @@ class routerOBP(OPENBRIDGE):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def unit_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data):
 | 
					    def unit_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _frame_type, _dtype_vseq, _stream_id, _data):
 | 
				
			||||||
 | 
					        global UNIT_MAP
 | 
				
			||||||
        pkt_time = time()
 | 
					        pkt_time = time()
 | 
				
			||||||
        dmrpkt = _data[20:53]
 | 
					        dmrpkt = _data[20:53]
 | 
				
			||||||
        _bits = _data[15]
 | 
					        _bits = _data[15]
 | 
				
			||||||
 
 | 
					 
 | 
				
			||||||
 | 
					        # Make/update this unit in the UNIT_MAP cache
 | 
				
			||||||
 | 
					        UNIT_MAP[_rf_src] = (self.name, pkt_time)
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        # Is this a new call stream?
 | 
				
			||||||
 | 
					        if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']):
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # Collision in progress, bail out!
 | 
				
			||||||
 | 
					            if (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM) and (pkt_time < (self.STATUS[_slot]['RX_TIME'] + STREAM_TO)) and (_rf_src != self.STATUS[_slot]['RX_RFS']):
 | 
				
			||||||
 | 
					                logger.warning('(%s) Packet received with STREAM ID: %s <FROM> SUB: %s PEER: %s <TO> UNIT %s, SLOT %s collided with existing call', self._system, int_id(_stream_id), int_id(_rf_src), int_id(_peer_id), int_id(_dst_id), _slot)
 | 
				
			||||||
 | 
					                return
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
 | 
					            # Create a destination list for the call:
 | 
				
			||||||
 | 
					            if _dst_id in UNIT_MAP:
 | 
				
			||||||
 | 
					                if UNIT_MAP[_dst_id][0] != self._system:
 | 
				
			||||||
 | 
					                    self._targets = [UNIT_MAP[_dst_id][0]]
 | 
				
			||||||
 | 
					                    _target_route = UNIT_MAP[_dst_id][0]
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    self._targets = []
 | 
				
			||||||
 | 
					                    logger.debug('UNIT call to a subscriber on the same system, send nothing')
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                self._targets = list(systems)
 | 
				
			||||||
 | 
					                self._targets.remove(self._system)
 | 
				
			||||||
 | 
					                _target_route = 'FLOOD'
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # This is a new call stream, so log & report
 | 
				
			||||||
 | 
					            self.STATUS[_slot]['RX_START'] = pkt_time
 | 
				
			||||||
 | 
					            logger.info('(%s) *UNIT CALL START* STREAM ID: %s SUB: %s (%s) PEER: %s (%s) UNIT: %s (%s), TS: %s, FORWARD: %s', \
 | 
				
			||||||
 | 
					                    self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, _target_route)
 | 
				
			||||||
 | 
					            if CONFIG['REPORTS']['REPORT']:
 | 
				
			||||||
 | 
					                self._report.send_bridgeEvent('UNIT VOICE,START,RX,{},{},{},{},{},{},{}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), _target_route).encode(encoding='utf-8', errors='ignore'))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for _target in self._targets:
 | 
				
			||||||
 | 
					            _target_status = systems[_target].STATUS
 | 
				
			||||||
 | 
					            _target_system = self._CONFIG['SYSTEMS'][_target]
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            if self._CONFIG['SYSTEMS'][_target]['MODE'] == 'OPENBRIDGE':
 | 
				
			||||||
 | 
					                if (_stream_id not in _target_status):
 | 
				
			||||||
 | 
					                    # This is a new call stream on the target
 | 
				
			||||||
 | 
					                    _target_status[_stream_id] = {
 | 
				
			||||||
 | 
					                        'START':     pkt_time,
 | 
				
			||||||
 | 
					                        'CONTENTION':False,
 | 
				
			||||||
 | 
					                        'RFS':       _rf_src,
 | 
				
			||||||
 | 
					                        'TGID':      _dst_id,
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    logger.info('(%s) Unit call bridged to OBP System: %s TS: %s, TGID: %s', self._system, _target, _slot, int_id(_dst_id))
 | 
				
			||||||
 | 
					                    if CONFIG['REPORTS']['REPORT']:
 | 
				
			||||||
 | 
					                        systems[_target]._report.send_bridgeEvent('UNIT VOICE,START,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore'))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # Record the time of this packet so we can later identify a stale stream
 | 
				
			||||||
 | 
					                _target_status[_stream_id]['LAST'] = pkt_time
 | 
				
			||||||
 | 
					                # Clear the TS bit -- all OpenBridge streams are effectively on TS1
 | 
				
			||||||
 | 
					                _tmp_bits = _bits & ~(1 << 7)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # Assemble transmit HBP packet
 | 
				
			||||||
 | 
					                _tmp_data = b''.join([_data[:15], _tmp_bits.to_bytes(1, 'big'), _data[16:20]])
 | 
				
			||||||
 | 
					                _data = b''.join([_tmp_data, dmrpkt])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                # BEGIN STANDARD CONTENTION HANDLING
 | 
				
			||||||
 | 
					                #
 | 
				
			||||||
 | 
					                # The rules for each of the 4 "ifs" below are listed here for readability. The Frame To Send is:
 | 
				
			||||||
 | 
					                #   From a different group than last RX from this HBSystem, but it has been less than Group Hangtime
 | 
				
			||||||
 | 
					                #   From a different group than last TX to this HBSystem, but it has been less than Group Hangtime
 | 
				
			||||||
 | 
					                #   From the same group as the last RX from this HBSystem, but from a different subscriber, and it has been less than stream timeout
 | 
				
			||||||
 | 
					                #   From the same group as the last TX to this HBSystem, but from a different subscriber, and it has been less than stream timeout
 | 
				
			||||||
 | 
					                # The "continue" at the end of each means the next iteration of the for loop that tests for matching rules
 | 
				
			||||||
 | 
					                #
 | 
				
			||||||
 | 
					                if ((_dst_id != _target_status[_slot]['RX_TGID']) and ((pkt_time - _target_status[_slot]['RX_TIME']) < _target_system['GROUP_HANGTIME'])):
 | 
				
			||||||
 | 
					                    if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id:
 | 
				
			||||||
 | 
					                        logger.info('(%s) Call not routed to destination %s, target active or in group hangtime: HBSystem: %s, TS: %s, DEST: %s', self._system, int_id(_dst_id), _target, _slot, int_id(_target_status[_slot]['RX_TGID']))
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					                if ((_dst_id != _target_status[_slot]['TX_TGID']) and ((pkt_time - _target_status[_slot]['TX_TIME']) < _target_system['GROUP_HANGTIME'])):
 | 
				
			||||||
 | 
					                    if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id:
 | 
				
			||||||
 | 
					                        logger.info('(%s) Call not routed to destination %s, target in group hangtime: HBSystem: %s, TS: %s, DEST: %s', self._system, int_id(_dst_id), _target, _slot, int_id(_target_status[_slot]['TX_TGID']))
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					                if (_dst_id == _target_status[_slot]['RX_TGID']) and ((pkt_time - _target_status[_slot]['RX_TIME']) < STREAM_TO):
 | 
				
			||||||
 | 
					                    if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id:
 | 
				
			||||||
 | 
					                        logger.info('(%s) Call not routed to destination %s, matching call already active on target: HBSystem: %s, TS: %s, DEST: %s', self._system, int_id(_dst_id), _target, _slot, int_id(_target_status[_slot]['RX_TGID']))
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					                if (_dst_id == _target_status[_slot]['TX_TGID']) and (_rf_src != _target_status[_slot]['TX_RFS']) and ((pkt_time - _target_status[_slot]['TX_TIME']) < STREAM_TO):
 | 
				
			||||||
 | 
					                    if _frame_type == HBPF_DATA_SYNC and _dtype_vseq == HBPF_SLT_VHEAD and self.STATUS[_slot]['RX_STREAM_ID'] != _stream_id:
 | 
				
			||||||
 | 
					                        logger.info('(%s) Call not routed for subscriber %s, call route in progress on target: HBSystem: %s, TS: %s, DEST: %s, SUB: %s', self._system, int_id(_rf_src), _target, _slot, int_id(_target_status[_slot]['TX_TGID']), int_id(_target_status[_slot]['TX_RFS']))
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # Record target information if this is a new call stream?
 | 
				
			||||||
 | 
					                if (_stream_id != self.STATUS[_slot]['RX_STREAM_ID']):
 | 
				
			||||||
 | 
					                    # Record the DST TGID and Stream ID
 | 
				
			||||||
 | 
					                    _target_status[_slot]['TX_START'] = pkt_time
 | 
				
			||||||
 | 
					                    _target_status[_slot]['TX_TGID'] = _dst_id
 | 
				
			||||||
 | 
					                    _target_status[_slot]['TX_STREAM_ID'] = _stream_id
 | 
				
			||||||
 | 
					                    _target_status[_slot]['TX_RFS'] = _rf_src
 | 
				
			||||||
 | 
					                    _target_status[_slot]['TX_PEER'] = _peer_id
 | 
				
			||||||
 | 
					                    
 | 
				
			||||||
 | 
					                    logger.info('(%s) Unit call bridged to HBP System: %s TS: %s, UNIT: %s', self._system, _target, _slot, int_id(_dst_id))
 | 
				
			||||||
 | 
					                    if CONFIG['REPORTS']['REPORT']:
 | 
				
			||||||
 | 
					                       systems[_target]._report.send_bridgeEvent('UNIT VOICE,START,TX,{},{},{},{},{},{}'.format(_target, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id)).encode(encoding='utf-8', errors='ignore'))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # Set other values for the contention handler to test next time there is a frame to forward
 | 
				
			||||||
 | 
					                _target_status[_slot]['TX_TIME'] = pkt_time
 | 
				
			||||||
 | 
					                _target_status[_slot]['TX_TYPE'] = _dtype_vseq
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            #send the call:
 | 
				
			||||||
 | 
					            systems[_target].send_system(_data)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        # Final actions - Is this a voice terminator?
 | 
				
			||||||
 | 
					        if (_frame_type == HBPF_DATA_SYNC) and (_dtype_vseq == HBPF_SLT_VTERM) and (self.STATUS[_slot]['RX_TYPE'] != HBPF_SLT_VTERM):
 | 
				
			||||||
 | 
					            self._targets = []
 | 
				
			||||||
 | 
					            call_duration = pkt_time - self.STATUS[_slot]['RX_START']
 | 
				
			||||||
 | 
					            logger.info('(%s) *UNIT CALL END*   STREAM ID: %s SUB: %s (%s) PEER: %s (%s) UNIT %s (%s), TS %s, Duration: %.2f', \
 | 
				
			||||||
 | 
					                    self._system, int_id(_stream_id), get_alias(_rf_src, subscriber_ids), int_id(_rf_src), get_alias(_peer_id, peer_ids), int_id(_peer_id), get_alias(_dst_id, talkgroup_ids), int_id(_dst_id), _slot, call_duration)
 | 
				
			||||||
 | 
					            if CONFIG['REPORTS']['REPORT']:
 | 
				
			||||||
 | 
					               self._report.send_bridgeEvent('UNIT VOICE,END,RX,{},{},{},{},{},{},{:.2f}'.format(self._system, int_id(_stream_id), int_id(_peer_id), int_id(_rf_src), _slot, int_id(_dst_id), call_duration).encode(encoding='utf-8', errors='ignore'))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Mark status variables for use later
 | 
				
			||||||
 | 
					        self.STATUS[_slot]['RX_PEER']      = _peer_id
 | 
				
			||||||
 | 
					        self.STATUS[_slot]['RX_SEQ']       = _seq
 | 
				
			||||||
 | 
					        self.STATUS[_slot]['RX_RFS']       = _rf_src
 | 
				
			||||||
 | 
					        self.STATUS[_slot]['RX_TYPE']      = _dtype_vseq
 | 
				
			||||||
 | 
					        self.STATUS[_slot]['RX_TGID']      = _dst_id
 | 
				
			||||||
 | 
					        self.STATUS[_slot]['RX_TIME']      = pkt_time
 | 
				
			||||||
 | 
					        self.STATUS[_slot]['RX_STREAM_ID'] = _stream_id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
 | 
					    def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user