diff --git a/cli/src/commands/operations/commands/push.ts b/cli/src/commands/operations/commands/push.ts index c535d88980..013aacff30 100644 --- a/cli/src/commands/operations/commands/push.ts +++ b/cli/src/commands/operations/commands/push.ts @@ -3,9 +3,14 @@ import { readFile } from 'node:fs/promises'; import { Command } from 'commander'; import pc from 'picocolors'; +import cliProgress from 'cli-progress'; import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb'; -import { PublishedOperationStatus, PersistedOperation } from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb'; +import { + PublishedOperation, + PublishedOperationStatus, + PersistedOperation, +} from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb'; import { BaseCommandOptions } from '../../../core/types/types.js'; import { getBaseHeaders } from '../../../core/config.js'; @@ -19,6 +24,8 @@ interface OperationOutput { operationNames: string[]; } +const OPERATION_BATCH_SIZE = 100; + const collect = (value: string, previous: string[]): string[] => { return [...previous, value]; }; @@ -157,69 +164,89 @@ export default (opts: BaseCommandOptions) => { } } - const result = await opts.client.platform.publishPersistedOperations( - { - fedGraphName: name, - namespace: options.namespace, - clientName: options.client, - operations, - }, - { headers: getBaseHeaders() }, - ); - if (result.response?.code === EnumStatusCode.OK) { - if (options.quiet) { - return; + const publishedOperations: PublishedOperation[] = []; + const showProgress = !options.quiet && options.format === 'text' && operations.length > 0; + const bar = showProgress ? new cliProgress.SingleBar({}) : null; + try { + let processed = 0; + if (bar) { + bar.start(operations.length, 0); } - switch (options.format) { - case 'text': { - for (const op of result.operations) { - const message: string[] = [`pushed operation ${op.id}`]; - if (op.hash !== op.id) { - message.push(`(${op.hash})`); - } - message.push(`(${humanReadableOperationStatus(op.status)})`); - if (op.operationNames.length > 0) { - message.push(`: ${op.operationNames.join(', ')}`); - } - console.log(message.join(' ')); - } - const upToDate = (result.operations?.filter((op) => op.status === PublishedOperationStatus.UP_TO_DATE) ?? []) - .length; - const created = (result.operations?.filter((op) => op.status === PublishedOperationStatus.CREATED) ?? []) - .length; - const conflict = (result.operations?.filter((op) => op.status === PublishedOperationStatus.CONFLICT) ?? []) - .length; - const color = conflict === 0 ? pc.green : pc.yellow; - console.log( - color( - `pushed ${ - result.operations?.length ?? 0 - } operations: ${created} created, ${upToDate} up to date, ${conflict} conflicts`, - ), - ); - if (conflict > 0 && !options.allowConflicts) { - command.error(pc.red('conflicts detected')); + for (let start = 0; start < operations.length; start += OPERATION_BATCH_SIZE) { + const chunk = operations.slice(start, start + OPERATION_BATCH_SIZE); + const result = await opts.client.platform.publishPersistedOperations( + { + fedGraphName: name, + namespace: options.namespace, + clientName: options.client, + operations: chunk, + }, + { headers: getBaseHeaders() }, + ); + if (result.response?.code !== EnumStatusCode.OK) { + if (bar) { + bar.stop(); } - break; + command.error(pc.red(`could not push operations: ${result.response?.details ?? 'unknown error'}`)); + } + publishedOperations.push(...result.operations); + processed += chunk.length; + if (bar) { + bar.update(processed); } - case 'json': { - const returnedOperations: Record = {}; - for (let ii = 0; ii < result.operations.length; ii++) { - const op = result.operations[ii]; - - returnedOperations[op.id] = { - hash: op.hash, - contents: operations[ii].contents, - status: jsonOperationStatus(op.status), - operationNames: op.operationNames ?? [], - }; + } + } finally { + if (bar) { + bar.stop(); + } + } + if (options.quiet) { + return; + } + switch (options.format) { + case 'text': { + for (const op of publishedOperations) { + const message: string[] = [`pushed operation ${op.id}`]; + if (op.hash !== op.id) { + message.push(`(${op.hash})`); + } + message.push(`(${humanReadableOperationStatus(op.status)})`); + const opNames = op.operationNames ?? []; + if (opNames.length > 0) { + message.push(`: ${opNames.join(', ')}`); } - console.log(JSON.stringify(returnedOperations, null, 2)); - break; + console.log(message.join(' ')); + } + const upToDate = (publishedOperations?.filter((op) => op.status === PublishedOperationStatus.UP_TO_DATE) ?? []) + .length; + const created = (publishedOperations?.filter((op) => op.status === PublishedOperationStatus.CREATED) ?? []) + .length; + const conflict = (publishedOperations?.filter((op) => op.status === PublishedOperationStatus.CONFLICT) ?? []) + .length; + const color = conflict === 0 ? pc.green : pc.yellow; + console.log( + color( + `pushed ${publishedOperations?.length ?? 0} operations: ${created} created, ${upToDate} up to date, ${conflict} conflicts`, + ), + ); + if (conflict > 0 && !options.allowConflicts) { + command.error(pc.red('conflicts detected')); + } + break; + } + case 'json': { + const returnedOperations: Record = {}; + for (const [ii, op] of publishedOperations.entries()) { + returnedOperations[op.id] = { + hash: op.hash, + contents: operations[ii].contents, + status: jsonOperationStatus(op.status), + operationNames: op.operationNames ?? [], + }; } + console.log(JSON.stringify(returnedOperations, null, 2)); + break; } - } else { - command.error(pc.red(`could not push operations: ${result.response?.details ?? 'unknown error'}`)); } }); return command; diff --git a/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts b/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts index 598c4e8e81..b417e52898 100644 --- a/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts +++ b/controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts @@ -1,12 +1,14 @@ import crypto from 'node:crypto'; import { PlainMessage } from '@bufbuild/protobuf'; -import { HandlerContext } from '@connectrpc/connect'; +import pLimit from 'p-limit'; +import { Code, ConnectError, HandlerContext } from '@connectrpc/connect'; import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb'; import { PublishedOperation, PublishedOperationStatus, PublishPersistedOperationsRequest, PublishPersistedOperationsResponse, + PersistedOperation, } from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb'; import { buildASTSchema as graphQLBuildASTSchema, DocumentNode, parse, validate } from 'graphql'; import { PublishedOperationData, UpdatedPersistedOperation } from '../../../types/index.js'; @@ -17,6 +19,9 @@ import type { RouterOptions } from '../../routes.js'; import { enrichLogger, extractOperationNames, getLogger, handleError } from '../../util.js'; import { UnauthorizedError } from '../../errors/errors.js'; +const MAX_PERSISTED_OPERATIONS = 100; +const PARALLEL_PERSISTED_OPERATIONS_LIMIT = 25; + export function publishPersistedOperations( opts: RouterOptions, req: PublishPersistedOperationsRequest, @@ -41,6 +46,13 @@ export function publishPersistedOperations( throw new UnauthorizedError(); } + if (req.operations.length > MAX_PERSISTED_OPERATIONS) { + throw new ConnectError( + `Payload Too Large: max ${MAX_PERSISTED_OPERATIONS} operations per request`, + Code.ResourceExhausted, + ); + } + const userId = authContext.userId; if (!userId) { return { @@ -141,36 +153,41 @@ export function publishPersistedOperations( const operationsByOperationId = new Map( operationsResult.map((op) => [op.operationId, { hash: op.hash, operationNames: op.operationNames }]), ); - for (const operation of req.operations) { + + const processOperation = async ( + operation: PersistedOperation, + ): Promise<{ + publishedOperation: PublishedOperation | null; + updatedOp: UpdatedPersistedOperation | null; + error: { operationId: string; path: string } | null; + }> => { const operationId = operation.id; const operationHash = crypto.createHash('sha256').update(operation.contents).digest('hex'); const prev = operationsByOperationId.get(operationId); if (prev !== undefined && prev.hash !== operationHash) { // We're trying to update an operation with the same ID but different hash - operations.push( - new PublishedOperation({ + return { + publishedOperation: new PublishedOperation({ id: operationId, hash: prev.hash, status: PublishedOperationStatus.CONFLICT, operationNames: prev.operationNames, }), - ); - continue; + updatedOp: null, + error: null, + }; } const operationNames = extractOperationNames(operation.contents); - operationsByOperationId.set(operationId, { hash: operationHash, operationNames }); const clientName = encodeURIComponent(req.clientName); const path = `${organizationId}/${federatedGraph.id}/operations/${clientName}/${operationId}.json`; - updatedOperations.push({ + const updatedOp: UpdatedPersistedOperation = { operationId, hash: operationHash, filePath: path, contents: operation.contents, operationNames, - }); + }; - // New operation - let status: PublishedOperationStatus; if (prev === undefined) { const data: PublishedOperationData = { version: 1, @@ -185,26 +202,54 @@ export function publishPersistedOperations( } catch (e) { logger.error(e, `Could not store operation contents for ${operationId} at ${path}`); return { - response: { - code: EnumStatusCode.ERR, - details: `Could not store operation contents for ${operationId} at ${path}`, - }, - operations: [], + publishedOperation: null, + updatedOp: null, + error: { operationId, path }, }; } - - status = PublishedOperationStatus.CREATED; - } else { - status = PublishedOperationStatus.UP_TO_DATE; + return { + publishedOperation: new PublishedOperation({ + id: operationId, + hash: operationHash, + status: PublishedOperationStatus.CREATED, + operationNames, + }), + updatedOp, + error: null, + }; } - operations.push( - new PublishedOperation({ + + return { + publishedOperation: new PublishedOperation({ id: operationId, hash: operationHash, - status, + status: PublishedOperationStatus.UP_TO_DATE, operationNames, }), - ); + updatedOp, + error: null, + }; + }; + + const limit = pLimit(PARALLEL_PERSISTED_OPERATIONS_LIMIT); + const results = await Promise.all(req.operations.map((op) => limit(() => processOperation(op)))); + + const firstError = results.find((r) => r.error !== null); + if (firstError?.error) { + return { + response: { + code: EnumStatusCode.ERR, + details: `Could not store operation contents for ${firstError.error.operationId} at ${firstError.error.path}`, + }, + operations: [], + }; + } + + for (const r of results) { + operations.push(r.publishedOperation!); + if (r.updatedOp !== null) { + updatedOperations.push(r.updatedOp); + } } await operationsRepo.updatePersistedOperations(clientId, userId, updatedOperations); diff --git a/controlplane/test/persisted-operations.test.ts b/controlplane/test/persisted-operations.test.ts index 72af55507f..3206adeca9 100644 --- a/controlplane/test/persisted-operations.test.ts +++ b/controlplane/test/persisted-operations.test.ts @@ -1,5 +1,6 @@ import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb'; import { joinLabel } from '@wundergraph/cosmo-shared'; +import { Code, ConnectError } from '@connectrpc/connect'; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'; import { ClickHouseClient } from '../src/core/clickhouse/index.js'; import { afterAllSetup, beforeAllSetup, genID, genUniqueLabel } from '../src/core/test-util.js'; @@ -146,6 +147,34 @@ describe('Persisted operations', (ctx) => { expect(publishOperationsResp.response?.code).not.toBe(EnumStatusCode.OK); }); + test('Should reject persisted operations when payload is too large', async (testContext) => { + const { client, server } = await SetupTest({ dbname, chClient }); + testContext.onTestFinished(() => server.close()); + + const fedGraphName = genID('fedGraph'); + await setupFederatedGraph(fedGraphName, client); + + const operations = Array.from({ length: 101 }, (_, index) => ({ + id: genID(`hello-${index}`), + contents: `query { hello }`, + })); + + let error: unknown; + try { + await client.publishPersistedOperations({ + fedGraphName, + namespace: 'default', + clientName: 'test-client', + operations, + }); + } catch (err) { + error = err; + } + + expect(error).toBeInstanceOf(ConnectError); + expect(error).toMatchObject({ code: Code.ResourceExhausted }); + }); + test('Should not publish persisted operations with an invalid federated graph name', async (testContext) => { const { client, server } = await SetupTest({ dbname, chClient }); testContext.onTestFinished(() => server.close());