diff --git a/bridge_master.py b/bridge_master.py index 7a9db59..63d9867 100755 --- a/bridge_master.py +++ b/bridge_master.py @@ -42,6 +42,11 @@ import re from twisted.internet.protocol import Factory, Protocol from twisted.protocols.basic import NetstringReceiver from twisted.internet import reactor, task +#We're going to *try* and be thread safe +from twisted.python import threadable +threadable.init(1) + +from threading import Semaphore # Things we import from the main hblink module from hblink import HBSYSTEM, OPENBRIDGE, systems, hblink_handler, reportFactory, REPORT_OPCODES, mk_aliases @@ -264,9 +269,21 @@ def stream_trimmer_loop(): else: logger.error('(%s) Attemped to remove OpenBridge Stream ID %s not in the Stream ID list: %s', system, int_id(stream_id), [id for id in systems[system].STATUS]) +def threadIdent(): + logger.debug('(IDENT) starting ident thread') + reactor.callInThread(ident) + +def threadedMysql(): + logger.debug('(MYSQL) Starting MySQL thread') + if not mysql_sema.acquire(blocking = False): + logger.debug('(MYSQL) Previous thread is still running (can\'t acquire semaphore). Try next iteration') + return + reactor.callInThread(mysql_config_check) + mysql_sema.release() + def ident(): for system in systems: - if CONFIG['SYSTEMS'][system]['MODE'] == 'OPENBRIDGE': + if CONFIG['SYSTEMS'][system]['MODE'] != 'MASTER': continue if CONFIG['SYSTEMS'][system]['VOICE_IDENT'] == True: #We only care about slot 2 - idents go out on slot 2 @@ -281,6 +298,8 @@ def ident(): for character in _systemcs: _say.append(words[character]) _say.append(words['silence']) + #test + #_say.append(AMBEobj.readSingleFile('44xx.ambe')) speech = pkt_gen(bytes_3(16777215), bytes_3(16777215), bytes_4(16777215), 1, _say) sleep(1) @@ -291,7 +310,8 @@ def ident(): break #Packet every 60ms sleep(0.058) - systems[system].send_system(pkt) + #Twisted is not thread safe. We need to call this in the reactor main thread + reactor.callFromThread(systems[system].send_system,pkt) def mysql_config_check(): logger.debug('(MYSQL) Periodic config check') @@ -765,17 +785,24 @@ class routerHBP(HBSYSTEM): _say.append(words[num]) speech = pkt_gen(bytes_3(9), bytes_3(9), bytes_4(9), 1, _say) - - sleep(1) - while True: - try: - pkt = next(speech) - except StopIteration: - break - #Packet every 60ms - sleep(0.058) - self.send_system(pkt) - #print(len(pkt), pkt[4], pkt) + + #Nested function - see below + def sendSpeech(self,speech): + sleep(1) + while True: + try: + pkt = next(speech) + except StopIteration: + break + #Packet every 60ms + sleep(0.058) + #Call the actual packet send in the reactor thread + #as it's not thread safe + reactor.callFromThread(self.send_system,pkt) + #print(len(pkt), pkt[4], pkt) + + #call speech in a thread as it contains sleep() and hence could block the reactor + reactor.callInThread(sendSpeech,self,speech) # Mark status variables for use later self.STATUS[_slot]['RX_PEER'] = _peer_id @@ -1210,13 +1237,16 @@ if __name__ == '__main__': stream_trimmer.addErrback(loopingErrHandle) # Ident - ident_task = task.LoopingCall(ident) + #This runs in a thread so as not to block the reactor + ident_task = task.LoopingCall(threadIdent) ident = ident_task.start(900) ident.addErrback(loopingErrHandle) #Mysql config checker + #This runs in a thread so as not to block the reactor if CONFIG['MYSQL']['USE_MYSQL'] == True: - mysql_task = task.LoopingCall(mysql_config_check) + mysql_sema = Semaphore(value=1) + mysql_task = task.LoopingCall(threadedMysql) mysql = mysql_task.start(60) mysql.addErrback(loopingErrHandle) diff --git a/read_ambe.py b/read_ambe.py index 0981570..4a3ed86 100644 --- a/read_ambe.py +++ b/read_ambe.py @@ -63,6 +63,7 @@ class readAMBE: ]) return _wordBADict + #Read a single ambe file from the audio directory def readSingleFile(self,filename): ambeBytearray = {} _wordBitarray = bitarray(endian='big')