Skip to content

Commit bbfd94e

Browse files
committed
feat: Postgres wire protocol — connect with psql, Tableau, any BI tool
Implements pg wire v3 Simple Query protocol: - Startup handshake (SSL reject, auth ok, parameter status) - Query execution (SQL → compile → pipeline → DataRow responses) - SET/SHOW/RESET compatibility stubs - Error handling with proper ErrorResponse messages One adapter = entire BI ecosystem. 17 tests prove the full round-trip: startup, SSL negotiation, SELECT *, filtered queries, aggregates, CTEs, sequential queries, error recovery.
1 parent 1f7bf34 commit bbfd94e

File tree

5 files changed

+819
-0
lines changed

5 files changed

+819
-0
lines changed

src/pg-wire/handler.ts

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/**
2+
* Postgres wire protocol handler — translates pg queries to QueryMode execution.
3+
*
4+
* Handles the connection lifecycle:
5+
* 1. Startup handshake (SSL reject, auth ok, parameter status)
6+
* 2. Simple Query protocol (parse SQL → compile → execute → format as DataRow)
7+
* 3. Terminate
8+
*/
9+
10+
import type { QueryExecutor } from "../client.js";
11+
import type { Row } from "../types.js";
12+
import { buildSqlDataFrame } from "../sql/index.js";
13+
import {
14+
parseStartupMessage,
15+
parseFrontendMessage,
16+
sslRefused,
17+
authenticationOk,
18+
parameterStatus,
19+
backendKeyData,
20+
readyForQuery,
21+
rowDescription,
22+
dataRow,
23+
commandComplete,
24+
errorResponse,
25+
dtypeToOid,
26+
type FrontendMessage,
27+
} from "./protocol.js";
28+
29+
export interface PgConnectionOptions {
30+
/** QueryMode executor to run queries against */
31+
executor: QueryExecutor;
32+
/** Called when data should be sent to the client */
33+
send: (data: Uint8Array) => void;
34+
}
35+
36+
export class PgConnectionHandler {
37+
private executor: QueryExecutor;
38+
private send: (data: Uint8Array) => void;
39+
private startupDone = false;
40+
private buffer = new Uint8Array(0);
41+
42+
constructor(opts: PgConnectionOptions) {
43+
this.executor = opts.executor;
44+
this.send = opts.send;
45+
}
46+
47+
/** Feed incoming bytes from the client. May trigger responses via send(). */
48+
async onData(chunk: Uint8Array): Promise<void> {
49+
// Append to buffer
50+
const combined = new Uint8Array(this.buffer.length + chunk.length);
51+
combined.set(this.buffer);
52+
combined.set(chunk, this.buffer.length);
53+
this.buffer = combined;
54+
55+
if (!this.startupDone) {
56+
await this.handleStartup();
57+
return;
58+
}
59+
60+
// Parse regular messages
61+
while (this.buffer.length >= 5) {
62+
const result = parseFrontendMessage(this.buffer);
63+
if (!result) break;
64+
const [msg, consumed] = result;
65+
this.buffer = this.buffer.subarray(consumed);
66+
await this.handleMessage(msg);
67+
}
68+
}
69+
70+
private async handleStartup(): Promise<void> {
71+
const msg = parseStartupMessage(this.buffer);
72+
if (!msg) return;
73+
74+
if (msg.type === "ssl_request") {
75+
// Reject SSL, client will retry without
76+
this.send(sslRefused());
77+
this.buffer = this.buffer.subarray(8);
78+
return;
79+
}
80+
81+
if (msg.type === "startup") {
82+
// Consume startup message
83+
const dv = new DataView(this.buffer.buffer, this.buffer.byteOffset);
84+
const len = dv.getInt32(0);
85+
this.buffer = this.buffer.subarray(len);
86+
this.startupDone = true;
87+
88+
// Send auth ok + server params + ready
89+
this.send(authenticationOk());
90+
this.send(parameterStatus("server_version", "15.0 (QueryMode)"));
91+
this.send(parameterStatus("server_encoding", "UTF8"));
92+
this.send(parameterStatus("client_encoding", "UTF8"));
93+
this.send(parameterStatus("DateStyle", "ISO, MDY"));
94+
this.send(backendKeyData(1, 0));
95+
this.send(readyForQuery());
96+
}
97+
}
98+
99+
private async handleMessage(msg: FrontendMessage): Promise<void> {
100+
if (msg.type === "terminate") return;
101+
102+
if (msg.type === "query") {
103+
await this.handleQuery(msg.sql);
104+
}
105+
}
106+
107+
private async handleQuery(sql: string): Promise<void> {
108+
const trimmed = sql.trim().replace(/;$/, "").trim();
109+
110+
// Handle empty query
111+
if (!trimmed) {
112+
this.send(commandComplete("SELECT 0"));
113+
this.send(readyForQuery());
114+
return;
115+
}
116+
117+
// Handle SET/RESET/DISCARD — just acknowledge
118+
const upper = trimmed.toUpperCase();
119+
if (upper.startsWith("SET ") || upper.startsWith("RESET ") || upper.startsWith("DISCARD ")) {
120+
this.send(commandComplete("SET"));
121+
this.send(readyForQuery());
122+
return;
123+
}
124+
125+
// Handle SHOW — return a fake value for compatibility
126+
if (upper.startsWith("SHOW ")) {
127+
const param = trimmed.slice(5).trim().toLowerCase();
128+
const cols = [{ name: param, oid: 25 }];
129+
this.send(rowDescription(cols));
130+
this.send(dataRow(["on"]));
131+
this.send(commandComplete("SHOW"));
132+
this.send(readyForQuery());
133+
return;
134+
}
135+
136+
try {
137+
const df = buildSqlDataFrame(trimmed, this.executor);
138+
const result = await df.collect();
139+
140+
// Build column descriptors
141+
const colNames = result.columns.length > 0
142+
? result.columns
143+
: result.rows.length > 0
144+
? Object.keys(result.rows[0])
145+
: [];
146+
147+
const cols = colNames.map(name => ({
148+
name,
149+
oid: this.inferOid(name, result.rows),
150+
}));
151+
152+
// Send RowDescription
153+
this.send(rowDescription(cols));
154+
155+
// Send DataRows
156+
for (const row of result.rows) {
157+
const values = colNames.map(col => formatValue(row[col]));
158+
this.send(dataRow(values));
159+
}
160+
161+
// Send CommandComplete
162+
this.send(commandComplete(`SELECT ${result.rowCount}`));
163+
this.send(readyForQuery());
164+
} catch (err) {
165+
const msg = err instanceof Error ? err.message : String(err);
166+
this.send(errorResponse(msg));
167+
this.send(readyForQuery());
168+
}
169+
}
170+
171+
private inferOid(colName: string, rows: Row[]): number {
172+
// Find first non-null value to infer type
173+
for (const row of rows) {
174+
const val = row[colName];
175+
if (val === null || val === undefined) continue;
176+
if (typeof val === "number") return Number.isInteger(val) ? 23 : 701;
177+
if (typeof val === "bigint") return 20;
178+
if (typeof val === "boolean") return 16;
179+
if (val instanceof Float32Array) return 25; // vectors as text
180+
return 25; // string
181+
}
182+
return 25; // default to text
183+
}
184+
}
185+
186+
function formatValue(val: unknown): string | null {
187+
if (val === null || val === undefined) return null;
188+
if (typeof val === "bigint") return val.toString();
189+
if (val instanceof Float32Array) return `[${Array.from(val).join(",")}]`;
190+
return String(val);
191+
}

0 commit comments

Comments
 (0)