Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/extensions/scratch3_mesh_v2/mesh-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ class MeshV2Service {
if (!this.groupId || !this.client) return;

const unchanged = this.isDataUnchanged(dataArray);
log.info(`Mesh V2: sendData called with ${dataArray.length} items: ` +
log.debug(`Mesh V2: sendData called with ${dataArray.length} items: ` +
`${JSON.stringify(dataArray)} (unchanged: ${unchanged})`);

// Change detection
Expand Down
68 changes: 53 additions & 15 deletions src/extensions/scratch3_mesh_v2/rate-limiter.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ class RateLimiter {
this.enableMerge = options.enableMerge || false;
this.mergeKeyField = options.mergeKeyField || 'key';
this.requestCount = 0;

// Statistics
this.stats = {
totalSent: 0,
totalMerged: 0,
lastReportTime: Date.now()
};
}

/**
Expand All @@ -38,7 +45,7 @@ class RateLimiter {
this.queue.push({data, resolve, reject, sendFunction});
}

log.info(`RateLimiter: ${this.enableMerge ? 'Processed' : 'Added'} to queue ` +
log.debug(`RateLimiter: ${this.enableMerge ? 'Processed' : 'Added'} to queue ` +
`(size: ${this.queue.length}, enableMerge: ${this.enableMerge})`);

this.processQueue();
Expand All @@ -63,35 +70,64 @@ class RateLimiter {
const existingData = queueItem.data;
const mergedData = this.mergeData(existingData, dataArray);

log.info(`RateLimiter: Merging data - ` +
log.debug(`RateLimiter: Merging data - ` +
`before: ${JSON.stringify(existingData)}, ` +
`after: ${JSON.stringify(mergedData)}`);

queueItem.data = mergedData;

// Chain resolve and reject
const originalResolve = queueItem.resolve;
queueItem.resolve = result => {
originalResolve(result);
resolve(result);
};
// Use arrays to manage resolve/reject callbacks to avoid stack overflow
if (!queueItem.resolvers) {
// Convert existing resolve/reject to arrays
queueItem.resolvers = [queueItem.resolve];
queueItem.rejecters = [queueItem.reject];

// Set new resolve/reject handlers that call all functions in the arrays
queueItem.resolve = result => {
queueItem.resolvers.forEach(r => r(result));
};
queueItem.reject = error => {
queueItem.rejecters.forEach(r => r(error));
};
}

const originalReject = queueItem.reject;
queueItem.reject = error => {
originalReject(error);
reject(error);
};
// Add new callbacks to the arrays
queueItem.resolvers.push(resolve);
queueItem.rejecters.push(reject);

merged = true;
break;
}
}

if (!merged) {
if (merged) {
this.stats.totalMerged++;
this.reportStatsIfNeeded();
} else {
this.queue.push({data: dataArray, resolve, reject, sendFunction});
}
}

/**
* Report statistics periodically.
* @private
*/
reportStatsIfNeeded () {
const now = Date.now();
const elapsed = now - this.stats.lastReportTime;

// Output statistics every 10 seconds
if (elapsed >= 10000) {
log.info(`RateLimiter Stats (last ${(elapsed / 1000).toFixed(1)}s): ` +
`sent=${this.stats.totalSent}, merged=${this.stats.totalMerged}, ` +
`queue=${this.queue.length}`);

this.stats.totalSent = 0;
this.stats.totalMerged = 0;
this.stats.lastReportTime = now;
}
}

/**
* Merge two arrays of data using mergeKeyField.
* @param {Array} existingData - Existing data items.
Expand Down Expand Up @@ -161,12 +197,14 @@ class RateLimiter {

const item = this.queue.shift();
this.requestCount++;
log.info(`RateLimiter: Sending request #${this.requestCount} ` +
log.debug(`RateLimiter: Sending request #${this.requestCount} ` +
`(queue remaining: ${this.queue.length})`);

try {
const result = await item.sendFunction(item.data);
this.lastSendTime = Date.now();
this.stats.totalSent++;
this.reportStatsIfNeeded();
item.resolve(result);
} catch (error) {
item.reject(error);
Expand Down
41 changes: 41 additions & 0 deletions test/unit/scratch3_mesh_v2_rate_limiter_repro.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
const test = require('tap').test;
const RateLimiter = require('../../src/extensions/scratch3_mesh_v2/rate-limiter');
const log = require('../../src/util/log');

// Disable logging for test to avoid timeout
log.debug = () => {};
log.info = () => {};

test('RateLimiter stack overflow reproduction', {timeout: 60000}, async t => {
// maxPerSecond: 1, intervalMs: 250ms, enableMerge: true
const limiter = new RateLimiter(1, 250, {enableMerge: true});

// Immediate sendFunction
const sendFunction = d => Promise.resolve(d);

const promises = [];

// 15000 merges is usually enough to trigger stack overflow in Node.js
const MERGE_COUNT = 15000;

console.log(`Starting ${MERGE_COUNT} merges...`);

for (let i = 0; i < MERGE_COUNT; i++) {
promises.push(limiter.send([{key: 'var1', value: i}], sendFunction));
}

console.log('Finished pushing to queue. Waiting for completion...');

try {
await Promise.all(promises);
t.pass('Completed without stack overflow');
} catch (e) {
if (e.message === 'Maximum call stack size exceeded') {
t.fail(`Stack overflow occurred: ${e.message}`);
} else {
t.fail(`Failed with unexpected error: ${e.message}`);
}
}

t.end();
});
Loading