|
36 | 36 | #include "icy/util.h" |
37 | 37 |
|
38 | 38 | #include <cstring> |
| 39 | +#include <mutex> |
39 | 40 | #include <sstream> |
| 41 | +#include <thread> |
| 42 | +#include <vector> |
40 | 43 |
|
41 | 44 |
|
42 | 45 | using icy::test::Test; |
@@ -476,6 +479,89 @@ struct MockBurstPacketSource : public PacketSource |
476 | 479 | } |
477 | 480 | }; |
478 | 481 |
|
| 482 | + |
| 483 | +struct BranchLifecyclePacketSource : public PacketSource |
| 484 | + , public basic::Startable |
| 485 | +{ |
| 486 | + PacketSignal emitter; |
| 487 | + Thread runner; |
| 488 | + std::atomic<bool> running{false}; |
| 489 | + std::atomic<bool> started{false}; |
| 490 | + std::atomic<int> sent{0}; |
| 491 | + std::vector<std::string>* lifecycle = nullptr; |
| 492 | + std::mutex* lifecycleMutex = nullptr; |
| 493 | + |
| 494 | + BranchLifecyclePacketSource(std::vector<std::string>& lifecycleLog, |
| 495 | + std::mutex& lifecycleLogMutex) |
| 496 | + : PacketSource(emitter) |
| 497 | + , lifecycle(&lifecycleLog) |
| 498 | + , lifecycleMutex(&lifecycleLogMutex) |
| 499 | + { |
| 500 | + } |
| 501 | + |
| 502 | + ~BranchLifecyclePacketSource() override |
| 503 | + { |
| 504 | + stop(); |
| 505 | + } |
| 506 | + |
| 507 | + void start() override |
| 508 | + { |
| 509 | + if (running.exchange(true)) |
| 510 | + return; |
| 511 | + |
| 512 | + started = true; |
| 513 | + record("source:start"); |
| 514 | + |
| 515 | + runner.start([this]() { |
| 516 | + while (running.load()) { |
| 517 | + const std::string payload = "pkt" + std::to_string(sent.fetch_add(1)); |
| 518 | + RawPacket packet(payload.data(), payload.size()); |
| 519 | + emitter.emit(packet); |
| 520 | + icy::sleep(2); |
| 521 | + } |
| 522 | + }); |
| 523 | + } |
| 524 | + |
| 525 | + void stop() override |
| 526 | + { |
| 527 | + const bool wasRunning = running.exchange(false); |
| 528 | + if (wasRunning) |
| 529 | + record("source:stop"); |
| 530 | + |
| 531 | + if (started.exchange(false) && Thread::currentID() != runner.tid()) |
| 532 | + runner.join(); |
| 533 | + } |
| 534 | + |
| 535 | + void record(const std::string& entry) |
| 536 | + { |
| 537 | + std::lock_guard<std::mutex> guard(*lifecycleMutex); |
| 538 | + lifecycle->push_back(entry); |
| 539 | + } |
| 540 | +}; |
| 541 | + |
| 542 | + |
| 543 | +class GatedAsyncPacketQueue : public AsyncPacketQueue<> |
| 544 | +{ |
| 545 | +public: |
| 546 | + explicit GatedAsyncPacketQueue(int maxSize = 16) |
| 547 | + : AsyncPacketQueue<>(maxSize) |
| 548 | + { |
| 549 | + } |
| 550 | + |
| 551 | + std::atomic<bool> enteredDispatch{false}; |
| 552 | + std::atomic<bool> releaseDispatch{false}; |
| 553 | + |
| 554 | +protected: |
| 555 | + void dispatch(IPacket& packet) override |
| 556 | + { |
| 557 | + enteredDispatch = true; |
| 558 | + while (!releaseDispatch.load()) |
| 559 | + icy::sleep(1); |
| 560 | + |
| 561 | + AsyncPacketQueue<>::dispatch(packet); |
| 562 | + } |
| 563 | +}; |
| 564 | + |
479 | 565 | class PacketStreamTest : public Test |
480 | 566 | { |
481 | 567 | int numPackets; |
@@ -1191,6 +1277,199 @@ class PacketStreamSharedSourceBranchCloneBoundaryTest : public Test |
1191 | 1277 | }; |
1192 | 1278 |
|
1193 | 1279 |
|
| 1280 | +class PacketStreamBranchFanoutSequenceTest : public Test |
| 1281 | +{ |
| 1282 | + std::mutex receivedMutex; |
| 1283 | + std::vector<std::string> deliveryReceived; |
| 1284 | + std::vector<std::string> detectReceived; |
| 1285 | + |
| 1286 | + static std::string packetString(IPacket& packet) |
| 1287 | + { |
| 1288 | + return std::string(packet.data(), packet.size()); |
| 1289 | + } |
| 1290 | + |
| 1291 | + void run() |
| 1292 | + { |
| 1293 | + PacketSignal ingress; |
| 1294 | + PacketStream deliveryBranch("delivery-branch"); |
| 1295 | + PacketStream detectBranch("detect-branch"); |
| 1296 | + auto detectQueue = std::make_shared<AsyncPacketQueue<>>(16); |
| 1297 | + const std::vector<std::string> expected = {"frame-0", "frame-1", "frame-2", "frame-3"}; |
| 1298 | + |
| 1299 | + deliveryBranch.attachSource(ingress); |
| 1300 | + detectBranch.attachSource(ingress); |
| 1301 | + detectBranch.attach(detectQueue, 0); |
| 1302 | + |
| 1303 | + deliveryBranch.emitter += [&](IPacket& packet) { |
| 1304 | + std::lock_guard<std::mutex> guard(receivedMutex); |
| 1305 | + deliveryReceived.push_back(packetString(packet)); |
| 1306 | + }; |
| 1307 | + detectBranch.emitter += [&](IPacket& packet) { |
| 1308 | + std::lock_guard<std::mutex> guard(receivedMutex); |
| 1309 | + detectReceived.push_back(packetString(packet)); |
| 1310 | + }; |
| 1311 | + |
| 1312 | + deliveryBranch.start(); |
| 1313 | + detectBranch.start(); |
| 1314 | + |
| 1315 | + for (const auto& payload : expected) { |
| 1316 | + RawPacket packet(payload.data(), payload.size()); |
| 1317 | + ingress.emit(packet); |
| 1318 | + } |
| 1319 | + |
| 1320 | + expect(test::waitFor([&]() { |
| 1321 | + std::lock_guard<std::mutex> guard(receivedMutex); |
| 1322 | + return detectReceived.size() == expected.size(); |
| 1323 | + }, 2000)); |
| 1324 | + |
| 1325 | + deliveryBranch.close(); |
| 1326 | + detectBranch.close(); |
| 1327 | + |
| 1328 | + std::lock_guard<std::mutex> guard(receivedMutex); |
| 1329 | + expect(deliveryReceived == expected); |
| 1330 | + expect(detectReceived == expected); |
| 1331 | + expect(detectQueue->retention() == PacketRetention::Cloned); |
| 1332 | + } |
| 1333 | +}; |
| 1334 | + |
| 1335 | + |
| 1336 | +class PacketStreamBranchTeardownOrderTest : public Test |
| 1337 | +{ |
| 1338 | + std::mutex lifecycleMutex; |
| 1339 | + std::vector<std::string> lifecycle; |
| 1340 | + std::atomic<int> deliveryReceived{0}; |
| 1341 | + std::atomic<int> detectReceived{0}; |
| 1342 | + |
| 1343 | + void record(const std::string& entry) |
| 1344 | + { |
| 1345 | + std::lock_guard<std::mutex> guard(lifecycleMutex); |
| 1346 | + lifecycle.push_back(entry); |
| 1347 | + } |
| 1348 | + |
| 1349 | + int indexOf(const std::string& entry) |
| 1350 | + { |
| 1351 | + std::lock_guard<std::mutex> guard(lifecycleMutex); |
| 1352 | + for (size_t index = 0; index < lifecycle.size(); ++index) { |
| 1353 | + if (lifecycle[index] == entry) |
| 1354 | + return static_cast<int>(index); |
| 1355 | + } |
| 1356 | + |
| 1357 | + return -1; |
| 1358 | + } |
| 1359 | + |
| 1360 | + void run() |
| 1361 | + { |
| 1362 | + PacketStream sourceStream("ingress-source"); |
| 1363 | + auto source = std::make_shared<BranchLifecyclePacketSource>(lifecycle, lifecycleMutex); |
| 1364 | + PacketStream deliveryBranch("delivery-branch"); |
| 1365 | + PacketStream detectBranch("detect-branch"); |
| 1366 | + auto detectQueue = std::make_shared<AsyncPacketQueue<>>(16); |
| 1367 | + |
| 1368 | + sourceStream.attachSource(source, true); |
| 1369 | + deliveryBranch.attachSource(sourceStream.emitter); |
| 1370 | + detectBranch.attachSource(sourceStream.emitter); |
| 1371 | + detectBranch.attach(detectQueue, 0); |
| 1372 | + |
| 1373 | + deliveryBranch.emitter += [&](IPacket&) { |
| 1374 | + ++deliveryReceived; |
| 1375 | + }; |
| 1376 | + detectBranch.emitter += [&](IPacket&) { |
| 1377 | + ++detectReceived; |
| 1378 | + }; |
| 1379 | + |
| 1380 | + deliveryBranch.StateChange += [&](void*, PacketStreamState& state, const PacketStreamState&) { |
| 1381 | + if (state.id() == PacketStreamState::Closed) |
| 1382 | + record("delivery:closed"); |
| 1383 | + }; |
| 1384 | + detectBranch.StateChange += [&](void*, PacketStreamState& state, const PacketStreamState&) { |
| 1385 | + if (state.id() == PacketStreamState::Closed) |
| 1386 | + record("detect:closed"); |
| 1387 | + }; |
| 1388 | + |
| 1389 | + deliveryBranch.start(); |
| 1390 | + detectBranch.start(); |
| 1391 | + sourceStream.start(); |
| 1392 | + |
| 1393 | + expect(test::waitFor([&]() { |
| 1394 | + return deliveryReceived.load() > 0 && detectReceived.load() > 0; |
| 1395 | + }, 2000)); |
| 1396 | + |
| 1397 | + detectBranch.close(); |
| 1398 | + expect(test::waitFor([&]() { |
| 1399 | + return detectBranch.closed(); |
| 1400 | + }, 2000)); |
| 1401 | + |
| 1402 | + const int deliveryBefore = deliveryReceived.load(); |
| 1403 | + expect(test::waitFor([&]() { |
| 1404 | + return deliveryReceived.load() > deliveryBefore; |
| 1405 | + }, 2000)); |
| 1406 | + expect(source->running.load()); |
| 1407 | + expect(indexOf("source:stop") == -1); |
| 1408 | + |
| 1409 | + deliveryBranch.close(); |
| 1410 | + expect(test::waitFor([&]() { |
| 1411 | + return deliveryBranch.closed(); |
| 1412 | + }, 2000)); |
| 1413 | + expect(source->running.load()); |
| 1414 | + expect(indexOf("source:stop") == -1); |
| 1415 | + |
| 1416 | + sourceStream.close(); |
| 1417 | + expect(!source->running.load()); |
| 1418 | + |
| 1419 | + expect(indexOf("detect:closed") != -1); |
| 1420 | + expect(indexOf("delivery:closed") != -1); |
| 1421 | + expect(indexOf("source:stop") != -1); |
| 1422 | + expect(indexOf("detect:closed") < indexOf("source:stop")); |
| 1423 | + expect(indexOf("delivery:closed") < indexOf("source:stop")); |
| 1424 | + } |
| 1425 | +}; |
| 1426 | + |
| 1427 | + |
| 1428 | +class PacketStreamAsyncLateDropAfterCloseTest : public Test |
| 1429 | +{ |
| 1430 | + std::atomic<int> received{0}; |
| 1431 | + |
| 1432 | + void onPacket(IPacket&) |
| 1433 | + { |
| 1434 | + ++received; |
| 1435 | + } |
| 1436 | + |
| 1437 | + void run() |
| 1438 | + { |
| 1439 | + PacketStream stream("late-drop"); |
| 1440 | + auto queue = std::make_shared<GatedAsyncPacketQueue>(16); |
| 1441 | + |
| 1442 | + stream.attach(queue, 0); |
| 1443 | + stream.emitter += slot(this, &PacketStreamAsyncLateDropAfterCloseTest::onPacket); |
| 1444 | + stream.start(); |
| 1445 | + |
| 1446 | + stream.write(RawPacket("late", 4)); |
| 1447 | + |
| 1448 | + expect(test::waitFor([&]() { |
| 1449 | + return queue->enteredDispatch.load(); |
| 1450 | + }, 2000)); |
| 1451 | + |
| 1452 | + std::atomic<bool> closeReturned{false}; |
| 1453 | + std::thread closer([&]() { |
| 1454 | + stream.close(); |
| 1455 | + closeReturned = true; |
| 1456 | + }); |
| 1457 | + |
| 1458 | + icy::sleep(25); |
| 1459 | + queue->releaseDispatch = true; |
| 1460 | + |
| 1461 | + expect(test::waitFor([&]() { |
| 1462 | + return closeReturned.load(); |
| 1463 | + }, 2000)); |
| 1464 | + |
| 1465 | + closer.join(); |
| 1466 | + |
| 1467 | + expect(received.load() == 0); |
| 1468 | + expect(stream.closed()); |
| 1469 | + } |
| 1470 | +}; |
| 1471 | + |
| 1472 | + |
1194 | 1473 | } // namespace icy |
1195 | 1474 |
|
1196 | 1475 |
|
|
0 commit comments