Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 85 additions & 58 deletions cli/src/commands/operations/commands/push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ import { readFile } from 'node:fs/promises';

import { Command } from 'commander';
import pc from 'picocolors';
import cliProgress from 'cli-progress';

import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb';
import { PublishedOperationStatus, PersistedOperation } from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb';
import {
PublishedOperation,
PublishedOperationStatus,
PersistedOperation,
} from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb';

import { BaseCommandOptions } from '../../../core/types/types.js';
import { getBaseHeaders } from '../../../core/config.js';
Expand All @@ -19,6 +24,8 @@ interface OperationOutput {
operationNames: string[];
}

const OPERATION_BATCH_SIZE = 100;

const collect = (value: string, previous: string[]): string[] => {
return [...previous, value];
};
Expand Down Expand Up @@ -157,69 +164,89 @@ export default (opts: BaseCommandOptions) => {
}
}

const result = await opts.client.platform.publishPersistedOperations(
{
fedGraphName: name,
namespace: options.namespace,
clientName: options.client,
operations,
},
{ headers: getBaseHeaders() },
);
if (result.response?.code === EnumStatusCode.OK) {
if (options.quiet) {
return;
const publishedOperations: PublishedOperation[] = [];
const showProgress = !options.quiet && options.format === 'text' && operations.length > 0;
const bar = showProgress ? new cliProgress.SingleBar({}) : null;
try {
let processed = 0;
if (bar) {
bar.start(operations.length, 0);
}
switch (options.format) {
case 'text': {
for (const op of result.operations) {
const message: string[] = [`pushed operation ${op.id}`];
if (op.hash !== op.id) {
message.push(`(${op.hash})`);
}
message.push(`(${humanReadableOperationStatus(op.status)})`);
if (op.operationNames.length > 0) {
message.push(`: ${op.operationNames.join(', ')}`);
}
console.log(message.join(' '));
}
const upToDate = (result.operations?.filter((op) => op.status === PublishedOperationStatus.UP_TO_DATE) ?? [])
.length;
const created = (result.operations?.filter((op) => op.status === PublishedOperationStatus.CREATED) ?? [])
.length;
const conflict = (result.operations?.filter((op) => op.status === PublishedOperationStatus.CONFLICT) ?? [])
.length;
const color = conflict === 0 ? pc.green : pc.yellow;
console.log(
color(
`pushed ${
result.operations?.length ?? 0
} operations: ${created} created, ${upToDate} up to date, ${conflict} conflicts`,
),
);
if (conflict > 0 && !options.allowConflicts) {
command.error(pc.red('conflicts detected'));
for (let start = 0; start < operations.length; start += OPERATION_BATCH_SIZE) {
const chunk = operations.slice(start, start + OPERATION_BATCH_SIZE);
const result = await opts.client.platform.publishPersistedOperations(
{
fedGraphName: name,
namespace: options.namespace,
clientName: options.client,
operations: chunk,
},
{ headers: getBaseHeaders() },
);
if (result.response?.code !== EnumStatusCode.OK) {
if (bar) {
bar.stop();
}
break;
command.error(pc.red(`could not push operations: ${result.response?.details ?? 'unknown error'}`));
}
publishedOperations.push(...result.operations);
processed += chunk.length;
if (bar) {
bar.update(processed);
}
case 'json': {
const returnedOperations: Record<string, OperationOutput> = {};
for (let ii = 0; ii < result.operations.length; ii++) {
const op = result.operations[ii];

returnedOperations[op.id] = {
hash: op.hash,
contents: operations[ii].contents,
status: jsonOperationStatus(op.status),
operationNames: op.operationNames ?? [],
};
}
} finally {
if (bar) {
bar.stop();
}
}
if (options.quiet) {
return;
}
switch (options.format) {
case 'text': {
for (const op of publishedOperations) {
const message: string[] = [`pushed operation ${op.id}`];
if (op.hash !== op.id) {
message.push(`(${op.hash})`);
}
message.push(`(${humanReadableOperationStatus(op.status)})`);
const opNames = op.operationNames ?? [];
if (opNames.length > 0) {
message.push(`: ${opNames.join(', ')}`);
}
console.log(JSON.stringify(returnedOperations, null, 2));
break;
console.log(message.join(' '));
}
const upToDate = (publishedOperations?.filter((op) => op.status === PublishedOperationStatus.UP_TO_DATE) ?? [])
.length;
const created = (publishedOperations?.filter((op) => op.status === PublishedOperationStatus.CREATED) ?? [])
.length;
const conflict = (publishedOperations?.filter((op) => op.status === PublishedOperationStatus.CONFLICT) ?? [])
.length;
const color = conflict === 0 ? pc.green : pc.yellow;
console.log(
color(
`pushed ${publishedOperations?.length ?? 0} operations: ${created} created, ${upToDate} up to date, ${conflict} conflicts`,
),
);
if (conflict > 0 && !options.allowConflicts) {
command.error(pc.red('conflicts detected'));
}
break;
}
case 'json': {
const returnedOperations: Record<string, OperationOutput> = {};
for (const [ii, op] of publishedOperations.entries()) {
returnedOperations[op.id] = {
hash: op.hash,
contents: operations[ii].contents,
status: jsonOperationStatus(op.status),
operationNames: op.operationNames ?? [],
};
}
console.log(JSON.stringify(returnedOperations, null, 2));
break;
}
} else {
command.error(pc.red(`could not push operations: ${result.response?.details ?? 'unknown error'}`));
}
});
return command;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import crypto from 'node:crypto';
import { PlainMessage } from '@bufbuild/protobuf';
import { HandlerContext } from '@connectrpc/connect';
import pLimit from 'p-limit';
import { Code, ConnectError, HandlerContext } from '@connectrpc/connect';
import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb';
import {
PublishedOperation,
PublishedOperationStatus,
PublishPersistedOperationsRequest,
PublishPersistedOperationsResponse,
PersistedOperation,
} from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb';
import { buildASTSchema as graphQLBuildASTSchema, DocumentNode, parse, validate } from 'graphql';
import { PublishedOperationData, UpdatedPersistedOperation } from '../../../types/index.js';
Expand All @@ -17,6 +19,9 @@ import type { RouterOptions } from '../../routes.js';
import { enrichLogger, extractOperationNames, getLogger, handleError } from '../../util.js';
import { UnauthorizedError } from '../../errors/errors.js';

const MAX_PERSISTED_OPERATIONS = 100;
const PARALLEL_PERSISTED_OPERATIONS_LIMIT = 25;

export function publishPersistedOperations(
opts: RouterOptions,
req: PublishPersistedOperationsRequest,
Expand All @@ -41,6 +46,13 @@ export function publishPersistedOperations(
throw new UnauthorizedError();
}

if (req.operations.length > MAX_PERSISTED_OPERATIONS) {
throw new ConnectError(
`Payload Too Large: max ${MAX_PERSISTED_OPERATIONS} operations per request`,
Code.ResourceExhausted,
);
}

const userId = authContext.userId;
if (!userId) {
return {
Expand Down Expand Up @@ -141,36 +153,41 @@ export function publishPersistedOperations(
const operationsByOperationId = new Map(
operationsResult.map((op) => [op.operationId, { hash: op.hash, operationNames: op.operationNames }]),
);
for (const operation of req.operations) {

const processOperation = async (
operation: PersistedOperation,
): Promise<{
publishedOperation: PublishedOperation | null;
updatedOp: UpdatedPersistedOperation | null;
error: { operationId: string; path: string } | null;
}> => {
const operationId = operation.id;
const operationHash = crypto.createHash('sha256').update(operation.contents).digest('hex');
const prev = operationsByOperationId.get(operationId);
if (prev !== undefined && prev.hash !== operationHash) {
// We're trying to update an operation with the same ID but different hash
operations.push(
new PublishedOperation({
return {
publishedOperation: new PublishedOperation({
id: operationId,
hash: prev.hash,
status: PublishedOperationStatus.CONFLICT,
operationNames: prev.operationNames,
}),
);
continue;
updatedOp: null,
error: null,
};
}
const operationNames = extractOperationNames(operation.contents);
operationsByOperationId.set(operationId, { hash: operationHash, operationNames });
const clientName = encodeURIComponent(req.clientName);
const path = `${organizationId}/${federatedGraph.id}/operations/${clientName}/${operationId}.json`;
updatedOperations.push({
const updatedOp: UpdatedPersistedOperation = {
operationId,
hash: operationHash,
filePath: path,
contents: operation.contents,
operationNames,
});
};

// New operation
let status: PublishedOperationStatus;
if (prev === undefined) {
const data: PublishedOperationData = {
version: 1,
Expand All @@ -185,26 +202,54 @@ export function publishPersistedOperations(
} catch (e) {
logger.error(e, `Could not store operation contents for ${operationId} at ${path}`);
return {
response: {
code: EnumStatusCode.ERR,
details: `Could not store operation contents for ${operationId} at ${path}`,
},
operations: [],
publishedOperation: null,
updatedOp: null,
error: { operationId, path },
};
}

status = PublishedOperationStatus.CREATED;
} else {
status = PublishedOperationStatus.UP_TO_DATE;
return {
publishedOperation: new PublishedOperation({
id: operationId,
hash: operationHash,
status: PublishedOperationStatus.CREATED,
operationNames,
}),
updatedOp,
error: null,
};
}
operations.push(
new PublishedOperation({

return {
publishedOperation: new PublishedOperation({
id: operationId,
hash: operationHash,
status,
status: PublishedOperationStatus.UP_TO_DATE,
operationNames,
}),
);
updatedOp,
error: null,
};
};

const limit = pLimit(PARALLEL_PERSISTED_OPERATIONS_LIMIT);
const results = await Promise.all(req.operations.map((op) => limit(() => processOperation(op))));

const firstError = results.find((r) => r.error !== null);
if (firstError?.error) {
return {
response: {
code: EnumStatusCode.ERR,
details: `Could not store operation contents for ${firstError.error.operationId} at ${firstError.error.path}`,
},
operations: [],
};
}

for (const r of results) {
operations.push(r.publishedOperation!);
if (r.updatedOp !== null) {
updatedOperations.push(r.updatedOp);
}
}

await operationsRepo.updatePersistedOperations(clientId, userId, updatedOperations);
Expand Down
29 changes: 29 additions & 0 deletions controlplane/test/persisted-operations.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb';
import { joinLabel } from '@wundergraph/cosmo-shared';
import { Code, ConnectError } from '@connectrpc/connect';
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest';
import { ClickHouseClient } from '../src/core/clickhouse/index.js';
import { afterAllSetup, beforeAllSetup, genID, genUniqueLabel } from '../src/core/test-util.js';
Expand Down Expand Up @@ -146,6 +147,34 @@ describe('Persisted operations', (ctx) => {
expect(publishOperationsResp.response?.code).not.toBe(EnumStatusCode.OK);
});

test('Should reject persisted operations when payload is too large', async (testContext) => {
const { client, server } = await SetupTest({ dbname, chClient });
testContext.onTestFinished(() => server.close());

const fedGraphName = genID('fedGraph');
await setupFederatedGraph(fedGraphName, client);

const operations = Array.from({ length: 101 }, (_, index) => ({
id: genID(`hello-${index}`),
contents: `query { hello }`,
}));

let error: unknown;
try {
await client.publishPersistedOperations({
fedGraphName,
namespace: 'default',
clientName: 'test-client',
operations,
});
} catch (err) {
error = err;
}

expect(error).toBeInstanceOf(ConnectError);
expect(error).toMatchObject({ code: Code.ResourceExhausted });
});

test('Should not publish persisted operations with an invalid federated graph name', async (testContext) => {
const { client, server } = await SetupTest({ dbname, chClient });
testContext.onTestFinished(() => server.close());
Expand Down
Loading