Skip to content

meshV2: 大量イベント発火時のキュー肥大化を防ぐ重複排除機能 #500

@takaokouji

Description

@takaokouji

問題の概要

meshV2拡張機能で、「ずっと」ループ内でメッセージを連続発火すると、イベント送信キュー(eventQueue)が無限に肥大化し、システムが不安定になる問題があります。

再現手順

  1. meshV2拡張機能でグループを作成(ホスト)
  2. 以下のようなスクリプトを実行:
ずっと
  [メッセージ1] を送る
end
  1. 30fps(33msごと)でイベントが発火される
  2. 処理頻度は250ms → 1回で約7-8イベント処理
  3. キューに無限にイベントが追加され続ける

現状の問題点

1. eventQueue(送信キュー)に上限がない

コード: gui/scratch-vm/src/extensions/scratch3_mesh_v2/mesh-service.js:820

fireEvent (eventName, payload = '') {
    // ...
    this.eventQueue.push({
        eventName: eventName,
        payload: payload,
        firedAt: new Date().toISOString()
    });
}

問題:

  • 上限チェックがないため、無制限に追加される
  • 同じイベントが重複してキューに追加される
  • 大量発火(例:ループ内で複数イベント)すると、キューが肥大化
  • メモリ消費が増大し、システムが不安定になる

2. processBatchEvents()の処理

コード: mesh-service.js:597

async processBatchEvents () {
    if (this.eventQueue.length === 0) return;

    // キューから全イベントを取り出す
    const events = this.eventQueue.splice(0);
    log.info(`Mesh V2: Processing ${events.length} queued events for sending`);
    // ...
}

問題:

  • キュー全体を一度に取り出して処理
  • 10万イベント溜まると、すべて送信完了まで時間がかかる
  • その間も新しいイベントが追加され続ける

3. processNextBroadcast()(受信側)の処理効率

コード: mesh-service.js:535

// フレームごとに1つのブロードキャストのみ実行
const {event, offsetMs} = eventsToProcess[0];
this.broadcastEvent(event);

問題:

  • 1フレームに1イベントしか処理しない
  • 30fps = 1秒で30イベント処理
  • 大量受信(例:1秒で1000イベント)すると、処理が追いつかない

採用する解決方法

基本方針

送信側: 重複排除 + サイズ制限(ハイブリッド)

  • イベント送信キューには、1種類のメッセージ(eventName + payload)につき1つまで
  • 異なるイベントが最大100個まで格納可能
  • 上限を超えたら古いイベントを破棄(FIFO)

受信側: ループで複数イベントを一括実行

  • 1フレームに1イベントではなく、受信した複数イベントをループで一括実行
  • 理由: イベントの種類が異なれば、イベントハンドラも異なるため、RESTARTが起きない

メリット

  • ✅ キューが無限に肥大化しない
  • ✅ メモリ使用量が制限される(送信側: 最大100イベント)
  • ✅ システムがクラッシュしない
  • ✅ 受信側の処理速度が向上(複数イベント一括処理)
  • ✅ シンプルで確実

デメリット

  • ⚠️ 同じイベントを連続発火しても、1回しか送信されない
  • ⚠️ 異なるイベントが100個を超えると、古いイベントが破棄される

実装案

実装1: 送信側(重複排除 + サイズ制限)

ファイル: gui/scratch-vm/src/extensions/scratch3_mesh_v2/mesh-service.js

// constructorに定数と統計変数を追加
constructor (blocks, meshId, domain) {
    // ... 既存のコード ...

    this.eventQueue = [];
    this.eventBatchInterval = 250;
    this.eventBatchTimer = null;

    // Event queue limits
    this.MAX_EVENT_QUEUE_SIZE = 100;  // 最大100イベント
    this.eventQueueStats = {
        duplicatesSkipped: 0,
        dropped: 0,
        lastReportTime: Date.now()
    };

    // ... 残りのコード ...
}

