From 22db0f55466076d16884d17798f76a9ff90c873c Mon Sep 17 00:00:00 2001 From: Nick Fujita Date: Thu, 28 Aug 2025 23:57:38 +0900 Subject: [PATCH 01/14] 'add bugger to incoming queries to wait for full message across multiple tcp packages' --- packages/pglite-socket/src/index.ts | 124 ++++++++++++++++++---------- 1 file changed, 79 insertions(+), 45 deletions(-) diff --git a/packages/pglite-socket/src/index.ts b/packages/pglite-socket/src/index.ts index 6a376437f..19bf18a7f 100644 --- a/packages/pglite-socket/src/index.ts +++ b/packages/pglite-socket/src/index.ts @@ -32,6 +32,7 @@ export class PGLiteSocketHandler extends EventTarget { private inspect: boolean private debug: boolean private readonly id: number + private messageBuffer: Buffer = Buffer.alloc(0) // Static counter for generating unique handler IDs private static nextHandlerId = 1 @@ -155,6 +156,7 @@ export class PGLiteSocketHandler extends EventTarget { this.socket = null this.active = false + this.messageBuffer = Buffer.alloc(0) return this } @@ -176,56 +178,88 @@ export class PGLiteSocketHandler extends EventTarget { this.log(`handleData: received ${data.length} bytes`) + // Append to buffer for message reassembly + this.messageBuffer = Buffer.concat([this.messageBuffer, data]) + // Print the incoming data to the console this.inspectData('incoming', data) try { - // Process the raw protocol data - this.log(`handleData: sending data to PGlite for processing`) - const result = await this.db.execProtocolRaw(new Uint8Array(data)) - - this.log(`handleData: received ${result.length} bytes from PGlite`) - - // Print the outgoing data to the console - this.inspectData('outgoing', result) - - // Send the result back if the socket is still connected - if (this.socket && this.socket.writable && this.active) { - if (result.length <= 0) { - this.log(`handleData: cowardly refusing to send empty packet`) - return new Promise((_, reject) => reject('no data')) - } - - const promise = new Promise((resolve, reject) => { - this.log(`handleData: writing response to socket`) - if (this.socket) { - this.socket.write(Buffer.from(result), (err?: Error) => { - if (err) { - reject(`Error while writing to the socket ${err.toString()}`) - } else { - resolve(result.length) - } - }) - } else { - reject(`No socket`) + let totalProcessed = 0 + + while (this.messageBuffer.length > 0) { + // Determine message length + let messageLength = 0 + let isComplete = false + + // Handle startup message (no type byte, just length) + if (this.messageBuffer.length >= 4) { + const firstInt = this.messageBuffer.readInt32BE(0) + + if (this.messageBuffer.length >= 8) { + const secondInt = this.messageBuffer.readInt32BE(4) + // PostgreSQL 3.0 protocol version + if (secondInt === 196608 || secondInt === 0x00030000) { + messageLength = firstInt + isComplete = this.messageBuffer.length >= messageLength + } } - }) - - // Emit data event with byte sizes - this.dispatchEvent( - new CustomEvent('data', { - detail: { incoming: data.length, outgoing: result.length }, - }), - ) - return promise - } else { - this.log( - `handleData: socket no longer writable or active, discarding response`, - ) - return new Promise((_, reject) => - reject(`No socket, not active or not writeable`), - ) + + // Regular message (type byte + length) + if (!isComplete && this.messageBuffer.length >= 5) { + const msgLength = this.messageBuffer.readInt32BE(1) + messageLength = 1 + msgLength + isComplete = this.messageBuffer.length >= messageLength + } + } + + if (!isComplete || messageLength === 0) { + this.log(`handleData: incomplete message, buffering ${this.messageBuffer.length} bytes`) + break + } + + // Extract and process complete message + const message = this.messageBuffer.slice(0, messageLength) + this.messageBuffer = this.messageBuffer.slice(messageLength) + + this.log(`handleData: processing message of ${message.length} bytes`) + const result = await this.db.execProtocolRaw(new Uint8Array(message)) + + this.log(`handleData: received ${result.length} bytes from PGlite`) + + // Print the outgoing data to the console + this.inspectData('outgoing', result) + + // Send response if available + if (result.length > 0 && this.socket && this.socket.writable && this.active) { + await new Promise((resolve, reject) => { + this.log(`handleData: writing response to socket`) + if (this.socket) { + this.socket.write(Buffer.from(result), (err?: any) => { + if (err) { + reject(`Error while writing to the socket ${err.toString()}`) + } else { + resolve(result.length) + } + }) + } else { + reject(`No socket`) + } + }) + } + + totalProcessed += message.length } + + // Emit data event with byte sizes + this.dispatchEvent( + new CustomEvent('data', { + detail: { incoming: data.length, outgoing: totalProcessed }, + }), + ) + + return totalProcessed + } catch (err) { this.log(`handleData: error processing data:`, err) this.handleError(err as Error) @@ -717,4 +751,4 @@ export class PGLiteSocketServer extends EventTarget { throw err } } -} +} \ No newline at end of file From 2403cc0b1b39395e32a15bd001230be3e13aa117 Mon Sep 17 00:00:00 2001 From: Nick Fujita Date: Fri, 29 Aug 2025 12:42:15 +0900 Subject: [PATCH 02/14] 'Add connection multiplexing and query level queue' --- packages/pglite-socket/src/index.ts | 704 +++++++++++++--------------- 1 file changed, 335 insertions(+), 369 deletions(-) diff --git a/packages/pglite-socket/src/index.ts b/packages/pglite-socket/src/index.ts index 19bf18a7f..4f111facb 100644 --- a/packages/pglite-socket/src/index.ts +++ b/packages/pglite-socket/src/index.ts @@ -1,80 +1,177 @@ import type { PGlite } from '@electric-sql/pglite' -import { createServer, Server, Socket } from 'net' +import { type Server, type Socket, createServer } from 'net' // Connection queue timeout in milliseconds export const CONNECTION_QUEUE_TIMEOUT = 60000 // 60 seconds +/** + * Represents a queued query waiting for PGlite access + */ +interface QueuedQuery { + handlerId: number + message: Uint8Array + resolve: (result: Uint8Array) => void + reject: (error: Error) => void + timestamp: number +} + +/** + * Global query queue manager + * Ensures only one query executes at a time in PGlite + */ +class QueryQueueManager { + private queue: QueuedQuery[] = [] + private processing = false + private db: PGlite + private debug: boolean + + constructor(db: PGlite, debug = false) { + this.db = db + this.debug = debug + } + + private log(message: string, ...args: any[]): void { + if (this.debug) { + console.log(`[QueryQueueManager] ${message}`, ...args) + } + } + + async enqueue(handlerId: number, message: Uint8Array): Promise { + return new Promise((resolve, reject) => { + const query: QueuedQuery = { + handlerId, + message, + resolve, + reject, + timestamp: Date.now(), + } + + this.queue.push(query) + this.log( + `enqueued query from handler #${handlerId}, queue size: ${this.queue.length}`, + ) + + // Process queue if not already processing + if (!this.processing) { + this.processQueue() + } + }) + } + + private async processQueue(): Promise { + if (this.processing || this.queue.length === 0) { + return + } + + this.processing = true + + while (this.queue.length > 0) { + const query = this.queue.shift() + if (!query) break + + const waitTime = Date.now() - query.timestamp + this.log( + `processing query from handler #${query.handlerId} (waited ${waitTime}ms)`, + ) + + try { + // Execute the query with exclusive access to PGlite + const result = await this.db.runExclusive(async () => { + return await this.db.execProtocolRaw(query.message) + }) + + this.log( + `query from handler #${query.handlerId} completed, ${result.length} bytes`, + ) + query.resolve(result) + } catch (error) { + this.log(`query from handler #${query.handlerId} failed:`, error) + query.reject(error as Error) + } + } + + this.processing = false + this.log(`queue processing complete, queue is empty`) + } + + getQueueLength(): number { + return this.queue.length + } + + clearQueueForHandler(handlerId: number): void { + const before = this.queue.length + this.queue = this.queue.filter((q) => { + if (q.handlerId === handlerId) { + q.reject(new Error('Handler disconnected')) + return false + } + return true + }) + const removed = before - this.queue.length + if (removed > 0) { + this.log(`cleared ${removed} queries for handler #${handlerId}`) + } + } +} + /** * Options for creating a PGLiteSocketHandler */ export interface PGLiteSocketHandlerOptions { - /** The PGlite database instance */ - db: PGlite + /** The query queue manager */ + queryQueue: QueryQueueManager /** Whether to close the socket when detached (default: false) */ closeOnDetach?: boolean /** Print the incoming and outgoing data to the console in hex and ascii */ inspect?: boolean /** Enable debug logging of method calls */ debug?: boolean + /** Idle timeout in ms (0 to disable, default: 0) */ + idleTimeout?: number } /** - * Low-level handler for a single socket connection to PGLite - * Handles the raw protocol communication between a socket and PGLite + * Handler for a single socket connection to PGlite + * Each connection can remain open and send multiple queries */ export class PGLiteSocketHandler extends EventTarget { - readonly db: PGlite + private queryQueue: QueryQueueManager private socket: Socket | null = null private active = false private closeOnDetach: boolean - private resolveLock?: () => void - private rejectLock?: (err: Error) => void private inspect: boolean private debug: boolean private readonly id: number private messageBuffer: Buffer = Buffer.alloc(0) + private idleTimer?: NodeJS.Timeout + private idleTimeout: number + private lastActivityTime: number = Date.now() // Static counter for generating unique handler IDs private static nextHandlerId = 1 - /** - * Create a new PGLiteSocketHandler - * @param options Options for the handler - */ constructor(options: PGLiteSocketHandlerOptions) { super() - this.db = options.db + this.queryQueue = options.queryQueue this.closeOnDetach = options.closeOnDetach ?? false this.inspect = options.inspect ?? false this.debug = options.debug ?? false + this.idleTimeout = options.idleTimeout ?? 0 this.id = PGLiteSocketHandler.nextHandlerId++ this.log('constructor: created new handler') } - /** - * Get the unique ID of this handler - */ public get handlerId(): number { return this.id } - /** - * Log a message if debug is enabled - * @private - */ private log(message: string, ...args: any[]): void { if (this.debug) { console.log(`[PGLiteSocketHandler#${this.id}] ${message}`, ...args) } } - /** - * Attach a socket to this handler - * @param socket The socket to attach - * @returns this handler instance - * @throws Error if a socket is already attached - */ public async attach(socket: Socket): Promise { this.log( `attach: attaching socket from ${socket.remoteAddress}:${socket.remotePort}`, @@ -86,51 +183,72 @@ export class PGLiteSocketHandler extends EventTarget { this.socket = socket this.active = true + this.lastActivityTime = Date.now() - // Ensure the PGlite instance is ready - this.log(`attach: waiting for PGlite to be ready`) - await this.db.waitReady + // Set up socket options + socket.setKeepAlive(true, 30000) + socket.setNoDelay(true) - // Hold the lock on the PGlite instance - this.log(`attach: acquiring exclusive lock on PGlite instance`) - await new Promise((resolve) => { - this.db.runExclusive(() => { - // Ensure we have the lock on the PGlite instance - resolve() + // Set up idle timeout if configured + if (this.idleTimeout > 0) { + this.resetIdleTimer() + } - // Use a promise to hold the lock on the PGlite instance - // this can be resolved or rejected by the handler to release the lock - return new Promise((resolveLock, rejectLock) => { - this.resolveLock = resolveLock - this.rejectLock = rejectLock - }) + // Setup event handlers + this.log(`attach: setting up socket event handlers`) + + socket.on('data', (data) => { + this.lastActivityTime = Date.now() + this.resetIdleTimer() + + setImmediate(async () => { + try { + const result = await this.handleData(data) + this.log(`socket on data sent: ${result} bytes`) + } catch (err) { + this.log('socket on data error: ', err) + this.handleError(err as Error) + } }) }) - // Setup event handlers - this.log(`attach: setting up socket event handlers`) - socket.on('data', async (data) => { - try { - const result = await this.handleData(data) - this.log(`socket on data sent: ${result} bytes`) - } catch (err) { - this.log('socket on data error: ', err) - } + socket.on('error', (err) => { + setImmediate(() => this.handleError(err)) }) - socket.on('error', (err) => this.handleError(err)) - socket.on('close', () => this.handleClose()) + socket.on('close', () => { + setImmediate(() => this.handleClose()) + }) + + this.log(`attach: socket handler ready`) return this } - /** - * Detach the current socket from this handler - * @param close Whether to close the socket when detaching (overrides constructor option) - * @returns this handler instance - */ + private resetIdleTimer(): void { + if (this.idleTimeout <= 0) return + + if (this.idleTimer) { + clearTimeout(this.idleTimer) + } + + this.idleTimer = setTimeout(() => { + const idleTime = Date.now() - this.lastActivityTime + this.log(`idle timeout after ${idleTime}ms`) + this.handleError(new Error('Idle timeout')) + }, this.idleTimeout) + } + public detach(close?: boolean): PGLiteSocketHandler { this.log(`detach: detaching socket, close=${close ?? this.closeOnDetach}`) + if (this.idleTimer) { + clearTimeout(this.idleTimer) + this.idleTimer = undefined + } + + // Clear any pending queries for this handler + this.queryQueue.clearQueueForHandler(this.id) + if (!this.socket) { this.log(`detach: no socket attached, nothing to do`) return this @@ -145,35 +263,31 @@ export class PGLiteSocketHandler extends EventTarget { if (close ?? this.closeOnDetach) { if (this.socket.writable) { this.log(`detach: closing socket`) - this.socket.end() - this.socket.destroy() + try { + this.socket.end() + this.socket.destroy() + } catch (err) { + this.log(`detach: error closing socket:`, err) + } } } - // Release the lock on the PGlite instance - this.log(`detach: releasing exclusive lock on PGlite instance`) - this.resolveLock?.() - this.socket = null this.active = false this.messageBuffer = Buffer.alloc(0) + + this.log(`detach: handler cleaned up`) return this } - /** - * Check if a socket is currently attached - */ public get isAttached(): boolean { return this.socket !== null } - /** - * Handle incoming data from the socket - */ private async handleData(data: Buffer): Promise { if (!this.socket || !this.active) { this.log(`handleData: no active socket, ignoring data`) - return new Promise((_, reject) => reject(`no active socket`)) + return 0 } this.log(`handleData: received ${data.length} bytes`) @@ -186,16 +300,16 @@ export class PGLiteSocketHandler extends EventTarget { try { let totalProcessed = 0 - + while (this.messageBuffer.length > 0) { // Determine message length let messageLength = 0 let isComplete = false - + // Handle startup message (no type byte, just length) if (this.messageBuffer.length >= 4) { const firstInt = this.messageBuffer.readInt32BE(0) - + if (this.messageBuffer.length >= 8) { const secondInt = this.messageBuffer.readInt32BE(4) // PostgreSQL 3.0 protocol version @@ -204,7 +318,7 @@ export class PGLiteSocketHandler extends EventTarget { isComplete = this.messageBuffer.length >= messageLength } } - + // Regular message (type byte + length) if (!isComplete && this.messageBuffer.length >= 5) { const msgLength = this.messageBuffer.readInt32BE(1) @@ -212,95 +326,116 @@ export class PGLiteSocketHandler extends EventTarget { isComplete = this.messageBuffer.length >= messageLength } } - + if (!isComplete || messageLength === 0) { - this.log(`handleData: incomplete message, buffering ${this.messageBuffer.length} bytes`) + this.log( + `handleData: incomplete message, buffering ${this.messageBuffer.length} bytes`, + ) break } - + // Extract and process complete message const message = this.messageBuffer.slice(0, messageLength) this.messageBuffer = this.messageBuffer.slice(messageLength) - + this.log(`handleData: processing message of ${message.length} bytes`) - const result = await this.db.execProtocolRaw(new Uint8Array(message)) - + + // Check if socket is still active before processing + if (!this.active || !this.socket) { + this.log(`handleData: socket no longer active, stopping processing`) + break + } + + // Queue the query for execution + // This allows multiple connections to queue queries simultaneously + const result = await this.queryQueue.enqueue( + this.id, + new Uint8Array(message), + ) + this.log(`handleData: received ${result.length} bytes from PGlite`) - + // Print the outgoing data to the console this.inspectData('outgoing', result) - + // Send response if available - if (result.length > 0 && this.socket && this.socket.writable && this.active) { + if ( + result.length > 0 && + this.socket && + this.socket.writable && + this.active + ) { await new Promise((resolve, reject) => { this.log(`handleData: writing response to socket`) - if (this.socket) { + if (this.socket?.writable) { this.socket.write(Buffer.from(result), (err?: any) => { if (err) { - reject(`Error while writing to the socket ${err.toString()}`) + this.log(`handleData: error writing to socket:`, err) + reject(err) } else { resolve(result.length) } }) } else { - reject(`No socket`) + this.log(`handleData: socket no longer writable`) + resolve(0) } + }).catch((writeErr) => { + this.log(`handleData: failed to write to socket:`, writeErr) + throw writeErr }) } - + totalProcessed += message.length } - + // Emit data event with byte sizes this.dispatchEvent( new CustomEvent('data', { detail: { incoming: data.length, outgoing: totalProcessed }, }), ) - + return totalProcessed - } catch (err) { this.log(`handleData: error processing data:`, err) - this.handleError(err as Error) - return new Promise((_, reject) => - reject(`Error while processing data ${(err as Error).toString()}`), - ) + throw err } } - /** - * Handle errors from the socket - */ private handleError(err: Error): void { - this.log(`handleError:`, err) + if (!this.active) { + this.log(`handleError: handler not active, ignoring error`) + return + } + + // ECONNRESET is expected behavior when clients disconnect + if (err.message?.includes('ECONNRESET')) { + this.log( + `handleError: client disconnected (ECONNRESET) - normal behavior`, + ) + } else if (err.message?.includes('Idle timeout')) { + this.log(`handleError: connection idle timeout`) + } else { + this.log(`handleError:`, err) + } + + this.active = false // Emit error event this.dispatchEvent(new CustomEvent('error', { detail: err })) - // Reject the lock on the PGlite instance - this.log(`handleError: rejecting exclusive lock on PGlite instance`) - this.rejectLock?.(err) - this.resolveLock = undefined - this.rejectLock = undefined - - // Close the connection on error + // Clean up this.detach(true) } - /** - * Handle socket close event - */ private handleClose(): void { this.log(`handleClose: socket closed`) - + this.active = false this.dispatchEvent(new CustomEvent('close')) - this.detach(false) // Already closed, just clean up + this.detach(false) } - /** - * Print data in hex and ascii to the console - */ private inspectData( direction: 'incoming' | 'outgoing', data: Buffer | Uint8Array, @@ -313,31 +448,25 @@ export class PGLiteSocketHandler extends EventTarget { console.log('<- outgoing', data.length, 'bytes') } - // Process 16 bytes per line for (let offset = 0; offset < data.length; offset += 16) { - // Calculate current chunk size (may be less than 16 for the last chunk) const chunkSize = Math.min(16, data.length - offset) - // Build the hex representation let hexPart = '' for (let i = 0; i < 16; i++) { if (i < chunkSize) { const byte = data[offset + i] hexPart += byte.toString(16).padStart(2, '0') + ' ' } else { - hexPart += ' ' // 3 spaces for missing bytes + hexPart += ' ' } } - // Build the ASCII representation let asciiPart = '' for (let i = 0; i < chunkSize; i++) { const byte = data[offset + i] - // Use printable characters (32-126), replace others with a dot asciiPart += byte >= 32 && byte <= 126 ? String.fromCharCode(byte) : '.' } - // Print the line with offset in hex, hex values, and ASCII representation console.log( `${offset.toString(16).padStart(8, '0')} ${hexPart} ${asciiPart}`, ) @@ -345,18 +474,6 @@ export class PGLiteSocketHandler extends EventTarget { } } -/** - * Represents a queued connection with timeout - */ -interface QueuedConnection { - socket: Socket - clientInfo: { - clientAddress: string - clientPort: number - } - timeoutId: NodeJS.Timeout -} - /** * Options for creating a PGLiteSocketServer */ @@ -367,19 +484,21 @@ export interface PGLiteSocketServerOptions { port?: number /** The host to bind to (default: 127.0.0.1) */ host?: string - /** Unix socket path to bind to (default: undefined). If specified, takes precedence over host:port */ + /** Unix socket path to bind to (default: undefined) */ path?: string /** Print the incoming and outgoing data to the console in hex and ascii */ inspect?: boolean - /** Connection queue timeout in milliseconds (default: 10000) */ - connectionQueueTimeout?: number /** Enable debug logging of method calls */ debug?: boolean + /** Idle timeout in ms (0 to disable, default: 0) */ + idleTimeout?: number + /** Maximum concurrent connections (default: 100) */ + maxConnections?: number } /** - * High-level server that manages socket connections to PGLite - * Creates and manages a TCP server and handles client connections + * PGLite Socket Server with support for multiple concurrent connections + * Connections remain open and queries are queued at the query level */ export class PGLiteSocketServer extends EventTarget { readonly db: PGlite @@ -390,54 +509,41 @@ export class PGLiteSocketServer extends EventTarget { private active = false private inspect: boolean private debug: boolean - private connectionQueueTimeout: number - private activeHandler: PGLiteSocketHandler | null = null - private connectionQueue: QueuedConnection[] = [] - private handlerCount: number = 0 - - /** - * Create a new PGLiteSocketServer - * @param options Options for the server - */ + private idleTimeout: number + private maxConnections: number + private handlers: Set = new Set() + private queryQueue: QueryQueueManager + constructor(options: PGLiteSocketServerOptions) { super() this.db = options.db if (options.path) { this.path = options.path } else { - if (typeof options.port === 'number') { - // Keep port undefined on port 0, will be set by the OS when we start the server. - this.port = options.port ?? options.port - } else { - this.port = 5432 - } + this.port = options.port ?? 5432 this.host = options.host || '127.0.0.1' } this.inspect = options.inspect ?? false this.debug = options.debug ?? false - this.connectionQueueTimeout = - options.connectionQueueTimeout ?? CONNECTION_QUEUE_TIMEOUT + this.idleTimeout = options.idleTimeout ?? 0 + this.maxConnections = options.maxConnections ?? 100 - this.log(`constructor: created server on ${this.host}:${this.port}`) - this.log( - `constructor: connection queue timeout: ${this.connectionQueueTimeout}ms`, - ) + // Create the shared query queue + this.queryQueue = new QueryQueueManager(this.db, this.debug) + + this.log(`constructor: created server on ${this.getServerConn()}`) + this.log(`constructor: max connections: ${this.maxConnections}`) + if (this.idleTimeout > 0) { + this.log(`constructor: idle timeout: ${this.idleTimeout}ms`) + } } - /** - * Log a message if debug is enabled - * @private - */ private log(message: string, ...args: any[]): void { if (this.debug) { console.log(`[PGLiteSocketServer] ${message}`, ...args) } } - /** - * Start the socket server - * @returns Promise that resolves when the server is listening - */ public async start(): Promise { this.log(`start: starting server on ${this.getServerConn()}`) @@ -445,8 +551,15 @@ export class PGLiteSocketServer extends EventTarget { throw new Error('Socket server already started') } + // Ensure PGlite is ready before accepting connections + await this.db.waitReady + this.active = true - this.server = createServer((socket) => this.handleConnection(socket)) + this.server = createServer((socket) => { + setImmediate(() => this.handleConnection(socket)) + }) + + this.server.maxConnections = this.maxConnections return new Promise((resolve, reject) => { if (!this.server) return reject(new Error('Server not initialized')) @@ -454,7 +567,9 @@ export class PGLiteSocketServer extends EventTarget { this.server.on('error', (err) => { this.log(`start: server error:`, err) this.dispatchEvent(new CustomEvent('error', { detail: err })) - reject(err) + if (!this.active) { + reject(err) + } }) if (this.path) { @@ -468,15 +583,7 @@ export class PGLiteSocketServer extends EventTarget { resolve() }) } else { - const server = this.server - server.listen(this.port, this.host, () => { - const address = server.address() - // We are not using pipes, so return type should be AddressInfo - if (address === null || typeof address !== 'object') { - throw Error('Expected address info') - } - // Assign the new port number - this.port = address.port + this.server.listen(this.port, this.host, () => { this.log(`start: server listening on ${this.getServerConn()}`) this.dispatchEvent( new CustomEvent('listening', { @@ -494,37 +601,17 @@ export class PGLiteSocketServer extends EventTarget { return `${this.host}:${this.port}` } - /** - * Stop the socket server - * @returns Promise that resolves when the server is closed - */ public async stop(): Promise { this.log(`stop: stopping server`) this.active = false - // Clear connection queue - this.log( - `stop: clearing connection queue (${this.connectionQueue.length} connections)`, - ) - - this.connectionQueue.forEach((queuedConn) => { - clearTimeout(queuedConn.timeoutId) - if (queuedConn.socket.writable) { - this.log( - `stop: closing queued connection from ${queuedConn.clientInfo.clientAddress}:${queuedConn.clientInfo.clientPort}`, - ) - queuedConn.socket.end() - } - }) - this.connectionQueue = [] - - // Detach active handler if exists - if (this.activeHandler) { - this.log(`stop: detaching active handler #${this.activeHandlerId}`) - this.activeHandler.detach(true) - this.activeHandler = null + // Detach all handlers + this.log(`stop: detaching ${this.handlers.size} handlers`) + for (const handler of this.handlers) { + handler.detach(true) } + this.handlers.clear() if (!this.server) { this.log(`stop: server not running, nothing to do`) @@ -543,16 +630,6 @@ export class PGLiteSocketServer extends EventTarget { }) } - /** - * Get the active handler ID, or null if no active handler - */ - private get activeHandlerId(): number | null { - return this.activeHandler?.handlerId ?? null - } - - /** - * Handle a new client connection - */ private async handleConnection(socket: Socket): Promise { const clientInfo = { clientAddress: socket.remoteAddress || 'unknown', @@ -562,193 +639,82 @@ export class PGLiteSocketServer extends EventTarget { this.log( `handleConnection: new connection from ${clientInfo.clientAddress}:${clientInfo.clientPort}`, ) - - // If server is not active, close the connection immediately - if (!this.active) { - this.log(`handleConnection: server not active, closing connection`) - socket.end() - return - } - - // If we don't have an active handler or it's not attached, we can use this connection immediately - if (!this.activeHandler || !this.activeHandler.isAttached) { - this.log(`handleConnection: no active handler, attaching socket directly`) - this.dispatchEvent(new CustomEvent('connection', { detail: clientInfo })) - await this.attachSocketToNewHandler(socket, clientInfo) - return - } - - // Otherwise, queue the connection - this.log( - `handleConnection: active handler #${this.activeHandlerId} exists, queueing connection`, - ) - this.enqueueConnection(socket, clientInfo) - } - - /** - * Add a connection to the queue - */ - private enqueueConnection( - socket: Socket, - clientInfo: { clientAddress: string; clientPort: number }, - ): void { this.log( - `enqueueConnection: queueing connection from ${clientInfo.clientAddress}:${clientInfo.clientPort}, timeout: ${this.connectionQueueTimeout}ms`, + `handleConnection: active connections: ${this.handlers.size}, queued queries: ${this.queryQueue.getQueueLength()}`, ) - // Set a timeout for this queued connection - const timeoutId = setTimeout(() => { - this.log( - `enqueueConnection: timeout for connection from ${clientInfo.clientAddress}:${clientInfo.clientPort}`, - ) - - // Remove from queue - this.connectionQueue = this.connectionQueue.filter( - (queuedConn) => queuedConn.socket !== socket, - ) - - // End the connection if it's still open - if (socket.writable) { - this.log(`enqueueConnection: closing timed out connection`) + if (!this.active) { + this.log(`handleConnection: server not active, closing connection`) + try { socket.end() + } catch (err) { + this.log(`handleConnection: error closing socket:`, err) } - - this.dispatchEvent( - new CustomEvent('queueTimeout', { - detail: { ...clientInfo, queueSize: this.connectionQueue.length }, - }), - ) - }, this.connectionQueueTimeout) - - // Add to queue - this.connectionQueue.push({ socket, clientInfo, timeoutId }) - - this.log( - `enqueueConnection: connection queued, queue size: ${this.connectionQueue.length}`, - ) - - this.dispatchEvent( - new CustomEvent('queuedConnection', { - detail: { ...clientInfo, queueSize: this.connectionQueue.length }, - }), - ) - } - - /** - * Process the next connection in the queue - */ - private processNextInQueue(): void { - this.log( - `processNextInQueue: processing next connection, queue size: ${this.connectionQueue.length}`, - ) - - // No connections in queue or server not active - if (this.connectionQueue.length === 0 || !this.active) { - this.log( - `processNextInQueue: no connections in queue or server not active, nothing to do`, - ) return } - // Get the next connection - const nextConn = this.connectionQueue.shift() - if (!nextConn) return - - this.log( - `processNextInQueue: processing connection from ${nextConn.clientInfo.clientAddress}:${nextConn.clientInfo.clientPort}`, - ) - - // Clear the timeout - clearTimeout(nextConn.timeoutId) - - // Check if the socket is still valid - if (!nextConn.socket.writable) { - this.log( - `processNextInQueue: socket no longer writable, skipping to next connection`, - ) - // Socket closed while waiting, process next in queue - this.processNextInQueue() + // Check connection limit + if (this.handlers.size >= this.maxConnections) { + this.log(`handleConnection: max connections reached, rejecting`) + socket.write(Buffer.from('Too many connections\n')) + socket.end() return } - // Attach this socket to a new handler - this.attachSocketToNewHandler(nextConn.socket, nextConn.clientInfo).catch( - (err) => { - this.log(`processNextInQueue: error attaching socket:`, err) - this.dispatchEvent(new CustomEvent('error', { detail: err })) - // Try the next connection - this.processNextInQueue() - }, - ) - } - - /** - * Attach a socket to a new handler - */ - private async attachSocketToNewHandler( - socket: Socket, - clientInfo: { clientAddress: string; clientPort: number }, - ): Promise { - this.handlerCount++ - - this.log( - `attachSocketToNewHandler: creating new handler for ${clientInfo.clientAddress}:${clientInfo.clientPort} (handler #${this.handlerCount})`, - ) - // Create a new handler for this connection const handler = new PGLiteSocketHandler({ - db: this.db, + queryQueue: this.queryQueue, closeOnDetach: true, inspect: this.inspect, debug: this.debug, + idleTimeout: this.idleTimeout, }) - // Forward error events from the handler - handler.addEventListener('error', (event) => { - this.log( - `handler #${handler.handlerId}: error from handler:`, - (event as CustomEvent).detail, - ) - this.dispatchEvent( - new CustomEvent('error', { - detail: (event as CustomEvent).detail, - }), - ) - }) + // Track this handler + this.handlers.add(handler) - // Handle close event to process next queued connection - handler.addEventListener('close', () => { - this.log(`handler #${handler.handlerId}: closed`) + // Handle errors + handler.addEventListener('error', (event) => { + const error = (event as CustomEvent).detail - // If this is our active handler, clear it - if (this.activeHandler === handler) { + if (error?.message?.includes('ECONNRESET')) { this.log( - `handler #${handler.handlerId}: was active handler, processing next connection in queue`, + `handler #${handler.handlerId}: client disconnected (ECONNRESET)`, ) - this.activeHandler = null - // Process next connection in queue - this.processNextInQueue() + } else if (error?.message?.includes('Idle timeout')) { + this.log(`handler #${handler.handlerId}: idle timeout`) + } else { + this.log(`handler #${handler.handlerId}: error:`, error) } }) - try { - // Set as active handler - this.activeHandler = handler - - this.log(`handler #${handler.handlerId}: attaching socket`) + // Handle close event + handler.addEventListener('close', () => { + this.log(`handler #${handler.handlerId}: closed`) + this.handlers.delete(handler) + this.log(`handleConnection: active connections: ${this.handlers.size}`) + }) - // Attach the socket to the handler + try { await handler.attach(socket) - this.dispatchEvent(new CustomEvent('connection', { detail: clientInfo })) } catch (err) { - // If there was an error attaching, clean up - this.log(`handler #${handler.handlerId}: error attaching socket:`, err) - this.activeHandler = null - if (socket.writable) { + this.log(`handleConnection: error attaching socket:`, err) + this.handlers.delete(handler) + this.dispatchEvent(new CustomEvent('error', { detail: err })) + try { socket.end() + } catch (closeErr) { + this.log(`handleConnection: error closing socket:`, closeErr) } - throw err } } -} \ No newline at end of file + + public getStats() { + return { + activeConnections: this.handlers.size, + queuedQueries: this.queryQueue.getQueueLength(), + maxConnections: this.maxConnections, + } + } +} From 5fe3b08bc7c12da084a2d7a96e8c50f461fdf715 Mon Sep 17 00:00:00 2001 From: tudor Date: Mon, 12 Jan 2026 09:40:07 +0100 Subject: [PATCH 03/14] changeset --- .changeset/ninety-buckets-sing.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/ninety-buckets-sing.md diff --git a/.changeset/ninety-buckets-sing.md b/.changeset/ninety-buckets-sing.md new file mode 100644 index 000000000..bdb3cba81 --- /dev/null +++ b/.changeset/ninety-buckets-sing.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/pglite-socket': patch +--- + +Fix: Message buffering, connection handling, and concurrent connection support; From 981bbc3ceee831e0b369f74fe84a5968b8a89159 Mon Sep 17 00:00:00 2001 From: tudor Date: Mon, 12 Jan 2026 10:42:56 +0100 Subject: [PATCH 04/14] fixes --- packages/pglite-socket/src/index.ts | 18 +- packages/pglite-socket/tests/index.test.ts | 202 ++++++++++----------- 2 files changed, 108 insertions(+), 112 deletions(-) diff --git a/packages/pglite-socket/src/index.ts b/packages/pglite-socket/src/index.ts index 4f111facb..49cb451b6 100644 --- a/packages/pglite-socket/src/index.ts +++ b/packages/pglite-socket/src/index.ts @@ -186,7 +186,6 @@ export class PGLiteSocketHandler extends EventTarget { this.lastActivityTime = Date.now() // Set up socket options - socket.setKeepAlive(true, 30000) socket.setNoDelay(true) // Set up idle timeout if configured @@ -520,7 +519,12 @@ export class PGLiteSocketServer extends EventTarget { if (options.path) { this.path = options.path } else { - this.port = options.port ?? 5432 + if (typeof options.port === 'number') { + // Keep port undefined on port 0, will be set by the OS when we start the server. + this.port = options.port ?? options.port + } else { + this.port = 5432 + } this.host = options.host || '127.0.0.1' } this.inspect = options.inspect ?? false @@ -583,7 +587,15 @@ export class PGLiteSocketServer extends EventTarget { resolve() }) } else { - this.server.listen(this.port, this.host, () => { + const server = this.server + server.listen(this.port, this.host, () => { + const address = server.address() + // We are not using pipes, so return type should be AddressInfo + if (address === null || typeof address !== 'object') { + throw Error('Expected address info') + } + // Assign the new port number + this.port = address.port this.log(`start: server listening on ${this.getServerConn()}`) this.dispatchEvent( new CustomEvent('listening', { diff --git a/packages/pglite-socket/tests/index.test.ts b/packages/pglite-socket/tests/index.test.ts index 438d2a8b9..96c3deb22 100644 --- a/packages/pglite-socket/tests/index.test.ts +++ b/packages/pglite-socket/tests/index.test.ts @@ -9,11 +9,7 @@ import { afterAll, } from 'vitest' import { PGlite } from '@electric-sql/pglite' -import { - PGLiteSocketHandler, - PGLiteSocketServer, - CONNECTION_QUEUE_TIMEOUT, -} from '../src' +import { PGLiteSocketHandler, PGLiteSocketServer } from '../src' import { Socket, createConnection } from 'net' import { existsSync } from 'fs' import { unlink } from 'fs/promises' @@ -55,6 +51,7 @@ const createMockSocket = () => { writable: true, remoteAddress: '127.0.0.1', remotePort: 12345, + setNoDelay: vi.fn(), // Mock on method with tracking of handlers on: vi @@ -81,28 +78,34 @@ const createMockSocket = () => { return mockSocket as unknown as Socket } +// Create a mock QueryQueueManager for testing +const createMockQueryQueue = () => { + return { + enqueue: vi.fn().mockResolvedValue(new Uint8Array(0)), + clearQueueForHandler: vi.fn(), + getQueueLength: vi.fn().mockReturnValue(0), + } +} + describe('PGLiteSocketHandler', () => { - let db: PGlite let handler: PGLiteSocketHandler let mockSocket: ReturnType & { eventHandlers: Record void>> } + let mockQueryQueue: ReturnType beforeEach(async () => { - // Create a PGlite instance for testing - db = await PGlite.create() - handler = new PGLiteSocketHandler({ db }) + // Create a mock query queue for testing + mockQueryQueue = createMockQueryQueue() + handler = new PGLiteSocketHandler({ queryQueue: mockQueryQueue as any }) mockSocket = createMockSocket() as any }) afterEach(async () => { - // Ensure handler is detached before closing the database + // Ensure handler is detached if (handler?.isAttached) { handler.detach(true) } - - // Clean up - await db.close() }) it('should attach to a socket', async () => { @@ -299,30 +302,16 @@ testSocket(async (connOptions) => { ).resolves.not.toThrow() }) - describe('Connection queuing', () => { - // Mock implementation details - // eslint-disable-next-line @typescript-eslint/no-unused-vars - let handleConnectionSpy: any - let processNextInQueueSpy: any - let attachSocketToNewHandlerSpy: any - + describe('Connection multiplexing', () => { beforeEach(() => { - // Create a server with a short timeout for testing + // Create a server for testing server = new PGLiteSocketServer({ db, host: connOptions.host, port: connOptions.port, path: connOptions.path, - connectionQueueTimeout: 100, // Very short timeout for testing + maxConnections: 100, }) - - // Spy on internal methods - handleConnectionSpy = vi.spyOn(server as any, 'handleConnection') - processNextInQueueSpy = vi.spyOn(server as any, 'processNextInQueue') - attachSocketToNewHandlerSpy = vi.spyOn( - server as any, - 'attachSocketToNewHandler', - ) }) it('should create a handler for a new connection', async () => { @@ -338,133 +327,127 @@ testSocket(async (connOptions) => { // Handle connection await (server as any).handleConnection(socket1) - // Verify handler was created - expect(attachSocketToNewHandlerSpy).toHaveBeenCalledWith( - socket1, - expect.anything(), - ) + // Verify handler was created and tracked + expect((server as any).handlers.size).toBe(1) expect(connectionHandler).toHaveBeenCalled() }) - it('should queue a second connection when first is active', async () => { + it('should handle multiple simultaneous connections', async () => { await server.start() // Setup event listeners - const queuedConnectionHandler = vi.fn() - server.addEventListener('queuedConnection', queuedConnectionHandler) + const connectionHandler = vi.fn() + server.addEventListener('connection', connectionHandler) // Create mock sockets const socket1 = createMockSocket() const socket2 = createMockSocket() + const socket3 = createMockSocket() - // Handle first connection + // Handle connections - all should be accepted simultaneously await (server as any).handleConnection(socket1) - - // The first socket should be attached directly - expect(attachSocketToNewHandlerSpy).toHaveBeenCalledWith( - socket1, - expect.anything(), - ) - - // Handle second connection - should be queued await (server as any).handleConnection(socket2) + await (server as any).handleConnection(socket3) + + // All three sockets should have handlers (multiplexed) + expect((server as any).handlers.size).toBe(3) + expect(connectionHandler).toHaveBeenCalledTimes(3) - // The second connection should be queued - expect(queuedConnectionHandler).toHaveBeenCalledTimes(1) - expect(queuedConnectionHandler).toHaveBeenCalledWith( - expect.objectContaining({ - detail: expect.objectContaining({ - queueSize: 1, - }), - }), - ) + // None should be closed - they're all active + expect(socket1.end).not.toHaveBeenCalled() + expect(socket2.end).not.toHaveBeenCalled() + expect(socket3.end).not.toHaveBeenCalled() }) - it('should process next connection when current connection closes', async () => { + it('should remove handler when connection closes', async () => { await server.start() // Create mock sockets const socket1 = createMockSocket() const socket2 = createMockSocket() - // Setup event listener - const connectionHandler = vi.fn() - server.addEventListener('connection', connectionHandler) - - // Handle first connection + // Handle connections await (server as any).handleConnection(socket1) - - // Handle second connection (will be queued) await (server as any).handleConnection(socket2) - // First connection should be active, but clear the handler for next assertions - expect(connectionHandler).toHaveBeenCalled() - connectionHandler.mockClear() - - // Simulate closing the first connection - const activeHandler = (server as any).activeHandler - activeHandler.dispatchEvent(new CustomEvent('close')) - - // The next connection should be processed - expect(processNextInQueueSpy).toHaveBeenCalled() - expect(attachSocketToNewHandlerSpy).toHaveBeenCalledWith( - socket2, - expect.anything(), - ) + // Both should be tracked + expect((server as any).handlers.size).toBe(2) + + // Get the first handler and simulate close + const handlers = Array.from((server as any).handlers) + const handler1 = handlers[0] as PGLiteSocketHandler + handler1.dispatchEvent(new CustomEvent('close')) + + // First handler should be removed, second still active + expect((server as any).handlers.size).toBe(1) }) - it('should timeout queued connections after specified time', async () => { - await server.start() + it('should reject connections when max connections reached', async () => { + // Create server with low max connections + server = new PGLiteSocketServer({ + db, + host: connOptions.host, + port: connOptions.port, + path: connOptions.path, + maxConnections: 2, + }) - // Setup event listeners - const queueTimeoutHandler = vi.fn() - server.addEventListener('queueTimeout', queueTimeoutHandler) + await server.start() // Create mock sockets const socket1 = createMockSocket() const socket2 = createMockSocket() + const socket3 = createMockSocket() - // Handle first connection + // Handle first two connections - should succeed await (server as any).handleConnection(socket1) - - // Handle second connection (will be queued) await (server as any).handleConnection(socket2) - // Fast-forward time to trigger timeout - vi.advanceTimersByTime(1001) + expect((server as any).handlers.size).toBe(2) - // The queued connection should timeout - expect(queueTimeoutHandler).toHaveBeenCalledTimes(1) - expect(socket2.end).toHaveBeenCalled() + // Third connection should be rejected + await (server as any).handleConnection(socket3) + + // Third socket should be closed + expect(socket3.end).toHaveBeenCalled() + expect((server as any).handlers.size).toBe(2) }) - it('should use default timeout value from CONNECTION_QUEUE_TIMEOUT', async () => { - // Create server without specifying timeout - const defaultServer = new PGLiteSocketServer({ - db, - host: connOptions.host, - port: connOptions.port, - path: connOptions.path, - }) + it('should provide stats about active connections', async () => { + await server.start() + + // Create mock sockets + const socket1 = createMockSocket() + const socket2 = createMockSocket() - // Check that it's using the default timeout - expect((defaultServer as any).connectionQueueTimeout).toBe( - CONNECTION_QUEUE_TIMEOUT, - ) + // Check initial stats + let stats = server.getStats() + expect(stats.activeConnections).toBe(0) + expect(stats.maxConnections).toBe(100) + + // Handle connections + await (server as any).handleConnection(socket1) + await (server as any).handleConnection(socket2) + + // Check updated stats + stats = server.getStats() + expect(stats.activeConnections).toBe(2) }) - it('should clean up queue when stopping the server', async () => { + it('should clean up all handlers when stopping the server', async () => { await server.start() // Create mock sockets const socket1 = createMockSocket() const socket2 = createMockSocket() + const socket3 = createMockSocket() - // Handle first connection + // Handle connections await (server as any).handleConnection(socket1) - - // Handle second connection (will be queued) await (server as any).handleConnection(socket2) + await (server as any).handleConnection(socket3) + + expect((server as any).handlers.size).toBe(3) // Stop the server await server.stop() @@ -472,9 +455,10 @@ testSocket(async (connOptions) => { // All connections should be closed expect(socket1.end).toHaveBeenCalled() expect(socket2.end).toHaveBeenCalled() + expect(socket3.end).toHaveBeenCalled() - // Queue should be emptied - expect((server as any).connectionQueue).toHaveLength(0) + // Handlers should be cleared + expect((server as any).handlers.size).toBe(0) }) it('should start server with OS-assigned port when port is 0', async () => { From f4b76531b3ea4afa680d079a7558bc8a0eaa5504 Mon Sep 17 00:00:00 2001 From: tudor Date: Wed, 14 Jan 2026 12:07:03 +0100 Subject: [PATCH 05/14] added tests over 64kb --- .../tests/query-with-node-pg.test.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/packages/pglite-socket/tests/query-with-node-pg.test.ts b/packages/pglite-socket/tests/query-with-node-pg.test.ts index ad5354c9a..20946acb0 100644 --- a/packages/pglite-socket/tests/query-with-node-pg.test.ts +++ b/packages/pglite-socket/tests/query-with-node-pg.test.ts @@ -532,5 +532,20 @@ describe(`PGLite Socket Server`, () => { // Verify the notification was received with the correct payload expect(receivedPayload).toBe('Hello from PGlite!') }) + + it('should handle large queries that split across TCP packets', async () => { + // Create a table + await client.query(`CREATE TABLE test_users (id SERIAL, data TEXT)`) + + // Generate >64KB payload to force TCP fragmentation + const largeData = 'x'.repeat(100_000) // 100KB string + + // Insert large data + const result = await client.query(` + INSERT INTO test_users (data) VALUES ('${largeData}') RETURNING * + `) + + expect(result.rows[0].data).toBe(largeData) + }) }) }) From d1b8e10ae642d651a096eafd82362251f91547e8 Mon Sep 17 00:00:00 2001 From: tudor Date: Wed, 14 Jan 2026 12:28:57 +0100 Subject: [PATCH 06/14] style --- packages/pglite-socket/tests/query-with-node-pg.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/pglite-socket/tests/query-with-node-pg.test.ts b/packages/pglite-socket/tests/query-with-node-pg.test.ts index 20946acb0..675b3bd65 100644 --- a/packages/pglite-socket/tests/query-with-node-pg.test.ts +++ b/packages/pglite-socket/tests/query-with-node-pg.test.ts @@ -536,16 +536,16 @@ describe(`PGLite Socket Server`, () => { it('should handle large queries that split across TCP packets', async () => { // Create a table await client.query(`CREATE TABLE test_users (id SERIAL, data TEXT)`) - + // Generate >64KB payload to force TCP fragmentation const largeData = 'x'.repeat(100_000) // 100KB string - + // Insert large data const result = await client.query(` INSERT INTO test_users (data) VALUES ('${largeData}') RETURNING * `) - + expect(result.rows[0].data).toBe(largeData) - }) + }) }) }) From 137b9ffc5c5efb463642d0f3937d68375f1fb86e Mon Sep 17 00:00:00 2001 From: tudor Date: Sun, 18 Jan 2026 10:11:03 +0100 Subject: [PATCH 07/14] handle transactions in multi-connections --- packages/pglite-socket/src/index.ts | 42 ++++++++++++++++--- packages/pglite-socket/tests/index.test.ts | 1 + .../tests/query-with-node-pg.test.ts | 41 +++++++++++++++++- packages/pglite/src/pglite.ts | 7 +++- 4 files changed, 82 insertions(+), 9 deletions(-) diff --git a/packages/pglite-socket/src/index.ts b/packages/pglite-socket/src/index.ts index 49cb451b6..5ebebce3d 100644 --- a/packages/pglite-socket/src/index.ts +++ b/packages/pglite-socket/src/index.ts @@ -24,6 +24,7 @@ class QueryQueueManager { private processing = false private db: PGlite private debug: boolean + private lastHandlerId: null | number = null constructor(db: PGlite, debug = false) { this.db = db @@ -66,7 +67,25 @@ class QueryQueueManager { this.processing = true while (this.queue.length > 0) { - const query = this.queue.shift() + let query + + if (this.db.isInTransaction() && this.lastHandlerId) { + const i = this.queue.findIndex( + (q) => q.handlerId === this.lastHandlerId, + ) + if (i === -1) { + // we didn't find any other query from the same client! + this.log( + `transaction started, but no query from the same handler id found in queue`, + this.lastHandlerId, + ) + query = null + } else { + query = this.queue.splice(i, 1)[0] + } + } else { + query = this.queue.shift() + } if (!query) break const waitTime = Date.now() - query.timestamp @@ -83,6 +102,7 @@ class QueryQueueManager { this.log( `query from handler #${query.handlerId} completed, ${result.length} bytes`, ) + this.lastHandlerId = query.handlerId query.resolve(result) } catch (error) { this.log(`query from handler #${query.handlerId} failed:`, error) @@ -91,7 +111,7 @@ class QueryQueueManager { } this.processing = false - this.log(`queue processing complete, queue is empty`) + this.log(`queue processing complete, queue length is`, this.queue.length) } getQueueLength(): number { @@ -112,6 +132,14 @@ class QueryQueueManager { this.log(`cleared ${removed} queries for handler #${handlerId}`) } } + + async clearTransactionIfNeeded(handlerId: number): Promise { + if (this.db.isInTransaction() && this.lastHandlerId === handlerId) { + this.db.exec('ROLLBACK') + this.lastHandlerId = null + await this.processQueue() + } + } } /** @@ -202,8 +230,7 @@ export class PGLiteSocketHandler extends EventTarget { setImmediate(async () => { try { - const result = await this.handleData(data) - this.log(`socket on data sent: ${result} bytes`) + await this.handleData(data) } catch (err) { this.log('socket on data error: ', err) this.handleError(err as Error) @@ -237,7 +264,7 @@ export class PGLiteSocketHandler extends EventTarget { }, this.idleTimeout) } - public detach(close?: boolean): PGLiteSocketHandler { + public async detach(close?: boolean): Promise { this.log(`detach: detaching socket, close=${close ?? this.closeOnDetach}`) if (this.idleTimer) { @@ -248,6 +275,8 @@ export class PGLiteSocketHandler extends EventTarget { // Clear any pending queries for this handler this.queryQueue.clearQueueForHandler(this.id) + await this.queryQueue.clearTransactionIfNeeded(this.id) + if (!this.socket) { this.log(`detach: no socket attached, nothing to do`) return this @@ -372,6 +401,7 @@ export class PGLiteSocketHandler extends EventTarget { this.log(`handleData: error writing to socket:`, err) reject(err) } else { + this.log(`handleData: socket sent: ${result.length} bytes`) resolve(result.length) } }) @@ -385,7 +415,7 @@ export class PGLiteSocketHandler extends EventTarget { }) } - totalProcessed += message.length + totalProcessed += result.length } // Emit data event with byte sizes diff --git a/packages/pglite-socket/tests/index.test.ts b/packages/pglite-socket/tests/index.test.ts index 96c3deb22..e100eceee 100644 --- a/packages/pglite-socket/tests/index.test.ts +++ b/packages/pglite-socket/tests/index.test.ts @@ -83,6 +83,7 @@ const createMockQueryQueue = () => { return { enqueue: vi.fn().mockResolvedValue(new Uint8Array(0)), clearQueueForHandler: vi.fn(), + clearTransactionIfNeeded: vi.fn(), getQueueLength: vi.fn().mockReturnValue(0), } } diff --git a/packages/pglite-socket/tests/query-with-node-pg.test.ts b/packages/pglite-socket/tests/query-with-node-pg.test.ts index 2cdebdd0f..02c247db1 100644 --- a/packages/pglite-socket/tests/query-with-node-pg.test.ts +++ b/packages/pglite-socket/tests/query-with-node-pg.test.ts @@ -61,6 +61,7 @@ describe(`PGLite Socket Server`, () => { db, port: TEST_PORT, host: '127.0.0.1', + debug: true, }) // Add event listeners for debugging @@ -554,10 +555,48 @@ describe(`PGLite Socket Server`, () => { expect(result.rows[0].data).toBe(largeData) }) + + it('should handle concurrent clients with interleaved transaction and query', async () => { + // Create a second client connecting to the same server + let client2: typeof Client.prototype + if (DEBUG_TESTS) { + client2 = new Client({ + connectionString: DEBUG_TESTS_REAL_SERVER, + connectionTimeoutMillis: 10000, + statement_timeout: 5000, + }) + } else { + client2 = new Client(connectionConfig) + } + await client2.connect() + + try { + // Client 1 starts a transaction (don't await yet) + const beginResult = await client.query('BEGIN') + + // Client 2 makes a simple SELECT 1 query (don't await yet) + const selectPromise = client2.query('SELECT 999999 as one') + + // Small delay to ensure SELECT is sent before ROLLBACK + await new Promise((r) => setTimeout(r, 10)) + + // Client 1 rolls back the transaction (don't await yet) + const rollbackResult = await client.query('ROLLBACK') + + const selectResult = await selectPromise + + // Verify results + expect(beginResult.command).toBe('BEGIN') + expect(selectResult.rows[0].one).toBe(999999) + expect(rollbackResult.command).toBe('ROLLBACK') + } finally { + await client2.end() + } + }, 30000) }) describe('with extensions via CLI', () => { - const UNIX_SOCKET_DIR_PATH = `/tmp/${Date.now().toString()}` + const UNIX_SOCKET_DIR_PATH = `/tmp/${Date.now()}-${Math.random().toString(36).slice(2, 8)}` fs.mkdirSync(UNIX_SOCKET_DIR_PATH) const UNIX_SOCKET_PATH = `${UNIX_SOCKET_DIR_PATH}/.s.PGSQL.5432` let serverProcess: ChildProcess | null = null diff --git a/packages/pglite/src/pglite.ts b/packages/pglite/src/pglite.ts index 2407bb199..de5fedefa 100644 --- a/packages/pglite/src/pglite.ts +++ b/packages/pglite/src/pglite.ts @@ -675,8 +675,11 @@ export class PGlite this.#outputData = [] - if (this.#keepRawResponse && this.#writeOffset) - return this.#inputData.subarray(0, this.#writeOffset) + if (this.#keepRawResponse && this.#writeOffset) { + // reusing the buffer might lead to unexpected behavior if a previous query has a view into the buffer + // therefore, better return a copy of the response + return new Uint8Array(this.#inputData.subarray(0, this.#writeOffset)) + } return new Uint8Array(0) } From 98ea0969d20cc0f31a22a6a5bdd4c309d7b95c42 Mon Sep 17 00:00:00 2001 From: tudor Date: Sun, 18 Jan 2026 10:21:33 +0100 Subject: [PATCH 08/14] Add test for clearTransactionIfNeeded when client disconnects mid-transaction --- .../tests/query-with-node-pg.test.ts | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/packages/pglite-socket/tests/query-with-node-pg.test.ts b/packages/pglite-socket/tests/query-with-node-pg.test.ts index 02c247db1..e725a7b07 100644 --- a/packages/pglite-socket/tests/query-with-node-pg.test.ts +++ b/packages/pglite-socket/tests/query-with-node-pg.test.ts @@ -593,6 +593,61 @@ describe(`PGLite Socket Server`, () => { await client2.end() } }, 30000) + + it('should process pending queries when transaction owner disconnects', async () => { + // Create a second client connecting to the same server + let client2: typeof Client.prototype + if (DEBUG_TESTS) { + client2 = new Client({ + connectionString: DEBUG_TESTS_REAL_SERVER, + connectionTimeoutMillis: 10000, + statement_timeout: 5000, + }) + } else { + client2 = new Client(connectionConfig) + } + await client2.connect() + + // Suppress the expected "Connection terminated unexpectedly" error + client.on('error', () => { + // Expected when we destroy the connection + }) + + try { + // Client starts a transaction + const beginResult = await client.query('BEGIN') + expect(beginResult.command).toBe('BEGIN') + + // Client 2 sends a query (will be blocked because client is in transaction) + const selectPromise = client2.query('SELECT 123456 as val') + + // Small delay to ensure SELECT is enqueued + await new Promise((r) => setTimeout(r, 10)) + + // Client abruptly disconnects (simulating connection abort) + // This should trigger clearTransactionIfNeeded which rolls back + // the transaction and processes pending queries + ;(client as any).connection.stream.destroy() + + // Client 2's query should complete successfully after transaction is cleared + const selectResult = await selectPromise + + expect(selectResult.rows[0].val).toBe(123456) + } finally { + await client2.end() + // Reconnect client for afterEach cleanup + if (DEBUG_TESTS) { + client = new Client({ + connectionString: DEBUG_TESTS_REAL_SERVER, + connectionTimeoutMillis: 10000, + statement_timeout: 5000, + }) + } else { + client = new Client(connectionConfig) + } + await client.connect() + } + }, 30000) }) describe('with extensions via CLI', () => { From a17a1258eb963fab7108eaac3ac01f0d20771d3e Mon Sep 17 00:00:00 2001 From: tudor Date: Sun, 18 Jan 2026 10:22:25 +0100 Subject: [PATCH 09/14] remove debug --- packages/pglite-socket/tests/query-with-node-pg.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/pglite-socket/tests/query-with-node-pg.test.ts b/packages/pglite-socket/tests/query-with-node-pg.test.ts index e725a7b07..a46a34a29 100644 --- a/packages/pglite-socket/tests/query-with-node-pg.test.ts +++ b/packages/pglite-socket/tests/query-with-node-pg.test.ts @@ -61,7 +61,6 @@ describe(`PGLite Socket Server`, () => { db, port: TEST_PORT, host: '127.0.0.1', - debug: true, }) // Add event listeners for debugging From e8fa6a0714c1e3a86d771d418c719642a1611f6e Mon Sep 17 00:00:00 2001 From: tudor Date: Sun, 18 Jan 2026 10:51:47 +0100 Subject: [PATCH 10/14] fix tests --- packages/pglite-socket/tests/index.test.ts | 6 ++--- .../tests/query-with-node-pg.test.ts | 23 +++++-------------- 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/packages/pglite-socket/tests/index.test.ts b/packages/pglite-socket/tests/index.test.ts index e100eceee..4c61f53b6 100644 --- a/packages/pglite-socket/tests/index.test.ts +++ b/packages/pglite-socket/tests/index.test.ts @@ -105,7 +105,7 @@ describe('PGLiteSocketHandler', () => { afterEach(async () => { // Ensure handler is detached if (handler?.isAttached) { - handler.detach(true) + await handler.detach(true) } }) @@ -126,7 +126,7 @@ describe('PGLiteSocketHandler', () => { expect(handler.isAttached).toBe(true) // Then detach - handler.detach(false) + await handler.detach(false) expect(handler.isAttached).toBe(false) expect(mockSocket.removeAllListeners).toHaveBeenCalled() }) @@ -136,7 +136,7 @@ describe('PGLiteSocketHandler', () => { await handler.attach(mockSocket) // Detach with close option - handler.detach(true) + await handler.detach(true) expect(handler.isAttached).toBe(false) expect(mockSocket.end).toHaveBeenCalled() }) diff --git a/packages/pglite-socket/tests/query-with-node-pg.test.ts b/packages/pglite-socket/tests/query-with-node-pg.test.ts index a46a34a29..238631fa2 100644 --- a/packages/pglite-socket/tests/query-with-node-pg.test.ts +++ b/packages/pglite-socket/tests/query-with-node-pg.test.ts @@ -608,17 +608,17 @@ describe(`PGLite Socket Server`, () => { await client2.connect() // Suppress the expected "Connection terminated unexpectedly" error - client.on('error', () => { + client2.on('error', () => { // Expected when we destroy the connection }) try { // Client starts a transaction - const beginResult = await client.query('BEGIN') + const beginResult = await client2.query('BEGIN') expect(beginResult.command).toBe('BEGIN') // Client 2 sends a query (will be blocked because client is in transaction) - const selectPromise = client2.query('SELECT 123456 as val') + const selectPromise = client.query('SELECT 123456 as val') // Small delay to ensure SELECT is enqueued await new Promise((r) => setTimeout(r, 10)) @@ -626,25 +626,14 @@ describe(`PGLite Socket Server`, () => { // Client abruptly disconnects (simulating connection abort) // This should trigger clearTransactionIfNeeded which rolls back // the transaction and processes pending queries - ;(client as any).connection.stream.destroy() + ;(client2 as any).connection.stream.destroy() // Client 2's query should complete successfully after transaction is cleared const selectResult = await selectPromise expect(selectResult.rows[0].val).toBe(123456) - } finally { - await client2.end() - // Reconnect client for afterEach cleanup - if (DEBUG_TESTS) { - client = new Client({ - connectionString: DEBUG_TESTS_REAL_SERVER, - connectionTimeoutMillis: 10000, - statement_timeout: 5000, - }) - } else { - client = new Client(connectionConfig) - } - await client.connect() + } catch { + } }, 30000) }) From 644226b58b6fadfd7cadf7eeaa7b116e93e5b98f Mon Sep 17 00:00:00 2001 From: tudor Date: Sun, 18 Jan 2026 11:01:36 +0100 Subject: [PATCH 11/14] style --- packages/pglite-socket/tests/query-with-node-pg.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pglite-socket/tests/query-with-node-pg.test.ts b/packages/pglite-socket/tests/query-with-node-pg.test.ts index 238631fa2..f351eda43 100644 --- a/packages/pglite-socket/tests/query-with-node-pg.test.ts +++ b/packages/pglite-socket/tests/query-with-node-pg.test.ts @@ -633,7 +633,7 @@ describe(`PGLite Socket Server`, () => { expect(selectResult.rows[0].val).toBe(123456) } catch { - + // swallow } }, 30000) }) From 6634736c71c16aa75f2e724ceb8f09d81eb7823d Mon Sep 17 00:00:00 2001 From: tudor Date: Mon, 19 Jan 2026 10:00:33 +0100 Subject: [PATCH 12/14] more tests for pglite-socket --- .../tests/query-with-node-pg.test.ts | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/packages/pglite-socket/tests/query-with-node-pg.test.ts b/packages/pglite-socket/tests/query-with-node-pg.test.ts index f351eda43..d62fda12c 100644 --- a/packages/pglite-socket/tests/query-with-node-pg.test.ts +++ b/packages/pglite-socket/tests/query-with-node-pg.test.ts @@ -632,6 +632,161 @@ describe(`PGLite Socket Server`, () => { const selectResult = await selectPromise expect(selectResult.rows[0].val).toBe(123456) + } catch { + expect(false, 'Should not happen') + } + }, 30000) + + it('interleaved transactions should work', async () => { + const bob = client + // table that will be accessed by both clients + await client.query(` + CREATE TABLE test_users ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + email TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + `) + + // Create a second bob connecting to the same server + let alice: typeof Client.prototype + if (DEBUG_TESTS) { + alice = new Client({ + connectionString: DEBUG_TESTS_REAL_SERVER, + connectionTimeoutMillis: 10000, + statement_timeout: 5000, + }) + } else { + alice = new Client(connectionConfig) + } + await alice.connect() + + alice.on('error', () => { + // Suppress the expected "Connection terminated unexpectedly" error + }) + + // Client starts a transaction + const aliceBegin = await alice.query('BEGIN') + expect(aliceBegin.command).toBe('BEGIN') + + // Client 2 begins its own transaction + const bobBegin = bob.query('BEGIN') + + // Small delay to ensure client2.BEGIN is enqueued + await new Promise((r) => setTimeout(r, 10)) + + const aliceInsertPromise = alice.query(` + INSERT INTO test_users (name, email) + VALUES + ('Alice', 'alice@example.com') + RETURNING * + `) + + const bobInsertPromise = bob.query(` + INSERT INTO test_users (name, email) + VALUES + ('Bob', 'bob@example.com') + RETURNING * + `) + + // Small delay to ensure both inserts are enqueued + await new Promise((r) => setTimeout(r, 10)) + + // bob commits + const bobCommit = bob.query('COMMIT') + + // alice rolls back + const aliceRollback = alice.query('ROLLBACK') + + await Promise.all([ + bobBegin, + aliceInsertPromise, + bobInsertPromise, + aliceRollback, + bobCommit, + ]) + + // Verify only Bob was commited + const testUsers = await bob.query('SELECT * FROM test_users') + expect(testUsers.rows.length).toBe(1) + expect(testUsers.rows[0].name).toBe('Bob') + }, 30000) + + it('interleaved transactions should work when one client crashes', async () => { + const bob = client + // table that will be accessed by both clients + await bob.query(` + CREATE TABLE test_users ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + email TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + `) + + // Create a second client connecting to the same server + let alice: typeof Client.prototype + if (DEBUG_TESTS) { + alice = new Client({ + connectionString: DEBUG_TESTS_REAL_SERVER, + connectionTimeoutMillis: 10000, + statement_timeout: 5000, + }) + } else { + alice = new Client(connectionConfig) + } + await alice.connect() + + // Suppress the expected "Connection terminated unexpectedly" error + alice.on('error', () => { + // Expected when we destroy the connection + }) + + try { + // alice starts a transaction + const aliceBegin = await alice.query('BEGIN') + expect(aliceBegin.command).toBe('BEGIN') + + // bob begins its own transaction + const bobBegin = bob.query('BEGIN') + + // Small delay to ensure client2.BEGIN is enqueued + await new Promise((r) => setTimeout(r, 10)) + + // alice inserts data + alice.query(` + INSERT INTO test_users (name, email) + VALUES + ('Alice', 'alice@example.com') + RETURNING * + `) + + // client inserts data + const bobInsert = bob.query(` + INSERT INTO test_users (name, email) + VALUES + ('Bob', 'bob@example.com') + RETURNING * + `) + + // Small delay to ensure both inserts are enqueued + await new Promise((r) => setTimeout(r, 10)) + + // Client2 abruptly disconnects (simulating connection abort) + // This should trigger clearTransactionIfNeeded which rolls back + // the transaction and processes pending queries + ;(alice as any).connection.stream.destroy() + + // bob commits + const bobCommit = bob.query('COMMIT') + + await Promise.all([bobBegin, bobInsert, bobCommit]) + + // Verify only Bob was commited + const selectResult = await bob.query('SELECT * FROM test_users') + expect(selectResult.rows.length).toBe(1) + expect(selectResult.rows[0].name).toBe('Bob') } catch { // swallow } From c6e0988d4d85d3ff08b45c35937af9166468ac05 Mon Sep 17 00:00:00 2001 From: tudor Date: Wed, 21 Jan 2026 15:25:36 +0100 Subject: [PATCH 13/14] default: do not allow multiple connections --- packages/pglite-socket/src/index.ts | 2 +- packages/pglite-socket/src/scripts/server.ts | 10 ++++++++++ packages/pglite-socket/tests/index.test.ts | 2 +- .../pglite-socket/tests/query-with-node-pg.test.ts | 1 + 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/packages/pglite-socket/src/index.ts b/packages/pglite-socket/src/index.ts index 5ebebce3d..4bdc55125 100644 --- a/packages/pglite-socket/src/index.ts +++ b/packages/pglite-socket/src/index.ts @@ -560,7 +560,7 @@ export class PGLiteSocketServer extends EventTarget { this.inspect = options.inspect ?? false this.debug = options.debug ?? false this.idleTimeout = options.idleTimeout ?? 0 - this.maxConnections = options.maxConnections ?? 100 + this.maxConnections = options.maxConnections ?? 1 // Create the shared query queue this.queryQueue = new QueryQueueManager(this.db, this.debug) diff --git a/packages/pglite-socket/src/scripts/server.ts b/packages/pglite-socket/src/scripts/server.ts index ba765bb38..606a6bd37 100644 --- a/packages/pglite-socket/src/scripts/server.ts +++ b/packages/pglite-socket/src/scripts/server.ts @@ -61,6 +61,12 @@ const args = parseArgs({ default: '5000', help: 'Timeout in milliseconds for graceful subprocess shutdown (default: 5000)', }, + 'max-connections': { + type: 'string', + short: 'm', + default: '1', + help: 'Maximum concurrent connections (default: 1)', + }, help: { type: 'boolean', short: '?', @@ -85,6 +91,7 @@ Options: -r, --run=COMMAND Command to run after server starts --include-database-url Include DATABASE_URL in subprocess environment --shutdown-timeout=MS Timeout for graceful subprocess shutdown in ms (default: 5000) + -m, --max-connections=N Maximum concurrent connections (default is no concurrency: 1) ` interface ServerConfig { @@ -97,6 +104,7 @@ interface ServerConfig { runCommand?: string includeDatabaseUrl: boolean shutdownTimeout: number + maxConnections: number } class PGLiteServerRunner { @@ -123,6 +131,7 @@ class PGLiteServerRunner { runCommand: args.values.run as string, includeDatabaseUrl: args.values['include-database-url'] as boolean, shutdownTimeout: parseInt(args.values['shutdown-timeout'] as string, 10), + maxConnections: parseInt(args.values['max-connections'] as string, 10), } } @@ -281,6 +290,7 @@ class PGLiteServerRunner { host: this.config.host, path: this.config.path, inspect: this.config.debugLevel > 0, + maxConnections: this.config.maxConnections, }) // Create subprocess manager diff --git a/packages/pglite-socket/tests/index.test.ts b/packages/pglite-socket/tests/index.test.ts index 4c61f53b6..a79052b8a 100644 --- a/packages/pglite-socket/tests/index.test.ts +++ b/packages/pglite-socket/tests/index.test.ts @@ -421,7 +421,7 @@ testSocket(async (connOptions) => { const socket1 = createMockSocket() const socket2 = createMockSocket() - // Check initial stats + // Check initial stats (maxConnections is set to 100 in beforeEach) let stats = server.getStats() expect(stats.activeConnections).toBe(0) expect(stats.maxConnections).toBe(100) diff --git a/packages/pglite-socket/tests/query-with-node-pg.test.ts b/packages/pglite-socket/tests/query-with-node-pg.test.ts index d62fda12c..aa54db7f4 100644 --- a/packages/pglite-socket/tests/query-with-node-pg.test.ts +++ b/packages/pglite-socket/tests/query-with-node-pg.test.ts @@ -61,6 +61,7 @@ describe(`PGLite Socket Server`, () => { db, port: TEST_PORT, host: '127.0.0.1', + maxConnections: 100, }) // Add event listeners for debugging From d38c95c6ce9c5ff0e1e2139c55a9dcc281551138 Mon Sep 17 00:00:00 2001 From: tudor Date: Fri, 30 Jan 2026 20:31:58 +0100 Subject: [PATCH 14/14] await ROLLBACK on clearing transaction --- packages/pglite-socket/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pglite-socket/src/index.ts b/packages/pglite-socket/src/index.ts index 4bdc55125..6bf0d3369 100644 --- a/packages/pglite-socket/src/index.ts +++ b/packages/pglite-socket/src/index.ts @@ -135,7 +135,7 @@ class QueryQueueManager { async clearTransactionIfNeeded(handlerId: number): Promise { if (this.db.isInTransaction() && this.lastHandlerId === handlerId) { - this.db.exec('ROLLBACK') + await this.db.exec('ROLLBACK') this.lastHandlerId = null await this.processQueue() }