Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 49 additions & 39 deletions src/cloudflare/internal/d1-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type D1UpstreamFailure = {
results?: never;
error: string;
success: false;
meta: D1Meta & Record<string, unknown>;
meta?: never;
};

type D1RowsColumns<T = unknown> = D1Response & {
Expand Down Expand Up @@ -386,40 +386,42 @@ class D1DatabaseSessionAlwaysPrimary extends D1DatabaseSession {
span.setAttribute('db.query.text', query);
span.setAttribute('cloudflare.binding.type', 'D1');

// TODO: splitting by lines is overly simplification because a single line
// can contain multiple statements (ex: `select 1; select 2;`).
// Also, a statement can span multiple lines...
// Either, we should do a more reasonable job to split the query into multiple statements
// like we do in the D1 codebase or we report a simpler error without the line number.
const lines = query.trim().split('\n');
const _exec = await this._send('/execute', lines, [], 'NONE', span);
const exec = Array.isArray(_exec) ? _exec : [_exec];

addAggregatedD1MetaToSpan(
span,
exec.map((e) => e.meta)
);
let duration = 0;
const metas: D1Meta[] = [];
for (let i = 0; i < exec.length; i++) {
const res = exec[i];
if (!res?.success) {
span.setAttribute('error.type', `Error in line ${i + 1}`);
throw new Error(
`D1_EXEC_ERROR: Error in line ${i + 1}: ${lines[i]} ${res?.error ? `: ${res.error}` : ''}`,
{
cause: new Error(
`Error in line ${i + 1}: ${lines[i]} ${res?.error ? `: ${res.error}` : ''}`
),
}
);
}

const error = exec
.map((r) => {
return r.error ? 1 : 0;
})
.indexOf(1);
if (error !== -1) {
span.setAttribute('error.type', `Error in line ${error + 1}`);
throw new Error(
`D1_EXEC_ERROR: Error in line ${error + 1}: ${lines[error]}: ${
exec[error]?.error
}`,
{
cause: new Error(
`Error in line ${error + 1}: ${lines[error]}: ${exec[error]?.error}`
),
}
);
} else {
return {
count: exec.length,
duration: exec.reduce((p, c) => {
return p + c.meta['duration'];
}, 0),
};
duration += res.meta.duration;
metas.push(res.meta);
}

if (metas.length) {
addAggregatedD1MetaToSpan(span, metas);
}
return {
count: exec.length,
duration,
};
});
}

Expand Down Expand Up @@ -712,12 +714,11 @@ function mapD1Result<T>(result: D1UpstreamResponse<T>): D1UpstreamResponse<T> {
return result.error
? {
success: false,
meta: result.meta,
error: result.error,
}
: {
success: true,
meta: result.meta,
meta: (result as D1UpstreamSuccess).meta,
...('results' in result ? { results: result.results } : {}),
};
}
Expand All @@ -731,7 +732,12 @@ async function toJson<T = unknown>(response: Response): Promise<T> {
}
}

