Skip to content

Commit 776c659

Browse files
committed
feat: add .pipe(), toOperator(), fromOperator() for composable pipelines
DataFrame.pipe(fn) injects custom operators mid-chain without breaking the fluent API. toOperator() escapes to raw operators, fromOperator() re-enters. PipeStage type and all operators now exported from index. - pipe stages applied after filter/computed, before agg/sort in buildEdgePipeline - 6 tests covering pipe basic, chaining, filter-after, immutability, fromOperator - new docs/composability.mdx page: Unix pipes framing, three levels of control - updated operators.mdx with pipe and escape hatch sections
1 parent c04e28d commit 776c659

File tree

6 files changed

+404
-1
lines changed

6 files changed

+404
-1
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
---
2+
title: Composability
3+
description: Operators are building blocks. Your code assembles the pipeline.
4+
---
5+
6+
## Operators are Unix pipes
7+
8+
Every operator in QueryMode implements one interface:
9+
10+
```typescript
11+
interface Operator {
12+
next(): Promise<RowBatch | null>
13+
close(): Promise<void>
14+
}
15+
```
16+
17+
Each operator reads from upstream and writes downstream. Wrap one operator around another — that's composition. Same idea as `cat | grep | sort | uniq`, except the data is columnar row batches instead of text lines.
18+
19+
```
20+
ScanOperator → FilterOperator → YourCustomOperator → AggregateOperator → ProjectOperator
21+
```
22+
23+
## Three levels of control
24+
25+
### 1. DataFrame chain
26+
27+
The DataFrame API handles the common case. Filter, aggregate, sort, join — all without thinking about operators:
28+
29+
```typescript
30+
const result = await qm.table("events")
31+
.filter("status", "eq", "active")
32+
.groupBy("region")
33+
.aggregate("sum", "amount", "total")
34+
.sort("total", "desc")
35+
.limit(10)
36+
.collect()
37+
```
38+
39+
Under the hood, this builds a pipeline of operators. You don't see them, but they're there — doing page-level skip, SIMD decode, partial aggregation.
40+
41+
### 2. `.pipe()` — inject custom operators mid-chain
42+
43+
When you need something the built-in methods don't cover, `.pipe()` lets you inject any operator without leaving the chain:
44+
45+
```typescript
46+
const result = await qm.table("events")
47+
.filter("created_at", "gte", "2024-01-01")
48+
.pipe(upstream => new ComputedColumnOperator(upstream, [
49+
{ alias: "risk_score", fn: row => riskModel.predict(row) },
50+
]))
51+
.pipe(upstream => new FilterOperator(upstream, [
52+
{ column: "risk_score", op: "gt", value: 0.8 },
53+
]))
54+
.sort("risk_score", "desc")
55+
.limit(50)
56+
.collect()
57+
```
58+
59+
ML scoring inside the query pipeline. No round-trip to a separate service, no serialization. The model runs on the same row batch that the filter just produced.
60+
61+
### 3. `toOperator()` / `fromOperator()` — full escape hatch
62+
63+
For maximum control, break out of the DataFrame entirely:
64+
65+
```typescript
66+
// Escape: DataFrame → raw Operator
67+
const op = await qm.table("events")
68+
.filter("status", "eq", "active")
69+
.toOperator()
70+
71+
// Your code: arbitrary operator composition
72+
const enriched = new ComputedColumnOperator(op, [
73+
{ alias: "score", fn: row => score(row) },
74+
])
75+
const deduped = new DistinctOperator(enriched, ["user_id"])
76+
77+
// Re-enter: raw Operator → DataFrame
78+
const result = await DataFrame.fromOperator(deduped, executor)
79+
.sort("score", "desc")
80+
.limit(10)
81+
.collect()
82+
```
83+
84+
You left the DataFrame, did whatever you wanted with raw operators, and came back. The DataFrame doesn't know or care what happened in between.
85+
86+
## Write your own operators
87+
88+
Any object that implements `next()` and `close()` is an operator:
89+
90+
```typescript
91+
class SamplingOperator implements Operator {
92+
constructor(
93+
private upstream: Operator,
94+
private rate: number,
95+
) {}
96+
97+
async next(): Promise<RowBatch | null> {
98+
const batch = await this.upstream.next()
99+
if (!batch) return null
100+
return batch.filter(() => Math.random() < this.rate)
101+
}
102+
103+
async close() {
104+
await this.upstream.close()
105+
}
106+
}
107+
108+
// Use it
109+
const result = await qm.table("events")
110+
.filter("status", "eq", "active")
111+
.pipe(upstream => new SamplingOperator(upstream, 0.1))
112+
.collect()
113+
```
114+
115+
No framework, no plugin system, no registration. It's a two-method interface.
116+
117+
## Why this matters
118+
119+
Traditional query engines are black boxes. You send a SQL string, get rows back. You can't put a rate limiter between the scan and the filter. You can't run ML scoring before the aggregation. You can't swap the sort algorithm based on the data size.
120+
121+
With QueryMode, the pipeline is your code. The operators do real query engine work — page-level skip, SIMD decode, memory-bounded spill — but you control how they're assembled. That's the difference between using a query engine and building with one.

