Skip to content

Commit d734d65

Browse files
committed
fix: aggregation pipeline, offset pagination, and query schema
- Fix buildEdgePipeline stripping aggregate results via ProjectOperator (count/sum/avg returned [{}] instead of aggregated values) - Add aggregation support to scanPages Parquet path (with limit) - Fix offset not applied in applyJsPostProcessing - Add orderBy alias to query schema (maps to sortColumn/sortDirection) - Expand aggregate schema to all 10 functions (was only 5) - Add percentileTarget to aggregate schema - Add 33 HTTP API + schema validation tests (165 total workerd tests)
1 parent cce35d2 commit d734d65

File tree

4 files changed

+237
-7
lines changed

4 files changed

+237
-7
lines changed

src/http-api.test.ts

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/**
2+
* HTTP API + query schema tests.
3+
*
4+
* Part 1: HTTP routing tests using SELF.fetch for stateless endpoints only
5+
* (health, 404 — no DO interaction, no storage mutation).
6+
*
7+
* Part 2: Schema validation tests using parseAndValidateQuery directly
8+
* (covers orderBy alias, aggregate fns, select alias, groupBy, etc.)
9+
*
10+
* Data-dependent query tests are in operators-conformance.test.ts (at scale
11+
* against DuckDB) and convenience.test.ts (in-memory).
12+
*/
13+
import { describe, it, expect } from "vitest";
14+
import { SELF } from "cloudflare:test";
15+
import { parseAndValidateQuery } from "./query-schema.js";
16+
17+
// ── HTTP routing (stateless endpoints only) ─────────────────────────────────
18+
19+
describe("HTTP routing", () => {
20+
it("GET /health returns ok", async () => {
21+
const res = await SELF.fetch("https://fake-host/health");
22+
expect(res.status).toBe(200);
23+
expect(res.headers.get("content-type")).toBe("application/json");
24+
expect(res.headers.get("x-querymode-request-id")).toBeDefined();
25+
const body = await res.json() as Record<string, unknown>;
26+
expect(body.status).toBe("ok");
27+
expect(body.service).toBe("querymode");
28+
expect(body.timestamp).toBeDefined();
29+
});
30+
31+
it("GET /nonexistent returns 404", async () => {
32+
const res = await SELF.fetch("https://fake-host/nonexistent");
33+
expect(res.status).toBe(404);
34+
const text = await res.text();
35+
expect(text).toContain("/query");
36+
});
37+
});
38+
39+
// ── Schema validation ─────────────────────────────────────────────────────
40+
41+
describe("parseAndValidateQuery", () => {
42+
it("parses minimal query", () => {
43+
const q = parseAndValidateQuery({ table: "events" });
44+
expect(q.table).toBe("events");
45+
expect(q.filters).toEqual([]);
46+
expect(q.projections).toEqual([]);
47+
});
48+
49+
it("parses filter with all ops", () => {
50+
for (const op of ["eq", "neq", "gt", "gte", "lt", "lte", "in"]) {
51+
const value = op === "in" ? [1, 2] : 42;
52+
const q = parseAndValidateQuery({
53+
table: "t",
54+
filters: [{ column: "x", op, value }],
55+
});
56+
expect(q.filters[0].op).toBe(op);
57+
}
58+
});
59+
60+
it("parses sortColumn + sortDirection", () => {
61+
const q = parseAndValidateQuery({
62+
table: "t",
63+
sortColumn: "amount",
64+
sortDirection: "desc",
65+
});
66+
expect(q.sortColumn).toBe("amount");
67+
expect(q.sortDirection).toBe("desc");
68+
});
69+
70+
it("parses orderBy alias → sortColumn/sortDirection", () => {
71+
const q = parseAndValidateQuery({
72+
table: "t",
73+
orderBy: { column: "amount", desc: true },
74+
});
75+
expect(q.sortColumn).toBe("amount");
76+
expect(q.sortDirection).toBe("desc");
77+
});
78+
79+
it("orderBy without desc defaults to asc", () => {
80+
const q = parseAndValidateQuery({
81+
table: "t",
82+
orderBy: { column: "x" },
83+
});
84+
expect(q.sortColumn).toBe("x");
85+
expect(q.sortDirection).toBe("asc");
86+
});
87+
88+
it("sortColumn takes precedence over orderBy", () => {
89+
const q = parseAndValidateQuery({
90+
table: "t",
91+
sortColumn: "a",
92+
sortDirection: "desc",
93+
orderBy: { column: "b" },
94+
});
95+
expect(q.sortColumn).toBe("a");
96+
expect(q.sortDirection).toBe("desc");
97+
});
98+
99+
it("parses select alias → projections", () => {
100+
const q = parseAndValidateQuery({
101+
table: "t",
102+
select: ["id", "name"],
103+
});
104+
expect(q.projections).toEqual(["id", "name"]);
105+
});
106+
107+
it("projections takes precedence over select", () => {
108+
const q = parseAndValidateQuery({
109+
table: "t",
110+
projections: ["a"],
111+
select: ["b"],
112+
});
113+
expect(q.projections).toEqual(["a"]);
114+
});
115+
116+
it("parses limit + offset", () => {
117+
const q = parseAndValidateQuery({ table: "t", limit: 10, offset: 5 });
118+
expect(q.limit).toBe(10);
119+
expect(q.offset).toBe(5);
120+
});
121+
122+
it("parses groupBy", () => {
123+
const q = parseAndValidateQuery({
124+
table: "t",
125+
groupBy: ["region", "category"],
126+
});
127+
expect(q.groupBy).toEqual(["region", "category"]);
128+
});
129+
130+
it("parses cacheTTL", () => {
131+
const q = parseAndValidateQuery({ table: "t", cacheTTL: 60 });
132+
expect(q.cacheTTL).toBe(60);
133+
});
134+
135+
// Aggregate functions
136+
for (const fn of ["sum", "avg", "min", "max", "count", "count_distinct", "stddev", "variance", "median", "percentile"]) {
137+
it(`accepts aggregate fn="${fn}"`, () => {
138+
const q = parseAndValidateQuery({
139+
table: "t",
140+
aggregates: [{ fn, column: "x" }],
141+
});
142+
expect(q.aggregates).toHaveLength(1);
143+
expect(q.aggregates![0].fn).toBe(fn);
144+
});
145+
}
146+
147+
it("accepts percentileTarget", () => {
148+
const q = parseAndValidateQuery({
149+
table: "t",
150+
aggregates: [{ fn: "percentile", column: "x", percentileTarget: 0.95 }],
151+
});
152+
expect(q.aggregates![0].percentileTarget).toBe(0.95);
153+
});
154+
155+
it("accepts aggregate alias", () => {
156+
const q = parseAndValidateQuery({
157+
table: "t",
158+
aggregates: [{ fn: "sum", column: "amount", alias: "total" }],
159+
});
160+
expect(q.aggregates![0].alias).toBe("total");
161+
});
162+
163+
// Rejection cases
164+
it("rejects empty table name", () => {
165+
expect(() => parseAndValidateQuery({ table: "" })).toThrow(/table/i);
166+
});
167+
168+
it("rejects missing table", () => {
169+
expect(() => parseAndValidateQuery({})).toThrow();
170+
});
171+
172+
it("rejects invalid filter op", () => {
173+
expect(() => parseAndValidateQuery({
174+
table: "t",
175+
filters: [{ column: "x", op: "INVALID", value: 1 }],
176+
})).toThrow();
177+
});
178+
179+
it("rejects invalid aggregate fn", () => {
180+
expect(() => parseAndValidateQuery({
181+
table: "t",
182+
aggregates: [{ fn: "BOGUS", column: "x" }],
183+
})).toThrow();
184+
});
185+
186+
it("rejects empty filter column", () => {
187+
expect(() => parseAndValidateQuery({
188+
table: "t",
189+
filters: [{ column: "", op: "eq", value: 1 }],
190+
})).toThrow();
191+
});
192+
193+
it("rejects negative limit", () => {
194+
expect(() => parseAndValidateQuery({ table: "t", limit: -1 })).toThrow();
195+
});
196+
197+
it("rejects negative offset", () => {
198+
expect(() => parseAndValidateQuery({ table: "t", offset: -5 })).toThrow();
199+
});
200+
201+
it("rejects percentileTarget out of range", () => {
202+
expect(() => parseAndValidateQuery({
203+
table: "t",
204+
aggregates: [{ fn: "percentile", column: "x", percentileTarget: 1.5 }],
205+
})).toThrow();
206+
});
207+
});