// fireEvent()を修正
fireEvent (eventName, payload = '') {
    if (!this.groupId || !this.client) {
        log.warn(`Mesh V2: Cannot fire event ${eventName} - groupId: ${this.groupId}, client: ${!!this.client}`);
        return;
    }

    // ステップ1: 重複チェック
    const isDuplicate = this.eventQueue.some(item =>
        item.eventName === eventName && item.payload === payload
    );

    if (isDuplicate) {
        this.eventQueueStats.duplicatesSkipped++;
        this.reportEventStatsIfNeeded();

        log.debug(`Mesh V2: Event already in queue, skipping: ${eventName}`);
        return;
    }

    // ステップ2: サイズ制限チェック(保険)
    if (this.eventQueue.length >= this.MAX_EVENT_QUEUE_SIZE) {
        const dropped = this.eventQueue.shift();  // 古いイベントを破棄(FIFO)
        this.eventQueueStats.dropped++;

        if (this.eventQueueStats.dropped % 10 === 1) {  // 10イベントごとに警告
            log.warn(`Mesh V2: Event queue full (${this.MAX_EVENT_QUEUE_SIZE}). ` +
                `Dropped ${this.eventQueueStats.dropped} events. ` +
                `Latest: ${dropped.eventName}`);
        }
    }

    log.debug(`Mesh V2: Queuing event for sending: ${eventName} ` +
        `(queue size: ${this.eventQueue.length})`);

    this.eventQueue.push({
        eventName: eventName,
        payload: payload,
        firedAt: new Date().toISOString()
    });
}

// 統計情報レポート(10秒ごと)
reportEventStatsIfNeeded () {
    const now = Date.now();
    const elapsed = now - this.eventQueueStats.lastReportTime;

    if (elapsed >= 10000 &&
        (this.eventQueueStats.duplicatesSkipped > 0 || this.eventQueueStats.dropped > 0)) {
        log.info(`Mesh V2: Event Queue Stats (last ${(elapsed / 1000).toFixed(1)}s): ` +
            `duplicates skipped=${this.eventQueueStats.duplicatesSkipped}, ` +
            `dropped=${this.eventQueueStats.dropped}, ` +
            `current queue size=${this.eventQueue.length}`);

        this.eventQueueStats.duplicatesSkipped = 0;
        this.eventQueueStats.dropped = 0;
        this.eventQueueStats.lastReportTime = now;
    }
}

// cleanup()に統計情報出力を追加
cleanup () {
    // 統計情報を出力
    if (this.eventQueueStats &&
        (this.eventQueueStats.duplicatesSkipped > 0 || this.eventQueueStats.dropped > 0)) {
        log.info(`Mesh V2: Final Event Queue Stats: ` +
            `duplicates skipped=${this.eventQueueStats.duplicatesSkipped}, ` +
            `dropped=${this.eventQueueStats.dropped}`);
    }

    // キューをクリア
    this.pendingBroadcasts = [];
    this.batchStartTime = null;
    this.lastBroadcastOffset = 0;

    // ... 既存のクリーンアップコード ...
}

計算量:

  • 重複チェック: O(n) where n = eventQueue.length
  • 最悪ケース: キューに100イベント → 100回の比較
  • 実用上問題なし(250msに1回のみ実行)

実装2: 受信側(ループで複数イベント一括実行)

ファイル: gui/scratch-vm/src/extensions/scratch3_mesh_v2/mesh-service.js

/**
 * Process pending broadcast events that should fire based on elapsed real time.
 * Called once per frame via BEFORE_STEP event.
 *
 * Strategy:
 * - Process all events whose timing has arrived (offsetMs <= elapsedMs)
 * - Execute them in order (maintains event sequence)
 * - Different event types don't cause RESTART (different handlers)
 */
