Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 90 additions & 34 deletions src/extensions/scratch3_mesh_v2/mesh-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ class MeshV2Service {
this.eventBatchInterval = 250;
this.eventBatchTimer = null;

// Event queue limits
this.MAX_EVENT_QUEUE_SIZE = 100; // 最大100イベント
this.eventQueueStats = {
duplicatesSkipped: 0,
dropped: 0,
lastReportTime: Date.now()
};

// Last sent data to detect changes
this.lastSentData = {};

Expand Down Expand Up @@ -357,6 +365,14 @@ class MeshV2Service {
log.info(` Average cost per second: $${(totalCost / connectionDurationSeconds).toFixed(10)}`);
}

// 統計情報を出力
if (this.eventQueueStats &&
(this.eventQueueStats.duplicatesSkipped > 0 || this.eventQueueStats.dropped > 0)) {
log.info(`Mesh V2: Final Event Queue Stats: ` +
`duplicates skipped=${this.eventQueueStats.duplicatesSkipped}, ` +
`dropped=${this.eventQueueStats.dropped}`);
}

// キューをクリア
this.pendingBroadcasts = [];
this.batchStartTime = null;
Expand Down Expand Up @@ -480,8 +496,10 @@ class MeshV2Service {
* Called once per frame via BEFORE_STEP event.
*
* Strategy:
* - Events with offsetMs close to each other (< 1ms) are processed in the same frame (at most 1)
* - Events separated by frame intervals (>= 16.67ms) wait for real time to elapse
* - Process events whose timing has arrived (offsetMs <= elapsedMs)
* - Limit processing to a 33ms window of event time per frame to avoid spikes
* - Execute them in order (maintains event sequence)
* - Different event types don't cause RESTART (different handlers)
*/
processNextBroadcast () {
if (!this.groupId) {
Expand All @@ -501,54 +519,45 @@ class MeshV2Service {

// 処理すべきイベントを収集(タイミングが来ているもの)
const eventsToProcess = [];
let windowBase = null;

while (this.pendingBroadcasts.length > 0) {
const {event, offsetMs} = this.pendingBroadcasts[0];

// まだタイミングが来ていない場合は待機
if (offsetMs > elapsedMs) {
log.info(`Mesh V2: Waiting for event ${event.name} ` +
log.debug(`Mesh V2: Waiting for event ${event.name} ` +
`(needs ${offsetMs}ms, elapsed ${elapsedMs}ms)`);
break;
}

// 1フレーム(33ms)のウィンドウ制限を適用
// (バックログがある場合でも1フレームで大量のブロードキャストを避ける)
if (windowBase === null) {
windowBase = offsetMs;
} else if (offsetMs >= windowBase + 33) {
log.debug(`Mesh V2: Window limit reached (33ms). ` +
`Remaining events will be processed in next frames.`);
break;
}

// タイミングが来たイベントをキューから取り出し
const item = this.pendingBroadcasts.shift();
eventsToProcess.push(item);

// 次のイベントとの間隔をチェック
if (this.pendingBroadcasts.length > 0) {
const nextOffset = this.pendingBroadcasts[0].offsetMs;
const gap = nextOffset - offsetMs;

// 次のイベントが1ms以内なら同じフレームで処理対象とする(が実際には1個しか処理しない)
// それ以外は次のフレームまで待機
if (gap >= 1) {
break;
}
}
}

// 収集したイベントを処理
if (eventsToProcess.length > 0) {
// フレームごとに1つのブロードキャストのみ実行(スレッド再起動回避)
const {event, offsetMs} = eventsToProcess[0];

log.info(`Mesh V2: Broadcasting event: ${event.name} ` +
`(offset: ${offsetMs}ms, elapsed: ${elapsedMs}ms, ` +
`${eventsToProcess.length - 1} similar events batched, ` +
`${this.pendingBroadcasts.length} remaining in queue)`);

this.broadcastEvent(event);
this.lastBroadcastOffset = offsetMs;

// 1ms以内の追加イベントは次のフレームで処理
// (同じフレームで複数ブロードキャストしない制約)
eventsToProcess.slice(1)
.reverse()
.forEach(item => {
this.pendingBroadcasts.unshift(item);
});
log.info(`Mesh V2: Broadcasting ${eventsToProcess.length} events ` +
`(${this.pendingBroadcasts.length} remaining in queue)`);

eventsToProcess.forEach(({event, offsetMs}) => {
log.info(`Mesh V2: Broadcasting event: ${event.name} ` +
`(offset: ${offsetMs}ms, elapsed: ${elapsedMs}ms)`);

this.broadcastEvent(event);
this.lastBroadcastOffset = offsetMs;
});
}
}

Expand Down Expand Up @@ -815,7 +824,34 @@ class MeshV2Service {
return;
}

log.info(`Mesh V2: Queuing event for sending: ${eventName}`);
// ステップ1: 重複チェック
const isDuplicate = this.eventQueue.some(item =>
item.eventName === eventName && item.payload === payload
);

if (isDuplicate) {
this.eventQueueStats.duplicatesSkipped++;
this.reportEventStatsIfNeeded();

log.debug(`Mesh V2: Event already in queue, skipping: ${eventName}`);
return;
}

// ステップ2: サイズ制限チェック(保険)
if (this.eventQueue.length >= this.MAX_EVENT_QUEUE_SIZE) {
const dropped = this.eventQueue.shift(); // 古いイベントを破棄(FIFO)
this.eventQueueStats.dropped++;

if (this.eventQueueStats.dropped % 10 === 1) { // 10イベントごとに警告
log.warn(`Mesh V2: Event queue full (${this.MAX_EVENT_QUEUE_SIZE}). ` +
`Dropped ${this.eventQueueStats.dropped} events. ` +
`Latest: ${dropped.eventName}`);
}
}

log.debug(`Mesh V2: Queuing event for sending: ${eventName} ` +
`(queue size: ${this.eventQueue.length})`);

// キューに追加(発火日時を記録)
this.eventQueue.push({
eventName: eventName,
Expand All @@ -824,6 +860,26 @@ class MeshV2Service {
});
}

/**
* Report event queue statistics if needed (every 10 seconds).
*/
reportEventStatsIfNeeded () {
const now = Date.now();
const elapsed = now - this.eventQueueStats.lastReportTime;

if (elapsed >= 10000 &&
(this.eventQueueStats.duplicatesSkipped > 0 || this.eventQueueStats.dropped > 0)) {
log.info(`Mesh V2: Event Queue Stats (last ${(elapsed / 1000).toFixed(1)}s): ` +
`duplicates skipped=${this.eventQueueStats.duplicatesSkipped}, ` +
`dropped=${this.eventQueueStats.dropped}, ` +
`current queue size=${this.eventQueue.length}`);

this.eventQueueStats.duplicatesSkipped = 0;
this.eventQueueStats.dropped = 0;
this.eventQueueStats.lastReportTime = now;
}
}

/**
* Fetch data from all nodes in the group.
* @returns {Promise<void>} A promise that resolves when data is fetched and updated.
Expand Down
179 changes: 171 additions & 8 deletions test/unit/mesh_service_v2.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,45 @@ test('MeshV2Service Batch Events', t => {
st.end();
});

t.test('fireEvent deduplicates events', st => {
const blocks = createMockBlocks();
const service = new MeshV2Service(blocks, 'node1', 'domain1');
service.client = {mutate: () => Promise.resolve({})};
service.groupId = 'group1';

service.fireEvent('event1', 'payload1');
service.fireEvent('event1', 'payload1'); // Duplicate
service.fireEvent('event1', 'payload2'); // Different payload

st.equal(service.eventQueue.length, 2);
st.equal(service.eventQueue[0].eventName, 'event1');
st.equal(service.eventQueue[0].payload, 'payload1');
st.equal(service.eventQueue[1].eventName, 'event1');
st.equal(service.eventQueue[1].payload, 'payload2');
st.equal(service.eventQueueStats.duplicatesSkipped, 1);

st.end();
});

t.test('fireEvent respects MAX_EVENT_QUEUE_SIZE (FIFO)', st => {
const blocks = createMockBlocks();
const service = new MeshV2Service(blocks, 'node1', 'domain1');
service.client = {mutate: () => Promise.resolve({})};
service.groupId = 'group1';
service.MAX_EVENT_QUEUE_SIZE = 5;

for (let i = 0; i < 7; i++) {
service.fireEvent(`event${i}`, `payload${i}`);
}

st.equal(service.eventQueue.length, 5);
st.equal(service.eventQueue[0].eventName, 'event2');
st.equal(service.eventQueue[4].eventName, 'event6');
st.equal(service.eventQueueStats.dropped, 2);

st.end();
});

t.test('processBatchEvents sends events and clears queue', async st => {
const blocks = createMockBlocks();
const service = new MeshV2Service(blocks, 'node1', 'domain1');
Expand Down Expand Up @@ -121,7 +160,7 @@ test('MeshV2Service Batch Events', t => {
st.end();
});

t.test('processNextBroadcast processes one event per frame even for short gaps', st => {
t.test('processNextBroadcast processes events in one frame if timing arrived and within 33ms', st => {
const blocks = createMockBlocks();
const service = new MeshV2Service(blocks, 'node1', 'domain1');
service.groupId = 'group1';
Expand All @@ -147,22 +186,101 @@ test('MeshV2Service Batch Events', t => {
service.handleBatchEvent(batchEvent);
st.equal(service.pendingBroadcasts.length, 3);

// Frame 1: Should broadcast e1
// Frame 1: Should broadcast all events because offsetMs (0) <= elapsedMs (0)
// and they are within 33ms window.
service.processNextBroadcast();
st.equal(broadcasted.length, 1);
st.equal(broadcasted.length, 3);
st.equal(broadcasted[0], 'e1');
st.equal(service.pendingBroadcasts.length, 2);
st.equal(broadcasted[1], 'e2');
st.equal(broadcasted[2], 'e3');
st.equal(service.pendingBroadcasts.length, 0);
} finally {
Date.now = realDateNow;
}

st.end();
});

t.test('processNextBroadcast respects 33ms window when handling backlog', st => {
const blocks = createMockBlocks();
const service = new MeshV2Service(blocks, 'node1', 'domain1');
service.groupId = 'group1';
const broadcasted = [];
service.broadcastEvent = event => broadcasted.push(event.name);

// Events spaced 20ms apart: 0ms, 20ms, 40ms, 60ms
const batchEvent = {
firedByNodeId: 'node2',
events: [
{name: 'e1', timestamp: '2025-12-30T00:00:00.000Z'}, // offset 0
{name: 'e2', timestamp: '2025-12-30T00:00:00.020Z'}, // offset 20
{name: 'e3', timestamp: '2025-12-30T00:00:00.040Z'}, // offset 40
{name: 'e4', timestamp: '2025-12-30T00:00:00.060Z'} // offset 60
]
};

const realDateNow = Date.now;
const startTime = 1000000;
let currentTime = startTime;
Date.now = () => currentTime;

try {
service.handleBatchEvent(batchEvent);
st.equal(service.pendingBroadcasts.length, 4);

// Simulation: Backlog exists. Current time is 100ms after start.
// elapsedMs = 100. All events are technically "due".
currentTime = startTime + 100;

// Frame 2: Should broadcast e2
// Frame 1: Should process e1, e2 (within 33ms of e1). e3 is at 40ms, so it's split.
service.processNextBroadcast();
st.equal(broadcasted.length, 2);
st.equal(broadcasted[0], 'e1');
st.equal(broadcasted[1], 'e2');
st.equal(service.pendingBroadcasts.length, 1);
st.equal(service.pendingBroadcasts.length, 2);

// Frame 3: Should broadcast e3
// Frame 2: Should process e3, e4 (within 33ms of e3: 40 + 33 = 73).
service.processNextBroadcast();
st.equal(broadcasted.length, 3);
st.equal(broadcasted.length, 4);
st.equal(broadcasted[2], 'e3');
st.equal(broadcasted[3], 'e4');
st.equal(service.pendingBroadcasts.length, 0);
} finally {
Date.now = realDateNow;
}

st.end();
});

t.test('processNextBroadcast processes many simultaneous events in one frame', st => {
const blocks = createMockBlocks();
const service = new MeshV2Service(blocks, 'node1', 'domain1');
service.groupId = 'group1';
const broadcasted = [];
service.broadcastEvent = event => broadcasted.push(event.name);

// 50 events all with the same timestamp
const events = [];
for (let i = 0; i < 50; i++) {
events.push({name: `e${i}`, timestamp: '2025-12-30T00:00:00.000Z'});
}

const batchEvent = {
firedByNodeId: 'node2',
events: events
};

const realDateNow = Date.now;
const startTime = 1000000;
Date.now = () => startTime;

try {
service.handleBatchEvent(batchEvent);
st.equal(service.pendingBroadcasts.length, 50);

// All 50 should be processed in one frame because they all have offset 0
service.processNextBroadcast();
st.equal(broadcasted.length, 50);
st.equal(service.pendingBroadcasts.length, 0);
} finally {
Date.now = realDateNow;
Expand Down Expand Up @@ -232,6 +350,51 @@ test('MeshV2Service Batch Events', t => {
st.end();
});

t.test('reportEventStatsIfNeeded logs stats every 10s', st => {
const blocks = createMockBlocks();
const service = new MeshV2Service(blocks, 'node1', 'domain1');
service.groupId = 'group1';

const realDateNow = Date.now;
let currentTime = 1000000;
Date.now = () => currentTime;

try {
service.eventQueueStats.duplicatesSkipped = 5;
service.eventQueueStats.dropped = 2;
service.eventQueueStats.lastReportTime = currentTime;

// Less than 10s
currentTime += 5000;
service.reportEventStatsIfNeeded();
st.equal(service.eventQueueStats.duplicatesSkipped, 5);

// 10s or more
currentTime += 5001;
service.reportEventStatsIfNeeded();
st.equal(service.eventQueueStats.duplicatesSkipped, 0);
st.equal(service.eventQueueStats.dropped, 0);
st.equal(service.eventQueueStats.lastReportTime, currentTime);
} finally {
Date.now = realDateNow;
}

st.end();
});

t.test('cleanup reports final stats', st => {
const blocks = createMockBlocks();
const service = new MeshV2Service(blocks, 'node1', 'domain1');
service.eventQueueStats.duplicatesSkipped = 10;
service.eventQueueStats.dropped = 5;

// Note: We are just ensuring it doesn't crash and the coverage is met
// Capturing log.info would be better but requires more setup
service.cleanup();

st.end();
});

t.test('reconnect flow: events processed after reconnect', st => {
const blocks = createMockBlocks();
const service = new MeshV2Service(blocks, 'node1', 'domain1');
Expand Down
Loading
Loading