src/operators.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2020,7 +2020,10 @@ export function buildEdgePipeline(
20202020
pipeline = new LimitOperator(pipeline, query.limit ?? Infinity, query.offset ?? 0);
20212021
}
20222022

2023-
if (outputColumns.length > 0) {
2023+
// Skip projection when aggregation is active — aggregate output columns
2024+
// (aliases like "count_*", "sum_value") don't match the original table columns,
2025+
// so projecting to table columns would strip all aggregate results.
2026+
if (!hasAgg && outputColumns.length > 0) {
20242027
pipeline = new ProjectOperator(pipeline, outputColumns);
20252028
}
20262029

src/query-do.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
FilterOperator, HashJoinOperator, ProjectOperator,
1818
DEFAULT_MEMORY_BUDGET,
1919
} from "./operators.js";
20+
import { computePartialAgg, finalizePartialAgg } from "./partial-agg.js";
2021
import { VipCache } from "./vip-cache.js";
2122
import { parseLanceV2Columns } from "./lance-v2.js";
2223
import { parseAndValidateQuery } from "./query-schema.js";
@@ -435,7 +436,10 @@ export class QueryDO extends DurableObject<Env> {
435436
return va < vb ? -dir : va > vb ? dir : 0;
436437
});
437438
}
438-
if (query.limit && query.limit > 0) result = result.slice(0, query.limit);
439+
const offset = query.offset ?? 0;
440+
if (offset > 0 || (query.limit && query.limit > 0)) {
441+
result = result.slice(offset, query.limit ? offset + query.limit : undefined);
442+
}
439443
return result;
440444
}
441445

