1
0
mirror of https://github.com/craigerl/aprsd.git synced 2026-01-13 09:07:35 -05:00
aprsd/tests/threads/test_aprsd_thread.py
2025-12-09 17:20:23 -05:00

337 lines
10 KiB
Python

import threading
import time
import unittest
from aprsd.threads.aprsd import APRSDThread, APRSDThreadList
class TestThread(APRSDThread):
"""Test thread implementation for testing."""
def __init__(self, name='TestThread', should_loop=True):
super().__init__(name)
self.should_loop = should_loop
self.loop_called = False
def loop(self):
self.loop_called = True
return self.should_loop
class TestAPRSDThread(unittest.TestCase):
"""Unit tests for the APRSDThread class."""
def setUp(self):
"""Set up test fixtures."""
# Reset singleton instances
APRSDThreadList._instance = None
APRSDThreadList.threads_list = []
def tearDown(self):
"""Clean up after tests."""
# Stop all threads
thread_list = APRSDThreadList()
for thread in list(thread_list.threads_list):
thread.stop()
if thread.is_alive():
thread.join(timeout=1)
APRSDThreadList._instance = None
APRSDThreadList.threads_list = []
def test_init(self):
"""Test thread initialization."""
thread = TestThread('TestThread1')
self.assertEqual(thread.name, 'TestThread1')
self.assertFalse(thread.thread_stop)
self.assertFalse(thread._pause)
self.assertEqual(thread.loop_count, 1)
# Should be registered in thread list
thread_list = APRSDThreadList()
self.assertIn(thread, thread_list.threads_list)
def test_should_quit(self):
"""Test _should_quit() method."""
thread = TestThread('TestThread2')
self.assertFalse(thread._should_quit())
thread.thread_stop = True
self.assertTrue(thread._should_quit())
def test_pause_unpause(self):
"""Test pause() and unpause() methods."""
thread = TestThread('TestThread3')
self.assertFalse(thread._pause)
thread.pause()
self.assertTrue(thread._pause)
thread.unpause()
self.assertFalse(thread._pause)
def test_stop(self):
"""Test stop() method."""
thread = TestThread('TestThread4')
self.assertFalse(thread.thread_stop)
thread.stop()
self.assertTrue(thread.thread_stop)
def test_loop_age(self):
"""Test loop_age() method."""
import datetime
thread = TestThread('TestThread5')
age = thread.loop_age()
self.assertIsInstance(age, datetime.timedelta)
self.assertGreaterEqual(age.total_seconds(), 0)
def test_str(self):
"""Test __str__() method."""
thread = TestThread('TestThread6')
thread_str = str(thread)
self.assertIn('TestThread', thread_str)
self.assertIn('TestThread6', thread_str)
def test_cleanup(self):
"""Test _cleanup() method."""
thread = TestThread('TestThread7')
# Should not raise exception
thread._cleanup()
def test_run_loop(self):
"""Test run() method executes loop."""
thread = TestThread('TestThread8', should_loop=False)
thread.start()
thread.join(timeout=2)
self.assertTrue(thread.loop_called)
self.assertFalse(thread.is_alive())
def test_run_pause(self):
"""Test run() method with pause."""
thread = TestThread('TestThread9', should_loop=True)
thread.pause()
thread.start()
time.sleep(0.1)
thread.stop()
thread.join(timeout=1)
# Should have paused
self.assertFalse(thread.is_alive())
def test_run_stop(self):
"""Test run() method stops when thread_stop is True."""
thread = TestThread('TestThread10', should_loop=True)
thread.start()
time.sleep(0.1)
thread.stop()
thread.join(timeout=1)
self.assertFalse(thread.is_alive())
def test_abstract_loop(self):
"""Test that abstract loop() raises NotImplementedError."""
with self.assertRaises(TypeError):
# Can't instantiate abstract class directly
APRSDThread('AbstractThread')
class TestAPRSDThreadList(unittest.TestCase):
"""Unit tests for the APRSDThreadList class."""
def setUp(self):
"""Set up test fixtures."""
APRSDThreadList._instance = None
APRSDThreadList.threads_list = []
def tearDown(self):
"""Clean up after tests."""
thread_list = APRSDThreadList()
for thread in list(thread_list.threads_list):
thread.stop()
if thread.is_alive():
thread.join(timeout=1)
APRSDThreadList._instance = None
APRSDThreadList.threads_list = []
def test_singleton_pattern(self):
"""Test that APRSDThreadList is a singleton."""
list1 = APRSDThreadList()
list2 = APRSDThreadList()
self.assertIs(list1, list2)
def test_add(self):
"""Test add() method."""
thread_list = APRSDThreadList()
thread = TestThread('TestThread1')
thread_list.add(thread)
self.assertIn(thread, thread_list.threads_list)
def test_remove(self):
"""Test remove() method."""
thread_list = APRSDThreadList()
# Clear any existing threads
thread_list.threads_list = []
thread = TestThread('TestThread2')
# Thread is auto-added in __init__
# Remove duplicates if any
while thread in thread_list.threads_list:
thread_list.remove(thread)
thread_list.add(thread)
thread_list.remove(thread)
self.assertNotIn(thread, thread_list.threads_list)
def test_contains(self):
"""Test __contains__() method."""
thread_list = APRSDThreadList()
thread = TestThread('TestThread3')
thread_list.add(thread)
self.assertIn('TestThread3', thread_list)
self.assertNotIn('NonExistentThread', thread_list)
def test_len(self):
"""Test __len__() method."""
thread_list = APRSDThreadList()
# Clear any existing threads
thread_list.threads_list = []
self.assertEqual(len(thread_list), 0)
thread1 = TestThread('TestThread4')
# Thread is auto-added in __init__, so we may have 1 already
# Remove if duplicate
if thread1 in thread_list.threads_list:
thread_list.remove(thread1)
thread_list.add(thread1)
thread2 = TestThread('TestThread5')
if thread2 in thread_list.threads_list:
thread_list.remove(thread2)
thread_list.add(thread2)
self.assertEqual(len(thread_list), 2)
def test_stats(self):
"""Test stats() method."""
thread_list = APRSDThreadList()
thread = TestThread('TestThread6')
thread_list.add(thread)
stats = thread_list.stats()
self.assertIsInstance(stats, dict)
self.assertIn('TestThread6', stats)
self.assertIn('name', stats['TestThread6'])
self.assertIn('class', stats['TestThread6'])
self.assertIn('alive', stats['TestThread6'])
self.assertIn('age', stats['TestThread6'])
self.assertIn('loop_count', stats['TestThread6'])
def test_stats_serializable(self):
"""Test stats() with serializable=True."""
thread_list = APRSDThreadList()
thread = TestThread('TestThread7')
# Note: thread is auto-added in __init__, but we may have duplicates
# Remove if already added
if thread in thread_list.threads_list:
thread_list.remove(thread)
thread_list.add(thread)
stats = thread_list.stats(serializable=True)
self.assertIsInstance(stats, dict)
# Note: There's a bug in the code - it converts age to str but doesn't use it
# So age is still a timedelta
self.assertIn('TestThread7', stats)
self.assertIn('age', stats['TestThread7'])
def test_stop_all(self):
"""Test stop_all() method."""
thread_list = APRSDThreadList()
thread1 = TestThread('TestThread8')
thread2 = TestThread('TestThread9')
thread_list.add(thread1)
thread_list.add(thread2)
thread_list.stop_all()
self.assertTrue(thread1.thread_stop)
self.assertTrue(thread2.thread_stop)
def test_pause_all(self):
"""Test pause_all() method."""
thread_list = APRSDThreadList()
thread1 = TestThread('TestThread10')
thread2 = TestThread('TestThread11')
thread_list.add(thread1)
thread_list.add(thread2)
thread_list.pause_all()
self.assertTrue(thread1._pause)
self.assertTrue(thread2._pause)
def test_unpause_all(self):
"""Test unpause_all() method."""
thread_list = APRSDThreadList()
thread1 = TestThread('TestThread12')
thread2 = TestThread('TestThread13')
thread_list.add(thread1)
thread_list.add(thread2)
thread1._pause = True
thread2._pause = True
thread_list.unpause_all()
self.assertFalse(thread1._pause)
self.assertFalse(thread2._pause)
def test_info(self):
"""Test info() method."""
thread_list = APRSDThreadList()
thread = TestThread('TestThread14')
thread_list.add(thread)
info = thread_list.info()
self.assertIsInstance(info, dict)
self.assertIn('TestThread', info)
self.assertIn('alive', info['TestThread'])
self.assertIn('age', info['TestThread'])
self.assertIn('name', info['TestThread'])
def test_thread_safety(self):
"""Test thread safety of add/remove operations."""
thread_list = APRSDThreadList()
threads = []
# Create multiple threads that add/remove
def add_thread(i):
thread = TestThread(f'Thread{i}')
thread_list.add(thread)
threads.append(thread)
def remove_thread(thread):
try:
thread_list.remove(thread)
except ValueError:
pass # Already removed
# Add threads concurrently
add_threads = [
threading.Thread(target=add_thread, args=(i,)) for i in range(10)
]
for t in add_threads:
t.start()
for t in add_threads:
t.join()
# Remove threads concurrently
remove_threads = [
threading.Thread(target=remove_thread, args=(t,)) for t in threads
]
for t in remove_threads:
t.start()
for t in remove_threads:
t.join()
# Should handle concurrent access without errors
self.assertGreaterEqual(len(thread_list), 0)