forked from confluentinc/confluent-kafka-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_buffer_timeout_manager.py
More file actions
286 lines (204 loc) · 11.4 KB
/
test_buffer_timeout_manager.py
File metadata and controls
286 lines (204 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
#!/usr/bin/env python3
"""
Unit tests for the BufferTimeoutManager class (_buffer_timeout_manager.py)
This module tests the BufferTimeoutManager class to ensure proper
buffer timeout monitoring and automatic flush handling.
"""
import asyncio
import time
import unittest
from unittest.mock import AsyncMock, Mock
from confluent_kafka.aio.producer._buffer_timeout_manager import BufferTimeoutManager
class TestBufferTimeoutManager(unittest.TestCase):
"""Test cases for BufferTimeoutManager class"""
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# Create mock dependencies
self.mock_batch_processor = Mock()
self.mock_kafka_executor = Mock()
# Configure mock methods as async
self.mock_batch_processor.flush_buffer = AsyncMock()
self.mock_batch_processor.is_buffer_empty = Mock(return_value=False)
self.mock_kafka_executor.flush_librdkafka_queue = AsyncMock(return_value=0)
# Create timeout manager with 1 second timeout
self.timeout_manager = BufferTimeoutManager(self.mock_batch_processor, self.mock_kafka_executor, timeout=1.0)
def tearDown(self):
# Stop any running timeout monitoring tasks
if hasattr(self.timeout_manager, '_timeout_task') and self.timeout_manager._timeout_task:
self.timeout_manager.stop_timeout_monitoring()
self.loop.close()
def test_initialization(self):
"""Test that BufferTimeoutManager initializes correctly"""
self.assertEqual(self.timeout_manager._batch_processor, self.mock_batch_processor)
self.assertEqual(self.timeout_manager._kafka_executor, self.mock_kafka_executor)
self.assertEqual(self.timeout_manager._timeout, 1.0)
self.assertFalse(self.timeout_manager._running)
self.assertIsNone(self.timeout_manager._timeout_task)
self.assertIsInstance(self.timeout_manager._last_activity, float)
self.assertGreater(self.timeout_manager._last_activity, 0)
def test_mark_activity(self):
"""Test that mark_activity updates the last activity timestamp"""
initial_time = self.timeout_manager._last_activity
time.sleep(0.01) # Ensure time difference
self.timeout_manager.mark_activity()
self.assertGreater(self.timeout_manager._last_activity, initial_time)
def test_start_timeout_monitoring(self):
"""Test that timeout monitoring task starts correctly"""
async def async_test():
self.timeout_manager.start_timeout_monitoring()
self.assertTrue(self.timeout_manager._running)
self.assertIsNotNone(self.timeout_manager._timeout_task)
self.assertFalse(self.timeout_manager._timeout_task.done())
# Clean up
self.timeout_manager.stop_timeout_monitoring()
self.loop.run_until_complete(async_test())
def test_stop_timeout_monitoring(self):
"""Test that timeout monitoring task stops correctly"""
async def async_test():
self.timeout_manager.start_timeout_monitoring()
self.assertTrue(self.timeout_manager._running)
self.timeout_manager.stop_timeout_monitoring()
self.assertFalse(self.timeout_manager._running)
# Task should be cancelled or None
self.assertTrue(self.timeout_manager._timeout_task is None or self.timeout_manager._timeout_task.done())
self.loop.run_until_complete(async_test())
def test_start_timeout_monitoring_disabled(self):
"""Test that timeout monitoring doesn't start when timeout is 0"""
manager = BufferTimeoutManager(self.mock_batch_processor, self.mock_kafka_executor, timeout=0) # Disabled
manager.start_timeout_monitoring()
self.assertFalse(manager._running)
self.assertIsNone(manager._timeout_task)
def test_flush_buffer_due_to_timeout(self):
"""Test that _flush_buffer_due_to_timeout calls both flush methods"""
async def async_test():
# Call the flush method
await self.timeout_manager._flush_buffer_due_to_timeout()
# Verify both flush operations were called
self.mock_batch_processor.flush_buffer.assert_called_once()
self.mock_kafka_executor.flush_librdkafka_queue.assert_called_once()
self.loop.run_until_complete(async_test())
def test_flush_buffer_due_to_timeout_order(self):
"""Test that flush operations are called in the correct order"""
call_order = []
async def track_batch_flush():
call_order.append('batch_processor')
async def track_kafka_flush(timeout=0):
call_order.append('kafka_executor')
self.mock_batch_processor.flush_buffer.side_effect = track_batch_flush
self.mock_kafka_executor.flush_librdkafka_queue.side_effect = track_kafka_flush
async def async_test():
await self.timeout_manager._flush_buffer_due_to_timeout()
# Verify order: batch processor first, then kafka executor
self.assertEqual(call_order, ['batch_processor', 'kafka_executor'])
self.loop.run_until_complete(async_test())
def test_flush_buffer_due_to_timeout_batch_processor_error(self):
"""Test that errors from batch processor are propagated"""
self.mock_batch_processor.flush_buffer.side_effect = Exception("Batch flush failed")
async def async_test():
with self.assertRaises(Exception) as context:
await self.timeout_manager._flush_buffer_due_to_timeout()
self.assertEqual(str(context.exception), "Batch flush failed")
# Batch processor flush was attempted
self.mock_batch_processor.flush_buffer.assert_called_once()
# Kafka executor flush should not be called if batch processor fails
self.mock_kafka_executor.flush_librdkafka_queue.assert_not_called()
self.loop.run_until_complete(async_test())
def test_flush_buffer_due_to_timeout_kafka_executor_error(self):
"""Test that errors from kafka executor are propagated"""
self.mock_kafka_executor.flush_librdkafka_queue.side_effect = Exception("Kafka flush failed")
async def async_test():
with self.assertRaises(Exception) as context:
await self.timeout_manager._flush_buffer_due_to_timeout()
self.assertEqual(str(context.exception), "Kafka flush failed")
# Both flush operations were attempted
self.mock_batch_processor.flush_buffer.assert_called_once()
self.mock_kafka_executor.flush_librdkafka_queue.assert_called_once()
self.loop.run_until_complete(async_test())
def test_monitor_timeout_triggers_flush(self):
"""Test that timeout monitoring triggers flush when buffer is inactive"""
async def async_test():
# Set last activity to past time (simulating inactivity)
self.timeout_manager._last_activity = time.time() - 2.0 # 2 seconds ago
# Start monitoring
self.timeout_manager.start_timeout_monitoring()
# Wait for timeout check to trigger
# Check interval for 1s timeout should be 0.5s, add buffer time
await asyncio.sleep(0.7)
# Stop monitoring
self.timeout_manager.stop_timeout_monitoring()
# Verify flush was called
self.mock_batch_processor.flush_buffer.assert_called()
self.mock_kafka_executor.flush_librdkafka_queue.assert_called()
self.loop.run_until_complete(async_test())
def test_monitor_timeout_flushes_librdkafka_even_when_buffer_empty(self):
"""Test that timeout monitoring flushes librdkafka queue even when local buffer is empty
This ensures that messages sitting in librdkafka's internal queue get delivered,
even if there are no messages in our local buffer.
"""
# Configure buffer as empty
self.mock_batch_processor.is_buffer_empty.return_value = True
async def async_test():
# Set last activity to past time
self.timeout_manager._last_activity = time.time() - 2.0
# Start monitoring
self.timeout_manager.start_timeout_monitoring()
# Wait for potential timeout check
await asyncio.sleep(0.7)
# Stop monitoring
self.timeout_manager.stop_timeout_monitoring()
# flush_buffer() returns early since buffer is empty
self.mock_batch_processor.flush_buffer.assert_called_once()
# BUT librdkafka queue flush should still happen
self.mock_kafka_executor.flush_librdkafka_queue.assert_called_once()
self.loop.run_until_complete(async_test())
def test_monitor_timeout_does_not_flush_recent_activity(self):
"""Test that timeout monitoring doesn't flush if buffer has recent activity"""
async def async_test():
# Set last activity to recent time (within timeout)
self.timeout_manager._last_activity = time.time() - 0.3 # 0.3 seconds ago
# Start monitoring
self.timeout_manager.start_timeout_monitoring()
# Wait for potential timeout check (less than timeout duration)
# 0.3s (initial) + 0.4s (sleep) = 0.7s < 1.0s timeout
await asyncio.sleep(0.4)
# Stop monitoring
self.timeout_manager.stop_timeout_monitoring()
# Verify flush was NOT called since activity is recent
self.mock_batch_processor.flush_buffer.assert_not_called()
self.mock_kafka_executor.flush_librdkafka_queue.assert_not_called()
self.loop.run_until_complete(async_test())
def test_monitor_timeout_updates_activity_after_flush(self):
"""Test that timeout monitoring updates activity timestamp after successful flush"""
async def async_test():
# Set last activity to past time
self.timeout_manager._last_activity = time.time() - 2.0
initial_time = self.timeout_manager._last_activity
# Start monitoring
self.timeout_manager.start_timeout_monitoring()
# Wait for timeout check to trigger
await asyncio.sleep(0.7)
# Stop monitoring
self.timeout_manager.stop_timeout_monitoring()
# Verify activity was updated after flush
self.assertGreater(self.timeout_manager._last_activity, initial_time)
self.loop.run_until_complete(async_test())
def test_weak_reference_cleanup(self):
"""Test that weak references allow proper garbage collection"""
async def async_test():
# Create a manager with timeout monitoring
manager = BufferTimeoutManager(self.mock_batch_processor, self.mock_kafka_executor, timeout=1.0)
manager.start_timeout_monitoring()
# Wait a bit for task to start
await asyncio.sleep(0.1)
# Get reference to the task before deleting manager
task = manager._timeout_task
self.assertIsNotNone(task)
self.assertFalse(task.done())
# Stop the manager first to ensure clean shutdown
manager.stop_timeout_monitoring()
# Wait a bit for the cancellation to take effect
await asyncio.sleep(0.8)
# Verify the task stopped when we explicitly stopped monitoring
self.assertTrue(task.done())
self.loop.run_until_complete(async_test())