@@ -1041,9 +1045,19 @@ export class QueryDO extends DurableObject<Env> {
10411045
let rows = this.applyJsPostProcessing(
10421046
this.assembleRowsFromColumns(decodedColumns, colNames), query,
10431047
);
1048+
1049+
// Handle aggregation in the scanPages Parquet path
1050+
if (query.aggregates && query.aggregates.length > 0) {
1051+
const partial = computePartialAgg(rows, query);
1052+
rows = finalizePartialAgg(partial, query);
1053+
}
1054+
10441055
const wasmExecMs = Date.now() - wasmStart;
1056+
const outputCols = (query.aggregates && query.aggregates.length > 0)
1057+
? Object.keys(rows[0] ?? {})
1058+
: colNames;
10451059
return {
1046-
rows, rowCount: rows.length, columns: colNames,
1060+
rows, rowCount: rows.length, columns: outputCols,
10471061
bytesRead, pagesSkipped, durationMs: Date.now() - t0,
10481062
r2ReadMs, wasmExecMs, cacheHits, cacheMisses,
10491063
edgeCacheHits, edgeCacheMisses,

src/query-schema.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ const filterOpSchema = z.object({
1818
});
1919

2020
const aggregateOpSchema = z.object({
21-
fn: z.enum(["sum", "avg", "min", "max", "count"]),
21+
fn: z.enum(["sum", "avg", "min", "max", "count", "count_distinct", "stddev", "variance", "median", "percentile"]),
2222
column: z.string().min(1, "Aggregate column name cannot be empty"),
2323
alias: z.string().optional(),
24+
percentileTarget: z.number().min(0).max(1).optional(),
2425
});
2526

2627
const vectorSearchSchema = z.object({
@@ -39,6 +40,7 @@ export const queryDescriptorSchema = z.object({
3940
select: z.array(z.string()).optional(), // alias for projections
4041
sortColumn: z.string().optional(),
4142
sortDirection: z.enum(["asc", "desc"]).optional(),
43+
orderBy: z.object({ column: z.string(), desc: z.boolean().optional() }).optional(), // alias for sortColumn/sortDirection
4244
limit: z.number().int().positive().optional(),
4345
offset: z.number().int().nonnegative().optional(),
4446
vectorSearch: vectorSearchSchema.optional(),
@@ -62,7 +64,7 @@ export function parseAndValidateQuery(body: unknown): {
6264
limit?: number;
6365
offset?: number;
6466
vectorSearch?: { column: string; queryVector: number[] | Float32Array; topK: number };
65-
aggregates?: { fn: "sum" | "avg" | "min" | "max" | "count"; column: string; alias?: string }[];
67+
aggregates?: { fn: "sum" | "avg" | "min" | "max" | "count" | "count_distinct" | "stddev" | "variance" | "median" | "percentile"; column: string; alias?: string; percentileTarget?: number }[];
6668
groupBy?: string[];
6769
cacheTTL?: number;
6870
} {
@@ -80,12 +82,16 @@ export function parseAndValidateQuery(body: unknown): {
8082
? data.projections
8183
: (data.select ?? []);
8284

85+
// Merge `orderBy` alias into `sortColumn` / `sortDirection`
86+
const sortColumn = data.sortColumn ?? data.orderBy?.column;
87+
const sortDirection = data.sortDirection ?? (data.orderBy?.desc ? "desc" : data.orderBy ? "asc" : undefined);
88+
8389
return {
8490
table: data.table,
8591
filters: data.filters,
8692
projections,
87-
sortColumn: data.sortColumn,
88-
sortDirection: data.sortDirection,
93+
sortColumn,
94+
sortDirection,
8995
limit: data.limit,
9096
offset: data.offset,
9197
vectorSearch: data.vectorSearch,

0 commit comments

Comments
 (0)