From d3281cff0b9ca8b3f6648d6978f1489dca04e989 Mon Sep 17 00:00:00 2001 From: Walter Boring Date: Wed, 13 May 2026 11:44:49 -0400 Subject: [PATCH] test: add tests for beacon/ack flood prevention and scheduler timing guards Tests cover: - BeaconPacket skipped in PacketTrack.tx() (fire-and-forget) - AckPacket send_count not reset when same ack already tracked - Heavy traffic scenario with 5 digi paths for same message - Scheduler timing guards prevent threadpool race conditions - Scheduler cleanup of max-retry packets - MessagePacket still allows re-send (existing behavior preserved) --- tests/packets/test_tracker.py | 136 +++++++++++++++++++++- tests/threads/test_tx.py | 205 ++++++++++++++++++++++++++++++++++ 2 files changed, 340 insertions(+), 1 deletion(-) diff --git a/tests/packets/test_tracker.py b/tests/packets/test_tracker.py index c59c96c..bb44624 100644 --- a/tests/packets/test_tracker.py +++ b/tests/packets/test_tracker.py @@ -1,6 +1,6 @@ import unittest -from aprsd.packets import tracker +from aprsd.packets import core, tracker from tests import fake @@ -218,3 +218,137 @@ class TestPacketTrack(unittest.TestCase): pt.tx(fake.fake_packet(msg_number='456')) self.assertEqual(len(pt), 2) + + def test_tx_skips_beacon_packet(self): + """Test tx() does not track BeaconPackets. + + BeaconPackets are fire-and-forget — they never receive an ack, + so tracking them would cause the scheduler to retransmit them + as unwanted duplicates flooding RF. + """ + pt = tracker.PacketTrack() + beacon = core.BeaconPacket( + from_call='KFAKE', + to_call='APRS', + latitude=38.0, + longitude=-121.0, + comment='Test Beacon', + ) + beacon.prepare(create_msg_number=True) + initial_total = pt.total_tracked + + pt.tx(beacon) + + # Beacon should NOT be tracked + self.assertEqual(len(pt), 0) + self.assertEqual(pt.total_tracked, initial_total) + + def test_tx_beacon_not_tracked_even_with_retry_count(self): + """Test tx() skips BeaconPacket regardless of retry_count setting.""" + pt = tracker.PacketTrack() + beacon = core.BeaconPacket( + from_call='KFAKE', + to_call='APDW16', + latitude=38.0, + longitude=-121.0, + comment='WebChat Beacon', + ) + beacon.retry_count = 3 # Even with retries set, don't track + beacon.prepare(create_msg_number=True) + + pt.tx(beacon) + + self.assertEqual(len(pt), 0) + + def test_tx_ack_not_reset_when_already_tracked(self): + """Test tx() does not reset send_count for an ack already being tracked. + + When the same message arrives via multiple digipeater paths, each + copy triggers an ack send. The tracker must NOT reset send_count + on the existing ack, otherwise the retry counter restarts and + floods RF with duplicate acks. + """ + pt = tracker.PacketTrack() + + # First ack for msgNo '8817' + ack1 = core.AckPacket( + from_call='KM6LYW', + to_call='KM6LYW-9', + msgNo='8817', + ) + pt.tx(ack1) + self.assertIn('8817', pt.data) + self.assertEqual(pt.data['8817'].send_count, 0) + + # Simulate ack being partially sent (scheduler incremented send_count) + pt.data['8817'].send_count = 2 + + # Second ack for the same msgNo (from a digi copy of the message) + ack2 = core.AckPacket( + from_call='KM6LYW', + to_call='KM6LYW-9', + msgNo='8817', + ) + pt.tx(ack2) + + # send_count must NOT be reset to 0 + self.assertEqual(pt.data['8817'].send_count, 2) + # total_tracked should not have incremented again + self.assertEqual(pt.total_tracked, 1) + + def test_tx_ack_tracked_on_first_occurrence(self): + """Test tx() properly tracks an ack on first occurrence.""" + pt = tracker.PacketTrack() + ack = core.AckPacket( + from_call='KM6LYW', + to_call='KM6LYW-9', + msgNo='100', + ) + pt.tx(ack) + + self.assertIn('100', pt.data) + self.assertEqual(pt.data['100'].send_count, 0) + self.assertEqual(pt.total_tracked, 1) + + def test_tx_message_packet_still_resets_on_duplicate(self): + """Test that non-ack packets still get reset if sent again. + + MessagePackets may legitimately need to be re-sent with fresh + retry state (e.g., user re-sends a message). + """ + pt = tracker.PacketTrack() + pkt = fake.fake_packet(msg_number='999') + pt.tx(pkt) + pt.data['999'].send_count = 2 + + # Re-sending the same message should reset + pkt2 = fake.fake_packet(msg_number='999') + pt.tx(pkt2) + + self.assertEqual(pt.data['999'].send_count, 0) + + def test_heavy_traffic_multiple_digi_paths(self): + """Simulate heavy traffic: same message arrives via 5 digipeater paths. + + Each arrival triggers an ack. Only the first ack should be tracked. + Subsequent ack sends for the same msgNo must not reset the tracker, + preventing an ack flood on RF. + """ + pt = tracker.PacketTrack() + + # Simulate 5 copies of the same message arriving via different paths + for i in range(5): + ack = core.AckPacket( + from_call='KM6LYW', + to_call='KM6LYW-9', + msgNo='8817', + ) + pt.tx(ack) + + # After first add, simulate partial sending + if i == 0: + pt.data['8817'].send_count = 1 + + # Only tracked once, send_count preserved from after first send + self.assertEqual(pt.total_tracked, 1) + self.assertEqual(pt.data['8817'].send_count, 1) diff --git a/tests/threads/test_tx.py b/tests/threads/test_tx.py index 1c9730d..3b89aa5 100644 --- a/tests/threads/test_tx.py +++ b/tests/threads/test_tx.py @@ -822,3 +822,208 @@ class TestBeaconSendThread(unittest.TestCase): mock_log.error.assert_called() mock_client_class.return_value.reset.assert_called() thread.stop() + + +class TestSchedulerTimingGuards(unittest.TestCase): + """Tests for scheduler timing guards that prevent threadpool race conditions. + + These tests verify that the scheduler does NOT submit workers to the + threadpool when a packet was recently sent, preventing the race where + multiple workers fire before send_count is incremented. + """ + + def setUp(self): + """Set up test fixtures.""" + from oslo_config import cfg + + CONF = cfg.CONF + CONF.default_ack_send_count = 3 + tracker.PacketTrack._instance = None + tracker.PacketTrack.data = {} + tracker.PacketTrack.total_tracked = 0 + + def tearDown(self): + """Clean up after tests.""" + tracker.PacketTrack._instance = None + tracker.PacketTrack.data = {} + tracker.PacketTrack.total_tracked = 0 + + def test_ack_scheduler_skips_recently_sent(self): + """AckSendSchedulerThread must not re-submit if sent < 31 seconds ago. + + This prevents the threadpool race where multiple workers fire for + the same ack before send_count is incremented, causing rapid-fire + duplicate acks on RF. + """ + scheduler = tx.AckSendSchedulerThread(max_workers=2) + try: + ack_packet = fake.fake_ack_packet() + ack_packet.send_count = 1 + ack_packet.last_send_time = int(round(time.time())) # Just sent + + mock_tracker = mock.MagicMock() + mock_tracker.keys.return_value = ['12'] + mock_tracker.get.return_value = ack_packet + with mock.patch( + 'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker + ): + with mock.patch.object(scheduler.executor, 'submit') as mock_submit: + result = scheduler.loop() + + self.assertTrue(result) + # Should NOT submit — sent too recently + mock_submit.assert_not_called() + finally: + scheduler.stop() + scheduler.executor.shutdown(wait=False) + + def test_ack_scheduler_submits_after_31_seconds(self): + """AckSendSchedulerThread submits worker after 31 second cooldown.""" + scheduler = tx.AckSendSchedulerThread(max_workers=2) + try: + ack_packet = fake.fake_ack_packet() + ack_packet.send_count = 1 + ack_packet.last_send_time = int(round(time.time())) - 32 # 32 seconds ago + + mock_tracker = mock.MagicMock() + mock_tracker.keys.return_value = ['12'] + mock_tracker.get.return_value = ack_packet + with mock.patch( + 'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker + ): + with mock.patch.object(scheduler.executor, 'submit') as mock_submit: + result = scheduler.loop() + + self.assertTrue(result) + # Should submit — enough time has passed + mock_submit.assert_called_once() + finally: + scheduler.stop() + scheduler.executor.shutdown(wait=False) + + def test_ack_scheduler_submits_first_send(self): + """AckSendSchedulerThread submits worker on first send (no last_send_time).""" + scheduler = tx.AckSendSchedulerThread(max_workers=2) + try: + ack_packet = fake.fake_ack_packet() + ack_packet.send_count = 0 + ack_packet.last_send_time = None # Never sent + + mock_tracker = mock.MagicMock() + mock_tracker.keys.return_value = ['12'] + mock_tracker.get.return_value = ack_packet + with mock.patch( + 'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker + ): + with mock.patch.object(scheduler.executor, 'submit') as mock_submit: + result = scheduler.loop() + + self.assertTrue(result) + # Should submit — first time, no last_send_time + mock_submit.assert_called_once() + finally: + scheduler.stop() + scheduler.executor.shutdown(wait=False) + + def test_packet_scheduler_skips_recently_sent(self): + """PacketSendSchedulerThread must not re-submit if sent recently. + + Similar to ack scheduler, prevents threadpool race for message + packets where multiple workers could fire before send_count updates. + """ + scheduler = tx.PacketSendSchedulerThread(max_workers=2) + try: + packet = fake.fake_packet(msg_number='123') + packet.send_count = 0 + packet.retry_count = 3 + packet.last_send_time = int(round(time.time())) # Just sent + + mock_tracker = mock.MagicMock() + mock_tracker.keys.return_value = ['123'] + mock_tracker.get.return_value = packet + with mock.patch( + 'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker + ): + with mock.patch.object(scheduler.executor, 'submit') as mock_submit: + result = scheduler.loop() + + self.assertTrue(result) + # Should NOT submit — sent too recently + mock_submit.assert_not_called() + finally: + scheduler.stop() + scheduler.executor.shutdown(wait=False) + + def test_packet_scheduler_submits_after_backoff(self): + """PacketSendSchedulerThread submits after exponential backoff elapses.""" + scheduler = tx.PacketSendSchedulerThread(max_workers=2) + try: + packet = fake.fake_packet(msg_number='123') + packet.send_count = 1 # Second send: backoff = (1+1)*31 = 62s + packet.retry_count = 3 + packet.last_send_time = int(round(time.time())) - 63 # 63 seconds ago + + mock_tracker = mock.MagicMock() + mock_tracker.keys.return_value = ['123'] + mock_tracker.get.return_value = packet + with mock.patch( + 'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker + ): + with mock.patch.object(scheduler.executor, 'submit') as mock_submit: + result = scheduler.loop() + + self.assertTrue(result) + # Should submit — backoff period has elapsed + mock_submit.assert_called_once() + finally: + scheduler.stop() + scheduler.executor.shutdown(wait=False) + + def test_ack_scheduler_cleans_up_max_retries(self): + """AckSendSchedulerThread removes packets that hit max retries.""" + scheduler = tx.AckSendSchedulerThread(max_workers=2) + try: + ack_packet = fake.fake_ack_packet() + ack_packet.send_count = 3 # At max + + mock_tracker = mock.MagicMock() + mock_tracker.keys.return_value = ['12'] + mock_tracker.get.return_value = ack_packet + with mock.patch( + 'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker + ): + with mock.patch.object(scheduler.executor, 'submit') as mock_submit: + result = scheduler.loop() + + self.assertTrue(result) + mock_submit.assert_not_called() + # Should have called remove to clean up + mock_tracker.remove.assert_called_once_with('12') + finally: + scheduler.stop() + scheduler.executor.shutdown(wait=False) + + def test_packet_scheduler_cleans_up_max_retries(self): + """PacketSendSchedulerThread removes packets that hit max retries.""" + scheduler = tx.PacketSendSchedulerThread(max_workers=2) + try: + packet = fake.fake_packet(msg_number='123') + packet.send_count = 3 + packet.retry_count = 3 + + mock_tracker = mock.MagicMock() + mock_tracker.keys.return_value = ['123'] + mock_tracker.get.return_value = packet + with mock.patch( + 'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker + ): + with mock.patch.object(scheduler.executor, 'submit') as mock_submit: + result = scheduler.loop() + + self.assertTrue(result) + mock_submit.assert_not_called() + # Should have called remove to clean up + mock_tracker.remove.assert_called_once_with('123') + finally: + scheduler.stop() + scheduler.executor.shutdown(wait=False)