mirror of
https://github.com/craigerl/aprsd.git
synced 2026-05-15 13:52:43 -04:00
test: add tests for beacon/ack flood prevention and scheduler timing guards
Tests cover: - BeaconPacket skipped in PacketTrack.tx() (fire-and-forget) - AckPacket send_count not reset when same ack already tracked - Heavy traffic scenario with 5 digi paths for same message - Scheduler timing guards prevent threadpool race conditions - Scheduler cleanup of max-retry packets - MessagePacket still allows re-send (existing behavior preserved)
This commit is contained in:
parent
d8134c4531
commit
d3281cff0b
@ -1,6 +1,6 @@
|
||||
import unittest
|
||||
|
||||
from aprsd.packets import tracker
|
||||
from aprsd.packets import core, tracker
|
||||
from tests import fake
|
||||
|
||||
|
||||
@ -218,3 +218,137 @@ class TestPacketTrack(unittest.TestCase):
|
||||
|
||||
pt.tx(fake.fake_packet(msg_number='456'))
|
||||
self.assertEqual(len(pt), 2)
|
||||
|
||||
def test_tx_skips_beacon_packet(self):
|
||||
"""Test tx() does not track BeaconPackets.
|
||||
|
||||
BeaconPackets are fire-and-forget — they never receive an ack,
|
||||
so tracking them would cause the scheduler to retransmit them
|
||||
as unwanted duplicates flooding RF.
|
||||
"""
|
||||
pt = tracker.PacketTrack()
|
||||
beacon = core.BeaconPacket(
|
||||
from_call='KFAKE',
|
||||
to_call='APRS',
|
||||
latitude=38.0,
|
||||
longitude=-121.0,
|
||||
comment='Test Beacon',
|
||||
)
|
||||
beacon.prepare(create_msg_number=True)
|
||||
initial_total = pt.total_tracked
|
||||
|
||||
pt.tx(beacon)
|
||||
|
||||
# Beacon should NOT be tracked
|
||||
self.assertEqual(len(pt), 0)
|
||||
self.assertEqual(pt.total_tracked, initial_total)
|
||||
|
||||
def test_tx_beacon_not_tracked_even_with_retry_count(self):
|
||||
"""Test tx() skips BeaconPacket regardless of retry_count setting."""
|
||||
pt = tracker.PacketTrack()
|
||||
beacon = core.BeaconPacket(
|
||||
from_call='KFAKE',
|
||||
to_call='APDW16',
|
||||
latitude=38.0,
|
||||
longitude=-121.0,
|
||||
comment='WebChat Beacon',
|
||||
)
|
||||
beacon.retry_count = 3 # Even with retries set, don't track
|
||||
beacon.prepare(create_msg_number=True)
|
||||
|
||||
pt.tx(beacon)
|
||||
|
||||
self.assertEqual(len(pt), 0)
|
||||
|
||||
def test_tx_ack_not_reset_when_already_tracked(self):
|
||||
"""Test tx() does not reset send_count for an ack already being tracked.
|
||||
|
||||
When the same message arrives via multiple digipeater paths, each
|
||||
copy triggers an ack send. The tracker must NOT reset send_count
|
||||
on the existing ack, otherwise the retry counter restarts and
|
||||
floods RF with duplicate acks.
|
||||
"""
|
||||
pt = tracker.PacketTrack()
|
||||
|
||||
# First ack for msgNo '8817'
|
||||
ack1 = core.AckPacket(
|
||||
from_call='KM6LYW',
|
||||
to_call='KM6LYW-9',
|
||||
msgNo='8817',
|
||||
)
|
||||
pt.tx(ack1)
|
||||
self.assertIn('8817', pt.data)
|
||||
self.assertEqual(pt.data['8817'].send_count, 0)
|
||||
|
||||
# Simulate ack being partially sent (scheduler incremented send_count)
|
||||
pt.data['8817'].send_count = 2
|
||||
|
||||
# Second ack for the same msgNo (from a digi copy of the message)
|
||||
ack2 = core.AckPacket(
|
||||
from_call='KM6LYW',
|
||||
to_call='KM6LYW-9',
|
||||
msgNo='8817',
|
||||
)
|
||||
pt.tx(ack2)
|
||||
|
||||
# send_count must NOT be reset to 0
|
||||
self.assertEqual(pt.data['8817'].send_count, 2)
|
||||
# total_tracked should not have incremented again
|
||||
self.assertEqual(pt.total_tracked, 1)
|
||||
|
||||
def test_tx_ack_tracked_on_first_occurrence(self):
|
||||
"""Test tx() properly tracks an ack on first occurrence."""
|
||||
pt = tracker.PacketTrack()
|
||||
ack = core.AckPacket(
|
||||
from_call='KM6LYW',
|
||||
to_call='KM6LYW-9',
|
||||
msgNo='100',
|
||||
)
|
||||
pt.tx(ack)
|
||||
|
||||
self.assertIn('100', pt.data)
|
||||
self.assertEqual(pt.data['100'].send_count, 0)
|
||||
self.assertEqual(pt.total_tracked, 1)
|
||||
|
||||
def test_tx_message_packet_still_resets_on_duplicate(self):
|
||||
"""Test that non-ack packets still get reset if sent again.
|
||||
|
||||
MessagePackets may legitimately need to be re-sent with fresh
|
||||
retry state (e.g., user re-sends a message).
|
||||
"""
|
||||
pt = tracker.PacketTrack()
|
||||
pkt = fake.fake_packet(msg_number='999')
|
||||
pt.tx(pkt)
|
||||
pt.data['999'].send_count = 2
|
||||
|
||||
# Re-sending the same message should reset
|
||||
pkt2 = fake.fake_packet(msg_number='999')
|
||||
pt.tx(pkt2)
|
||||
|
||||
self.assertEqual(pt.data['999'].send_count, 0)
|
||||
|
||||
def test_heavy_traffic_multiple_digi_paths(self):
|
||||
"""Simulate heavy traffic: same message arrives via 5 digipeater paths.
|
||||
|
||||
Each arrival triggers an ack. Only the first ack should be tracked.
|
||||
Subsequent ack sends for the same msgNo must not reset the tracker,
|
||||
preventing an ack flood on RF.
|
||||
"""
|
||||
pt = tracker.PacketTrack()
|
||||
|
||||
# Simulate 5 copies of the same message arriving via different paths
|
||||
for i in range(5):
|
||||
ack = core.AckPacket(
|
||||
from_call='KM6LYW',
|
||||
to_call='KM6LYW-9',
|
||||
msgNo='8817',
|
||||
)
|
||||
pt.tx(ack)
|
||||
|
||||
# After first add, simulate partial sending
|
||||
if i == 0:
|
||||
pt.data['8817'].send_count = 1
|
||||
|
||||
# Only tracked once, send_count preserved from after first send
|
||||
self.assertEqual(pt.total_tracked, 1)
|
||||
self.assertEqual(pt.data['8817'].send_count, 1)
|
||||
|
||||
@ -822,3 +822,208 @@ class TestBeaconSendThread(unittest.TestCase):
|
||||
mock_log.error.assert_called()
|
||||
mock_client_class.return_value.reset.assert_called()
|
||||
thread.stop()
|
||||
|
||||
|
||||
class TestSchedulerTimingGuards(unittest.TestCase):
|
||||
"""Tests for scheduler timing guards that prevent threadpool race conditions.
|
||||
|
||||
These tests verify that the scheduler does NOT submit workers to the
|
||||
threadpool when a packet was recently sent, preventing the race where
|
||||
multiple workers fire before send_count is incremented.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
from oslo_config import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.default_ack_send_count = 3
|
||||
tracker.PacketTrack._instance = None
|
||||
tracker.PacketTrack.data = {}
|
||||
tracker.PacketTrack.total_tracked = 0
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
tracker.PacketTrack._instance = None
|
||||
tracker.PacketTrack.data = {}
|
||||
tracker.PacketTrack.total_tracked = 0
|
||||
|
||||
def test_ack_scheduler_skips_recently_sent(self):
|
||||
"""AckSendSchedulerThread must not re-submit if sent < 31 seconds ago.
|
||||
|
||||
This prevents the threadpool race where multiple workers fire for
|
||||
the same ack before send_count is incremented, causing rapid-fire
|
||||
duplicate acks on RF.
|
||||
"""
|
||||
scheduler = tx.AckSendSchedulerThread(max_workers=2)
|
||||
try:
|
||||
ack_packet = fake.fake_ack_packet()
|
||||
ack_packet.send_count = 1
|
||||
ack_packet.last_send_time = int(round(time.time())) # Just sent
|
||||
|
||||
mock_tracker = mock.MagicMock()
|
||||
mock_tracker.keys.return_value = ['12']
|
||||
mock_tracker.get.return_value = ack_packet
|
||||
with mock.patch(
|
||||
'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker
|
||||
):
|
||||
with mock.patch.object(scheduler.executor, 'submit') as mock_submit:
|
||||
result = scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should NOT submit — sent too recently
|
||||
mock_submit.assert_not_called()
|
||||
finally:
|
||||
scheduler.stop()
|
||||
scheduler.executor.shutdown(wait=False)
|
||||
|
||||
def test_ack_scheduler_submits_after_31_seconds(self):
|
||||
"""AckSendSchedulerThread submits worker after 31 second cooldown."""
|
||||
scheduler = tx.AckSendSchedulerThread(max_workers=2)
|
||||
try:
|
||||
ack_packet = fake.fake_ack_packet()
|
||||
ack_packet.send_count = 1
|
||||
ack_packet.last_send_time = int(round(time.time())) - 32 # 32 seconds ago
|
||||
|
||||
mock_tracker = mock.MagicMock()
|
||||
mock_tracker.keys.return_value = ['12']
|
||||
mock_tracker.get.return_value = ack_packet
|
||||
with mock.patch(
|
||||
'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker
|
||||
):
|
||||
with mock.patch.object(scheduler.executor, 'submit') as mock_submit:
|
||||
result = scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should submit — enough time has passed
|
||||
mock_submit.assert_called_once()
|
||||
finally:
|
||||
scheduler.stop()
|
||||
scheduler.executor.shutdown(wait=False)
|
||||
|
||||
def test_ack_scheduler_submits_first_send(self):
|
||||
"""AckSendSchedulerThread submits worker on first send (no last_send_time)."""
|
||||
scheduler = tx.AckSendSchedulerThread(max_workers=2)
|
||||
try:
|
||||
ack_packet = fake.fake_ack_packet()
|
||||
ack_packet.send_count = 0
|
||||
ack_packet.last_send_time = None # Never sent
|
||||
|
||||
mock_tracker = mock.MagicMock()
|
||||
mock_tracker.keys.return_value = ['12']
|
||||
mock_tracker.get.return_value = ack_packet
|
||||
with mock.patch(
|
||||
'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker
|
||||
):
|
||||
with mock.patch.object(scheduler.executor, 'submit') as mock_submit:
|
||||
result = scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should submit — first time, no last_send_time
|
||||
mock_submit.assert_called_once()
|
||||
finally:
|
||||
scheduler.stop()
|
||||
scheduler.executor.shutdown(wait=False)
|
||||
|
||||
def test_packet_scheduler_skips_recently_sent(self):
|
||||
"""PacketSendSchedulerThread must not re-submit if sent recently.
|
||||
|
||||
Similar to ack scheduler, prevents threadpool race for message
|
||||
packets where multiple workers could fire before send_count updates.
|
||||
"""
|
||||
scheduler = tx.PacketSendSchedulerThread(max_workers=2)
|
||||
try:
|
||||
packet = fake.fake_packet(msg_number='123')
|
||||
packet.send_count = 0
|
||||
packet.retry_count = 3
|
||||
packet.last_send_time = int(round(time.time())) # Just sent
|
||||
|
||||
mock_tracker = mock.MagicMock()
|
||||
mock_tracker.keys.return_value = ['123']
|
||||
mock_tracker.get.return_value = packet
|
||||
with mock.patch(
|
||||
'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker
|
||||
):
|
||||
with mock.patch.object(scheduler.executor, 'submit') as mock_submit:
|
||||
result = scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should NOT submit — sent too recently
|
||||
mock_submit.assert_not_called()
|
||||
finally:
|
||||
scheduler.stop()
|
||||
scheduler.executor.shutdown(wait=False)
|
||||
|
||||
def test_packet_scheduler_submits_after_backoff(self):
|
||||
"""PacketSendSchedulerThread submits after exponential backoff elapses."""
|
||||
scheduler = tx.PacketSendSchedulerThread(max_workers=2)
|
||||
try:
|
||||
packet = fake.fake_packet(msg_number='123')
|
||||
packet.send_count = 1 # Second send: backoff = (1+1)*31 = 62s
|
||||
packet.retry_count = 3
|
||||
packet.last_send_time = int(round(time.time())) - 63 # 63 seconds ago
|
||||
|
||||
mock_tracker = mock.MagicMock()
|
||||
mock_tracker.keys.return_value = ['123']
|
||||
mock_tracker.get.return_value = packet
|
||||
with mock.patch(
|
||||
'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker
|
||||
):
|
||||
with mock.patch.object(scheduler.executor, 'submit') as mock_submit:
|
||||
result = scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should submit — backoff period has elapsed
|
||||
mock_submit.assert_called_once()
|
||||
finally:
|
||||
scheduler.stop()
|
||||
scheduler.executor.shutdown(wait=False)
|
||||
|
||||
def test_ack_scheduler_cleans_up_max_retries(self):
|
||||
"""AckSendSchedulerThread removes packets that hit max retries."""
|
||||
scheduler = tx.AckSendSchedulerThread(max_workers=2)
|
||||
try:
|
||||
ack_packet = fake.fake_ack_packet()
|
||||
ack_packet.send_count = 3 # At max
|
||||
|
||||
mock_tracker = mock.MagicMock()
|
||||
mock_tracker.keys.return_value = ['12']
|
||||
mock_tracker.get.return_value = ack_packet
|
||||
with mock.patch(
|
||||
'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker
|
||||
):
|
||||
with mock.patch.object(scheduler.executor, 'submit') as mock_submit:
|
||||
result = scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_submit.assert_not_called()
|
||||
# Should have called remove to clean up
|
||||
mock_tracker.remove.assert_called_once_with('12')
|
||||
finally:
|
||||
scheduler.stop()
|
||||
scheduler.executor.shutdown(wait=False)
|
||||
|
||||
def test_packet_scheduler_cleans_up_max_retries(self):
|
||||
"""PacketSendSchedulerThread removes packets that hit max retries."""
|
||||
scheduler = tx.PacketSendSchedulerThread(max_workers=2)
|
||||
try:
|
||||
packet = fake.fake_packet(msg_number='123')
|
||||
packet.send_count = 3
|
||||
packet.retry_count = 3
|
||||
|
||||
mock_tracker = mock.MagicMock()
|
||||
mock_tracker.keys.return_value = ['123']
|
||||
mock_tracker.get.return_value = packet
|
||||
with mock.patch(
|
||||
'aprsd.threads.tx.tracker.PacketTrack', return_value=mock_tracker
|
||||
):
|
||||
with mock.patch.object(scheduler.executor, 'submit') as mock_submit:
|
||||
result = scheduler.loop()
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_submit.assert_not_called()
|
||||
# Should have called remove to clean up
|
||||
mock_tracker.remove.assert_called_once_with('123')
|
||||
finally:
|
||||
scheduler.stop()
|
||||
scheduler.executor.shutdown(wait=False)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user