Skip to content

Commit ca216ae

Browse files
committed
fix: mergeQueryResults now propagates all telemetry from partials
Previously only bytesRead, pagesSkipped, and durationMs were merged. Now propagates r2ReadMs, wasmExecMs (max — parallel execution), cacheHits, cacheMisses, edgeCacheHits, edgeCacheMisses (summed), and spillBytesWritten, spillBytesRead (summed). Fields are omitted when no partials report them.
1 parent e920ebb commit ca216ae

File tree

2 files changed

+66
-1
lines changed

2 files changed

+66
-1
lines changed

src/merge.test.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,46 @@ describe("mergeQueryResults", () => {
124124
expect(merged.rows[0].sum_val).toBe(1275);
125125
});
126126

127+
it("propagates telemetry fields from partials", () => {
128+
const p1: QueryResult = {
129+
rows: [{ v: 1 }], rowCount: 1, columns: ["v"],
130+
bytesRead: 100, pagesSkipped: 2, durationMs: 10,
131+
r2ReadMs: 5, wasmExecMs: 3, cacheHits: 4, cacheMisses: 1,
132+
edgeCacheHits: 2, edgeCacheMisses: 0, spillBytesWritten: 1024, spillBytesRead: 512,
133+
};
134+
const p2: QueryResult = {
135+
rows: [{ v: 2 }], rowCount: 1, columns: ["v"],
136+
bytesRead: 200, pagesSkipped: 3, durationMs: 15,
137+
r2ReadMs: 8, wasmExecMs: 6, cacheHits: 3, cacheMisses: 2,
138+
edgeCacheHits: 1, edgeCacheMisses: 1, spillBytesWritten: 2048, spillBytesRead: 1024,
139+
};
140+
const query: QueryDescriptor = { table: "t", filters: [], projections: ["v"] };
141+
const merged = mergeQueryResults([p1, p2], query);
142+
expect(merged.bytesRead).toBe(300);
143+
expect(merged.pagesSkipped).toBe(5);
144+
expect(merged.durationMs).toBe(15);
145+
// Timing: max across partials (parallel execution)
146+
expect(merged.r2ReadMs).toBe(8);
147+
expect(merged.wasmExecMs).toBe(6);
148+
// Counters: summed
149+
expect(merged.cacheHits).toBe(7);
150+
expect(merged.cacheMisses).toBe(3);
151+
expect(merged.edgeCacheHits).toBe(3);
152+
expect(merged.edgeCacheMisses).toBe(1);
153+
expect(merged.spillBytesWritten).toBe(3072);
154+
expect(merged.spillBytesRead).toBe(1536);
155+
});
156+
157+
it("omits telemetry fields when no partials have them", () => {
158+
const p1 = makeResult([{ v: 1 }]);
159+
const query: QueryDescriptor = { table: "t", filters: [], projections: ["v"] };
160+
const merged = mergeQueryResults([p1], query);
161+
expect(merged.r2ReadMs).toBeUndefined();
162+
expect(merged.wasmExecMs).toBeUndefined();
163+
expect(merged.cacheHits).toBeUndefined();
164+
expect(merged.spillBytesWritten).toBeUndefined();
165+
});
166+
127167
it("offset works with sorted merge across partials", () => {
128168
const partials = Array.from({ length: 10 }, (_, i) =>
129169
makeResult([{ v: i }]),

src/merge.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,35 @@ export function mergeQueryResults(
122122
const columns =
123123
partials.length > 0 ? partials[0].columns : query.projections;
124124

125-
const baseResult = {
125+
// Accumulate telemetry: sums for counters/bytes, max for parallel timings
126+
let r2ReadMs = 0, wasmExecMs = 0;
127+
let cacheHits = 0, cacheMisses = 0;
128+
let edgeCacheHits = 0, edgeCacheMisses = 0;
129+
let spillBytesWritten = 0, spillBytesRead = 0;
130+
let hasR2 = false, hasWasm = false;
131+
for (const p of partials) {
132+
if (p.r2ReadMs !== undefined) { r2ReadMs = r2ReadMs > p.r2ReadMs ? r2ReadMs : p.r2ReadMs; hasR2 = true; }
133+
if (p.wasmExecMs !== undefined) { wasmExecMs = wasmExecMs > p.wasmExecMs ? wasmExecMs : p.wasmExecMs; hasWasm = true; }
134+
if (p.cacheHits) cacheHits += p.cacheHits;
135+
if (p.cacheMisses) cacheMisses += p.cacheMisses;
136+
if (p.edgeCacheHits) edgeCacheHits += p.edgeCacheHits;
137+
if (p.edgeCacheMisses) edgeCacheMisses += p.edgeCacheMisses;
138+
if (p.spillBytesWritten) spillBytesWritten += p.spillBytesWritten;
139+
if (p.spillBytesRead) spillBytesRead += p.spillBytesRead;
140+
}
141+
142+
const baseResult: Omit<QueryResult, "rows" | "rowCount" | "columns"> = {
126143
bytesRead: totalBytesRead,
127144
pagesSkipped: totalPagesSkipped,
128145
durationMs: maxDuration,
146+
...(hasR2 && { r2ReadMs }),
147+
...(hasWasm && { wasmExecMs }),
148+
...(cacheHits > 0 && { cacheHits }),
149+
...(cacheMisses > 0 && { cacheMisses }),
150+
...(edgeCacheHits > 0 && { edgeCacheHits }),
151+
...(edgeCacheMisses > 0 && { edgeCacheMisses }),
152+
...(spillBytesWritten > 0 && { spillBytesWritten }),
153+
...(spillBytesRead > 0 && { spillBytesRead }),
129154
};
130155

131156
// Aggregation path — columnar when possible, Row[] fallback

0 commit comments

Comments
 (0)