function addAggregatedD1MetaToSpan(span: Span, metas: D1Meta[]): void {
type PartialD1Meta = Partial<D1Meta> | undefined;

function addAggregatedD1MetaToSpan(span: Span, metas: PartialD1Meta[]): void {
if (!metas.length) {
return;
}
const aggregatedMeta = aggregateD1Meta(metas);
addD1MetaToSpan(span, aggregatedMeta);
}
Expand Down Expand Up @@ -764,7 +770,7 @@ function addD1MetaToSpan(span: Span, meta: D1Meta): void {
// When a query is executing multiple statements, and we receive a D1Meta
// for each statement, we need to aggregate the meta data before we annotate
// the telemetry, with different rules for each field.
function aggregateD1Meta(metas: D1Meta[]): D1Meta {
function aggregateD1Meta(metas: PartialD1Meta[]): D1Meta {
const aggregatedMeta: D1Meta = {
duration: 0,
size_after: 0,
Expand All @@ -776,12 +782,16 @@ function aggregateD1Meta(metas: D1Meta[]): D1Meta {
};

for (const meta of metas) {
aggregatedMeta.duration += meta.duration;
if (!meta) {
continue;
}

aggregatedMeta.duration += meta.duration ?? 0;
// for size_after, we only want the last value
aggregatedMeta.size_after = meta.size_after;
aggregatedMeta.rows_read += meta.rows_read;
aggregatedMeta.rows_written += meta.rows_written;
aggregatedMeta.last_row_id = meta.last_row_id;
aggregatedMeta.size_after = meta.size_after ?? 0;
aggregatedMeta.rows_read += meta.rows_read ?? 0;
aggregatedMeta.rows_written += meta.rows_written ?? 0;
aggregatedMeta.last_row_id = meta.last_row_id ?? 0;
if (meta.served_by_region) {
aggregatedMeta.served_by_region = meta.served_by_region;
}
Expand All @@ -799,7 +809,7 @@ function aggregateD1Meta(metas: D1Meta[]): D1Meta {
aggregatedMeta.total_attempts =
(aggregatedMeta.total_attempts ?? 0) + meta.total_attempts;
}
aggregatedMeta.changes += meta.changes;
aggregatedMeta.changes += meta.changes ?? 0;
if (meta.changed_db) {
aggregatedMeta.changed_db = true;
}
Expand Down
48 changes: 48 additions & 0 deletions src/cloudflare/internal/test/d1/d1-api-instrumentation-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,54 @@ export const test = {
};

const expectedSpans = [
// testExec: exec() happy path and error handling (regression test for #5218).
{
name: 'fetch',
'network.protocol.name': 'http',
'network.protocol.version': 'HTTP/1.1',
'http.request.method': 'POST',
'url.full': 'http://d1/execute?resultsFormat=NONE',
'http.request.header.content-type': 'application/json',
'http.request.body.size': 20n,
'http.response.status_code': 200n,
'http.response.body.size': 0n,
closed: true,
},
{
name: 'd1_exec',
'db.system.name': 'cloudflare-d1',
'db.operation.name': 'exec',
'db.query.text': 'select 1',
'cloudflare.binding.type': 'D1',
'cloudflare.d1.response.size_after': 4096,
'cloudflare.d1.response.rows_read': 0,
'cloudflare.d1.response.rows_written': 0,
'cloudflare.d1.response.last_row_id': 0,
'cloudflare.d1.response.changed_db': false,
'cloudflare.d1.response.changes': 0,
closed: true,
},
{
name: 'fetch',
'network.protocol.name': 'http',
'network.protocol.version': 'HTTP/1.1',
'http.request.method': 'POST',
'url.full': 'http://d1/execute?resultsFormat=NONE',
'http.request.header.content-type': 'application/json',
'http.request.body.size': 23n,
'http.response.status_code': 200n,
'http.response.body.size': 0n,
closed: true,
},
{
name: 'd1_exec',
'db.system.name': 'cloudflare-d1',
'db.operation.name': 'exec',
'db.query.text': 'INVALID SQL',
'cloudflare.binding.type': 'D1',
'error.type': 'Error in line 1',
closed: true,
},
{
name: 'fetch',
'network.protocol.name': 'http',
Expand Down
22 changes: 22 additions & 0 deletions src/cloudflare/internal/test/d1/d1-api-test-common.js
Original file line number Diff line number Diff line change
Expand Up @@ -537,3 +537,25 @@ export async function testD1ApiQueriesHappyPath(DB) {
]
);
}

// Regression test for https://github.com/cloudflare/workerd/pull/5218.
// exec() with invalid SQL should throw a proper D1 error, not a TypeError
// from accessing properties on undefined meta during span aggregation.
export async function testD1Exec(DB) {
await itShould('run a simple exec', () => DB.exec('select 1'), {
count: 1,
duration: anything,
});

await assert.rejects(
() => DB.exec('INVALID SQL'),
(e) => {
assert.notEqual(e.constructor, TypeError);
assert.ok(
e.message.includes('D1_EXEC_ERROR'),
`Expected D1 error, got: ${e.message}`
);
return true;
}
);
}
8 changes: 7 additions & 1 deletion src/cloudflare/internal/test/d1/d1-api-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import { testD1ApiQueriesHappyPath } from './d1-api-test-common';
import { testD1ApiQueriesHappyPath, testD1Exec } from './d1-api-test-common';

export const testWithoutSessions = {
async test(_ctr, env) {
await testD1ApiQueriesHappyPath(env.d1);
},
};

export const testExec = {
async test(_ctr, env) {
await testD1Exec(env.d1);
},
};
12 changes: 10 additions & 2 deletions src/cloudflare/internal/test/d1/d1-mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,18 @@ export class D1MockDO {
: resultsFormatParam === 'NONE'
? 'NONE'
: 'ARRAY_OF_OBJECTS';
const safeRunQuery = (query) => {
try {
return this.runQuery(query, resultsFormat);
} catch (e) {
// Reproduce the production behavior by catching any error and returning a V4Failure
return { success: false, error: String(e.message) };
}
};
return Response.json(
Array.isArray(body)
? body.map((query) => this.runQuery(query, resultsFormat))
: this.runQuery(body, resultsFormat)
? body.map((query) => safeRunQuery(query))
: safeRunQuery(body)
);
} else {
return Response.json({ error: 'Not found' }, { status: 404 });
Expand Down
Loading