docs/src/content/docs/operators.mdx

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,65 @@ const scored = new ComputedColumnOperator(filtered, [
119119
])
120120
const top10 = new TopKOperator(scored, "ml_score", true, 10)
121121
```
122+
123+
## Pipe: inject custom stages into the DataFrame
124+
125+
The DataFrame API's `.pipe()` method lets you inject any custom operator into the pipeline without leaving the fluent chain:
126+
127+
```typescript
128+
import { QueryMode, ComputedColumnOperator } from "querymode"
129+
130+
const qm = QueryMode.local()
131+
const result = await qm.table("events")
132+
.filter("created_at", "gte", "2024-01-01")
133+
.pipe(upstream => new ComputedColumnOperator(upstream, [
134+
{ alias: "score", fn: row => scoreModel.predict(row) },
135+
]))
136+
.sort("score", "desc")
137+
.limit(10)
138+
.collect()
139+
```
140+
141+
Chain multiple `.pipe()` calls — each one wraps the previous stage:
142+
143+
```typescript
144+
const result = await qm.table("events")
145+
.filter("status", "eq", "active")
146+
.pipe(upstream => new ComputedColumnOperator(upstream, [
147+
{ alias: "risk", fn: row => computeRisk(row) },
148+
]))
149+
.pipe(upstream => new FilterOperator(upstream, [
150+
{ column: "risk", op: "gt", value: 0.8 },
151+
]))
152+
.collect()
153+
```
154+
155+
## Escape hatch: toOperator / fromOperator
156+
157+
For full control, break out of the DataFrame into raw operators, then re-enter:
158+
159+
```typescript
160+
// Escape: get the raw Operator pipeline
161+
const op = await qm.table("events")
162+
.filter("status", "eq", "active")
163+
.toOperator()
164+
165+
// Your code: wrap with custom operators
166+
const scored = new ComputedColumnOperator(op, [
167+
{ alias: "score", fn: row => scoreModel.predict(row) },
168+
])
169+
const filtered = new FilterOperator(scored, [
170+
{ column: "score", op: "gt", value: 0.9 },
171+
])
172+
173+
// Re-enter: wrap back into a DataFrame
174+
const result = await DataFrame.fromOperator(filtered, executor)
175+
.sort("score", "desc")
176+
.limit(10)
177+
.collect()
178+
```
179+
180+
Three levels of control:
181+
1. **DataFrame chain** — convenience methods for common operations
182+
2. **`.pipe()`** — inject custom operators without leaving the chain
183+
3. **`toOperator()` / `fromOperator()`** — full escape hatch for arbitrary composition

src/client.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import type {
1414
VectorSearchParams,
1515
WindowSpec,
1616
} from "./types.js";
17+
import type { Operator, RowBatch } from "./operators.js";
1718

1819
// ---------------------------------------------------------------------------
1920
// Progress callback for collect()
@@ -99,6 +100,7 @@ export class DataFrame<T extends Row = Row> {
99100
private _version?: number;
100101
private _vectorEncoder?: (text: string) => Promise<Float32Array>;
101102
private _vectorQueryText?: string;
103+
private _pipeStages: PipeStage[];
102104
private _executor: QueryExecutor;
103105

104106
constructor(table: string, executor: QueryExecutor, init?: Partial<DataFrameInit>) {
@@ -124,6 +126,7 @@ export class DataFrame<T extends Row = Row> {
124126
this._version = init?.version;
125127
this._vectorEncoder = init?.vectorEncoder;
126128
this._vectorQueryText = init?.vectorQueryText;
129+
this._pipeStages = init?.pipeStages ?? [];
127130
}
128131

129132
/** Create a shallow clone with overrides — returns a new immutable DataFrame. */
@@ -149,6 +152,7 @@ export class DataFrame<T extends Row = Row> {
149152
version: this._version,
150153
vectorEncoder: this._vectorEncoder,
151154
vectorQueryText: this._vectorQueryText,
155+
pipeStages: this._pipeStages,
152156
...overrides,
153157
});
154158
}
@@ -332,6 +336,25 @@ export class DataFrame<T extends Row = Row> {
332336
return this.derive({ windows: [...this._windows, spec] }) as DataFrame;
333337
}
334338

339+
/**
340+
* Inject a custom operator into the pipeline. The function receives the
341+
* upstream Operator and must return a new Operator. Runs at collect() time.
342+
*
343+
* const result = await qm.table("events")
344+
* .filter("created_at", "gte", startDate)
345+
* .pipe(upstream => new ComputedColumnOperator(upstream, [
346+
* { alias: "score", fn: row => model.predict(row) },
347+
* ]))
348+
* .sort("score", "desc")
349+
* .limit(10)
350+
* .collect()
351+
*/
352+
pipe(fn: (upstream: Operator) => Operator): DataFrame {
353+
return this.derive({
354+
pipeStages: [...(this._pipeStages ?? []), fn],
355+
}) as DataFrame;
356+
}
357+
335358
/** Deduplicate rows. If no columns specified, dedup on all columns. */
336359
distinct(...columns: string[]): DataFrame<T> {
337360
return this.derive({ distinct: columns.length > 0 ? columns : [] });
@@ -584,6 +607,47 @@ export class DataFrame<T extends Row = Row> {
584607
return this._executor.executeStream(this.toDescriptor());
585608
}
586609

610+
/**
611+
* Escape hatch: get the raw Operator for this query. Resolves deferred
612+
* subqueries and vector encoders, then returns an Operator you can wrap
613+
* with your own operators before re-entering via DataFrame.fromOperator().
614+
*
615+
* Requires an executor that supports toOperator (local-executor, query-do).
616+
*/
617+
async toOperator(): Promise<Operator> {
618+
if (!this._executor.toOperator) {
619+
throw new Error("toOperator() requires an executor with operator pipeline support");
620+
}
621+
const desc = await this.resolveDeferred();
622+
return this._executor.toOperator(desc);
623+
}
624+
625+
/**
626+
* Re-entry: wrap a raw Operator back into a DataFrame for further chaining.
627+
* The operator is drained at collect() time.
628+
*/
629+
static fromOperator<R extends Row = Row>(op: Operator, executor: QueryExecutor): DataFrame<R> {
630+
const drainExecutor: QueryExecutor = {
631+
async execute(_query: QueryDescriptor): Promise<QueryResult> {
632+
const rows: Row[] = [];
633+
let batch: RowBatch | null;
634+
while ((batch = await op.next()) !== null) {
635+
rows.push(...batch);
636+
}
637+
await op.close();
638+
return {
639+
rows,
640+
rowCount: rows.length,
641+
columns: rows.length > 0 ? Object.keys(rows[0]) : [],
642+
bytesRead: 0,
643+
pagesSkipped: 0,
644+
durationMs: 0,
645+
};
646+
},
647+
};
648+
return new DataFrame<R>("__from_operator__", drainExecutor);
649+
}
650+
587651
toDescriptor(): QueryDescriptor {
588652
return {
589653
table: this._table,
@@ -606,6 +670,7 @@ export class DataFrame<T extends Row = Row> {
606670
? this._subqueryIn.map(sq => ({ column: sq.column, valueSet: sq.valueSet }))
607671
: undefined,
608672
version: this._version,
673+
pipeStages: this._pipeStages.length > 0 ? this._pipeStages : undefined,
609674
};
610675
}
611676
}
@@ -736,6 +801,9 @@ export class MaterializedExecutor implements QueryExecutor {
736801
// DataFrameInit — internal state for derive()
737802
// ---------------------------------------------------------------------------
738803

804+
/** A function that wraps an upstream Operator, returning a new Operator. */
805+
export type PipeStage = (upstream: Operator) => Operator;
806+
739807
interface DataFrameInit {
740808
filters: FilterOp[];
741809
projections: string[];
@@ -757,6 +825,7 @@ interface DataFrameInit {
757825
version?: number;
758826
vectorEncoder?: (text: string) => Promise<Float32Array>;
759827
vectorQueryText?: string;
828+
pipeStages: PipeStage[];
760829
}
761830

762831
// ---------------------------------------------------------------------------
@@ -784,6 +853,7 @@ export interface QueryDescriptor {
784853
setOperation?: { mode: "union" | "union_all" | "intersect" | "except"; right: QueryDescriptor };
785854
subqueryIn?: { column: string; valueSet: Set<string> }[];
786855
version?: number;
856+
pipeStages?: PipeStage[];
787857
}
788858

789859
/** Interface for query execution backends (local, DO, browser) */
@@ -805,4 +875,6 @@ export interface QueryExecutor {
805875
explain?(query: QueryDescriptor): Promise<ExplainResult>;
806876
/** Optional: lazy batch iteration */
807877
cursor?(query: QueryDescriptor, batchSize: number): AsyncIterable<Row[]>;
878+
/** Optional: return the raw Operator pipeline for escape-hatch composition */
879+
toOperator?(query: QueryDescriptor): Promise<Operator>;
808880
}

src/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ export { ReaderRegistry, FileDataSource, UrlDataSource } from "./reader.js";
1515
export type { FormatReader, DataSource } from "./reader.js";
1616
export { DataFrame, TableQuery, MaterializedExecutor } from "./client.js";
1717
export { LazyResultHandle } from "./client.js";
18+
export {
19+
FilterOperator, AggregateOperator, TopKOperator, ProjectOperator,
20+
ComputedColumnOperator, HashJoinOperator, ExternalSortOperator,
21+
InMemorySortOperator, WindowOperator, DistinctOperator, SetOperator,
22+
LimitOperator, SubqueryInOperator,
23+
drainPipeline, buildEdgePipeline,
24+
} from "./operators.js";
25+
export type { Operator, RowBatch } from "./operators.js";
1826
export { QueryModeError } from "./errors.js";
1927
export type { ErrorCode } from "./errors.js";
2028
export { LocalExecutor } from "./local-executor.js";
@@ -29,7 +37,7 @@ export { HnswIndex, cosineDistance, l2DistanceSq, dotDistance } from "./hnsw.js"
2937
export type { HnswOptions } from "./hnsw.js";
3038
export { MaterializationCache, queryHashKey } from "./lazy.js";
3139
export type { MaterializationCacheOptions } from "./lazy.js";
32-
export type { QueryExecutor, QueryDescriptor, ProgressInfo, CollectOptions } from "./client.js";
40+
export type { QueryExecutor, QueryDescriptor, ProgressInfo, CollectOptions, PipeStage } from "./client.js";
3341
export type {
3442
Env,
3543
Footer,

src/operators.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1977,6 +1977,13 @@ export function buildEdgePipeline(
19771977
pipeline = new ComputedColumnOperator(pipeline, query.computedColumns);
19781978
}
19791979

1980+
// User-injected pipe stages (inserted after filter/computed, before agg/sort)
1981+
if (query.pipeStages) {
1982+
for (const stage of query.pipeStages) {
1983+
pipeline = stage(pipeline);
1984+
}
1985+
}
1986+
19801987
// Window functions
19811988
if (query.windows && query.windows.length > 0) {
19821989
pipeline = new WindowOperator(pipeline, query.windows);

0 commit comments

Comments
 (0)