From 2d2365211e170e4cb85b23c365cb18a72ebdbb27 Mon Sep 17 00:00:00 2001 From: Alessandro Pagnin Date: Thu, 29 Jan 2026 11:54:02 +0100 Subject: [PATCH 1/7] feat: max number of persistent operations per request --- cli/src/commands/operations/commands/push.ts | 141 +++++++++++------- .../publishPersistedOperations.ts | 11 +- .../test/persisted-operations.test.ts | 29 ++++ 3 files changed, 122 insertions(+), 59 deletions(-) diff --git a/cli/src/commands/operations/commands/push.ts b/cli/src/commands/operations/commands/push.ts index c535d88980..65cb8e88e6 100644 --- a/cli/src/commands/operations/commands/push.ts +++ b/cli/src/commands/operations/commands/push.ts @@ -3,9 +3,14 @@ import { readFile } from 'node:fs/promises'; import { Command } from 'commander'; import pc from 'picocolors'; +import cliProgress from 'cli-progress'; import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb'; -import { PublishedOperationStatus, PersistedOperation } from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb'; +import { + PublishedOperation, + PublishedOperationStatus, + PersistedOperation, +} from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb'; import { BaseCommandOptions } from '../../../core/types/types.js'; import { getBaseHeaders } from '../../../core/config.js'; @@ -19,6 +24,8 @@ interface OperationOutput { operationNames: string[]; } +const OPERATION_BATCH_SIZE = 50; + const collect = (value: string, previous: string[]): string[] => { return [...previous, value]; }; @@ -157,69 +164,87 @@ export default (opts: BaseCommandOptions) => { } } - const result = await opts.client.platform.publishPersistedOperations( - { - fedGraphName: name, - namespace: options.namespace, - clientName: options.client, - operations, - }, - { headers: getBaseHeaders() }, - ); - if (result.response?.code === EnumStatusCode.OK) { - if (options.quiet) { - return; + const publishedOperations: PublishedOperation[] = []; + const showProgress = !options.quiet && options.format === 'text' && operations.length > 0; + const bar = showProgress ? new cliProgress.SingleBar({}) : null; + let processed = 0; + if (bar) { + bar.start(operations.length, 0); + } + for (let start = 0; start < operations.length; start += OPERATION_BATCH_SIZE) { + const chunk = operations.slice(start, start + OPERATION_BATCH_SIZE); + const result = await opts.client.platform.publishPersistedOperations( + { + fedGraphName: name, + namespace: options.namespace, + clientName: options.client, + operations: chunk, + }, + { headers: getBaseHeaders() }, + ); + if (result.response?.code !== EnumStatusCode.OK) { + if (bar) { + bar.stop(); + } + command.error(pc.red(`could not push operations: ${result.response?.details ?? 'unknown error'}`)); } - switch (options.format) { - case 'text': { - for (const op of result.operations) { - const message: string[] = [`pushed operation ${op.id}`]; - if (op.hash !== op.id) { - message.push(`(${op.hash})`); - } - message.push(`(${humanReadableOperationStatus(op.status)})`); - if (op.operationNames.length > 0) { - message.push(`: ${op.operationNames.join(', ')}`); - } - console.log(message.join(' ')); + publishedOperations.push(...result.operations); + processed += result.operations.length; + if (bar) { + bar.update(processed); + } + } + if (bar) { + bar.stop(); + } + if (options.quiet) { + return; + } + switch (options.format) { + case 'text': { + for (const op of publishedOperations) { + const message: string[] = [`pushed operation ${op.id}`]; + if (op.hash !== op.id) { + message.push(`(${op.hash})`); } - const upToDate = (result.operations?.filter((op) => op.status === PublishedOperationStatus.UP_TO_DATE) ?? []) - .length; - const created = (result.operations?.filter((op) => op.status === PublishedOperationStatus.CREATED) ?? []) - .length; - const conflict = (result.operations?.filter((op) => op.status === PublishedOperationStatus.CONFLICT) ?? []) - .length; - const color = conflict === 0 ? pc.green : pc.yellow; - console.log( - color( - `pushed ${ - result.operations?.length ?? 0 - } operations: ${created} created, ${upToDate} up to date, ${conflict} conflicts`, - ), - ); - if (conflict > 0 && !options.allowConflicts) { - command.error(pc.red('conflicts detected')); + message.push(`(${humanReadableOperationStatus(op.status)})`); + if (op.operationNames.length > 0) { + message.push(`: ${op.operationNames.join(', ')}`); } - break; + console.log(message.join(' ')); } - case 'json': { - const returnedOperations: Record = {}; - for (let ii = 0; ii < result.operations.length; ii++) { - const op = result.operations[ii]; - - returnedOperations[op.id] = { - hash: op.hash, - contents: operations[ii].contents, - status: jsonOperationStatus(op.status), - operationNames: op.operationNames ?? [], - }; - } - console.log(JSON.stringify(returnedOperations, null, 2)); - break; + const upToDate = (publishedOperations?.filter((op) => op.status === PublishedOperationStatus.UP_TO_DATE) ?? []) + .length; + const created = (publishedOperations?.filter((op) => op.status === PublishedOperationStatus.CREATED) ?? []) + .length; + const conflict = (publishedOperations?.filter((op) => op.status === PublishedOperationStatus.CONFLICT) ?? []) + .length; + const color = conflict === 0 ? pc.green : pc.yellow; + console.log( + color( + `pushed ${publishedOperations?.length ?? 0} operations: ${created} created, ${upToDate} up to date, ${conflict} conflicts`, + ), + ); + if (conflict > 0 && !options.allowConflicts) { + command.error(pc.red('conflicts detected')); + } + break; + } + case 'json': { + const returnedOperations: Record = {}; + for (let ii = 0; ii < publishedOperations.length; ii++) { + const op = publishedOperations[ii]; + + returnedOperations[op.id] = { + hash: op.hash, + contents: operations[ii].contents, + status: jsonOperationStatus(op.status), + operationNames: op.operationNames ?? [], + }; } + console.log(JSON.stringify(returnedOperations, null, 2)); + break; } - } else { - command.error(pc.red(`could not push operations: ${result.response?.details ?? 'unknown error'}`)); } }); return command; diff --git a/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts b/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts index 598c4e8e81..9c6bb8d523 100644 --- a/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts +++ b/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts @@ -1,6 +1,6 @@ import crypto from 'node:crypto'; import { PlainMessage } from '@bufbuild/protobuf'; -import { HandlerContext } from '@connectrpc/connect'; +import { Code, ConnectError, HandlerContext } from '@connectrpc/connect'; import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb'; import { PublishedOperation, @@ -17,6 +17,8 @@ import type { RouterOptions } from '../../routes.js'; import { enrichLogger, extractOperationNames, getLogger, handleError } from '../../util.js'; import { UnauthorizedError } from '../../errors/errors.js'; +const MAX_PERSISTED_OPERATIONS = 50; + export function publishPersistedOperations( opts: RouterOptions, req: PublishPersistedOperationsRequest, @@ -41,6 +43,13 @@ export function publishPersistedOperations( throw new UnauthorizedError(); } + if (req.operations.length > MAX_PERSISTED_OPERATIONS) { + throw new ConnectError( + `Payload Too Large: max ${MAX_PERSISTED_OPERATIONS} operations per request`, + Code.ResourceExhausted, + ); + } + const userId = authContext.userId; if (!userId) { return { diff --git a/controlplane/test/persisted-operations.test.ts b/controlplane/test/persisted-operations.test.ts index 72af55507f..3298ebe723 100644 --- a/controlplane/test/persisted-operations.test.ts +++ b/controlplane/test/persisted-operations.test.ts @@ -1,5 +1,6 @@ import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb'; import { joinLabel } from '@wundergraph/cosmo-shared'; +import { Code, ConnectError } from '@connectrpc/connect'; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'; import { ClickHouseClient } from '../src/core/clickhouse/index.js'; import { afterAllSetup, beforeAllSetup, genID, genUniqueLabel } from '../src/core/test-util.js'; @@ -146,6 +147,34 @@ describe('Persisted operations', (ctx) => { expect(publishOperationsResp.response?.code).not.toBe(EnumStatusCode.OK); }); + test('Should reject persisted operations when payload is too large', async (testContext) => { + const { client, server } = await SetupTest({ dbname, chClient }); + testContext.onTestFinished(() => server.close()); + + const fedGraphName = genID('fedGraph'); + await setupFederatedGraph(fedGraphName, client); + + const operations = Array.from({ length: 51 }, (_, index) => ({ + id: genID(`hello-${index}`), + contents: `query { hello }`, + })); + + let error: unknown; + try { + await client.publishPersistedOperations({ + fedGraphName, + namespace: 'default', + clientName: 'test-client', + operations, + }); + } catch (err) { + error = err; + } + + expect(error).toBeInstanceOf(ConnectError); + expect(error).toMatchObject({ code: Code.ResourceExhausted }); + }); + test('Should not publish persisted operations with an invalid federated graph name', async (testContext) => { const { client, server } = await SetupTest({ dbname, chClient }); testContext.onTestFinished(() => server.close()); From b177d1ceac52528ccc161ab7cbd2793771c686c0 Mon Sep 17 00:00:00 2001 From: Alessandro Pagnin Date: Thu, 29 Jan 2026 12:13:51 +0100 Subject: [PATCH 2/7] fix: use for..of Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- cli/src/commands/operations/commands/push.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cli/src/commands/operations/commands/push.ts b/cli/src/commands/operations/commands/push.ts index 65cb8e88e6..df7b934986 100644 --- a/cli/src/commands/operations/commands/push.ts +++ b/cli/src/commands/operations/commands/push.ts @@ -232,9 +232,7 @@ export default (opts: BaseCommandOptions) => { } case 'json': { const returnedOperations: Record = {}; - for (let ii = 0; ii < publishedOperations.length; ii++) { - const op = publishedOperations[ii]; - + for (const [ii, op] of publishedOperations.entries()) { returnedOperations[op.id] = { hash: op.hash, contents: operations[ii].contents, @@ -242,6 +240,7 @@ export default (opts: BaseCommandOptions) => { operationNames: op.operationNames ?? [], }; } + } console.log(JSON.stringify(returnedOperations, null, 2)); break; } From 26aa6036ccab957620b683a51433335c4506e49f Mon Sep 17 00:00:00 2001 From: Alessandro Pagnin Date: Thu, 29 Jan 2026 12:21:09 +0100 Subject: [PATCH 3/7] fix: always stop bar progress --- cli/src/commands/operations/commands/push.ts | 52 ++++++++++---------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/cli/src/commands/operations/commands/push.ts b/cli/src/commands/operations/commands/push.ts index df7b934986..e41d7d6835 100644 --- a/cli/src/commands/operations/commands/push.ts +++ b/cli/src/commands/operations/commands/push.ts @@ -167,36 +167,39 @@ export default (opts: BaseCommandOptions) => { const publishedOperations: PublishedOperation[] = []; const showProgress = !options.quiet && options.format === 'text' && operations.length > 0; const bar = showProgress ? new cliProgress.SingleBar({}) : null; - let processed = 0; - if (bar) { - bar.start(operations.length, 0); - } - for (let start = 0; start < operations.length; start += OPERATION_BATCH_SIZE) { - const chunk = operations.slice(start, start + OPERATION_BATCH_SIZE); - const result = await opts.client.platform.publishPersistedOperations( - { - fedGraphName: name, - namespace: options.namespace, - clientName: options.client, - operations: chunk, - }, - { headers: getBaseHeaders() }, - ); - if (result.response?.code !== EnumStatusCode.OK) { + try { + let processed = 0; + if (bar) { + bar.start(operations.length, 0); + } + for (let start = 0; start < operations.length; start += OPERATION_BATCH_SIZE) { + const chunk = operations.slice(start, start + OPERATION_BATCH_SIZE); + const result = await opts.client.platform.publishPersistedOperations( + { + fedGraphName: name, + namespace: options.namespace, + clientName: options.client, + operations: chunk, + }, + { headers: getBaseHeaders() }, + ); + if (result.response?.code !== EnumStatusCode.OK) { + if (bar) { + bar.stop(); + } + command.error(pc.red(`could not push operations: ${result.response?.details ?? 'unknown error'}`)); + } + publishedOperations.push(...result.operations); + processed += result.operations.length; if (bar) { - bar.stop(); + bar.update(processed); } - command.error(pc.red(`could not push operations: ${result.response?.details ?? 'unknown error'}`)); } - publishedOperations.push(...result.operations); - processed += result.operations.length; + } finally { if (bar) { - bar.update(processed); + bar.stop(); } } - if (bar) { - bar.stop(); - } if (options.quiet) { return; } @@ -240,7 +243,6 @@ export default (opts: BaseCommandOptions) => { operationNames: op.operationNames ?? [], }; } - } console.log(JSON.stringify(returnedOperations, null, 2)); break; } From 06f592bfce8416cce61afff5f30e57f81872f943 Mon Sep 17 00:00:00 2001 From: Alessandro Pagnin Date: Thu, 29 Jan 2026 17:25:23 +0100 Subject: [PATCH 4/7] fix: improve bar update --- cli/src/commands/operations/commands/push.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/src/commands/operations/commands/push.ts b/cli/src/commands/operations/commands/push.ts index e41d7d6835..2d9e95e45e 100644 --- a/cli/src/commands/operations/commands/push.ts +++ b/cli/src/commands/operations/commands/push.ts @@ -190,7 +190,7 @@ export default (opts: BaseCommandOptions) => { command.error(pc.red(`could not push operations: ${result.response?.details ?? 'unknown error'}`)); } publishedOperations.push(...result.operations); - processed += result.operations.length; + processed += chunk.length; if (bar) { bar.update(processed); } From 3ce74b1b52f2825e32ad5bf83f753aca0f1ad185 Mon Sep 17 00:00:00 2001 From: Alessandro Pagnin Date: Thu, 29 Jan 2026 17:40:54 +0100 Subject: [PATCH 5/7] fix: improve behaviour if no operations are returned --- cli/src/commands/operations/commands/push.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cli/src/commands/operations/commands/push.ts b/cli/src/commands/operations/commands/push.ts index 2d9e95e45e..f0390457eb 100644 --- a/cli/src/commands/operations/commands/push.ts +++ b/cli/src/commands/operations/commands/push.ts @@ -211,8 +211,9 @@ export default (opts: BaseCommandOptions) => { message.push(`(${op.hash})`); } message.push(`(${humanReadableOperationStatus(op.status)})`); - if (op.operationNames.length > 0) { - message.push(`: ${op.operationNames.join(', ')}`); + const opNames = op.operationNames ?? []; + if (opNames.length > 0) { + message.push(`: ${opNames.join(', ')}`); } console.log(message.join(' ')); } From c27ba8f2d4f5269b4e9243fe4b779bee1beafa59 Mon Sep 17 00:00:00 2001 From: Alessandro Pagnin Date: Fri, 30 Jan 2026 12:57:00 +0100 Subject: [PATCH 6/7] feat: add parallelization on the control plane --- cli/src/commands/operations/commands/push.ts | 2 +- .../publishPersistedOperations.ts | 84 +++++++++++++------ 2 files changed, 61 insertions(+), 25 deletions(-) diff --git a/cli/src/commands/operations/commands/push.ts b/cli/src/commands/operations/commands/push.ts index f0390457eb..013aacff30 100644 --- a/cli/src/commands/operations/commands/push.ts +++ b/cli/src/commands/operations/commands/push.ts @@ -24,7 +24,7 @@ interface OperationOutput { operationNames: string[]; } -const OPERATION_BATCH_SIZE = 50; +const OPERATION_BATCH_SIZE = 100; const collect = (value: string, previous: string[]): string[] => { return [...previous, value]; diff --git a/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts b/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts index 9c6bb8d523..b417e52898 100644 --- a/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts +++ b/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts @@ -1,5 +1,6 @@ import crypto from 'node:crypto'; import { PlainMessage } from '@bufbuild/protobuf'; +import pLimit from 'p-limit'; import { Code, ConnectError, HandlerContext } from '@connectrpc/connect'; import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb'; import { @@ -7,6 +8,7 @@ import { PublishedOperationStatus, PublishPersistedOperationsRequest, PublishPersistedOperationsResponse, + PersistedOperation, } from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb'; import { buildASTSchema as graphQLBuildASTSchema, DocumentNode, parse, validate } from 'graphql'; import { PublishedOperationData, UpdatedPersistedOperation } from '../../../types/index.js'; @@ -17,7 +19,8 @@ import type { RouterOptions } from '../../routes.js'; import { enrichLogger, extractOperationNames, getLogger, handleError } from '../../util.js'; import { UnauthorizedError } from '../../errors/errors.js'; -const MAX_PERSISTED_OPERATIONS = 50; +const MAX_PERSISTED_OPERATIONS = 100; +const PARALLEL_PERSISTED_OPERATIONS_LIMIT = 25; export function publishPersistedOperations( opts: RouterOptions, @@ -150,36 +153,41 @@ export function publishPersistedOperations( const operationsByOperationId = new Map( operationsResult.map((op) => [op.operationId, { hash: op.hash, operationNames: op.operationNames }]), ); - for (const operation of req.operations) { + + const processOperation = async ( + operation: PersistedOperation, + ): Promise<{ + publishedOperation: PublishedOperation | null; + updatedOp: UpdatedPersistedOperation | null; + error: { operationId: string; path: string } | null; + }> => { const operationId = operation.id; const operationHash = crypto.createHash('sha256').update(operation.contents).digest('hex'); const prev = operationsByOperationId.get(operationId); if (prev !== undefined && prev.hash !== operationHash) { // We're trying to update an operation with the same ID but different hash - operations.push( - new PublishedOperation({ + return { + publishedOperation: new PublishedOperation({ id: operationId, hash: prev.hash, status: PublishedOperationStatus.CONFLICT, operationNames: prev.operationNames, }), - ); - continue; + updatedOp: null, + error: null, + }; } const operationNames = extractOperationNames(operation.contents); - operationsByOperationId.set(operationId, { hash: operationHash, operationNames }); const clientName = encodeURIComponent(req.clientName); const path = `${organizationId}/${federatedGraph.id}/operations/${clientName}/${operationId}.json`; - updatedOperations.push({ + const updatedOp: UpdatedPersistedOperation = { operationId, hash: operationHash, filePath: path, contents: operation.contents, operationNames, - }); + }; - // New operation - let status: PublishedOperationStatus; if (prev === undefined) { const data: PublishedOperationData = { version: 1, @@ -194,26 +202,54 @@ export function publishPersistedOperations( } catch (e) { logger.error(e, `Could not store operation contents for ${operationId} at ${path}`); return { - response: { - code: EnumStatusCode.ERR, - details: `Could not store operation contents for ${operationId} at ${path}`, - }, - operations: [], + publishedOperation: null, + updatedOp: null, + error: { operationId, path }, }; } - - status = PublishedOperationStatus.CREATED; - } else { - status = PublishedOperationStatus.UP_TO_DATE; + return { + publishedOperation: new PublishedOperation({ + id: operationId, + hash: operationHash, + status: PublishedOperationStatus.CREATED, + operationNames, + }), + updatedOp, + error: null, + }; } - operations.push( - new PublishedOperation({ + + return { + publishedOperation: new PublishedOperation({ id: operationId, hash: operationHash, - status, + status: PublishedOperationStatus.UP_TO_DATE, operationNames, }), - ); + updatedOp, + error: null, + }; + }; + + const limit = pLimit(PARALLEL_PERSISTED_OPERATIONS_LIMIT); + const results = await Promise.all(req.operations.map((op) => limit(() => processOperation(op)))); + + const firstError = results.find((r) => r.error !== null); + if (firstError?.error) { + return { + response: { + code: EnumStatusCode.ERR, + details: `Could not store operation contents for ${firstError.error.operationId} at ${firstError.error.path}`, + }, + operations: [], + }; + } + + for (const r of results) { + operations.push(r.publishedOperation!); + if (r.updatedOp !== null) { + updatedOperations.push(r.updatedOp); + } } await operationsRepo.updatePersistedOperations(clientId, userId, updatedOperations); From a661a8e4920a7c91a9f2406cf2b5190d068e20aa Mon Sep 17 00:00:00 2001 From: Alessandro Pagnin Date: Sat, 31 Jan 2026 14:12:57 +0100 Subject: [PATCH 7/7] fix: wrong test --- controlplane/test/persisted-operations.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controlplane/test/persisted-operations.test.ts b/controlplane/test/persisted-operations.test.ts index 3298ebe723..3206adeca9 100644 --- a/controlplane/test/persisted-operations.test.ts +++ b/controlplane/test/persisted-operations.test.ts @@ -154,7 +154,7 @@ describe('Persisted operations', (ctx) => { const fedGraphName = genID('fedGraph'); await setupFederatedGraph(fedGraphName, client); - const operations = Array.from({ length: 51 }, (_, index) => ({ + const operations = Array.from({ length: 101 }, (_, index) => ({ id: genID(`hello-${index}`), contents: `query { hello }`, }));