Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/extensions/scratch3_mesh_v2/mesh-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ class MeshV2Service {
this._reportDataBound = this._reportData.bind(this);

this.disconnectCallback = null;

// Cost tracking
this.costTracking = {
connectionStartTime: null,
queryCount: 0, // LIST_GROUPS_BY_DOMAIN, LIST_GROUP_STATUSES
mutationCount: 0, // CREATE_DOMAIN, CREATE_GROUP, JOIN_GROUP, etc.
heartbeatCount: 0, // RENEW_HEARTBEAT, SEND_MEMBER_HEARTBEAT
reportDataCount: 0, // REPORT_DATA
fireEventsCount: 0, // FIRE_EVENTS
dataUpdateReceived: 0, // ON_DATA_UPDATE
batchEventReceived: 0, // ON_BATCH_EVENT
dissolveReceived: 0 // ON_GROUP_DISSOLVE
};
}

/**
Expand Down Expand Up @@ -137,6 +150,7 @@ class MeshV2Service {
if (!this.client) throw new Error('Client not initialized');

try {
this.costTracking.mutationCount++;
const result = await this.client.mutate({
mutation: CREATE_DOMAIN
});
Expand All @@ -158,6 +172,7 @@ class MeshV2Service {
await this.createDomain();
}

this.costTracking.mutationCount++;
const result = await this.client.mutate({
mutation: CREATE_GROUP,
variables: {
Expand All @@ -174,6 +189,7 @@ class MeshV2Service {
this.expiresAt = group.expiresAt;
this.isHost = true;

this.costTracking.connectionStartTime = Date.now();
this.startSubscriptions();
this.startHeartbeat();
this.startEventBatchTimer();
Expand All @@ -198,6 +214,7 @@ class MeshV2Service {
await this.createDomain();
}

this.costTracking.queryCount++;
const result = await this.client.query({
query: LIST_GROUPS_BY_DOMAIN,
variables: {
Expand All @@ -218,6 +235,7 @@ class MeshV2Service {
if (!this.client) throw new Error('Client not initialized');

try {
this.costTracking.mutationCount++;
const result = await this.client.mutate({
mutation: JOIN_GROUP,
variables: {
Expand All @@ -237,6 +255,7 @@ class MeshV2Service {
this.memberHeartbeatInterval = node.heartbeatIntervalSeconds;
}

this.costTracking.connectionStartTime = Date.now();
this.startSubscriptions();
this.startHeartbeat(); // Start heartbeat for member too
this.startEventBatchTimer();
Expand Down Expand Up @@ -269,6 +288,7 @@ class MeshV2Service {

try {
if (isHost) {
this.costTracking.mutationCount++;
await this.client.mutate({
mutation: DISSOLVE_GROUP,
variables: {
Expand All @@ -279,6 +299,7 @@ class MeshV2Service {
});
log.info(`Mesh V2: Dissolved group ${groupId}`);
} else {
this.costTracking.mutationCount++;
await this.client.mutate({
mutation: LEAVE_GROUP,
variables: {
Expand All @@ -295,6 +316,47 @@ class MeshV2Service {
}

cleanup () {
// コスト計算とログ出力
if (this.costTracking.connectionStartTime) {
const connectionDurationSeconds = (Date.now() - this.costTracking.connectionStartTime) / 1000;
const connectionDurationMinutes = connectionDurationSeconds / 60;

// Query/Mutation costs
const queryCost = this.costTracking.queryCount * 0.000004;
const mutationCost = this.costTracking.mutationCount * 0.000004;

// Subscription message costs
const dataUpdateCost = this.costTracking.dataUpdateReceived * 0.000002;
const batchEventCost = this.costTracking.batchEventReceived * 0.000002;
const dissolveCost = this.costTracking.dissolveReceived * 0.000002;

// Subscription connection cost (3 subscriptions)
const connectionCost = (connectionDurationMinutes / 1000000) * 3 * 0.08;

const totalCost = queryCost + mutationCost + dataUpdateCost + batchEventCost +
dissolveCost + connectionCost;

log.info(`Mesh V2: Cost Summary for ${connectionDurationMinutes.toFixed(2)} minutes connection`);
log.info(` Role: ${this.isHost ? 'Host' : 'Member'}`);
log.info(` Queries: ${this.costTracking.queryCount} ops = $${queryCost.toFixed(8)}`);
log.info(` Mutations: ${this.costTracking.mutationCount} ops = $${mutationCost.toFixed(8)}`);
log.info(` - Heartbeats: ${this.costTracking.heartbeatCount}`);
log.info(` - REPORT_DATA: ${this.costTracking.reportDataCount}`);
log.info(` - FIRE_EVENTS: ${this.costTracking.fireEventsCount}`);
log.info(` Subscription Messages:`);
log.info(` - Data Updates: ${this.costTracking.dataUpdateReceived} msgs = ` +
`$${dataUpdateCost.toFixed(8)}`);
log.info(` - Batch Events: ${this.costTracking.batchEventReceived} msgs = ` +
`$${batchEventCost.toFixed(8)}`);
log.info(` - Dissolve: ${this.costTracking.dissolveReceived} msgs = ` +
`$${dissolveCost.toFixed(8)}`);
log.info(` Subscription Connection: ${connectionDurationMinutes.toFixed(2)} min × 3 = ` +
`$${connectionCost.toFixed(8)}`);
log.info(` TOTAL ESTIMATED COST: $${totalCost.toFixed(8)} ` +
`(${(totalCost * 1000000).toFixed(2)} per million operations equivalent)`);
log.info(` Average cost per second: $${(totalCost / connectionDurationSeconds).toFixed(10)}`);
}

// キューをクリア
this.pendingBroadcasts = [];
this.batchStartTime = null;
Expand Down Expand Up @@ -342,6 +404,7 @@ class MeshV2Service {
variables
}).subscribe({
next: () => {
this.costTracking.dissolveReceived++;
log.info('Mesh V2: Group dissolved by host');
this.cleanupAndDisconnect();
},
Expand All @@ -359,6 +422,7 @@ class MeshV2Service {
handleDataUpdate (nodeStatus) {
if (!nodeStatus || nodeStatus.nodeId === this.meshId) return;

this.costTracking.dataUpdateReceived++;
const nodeId = nodeStatus.nodeId;
if (!this.remoteData[nodeId]) {
this.remoteData[nodeId] = {};
Expand All @@ -375,6 +439,7 @@ class MeshV2Service {
handleBatchEvent (batchEvent) {
if (!batchEvent || batchEvent.firedByNodeId === this.meshId) return;

this.costTracking.batchEventReceived++;
const events = batchEvent.events ?
batchEvent.events.filter(event => event.firedByNodeId !== this.meshId) :
[];
Expand Down Expand Up @@ -551,6 +616,8 @@ class MeshV2Service {
// データ送信完了を待つ
await this.dataRateLimiter.waitForCompletion();

this.costTracking.mutationCount++;
this.costTracking.fireEventsCount++;
log.info(`Mesh V2: Sending batch of ${events.length} events to group ${this.groupId}`);
await this.client.mutate({
mutation: FIRE_EVENTS,
Expand Down Expand Up @@ -599,6 +666,8 @@ class MeshV2Service {
if (!this.groupId || !this.client || !this.isHost) return;

try {
this.costTracking.mutationCount++;
this.costTracking.heartbeatCount++;
const result = await this.client.mutate({
mutation: RENEW_HEARTBEAT,
variables: {
Expand All @@ -624,6 +693,8 @@ class MeshV2Service {
if (!this.groupId || !this.client || this.isHost) return;

try {
this.costTracking.mutationCount++;
this.costTracking.heartbeatCount++;
const result = await this.client.mutate({
mutation: SEND_MEMBER_HEARTBEAT,
variables: {
Expand Down Expand Up @@ -720,6 +791,8 @@ class MeshV2Service {
* @private
*/
async _reportData (payload) {
this.costTracking.mutationCount++;
this.costTracking.reportDataCount++;
await this.client.mutate({
mutation: REPORT_DATA,
variables: {
Expand Down Expand Up @@ -759,6 +832,7 @@ class MeshV2Service {
if (!this.groupId || !this.client) return;

try {
this.costTracking.queryCount++;
const result = await this.client.query({
query: LIST_GROUP_STATUSES,
variables: {
Expand Down
147 changes: 147 additions & 0 deletions test/unit/extension_mesh_v2_service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/* eslint-disable require-atomic-updates */
const test = require('tap').test;
const MeshV2Service = require('../../src/extensions/scratch3_mesh_v2/mesh-service');
const log = require('../../src/util/log');

const createMockBlocks = () => ({
runtime: {
on: () => {},
getTargetForStage: () => ({
variables: {}
}),
sequencer: {}
},
opcodeFunctions: {
event_broadcast: () => {}
}
});

test('MeshV2Service Cost Tracking', t => {
const blocks = createMockBlocks();
const service = new MeshV2Service(blocks, 'node1', 'domain1');

// Mock client
const mockClient = {
query: () => Promise.resolve({
data: {
listGroupsByDomain: [],
listGroupStatuses: []
}
}),
mutate: () => Promise.resolve({
data: {
createDomain: 'd1',
createGroup: {
id: 'g1',
name: 'G1',
domain: 'd1',
expiresAt: '2026-01-01T00:00:00Z'
},
joinGroup: {
id: 'n1',
domain: 'd1',
expiresAt: '2026-01-01T00:00:00Z'
},
renewHeartbeat: {
expiresAt: '2026-01-01T00:00:00Z'
},
sendMemberHeartbeat: {
expiresAt: '2026-01-01T00:00:00Z'
}
}
}),
subscribe: () => ({
subscribe: () => ({
unsubscribe: () => {}
})
})
};
service.client = mockClient;

t.test('initial state', st => {
st.equal(service.costTracking.queryCount, 0);
st.equal(service.costTracking.mutationCount, 0);
st.equal(service.costTracking.connectionStartTime, null);
st.end();
});

t.test('tracking mutations and queries', async st => {
await service.createDomain();
st.equal(service.costTracking.mutationCount, 1);

await service.createGroup('G1');
// createGroup uses service.domain if it exists. service.domain is 'domain1' from constructor.
// So createGroup calls mutate once.
st.equal(service.costTracking.mutationCount, 2);
st.ok(service.costTracking.connectionStartTime);

await service.listGroups();
st.equal(service.costTracking.queryCount, 1);

await service.joinGroup('g1', 'd1', 'G1');
st.equal(service.costTracking.mutationCount, 3);

await service.renewHeartbeat(); // only if host

// Set isHost directly
service.isHost = true;

await service.renewHeartbeat();
st.equal(service.costTracking.mutationCount, 4);
st.equal(service.costTracking.heartbeatCount, 1);

service.isHost = false;
await service.sendMemberHeartbeat();
st.equal(service.costTracking.mutationCount, 5);
st.equal(service.costTracking.heartbeatCount, 2);

await service._reportData([{key: 'k1', value: 'v1'}]);
st.equal(service.costTracking.mutationCount, 6);
st.equal(service.costTracking.reportDataCount, 1);

await service.fireEventsBatch([{eventName: 'e1'}]);
st.equal(service.costTracking.mutationCount, 7);
st.equal(service.costTracking.fireEventsCount, 1);

await service.fetchAllNodesData();
st.equal(service.costTracking.queryCount, 3);

st.end();
});

t.test('tracking received messages', st => {
service.handleDataUpdate({
nodeId: 'other',
data: [{key: 'k', value: 'v'}]
});
st.equal(service.costTracking.dataUpdateReceived, 1);

service.handleBatchEvent({
firedByNodeId: 'other',
events: [{
name: 'e',
timestamp: new Date().toISOString()
}]
});
st.equal(service.costTracking.batchEventReceived, 1);

st.end();
});

t.test('logging summary in cleanup', st => {
// Mock log.info to verify it's called
const originalLogInfo = log.info;
const messages = [];
log.info = msg => messages.push(msg);

service.cleanup();

st.ok(messages.some(m => m.includes('Mesh V2: Cost Summary')));
st.ok(messages.some(m => m.includes('TOTAL ESTIMATED COST')));

log.info = originalLogInfo;
st.end();
});

t.end();
});
Loading