11import { TableQuery } from "./client.js" ;
22import type { QueryDescriptor , QueryExecutor } from "./client.js" ;
3- import type { AppendResult , ExplainResult , QueryResult , Row } from "./types.js" ;
4- import { bigIntReplacer } from "./decode.js" ;
3+ import type { AppendResult , ExplainResult , QueryResult , Row , QueryDORpc , MasterDORpc } from "./types.js" ;
54import { LocalExecutor } from "./local-executor.js" ;
65
76export { MasterDO } from "./master-do.js" ;
@@ -34,6 +33,8 @@ export type {
3433 AppendResult ,
3534 ExplainResult ,
3635 VectorIndexInfo ,
36+ QueryDORpc ,
37+ MasterDORpc ,
3738} from "./types.js" ;
3839
3940/**
@@ -95,8 +96,9 @@ export class QueryMode {
9596}
9697
9798/**
98- * Executor that sends queries to a regional Query DO.
99- * The DO has cached footers — no metadata round-trip needed.
99+ * Executor that calls regional Query DO via Worker RPC.
100+ * Zero-serialization: structured clone instead of JSON, no HTTP overhead.
101+ * RPC sessions survive DO hibernation — each session is one billable request.
100102 */
101103class RemoteExecutor implements QueryExecutor {
102104 private namespace : DurableObjectNamespace ;
@@ -111,76 +113,45 @@ class RemoteExecutor implements QueryExecutor {
111113 this . masterNamespace = masterNamespace ;
112114 }
113115
114- private getQueryDo ( ) {
116+ /** Get a typed RPC handle for the regional Query DO. */
117+ private getQueryHandle ( ) : QueryDORpc {
115118 const doName = `query-${ this . region } ` ;
116119 const id = this . namespace . idFromName ( doName ) ;
117- return this . locationHint
120+ const doRef = this . locationHint
118121 ? this . namespace . get ( id , { locationHint : this . locationHint as DurableObjectLocationHint } )
119122 : this . namespace . get ( id ) ;
123+ return doRef as unknown as QueryDORpc ;
120124 }
121125
126+ /** Execute query via RPC — zero JSON serialization overhead. */
122127 async execute ( query : QueryDescriptor ) : Promise < QueryResult > {
123- const queryDo = this . getQueryDo ( ) ;
124-
125- const response = await queryDo . fetch ( new Request ( "http://internal/query" , {
126- method : "POST" ,
127- body : JSON . stringify ( query , bigIntReplacer ) ,
128- headers : { "content-type" : "application/json" } ,
129- } ) ) ;
130-
131- if ( ! response . ok ) {
132- const error = await response . text ( ) ;
133- throw new Error ( `QueryMode query failed: ${ error } ` ) ;
134- }
135-
136- return response . json ( ) as Promise < QueryResult > ;
128+ const rpc = this . getQueryHandle ( ) ;
129+ return rpc . queryRpc ( query ) ;
137130 }
138131
132+ /** Append rows via RPC to Master DO. */
139133 async append ( table : string , rows : Record < string , unknown > [ ] ) : Promise < AppendResult > {
140134 if ( ! this . masterNamespace ) {
141135 throw new Error ( "append() requires masterDoNamespace — pass it via QueryMode.remote(queryDO, { masterDO })" ) ;
142136 }
143- // Append goes to Master DO (single writer)
144137 const id = this . masterNamespace . idFromName ( "master" ) ;
145- const masterDo = this . masterNamespace . get ( id ) ;
146-
147- const response = await masterDo . fetch ( new Request ( "http://internal/append" , {
148- method : "POST" ,
149- body : JSON . stringify ( { table, rows } , bigIntReplacer ) ,
150- headers : { "content-type" : "application/json" } ,
151- } ) ) ;
152-
153- if ( ! response . ok ) {
154- const error = await response . text ( ) ;
155- throw new Error ( `QueryMode append failed: ${ error } ` ) ;
156- }
157-
158- return response . json ( ) as Promise < AppendResult > ;
138+ const masterRpc = this . masterNamespace . get ( id ) as unknown as MasterDORpc ;
139+ return masterRpc . appendRpc ( table , rows ) ;
159140 }
160141
142+ /** Stream query results via RPC — native ReadableStream, no HTTP wrapper. */
161143 async executeStream ( query : QueryDescriptor ) : Promise < ReadableStream < Row > > {
162- const queryDo = this . getQueryDo ( ) ;
163-
164- const response = await queryDo . fetch ( new Request ( "http://internal/query/stream" , {
165- method : "POST" ,
166- body : JSON . stringify ( query , bigIntReplacer ) ,
167- headers : { "content-type" : "application/json" } ,
168- } ) ) ;
169-
170- if ( ! response . ok ) {
171- const error = await response . text ( ) ;
172- throw new Error ( `QueryMode stream failed: ${ error } ` ) ;
173- }
174-
175- if ( ! response . body ) throw new Error ( "No response body for stream" ) ;
144+ const rpc = this . getQueryHandle ( ) ;
145+ const byteStream = await rpc . streamRpc ( query ) ;
176146
147+ // Parse NDJSON byte stream into Row objects
177148 const decoder = new TextDecoder ( ) ;
178- const MAX_STREAM_BUFFER = 10 * 1024 * 1024 ; // 10MB
149+ const MAX_STREAM_BUFFER = 10 * 1024 * 1024 ;
179150 let buffer = "" ;
180151
181152 return new ReadableStream < Row > ( {
182153 async start ( controller ) {
183- const reader = response . body ! . getReader ( ) ;
154+ const reader = byteStream . getReader ( ) ;
184155 try {
185156 while ( true ) {
186157 const { done, value } = await reader . read ( ) ;
@@ -206,36 +177,27 @@ class RemoteExecutor implements QueryExecutor {
206177 } ) ;
207178 }
208179
209- private async postQuery < T > ( path : string , query : QueryDescriptor ) : Promise < T > {
210- const queryDo = this . getQueryDo ( ) ;
211- const response = await queryDo . fetch ( new Request ( `http://internal${ path } ` , {
212- method : "POST" ,
213- body : JSON . stringify ( query , bigIntReplacer ) ,
214- headers : { "content-type" : "application/json" } ,
215- } ) ) ;
216- if ( ! response . ok ) {
217- const error = await response . text ( ) ;
218- throw new Error ( `QueryMode ${ path } failed: ${ error } ` ) ;
219- }
220- return response . json ( ) as Promise < T > ;
221- }
222-
180+ /** Count via RPC — returns number directly, no JSON wrapper. */
223181 async count ( query : QueryDescriptor ) : Promise < number > {
224- const body = await this . postQuery < { count : number } > ( "/query/count" , query ) ;
225- return body . count ;
182+ const rpc = this . getQueryHandle ( ) ;
183+ return rpc . countRpc ( query ) ;
226184 }
227185
186+ /** Exists via RPC — returns boolean directly. */
228187 async exists ( query : QueryDescriptor ) : Promise < boolean > {
229- const body = await this . postQuery < { exists : boolean } > ( "/query/exists" , query ) ;
230- return body . exists ;
188+ const rpc = this . getQueryHandle ( ) ;
189+ return rpc . existsRpc ( query ) ;
231190 }
232191
192+ /** First via RPC — returns Row directly. */
233193 async first ( query : QueryDescriptor ) : Promise < Row | null > {
234- const body = await this . postQuery < { row : Row | null } > ( "/query/first" , query ) ;
235- return body . row ;
194+ const rpc = this . getQueryHandle ( ) ;
195+ return rpc . firstRpc ( query ) ;
236196 }
237197
198+ /** Explain via RPC — returns ExplainResult directly. */
238199 async explain ( query : QueryDescriptor ) : Promise < ExplainResult > {
239- return this . postQuery < ExplainResult > ( "/query/explain" , query ) ;
200+ const rpc = this . getQueryHandle ( ) ;
201+ return rpc . explainRpc ( query ) ;
240202 }
241203}
0 commit comments