processNextBroadcast () {
    if (!this.groupId) {
        // 切断されている場合はなにもしない
        return;
    }

    if (this.pendingBroadcasts.length === 0) {
        // キューが空になったらリセット
        this.batchStartTime = null;
        this.lastBroadcastOffset = 0;
        return;
    }

    const now = Date.now();
    const elapsedMs = this.batchStartTime ? now - this.batchStartTime : 0;

    // 処理すべきイベントを収集(タイミングが来ているもの)
    const eventsToProcess = [];

    while (this.pendingBroadcasts.length > 0) {
        const {event, offsetMs} = this.pendingBroadcasts[0];

        // まだタイミングが来ていない場合は待機
        if (offsetMs > elapsedMs) {
            log.debug(`Mesh V2: Waiting for event ${event.name} ` +
                `(needs ${offsetMs}ms, elapsed ${elapsedMs}ms)`);
            break;
        }

        // タイミングが来たイベントをキューから取り出し
        const item = this.pendingBroadcasts.shift();
        eventsToProcess.push(item);
    }

    // 収集したイベントをすべて処理
    if (eventsToProcess.length > 0) {
        log.info(`Mesh V2: Broadcasting ${eventsToProcess.length} events ` +
            `(${this.pendingBroadcasts.length} remaining in queue)`);

        eventsToProcess.forEach(({event, offsetMs}) => {
            log.info(`Mesh V2: Broadcasting event: ${event.name} ` +
                `(offset: ${offsetMs}ms, elapsed: ${elapsedMs}ms)`);

            this.broadcastEvent(event);
            this.lastBroadcastOffset = offsetMs;
        });
    }
}

変更点:

  • ❌ 削除: 1フレームに1イベントのみ処理する制約
  • ✅ 追加: forEachループですべてのイベントを一括処理
  • ✅ 理由: イベント種類が異なればハンドラも異なるため、RESTARTが起きない

パフォーマンス:

  • 従来: 1フレームに1イベント = 30イベント/秒(30fps)
  • 改善後: 1フレームに複数イベント = 処理速度が大幅向上
  • 例: 100イベント受信 → 従来3.3秒、改善後0.1秒以内

テスト方法

テスト1: 同じイベントを連続発火(重複排除)

ずっと
  [メッセージ1] を送る
end

期待結果:

  • eventQueueには常に1つのイベントのみ
  • 250msごとに1回送信
  • duplicatesSkipped が増加
  • システムが安定動作

テスト2: 異なるイベントを発火

ずっと
  [メッセージ1] を送る
  [メッセージ2] を送る
  [メッセージ3] を送る
end

期待結果:

  • eventQueueには最大3つのイベント
  • 250msごとに3つ送信
  • duplicatesSkipped は増加しない

テスト3: 大量の異なるイベント(サイズ制限)

// 150種類の異なるイベントを発火
for (let i = 0; i < 150; i++) {
    this.meshService.fireEvent(`message${i}`);
}

期待結果:

  • eventQueueが100で上限に達する
  • 古い50イベントが破棄される
  • 警告ログが出力される(10イベントごと)
  • システムがクラッシュしない

テスト4: 受信側の一括処理

準備:

  • ホスト: グループ作成
  • メンバー: グループ参加

ホスト側:

[メッセージ1] を送る
[メッセージ2] を送る
[メッセージ3] を送る

メンバー側:

[メッセージ1] を受け取ったとき
  (変数1) を (1) にする

[メッセージ2] を受け取ったとき
  (変数2) を (1) にする

[メッセージ3] を受け取ったとき
  (変数3) を (1) にする

期待結果:

  • メンバー側で3つのイベントが同一フレーム内で処理される
  • 変数1, 変数2, 変数3がすべて1になる
  • ログに「Broadcasting 3 events」と表示される

パラメータ設定値

パラメータ 理由
MAX_EVENT_QUEUE_SIZE 100 250ms × 400回 = 100秒分のバッファ(実用上十分)
eventBatchInterval 250ms 既存の設定を維持
MAX_BATCH_SIZE 1000 GraphQLペイロードサイズ制限(既存)

優先度

High - 大量イベント発火時にシステムが不安定になる実用上の問題

影響範囲

  • meshV2拡張機能を使用するすべてのプロジェクト
  • 特に「ずっと」ループ内でメッセージを発火するケース
  • センサーデータやゲームイベントをリアルタイム共有するケース

関連Issue


🤖 Generated with Claude Code

Co-Authored-By: Claude Sonnet 4.5 noreply@anthropic.com

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions