diff --git a/packages/pglite-socket/src/index.ts b/packages/pglite-socket/src/index.ts index 6a376437f..4f111facb 100644 --- a/packages/pglite-socket/src/index.ts +++ b/packages/pglite-socket/src/index.ts @@ -1,79 +1,177 @@ import type { PGlite } from '@electric-sql/pglite' -import { createServer, Server, Socket } from 'net' +import { type Server, type Socket, createServer } from 'net' // Connection queue timeout in milliseconds export const CONNECTION_QUEUE_TIMEOUT = 60000 // 60 seconds +/** + * Represents a queued query waiting for PGlite access + */ +interface QueuedQuery { + handlerId: number + message: Uint8Array + resolve: (result: Uint8Array) => void + reject: (error: Error) => void + timestamp: number +} + +/** + * Global query queue manager + * Ensures only one query executes at a time in PGlite + */ +class QueryQueueManager { + private queue: QueuedQuery[] = [] + private processing = false + private db: PGlite + private debug: boolean + + constructor(db: PGlite, debug = false) { + this.db = db + this.debug = debug + } + + private log(message: string, ...args: any[]): void { + if (this.debug) { + console.log(`[QueryQueueManager] ${message}`, ...args) + } + } + + async enqueue(handlerId: number, message: Uint8Array): Promise { + return new Promise((resolve, reject) => { + const query: QueuedQuery = { + handlerId, + message, + resolve, + reject, + timestamp: Date.now(), + } + + this.queue.push(query) + this.log( + `enqueued query from handler #${handlerId}, queue size: ${this.queue.length}`, + ) + + // Process queue if not already processing + if (!this.processing) { + this.processQueue() + } + }) + } + + private async processQueue(): Promise { + if (this.processing || this.queue.length === 0) { + return + } + + this.processing = true + + while (this.queue.length > 0) { + const query = this.queue.shift() + if (!query) break + + const waitTime = Date.now() - query.timestamp + this.log( + `processing query from handler #${query.handlerId} (waited ${waitTime}ms)`, + ) + + try { + // Execute the query with exclusive access to PGlite + const result = await this.db.runExclusive(async () => { + return await this.db.execProtocolRaw(query.message) + }) + + this.log( + `query from handler #${query.handlerId} completed, ${result.length} bytes`, + ) + query.resolve(result) + } catch (error) { + this.log(`query from handler #${query.handlerId} failed:`, error) + query.reject(error as Error) + } + } + + this.processing = false + this.log(`queue processing complete, queue is empty`) + } + + getQueueLength(): number { + return this.queue.length + } + + clearQueueForHandler(handlerId: number): void { + const before = this.queue.length + this.queue = this.queue.filter((q) => { + if (q.handlerId === handlerId) { + q.reject(new Error('Handler disconnected')) + return false + } + return true + }) + const removed = before - this.queue.length + if (removed > 0) { + this.log(`cleared ${removed} queries for handler #${handlerId}`) + } + } +} + /** * Options for creating a PGLiteSocketHandler */ export interface PGLiteSocketHandlerOptions { - /** The PGlite database instance */ - db: PGlite + /** The query queue manager */ + queryQueue: QueryQueueManager /** Whether to close the socket when detached (default: false) */ closeOnDetach?: boolean /** Print the incoming and outgoing data to the console in hex and ascii */ inspect?: boolean /** Enable debug logging of method calls */ debug?: boolean + /** Idle timeout in ms (0 to disable, default: 0) */ + idleTimeout?: number } /** - * Low-level handler for a single socket connection to PGLite - * Handles the raw protocol communication between a socket and PGLite + * Handler for a single socket connection to PGlite + * Each connection can remain open and send multiple queries */ export class PGLiteSocketHandler extends EventTarget { - readonly db: PGlite + private queryQueue: QueryQueueManager private socket: Socket | null = null private active = false private closeOnDetach: boolean - private resolveLock?: () => void - private rejectLock?: (err: Error) => void private inspect: boolean private debug: boolean private readonly id: number + private messageBuffer: Buffer = Buffer.alloc(0) + private idleTimer?: NodeJS.Timeout + private idleTimeout: number + private lastActivityTime: number = Date.now() // Static counter for generating unique handler IDs private static nextHandlerId = 1 - /** - * Create a new PGLiteSocketHandler - * @param options Options for the handler - */ constructor(options: PGLiteSocketHandlerOptions) { super() - this.db = options.db + this.queryQueue = options.queryQueue this.closeOnDetach = options.closeOnDetach ?? false this.inspect = options.inspect ?? false this.debug = options.debug ?? false + this.idleTimeout = options.idleTimeout ?? 0 this.id = PGLiteSocketHandler.nextHandlerId++ this.log('constructor: created new handler') } - /** - * Get the unique ID of this handler - */ public get handlerId(): number { return this.id } - /** - * Log a message if debug is enabled - * @private - */ private log(message: string, ...args: any[]): void { if (this.debug) { console.log(`[PGLiteSocketHandler#${this.id}] ${message}`, ...args) } } - /** - * Attach a socket to this handler - * @param socket The socket to attach - * @returns this handler instance - * @throws Error if a socket is already attached - */ public async attach(socket: Socket): Promise { this.log( `attach: attaching socket from ${socket.remoteAddress}:${socket.remotePort}`, @@ -85,51 +183,72 @@ export class PGLiteSocketHandler extends EventTarget { this.socket = socket this.active = true + this.lastActivityTime = Date.now() - // Ensure the PGlite instance is ready - this.log(`attach: waiting for PGlite to be ready`) - await this.db.waitReady + // Set up socket options + socket.setKeepAlive(true, 30000) + socket.setNoDelay(true) - // Hold the lock on the PGlite instance - this.log(`attach: acquiring exclusive lock on PGlite instance`) - await new Promise((resolve) => { - this.db.runExclusive(() => { - // Ensure we have the lock on the PGlite instance - resolve() + // Set up idle timeout if configured + if (this.idleTimeout > 0) { + this.resetIdleTimer() + } - // Use a promise to hold the lock on the PGlite instance - // this can be resolved or rejected by the handler to release the lock - return new Promise((resolveLock, rejectLock) => { - this.resolveLock = resolveLock - this.rejectLock = rejectLock - }) + // Setup event handlers + this.log(`attach: setting up socket event handlers`) + + socket.on('data', (data) => { + this.lastActivityTime = Date.now() + this.resetIdleTimer() + + setImmediate(async () => { + try { + const result = await this.handleData(data) + this.log(`socket on data sent: ${result} bytes`) + } catch (err) { + this.log('socket on data error: ', err) + this.handleError(err as Error) + } }) }) - // Setup event handlers - this.log(`attach: setting up socket event handlers`) - socket.on('data', async (data) => { - try { - const result = await this.handleData(data) - this.log(`socket on data sent: ${result} bytes`) - } catch (err) { - this.log('socket on data error: ', err) - } + socket.on('error', (err) => { + setImmediate(() => this.handleError(err)) + }) + + socket.on('close', () => { + setImmediate(() => this.handleClose()) }) - socket.on('error', (err) => this.handleError(err)) - socket.on('close', () => this.handleClose()) + this.log(`attach: socket handler ready`) return this } - /** - * Detach the current socket from this handler - * @param close Whether to close the socket when detaching (overrides constructor option) - * @returns this handler instance - */ + private resetIdleTimer(): void { + if (this.idleTimeout <= 0) return + + if (this.idleTimer) { + clearTimeout(this.idleTimer) + } + + this.idleTimer = setTimeout(() => { + const idleTime = Date.now() - this.lastActivityTime + this.log(`idle timeout after ${idleTime}ms`) + this.handleError(new Error('Idle timeout')) + }, this.idleTimeout) + } + public detach(close?: boolean): PGLiteSocketHandler { this.log(`detach: detaching socket, close=${close ?? this.closeOnDetach}`) + if (this.idleTimer) { + clearTimeout(this.idleTimer) + this.idleTimer = undefined + } + + // Clear any pending queries for this handler + this.queryQueue.clearQueueForHandler(this.id) + if (!this.socket) { this.log(`detach: no socket attached, nothing to do`) return this @@ -144,129 +263,179 @@ export class PGLiteSocketHandler extends EventTarget { if (close ?? this.closeOnDetach) { if (this.socket.writable) { this.log(`detach: closing socket`) - this.socket.end() - this.socket.destroy() + try { + this.socket.end() + this.socket.destroy() + } catch (err) { + this.log(`detach: error closing socket:`, err) + } } } - // Release the lock on the PGlite instance - this.log(`detach: releasing exclusive lock on PGlite instance`) - this.resolveLock?.() - this.socket = null this.active = false + this.messageBuffer = Buffer.alloc(0) + + this.log(`detach: handler cleaned up`) return this } - /** - * Check if a socket is currently attached - */ public get isAttached(): boolean { return this.socket !== null } - /** - * Handle incoming data from the socket - */ private async handleData(data: Buffer): Promise { if (!this.socket || !this.active) { this.log(`handleData: no active socket, ignoring data`) - return new Promise((_, reject) => reject(`no active socket`)) + return 0 } this.log(`handleData: received ${data.length} bytes`) + // Append to buffer for message reassembly + this.messageBuffer = Buffer.concat([this.messageBuffer, data]) + // Print the incoming data to the console this.inspectData('incoming', data) try { - // Process the raw protocol data - this.log(`handleData: sending data to PGlite for processing`) - const result = await this.db.execProtocolRaw(new Uint8Array(data)) - - this.log(`handleData: received ${result.length} bytes from PGlite`) + let totalProcessed = 0 + + while (this.messageBuffer.length > 0) { + // Determine message length + let messageLength = 0 + let isComplete = false + + // Handle startup message (no type byte, just length) + if (this.messageBuffer.length >= 4) { + const firstInt = this.messageBuffer.readInt32BE(0) + + if (this.messageBuffer.length >= 8) { + const secondInt = this.messageBuffer.readInt32BE(4) + // PostgreSQL 3.0 protocol version + if (secondInt === 196608 || secondInt === 0x00030000) { + messageLength = firstInt + isComplete = this.messageBuffer.length >= messageLength + } + } - // Print the outgoing data to the console - this.inspectData('outgoing', result) + // Regular message (type byte + length) + if (!isComplete && this.messageBuffer.length >= 5) { + const msgLength = this.messageBuffer.readInt32BE(1) + messageLength = 1 + msgLength + isComplete = this.messageBuffer.length >= messageLength + } + } - // Send the result back if the socket is still connected - if (this.socket && this.socket.writable && this.active) { - if (result.length <= 0) { - this.log(`handleData: cowardly refusing to send empty packet`) - return new Promise((_, reject) => reject('no data')) + if (!isComplete || messageLength === 0) { + this.log( + `handleData: incomplete message, buffering ${this.messageBuffer.length} bytes`, + ) + break } - const promise = new Promise((resolve, reject) => { - this.log(`handleData: writing response to socket`) - if (this.socket) { - this.socket.write(Buffer.from(result), (err?: Error) => { - if (err) { - reject(`Error while writing to the socket ${err.toString()}`) - } else { - resolve(result.length) - } - }) - } else { - reject(`No socket`) - } - }) + // Extract and process complete message + const message = this.messageBuffer.slice(0, messageLength) + this.messageBuffer = this.messageBuffer.slice(messageLength) - // Emit data event with byte sizes - this.dispatchEvent( - new CustomEvent('data', { - detail: { incoming: data.length, outgoing: result.length }, - }), - ) - return promise - } else { - this.log( - `handleData: socket no longer writable or active, discarding response`, - ) - return new Promise((_, reject) => - reject(`No socket, not active or not writeable`), + this.log(`handleData: processing message of ${message.length} bytes`) + + // Check if socket is still active before processing + if (!this.active || !this.socket) { + this.log(`handleData: socket no longer active, stopping processing`) + break + } + + // Queue the query for execution + // This allows multiple connections to queue queries simultaneously + const result = await this.queryQueue.enqueue( + this.id, + new Uint8Array(message), ) + + this.log(`handleData: received ${result.length} bytes from PGlite`) + + // Print the outgoing data to the console + this.inspectData('outgoing', result) + + // Send response if available + if ( + result.length > 0 && + this.socket && + this.socket.writable && + this.active + ) { + await new Promise((resolve, reject) => { + this.log(`handleData: writing response to socket`) + if (this.socket?.writable) { + this.socket.write(Buffer.from(result), (err?: any) => { + if (err) { + this.log(`handleData: error writing to socket:`, err) + reject(err) + } else { + resolve(result.length) + } + }) + } else { + this.log(`handleData: socket no longer writable`) + resolve(0) + } + }).catch((writeErr) => { + this.log(`handleData: failed to write to socket:`, writeErr) + throw writeErr + }) + } + + totalProcessed += message.length } + + // Emit data event with byte sizes + this.dispatchEvent( + new CustomEvent('data', { + detail: { incoming: data.length, outgoing: totalProcessed }, + }), + ) + + return totalProcessed } catch (err) { this.log(`handleData: error processing data:`, err) - this.handleError(err as Error) - return new Promise((_, reject) => - reject(`Error while processing data ${(err as Error).toString()}`), - ) + throw err } } - /** - * Handle errors from the socket - */ private handleError(err: Error): void { - this.log(`handleError:`, err) + if (!this.active) { + this.log(`handleError: handler not active, ignoring error`) + return + } + + // ECONNRESET is expected behavior when clients disconnect + if (err.message?.includes('ECONNRESET')) { + this.log( + `handleError: client disconnected (ECONNRESET) - normal behavior`, + ) + } else if (err.message?.includes('Idle timeout')) { + this.log(`handleError: connection idle timeout`) + } else { + this.log(`handleError:`, err) + } + + this.active = false // Emit error event this.dispatchEvent(new CustomEvent('error', { detail: err })) - // Reject the lock on the PGlite instance - this.log(`handleError: rejecting exclusive lock on PGlite instance`) - this.rejectLock?.(err) - this.resolveLock = undefined - this.rejectLock = undefined - - // Close the connection on error + // Clean up this.detach(true) } - /** - * Handle socket close event - */ private handleClose(): void { this.log(`handleClose: socket closed`) - + this.active = false this.dispatchEvent(new CustomEvent('close')) - this.detach(false) // Already closed, just clean up + this.detach(false) } - /** - * Print data in hex and ascii to the console - */ private inspectData( direction: 'incoming' | 'outgoing', data: Buffer | Uint8Array, @@ -279,31 +448,25 @@ export class PGLiteSocketHandler extends EventTarget { console.log('<- outgoing', data.length, 'bytes') } - // Process 16 bytes per line for (let offset = 0; offset < data.length; offset += 16) { - // Calculate current chunk size (may be less than 16 for the last chunk) const chunkSize = Math.min(16, data.length - offset) - // Build the hex representation let hexPart = '' for (let i = 0; i < 16; i++) { if (i < chunkSize) { const byte = data[offset + i] hexPart += byte.toString(16).padStart(2, '0') + ' ' } else { - hexPart += ' ' // 3 spaces for missing bytes + hexPart += ' ' } } - // Build the ASCII representation let asciiPart = '' for (let i = 0; i < chunkSize; i++) { const byte = data[offset + i] - // Use printable characters (32-126), replace others with a dot asciiPart += byte >= 32 && byte <= 126 ? String.fromCharCode(byte) : '.' } - // Print the line with offset in hex, hex values, and ASCII representation console.log( `${offset.toString(16).padStart(8, '0')} ${hexPart} ${asciiPart}`, ) @@ -311,18 +474,6 @@ export class PGLiteSocketHandler extends EventTarget { } } -/** - * Represents a queued connection with timeout - */ -interface QueuedConnection { - socket: Socket - clientInfo: { - clientAddress: string - clientPort: number - } - timeoutId: NodeJS.Timeout -} - /** * Options for creating a PGLiteSocketServer */ @@ -333,19 +484,21 @@ export interface PGLiteSocketServerOptions { port?: number /** The host to bind to (default: 127.0.0.1) */ host?: string - /** Unix socket path to bind to (default: undefined). If specified, takes precedence over host:port */ + /** Unix socket path to bind to (default: undefined) */ path?: string /** Print the incoming and outgoing data to the console in hex and ascii */ inspect?: boolean - /** Connection queue timeout in milliseconds (default: 10000) */ - connectionQueueTimeout?: number /** Enable debug logging of method calls */ debug?: boolean + /** Idle timeout in ms (0 to disable, default: 0) */ + idleTimeout?: number + /** Maximum concurrent connections (default: 100) */ + maxConnections?: number } /** - * High-level server that manages socket connections to PGLite - * Creates and manages a TCP server and handles client connections + * PGLite Socket Server with support for multiple concurrent connections + * Connections remain open and queries are queued at the query level */ export class PGLiteSocketServer extends EventTarget { readonly db: PGlite @@ -356,54 +509,41 @@ export class PGLiteSocketServer extends EventTarget { private active = false private inspect: boolean private debug: boolean - private connectionQueueTimeout: number - private activeHandler: PGLiteSocketHandler | null = null - private connectionQueue: QueuedConnection[] = [] - private handlerCount: number = 0 - - /** - * Create a new PGLiteSocketServer - * @param options Options for the server - */ + private idleTimeout: number + private maxConnections: number + private handlers: Set = new Set() + private queryQueue: QueryQueueManager + constructor(options: PGLiteSocketServerOptions) { super() this.db = options.db if (options.path) { this.path = options.path } else { - if (typeof options.port === 'number') { - // Keep port undefined on port 0, will be set by the OS when we start the server. - this.port = options.port ?? options.port - } else { - this.port = 5432 - } + this.port = options.port ?? 5432 this.host = options.host || '127.0.0.1' } this.inspect = options.inspect ?? false this.debug = options.debug ?? false - this.connectionQueueTimeout = - options.connectionQueueTimeout ?? CONNECTION_QUEUE_TIMEOUT + this.idleTimeout = options.idleTimeout ?? 0 + this.maxConnections = options.maxConnections ?? 100 - this.log(`constructor: created server on ${this.host}:${this.port}`) - this.log( - `constructor: connection queue timeout: ${this.connectionQueueTimeout}ms`, - ) + // Create the shared query queue + this.queryQueue = new QueryQueueManager(this.db, this.debug) + + this.log(`constructor: created server on ${this.getServerConn()}`) + this.log(`constructor: max connections: ${this.maxConnections}`) + if (this.idleTimeout > 0) { + this.log(`constructor: idle timeout: ${this.idleTimeout}ms`) + } } - /** - * Log a message if debug is enabled - * @private - */ private log(message: string, ...args: any[]): void { if (this.debug) { console.log(`[PGLiteSocketServer] ${message}`, ...args) } } - /** - * Start the socket server - * @returns Promise that resolves when the server is listening - */ public async start(): Promise { this.log(`start: starting server on ${this.getServerConn()}`) @@ -411,8 +551,15 @@ export class PGLiteSocketServer extends EventTarget { throw new Error('Socket server already started') } + // Ensure PGlite is ready before accepting connections + await this.db.waitReady + this.active = true - this.server = createServer((socket) => this.handleConnection(socket)) + this.server = createServer((socket) => { + setImmediate(() => this.handleConnection(socket)) + }) + + this.server.maxConnections = this.maxConnections return new Promise((resolve, reject) => { if (!this.server) return reject(new Error('Server not initialized')) @@ -420,7 +567,9 @@ export class PGLiteSocketServer extends EventTarget { this.server.on('error', (err) => { this.log(`start: server error:`, err) this.dispatchEvent(new CustomEvent('error', { detail: err })) - reject(err) + if (!this.active) { + reject(err) + } }) if (this.path) { @@ -434,15 +583,7 @@ export class PGLiteSocketServer extends EventTarget { resolve() }) } else { - const server = this.server - server.listen(this.port, this.host, () => { - const address = server.address() - // We are not using pipes, so return type should be AddressInfo - if (address === null || typeof address !== 'object') { - throw Error('Expected address info') - } - // Assign the new port number - this.port = address.port + this.server.listen(this.port, this.host, () => { this.log(`start: server listening on ${this.getServerConn()}`) this.dispatchEvent( new CustomEvent('listening', { @@ -460,37 +601,17 @@ export class PGLiteSocketServer extends EventTarget { return `${this.host}:${this.port}` } - /** - * Stop the socket server - * @returns Promise that resolves when the server is closed - */ public async stop(): Promise { this.log(`stop: stopping server`) this.active = false - // Clear connection queue - this.log( - `stop: clearing connection queue (${this.connectionQueue.length} connections)`, - ) - - this.connectionQueue.forEach((queuedConn) => { - clearTimeout(queuedConn.timeoutId) - if (queuedConn.socket.writable) { - this.log( - `stop: closing queued connection from ${queuedConn.clientInfo.clientAddress}:${queuedConn.clientInfo.clientPort}`, - ) - queuedConn.socket.end() - } - }) - this.connectionQueue = [] - - // Detach active handler if exists - if (this.activeHandler) { - this.log(`stop: detaching active handler #${this.activeHandlerId}`) - this.activeHandler.detach(true) - this.activeHandler = null + // Detach all handlers + this.log(`stop: detaching ${this.handlers.size} handlers`) + for (const handler of this.handlers) { + handler.detach(true) } + this.handlers.clear() if (!this.server) { this.log(`stop: server not running, nothing to do`) @@ -509,16 +630,6 @@ export class PGLiteSocketServer extends EventTarget { }) } - /** - * Get the active handler ID, or null if no active handler - */ - private get activeHandlerId(): number | null { - return this.activeHandler?.handlerId ?? null - } - - /** - * Handle a new client connection - */ private async handleConnection(socket: Socket): Promise { const clientInfo = { clientAddress: socket.remoteAddress || 'unknown', @@ -528,193 +639,82 @@ export class PGLiteSocketServer extends EventTarget { this.log( `handleConnection: new connection from ${clientInfo.clientAddress}:${clientInfo.clientPort}`, ) - - // If server is not active, close the connection immediately - if (!this.active) { - this.log(`handleConnection: server not active, closing connection`) - socket.end() - return - } - - // If we don't have an active handler or it's not attached, we can use this connection immediately - if (!this.activeHandler || !this.activeHandler.isAttached) { - this.log(`handleConnection: no active handler, attaching socket directly`) - this.dispatchEvent(new CustomEvent('connection', { detail: clientInfo })) - await this.attachSocketToNewHandler(socket, clientInfo) - return - } - - // Otherwise, queue the connection - this.log( - `handleConnection: active handler #${this.activeHandlerId} exists, queueing connection`, - ) - this.enqueueConnection(socket, clientInfo) - } - - /** - * Add a connection to the queue - */ - private enqueueConnection( - socket: Socket, - clientInfo: { clientAddress: string; clientPort: number }, - ): void { this.log( - `enqueueConnection: queueing connection from ${clientInfo.clientAddress}:${clientInfo.clientPort}, timeout: ${this.connectionQueueTimeout}ms`, + `handleConnection: active connections: ${this.handlers.size}, queued queries: ${this.queryQueue.getQueueLength()}`, ) - // Set a timeout for this queued connection - const timeoutId = setTimeout(() => { - this.log( - `enqueueConnection: timeout for connection from ${clientInfo.clientAddress}:${clientInfo.clientPort}`, - ) - - // Remove from queue - this.connectionQueue = this.connectionQueue.filter( - (queuedConn) => queuedConn.socket !== socket, - ) - - // End the connection if it's still open - if (socket.writable) { - this.log(`enqueueConnection: closing timed out connection`) + if (!this.active) { + this.log(`handleConnection: server not active, closing connection`) + try { socket.end() + } catch (err) { + this.log(`handleConnection: error closing socket:`, err) } - - this.dispatchEvent( - new CustomEvent('queueTimeout', { - detail: { ...clientInfo, queueSize: this.connectionQueue.length }, - }), - ) - }, this.connectionQueueTimeout) - - // Add to queue - this.connectionQueue.push({ socket, clientInfo, timeoutId }) - - this.log( - `enqueueConnection: connection queued, queue size: ${this.connectionQueue.length}`, - ) - - this.dispatchEvent( - new CustomEvent('queuedConnection', { - detail: { ...clientInfo, queueSize: this.connectionQueue.length }, - }), - ) - } - - /** - * Process the next connection in the queue - */ - private processNextInQueue(): void { - this.log( - `processNextInQueue: processing next connection, queue size: ${this.connectionQueue.length}`, - ) - - // No connections in queue or server not active - if (this.connectionQueue.length === 0 || !this.active) { - this.log( - `processNextInQueue: no connections in queue or server not active, nothing to do`, - ) return } - // Get the next connection - const nextConn = this.connectionQueue.shift() - if (!nextConn) return - - this.log( - `processNextInQueue: processing connection from ${nextConn.clientInfo.clientAddress}:${nextConn.clientInfo.clientPort}`, - ) - - // Clear the timeout - clearTimeout(nextConn.timeoutId) - - // Check if the socket is still valid - if (!nextConn.socket.writable) { - this.log( - `processNextInQueue: socket no longer writable, skipping to next connection`, - ) - // Socket closed while waiting, process next in queue - this.processNextInQueue() + // Check connection limit + if (this.handlers.size >= this.maxConnections) { + this.log(`handleConnection: max connections reached, rejecting`) + socket.write(Buffer.from('Too many connections\n')) + socket.end() return } - // Attach this socket to a new handler - this.attachSocketToNewHandler(nextConn.socket, nextConn.clientInfo).catch( - (err) => { - this.log(`processNextInQueue: error attaching socket:`, err) - this.dispatchEvent(new CustomEvent('error', { detail: err })) - // Try the next connection - this.processNextInQueue() - }, - ) - } - - /** - * Attach a socket to a new handler - */ - private async attachSocketToNewHandler( - socket: Socket, - clientInfo: { clientAddress: string; clientPort: number }, - ): Promise { - this.handlerCount++ - - this.log( - `attachSocketToNewHandler: creating new handler for ${clientInfo.clientAddress}:${clientInfo.clientPort} (handler #${this.handlerCount})`, - ) - // Create a new handler for this connection const handler = new PGLiteSocketHandler({ - db: this.db, + queryQueue: this.queryQueue, closeOnDetach: true, inspect: this.inspect, debug: this.debug, + idleTimeout: this.idleTimeout, }) - // Forward error events from the handler - handler.addEventListener('error', (event) => { - this.log( - `handler #${handler.handlerId}: error from handler:`, - (event as CustomEvent).detail, - ) - this.dispatchEvent( - new CustomEvent('error', { - detail: (event as CustomEvent).detail, - }), - ) - }) + // Track this handler + this.handlers.add(handler) - // Handle close event to process next queued connection - handler.addEventListener('close', () => { - this.log(`handler #${handler.handlerId}: closed`) + // Handle errors + handler.addEventListener('error', (event) => { + const error = (event as CustomEvent).detail - // If this is our active handler, clear it - if (this.activeHandler === handler) { + if (error?.message?.includes('ECONNRESET')) { this.log( - `handler #${handler.handlerId}: was active handler, processing next connection in queue`, + `handler #${handler.handlerId}: client disconnected (ECONNRESET)`, ) - this.activeHandler = null - // Process next connection in queue - this.processNextInQueue() + } else if (error?.message?.includes('Idle timeout')) { + this.log(`handler #${handler.handlerId}: idle timeout`) + } else { + this.log(`handler #${handler.handlerId}: error:`, error) } }) - try { - // Set as active handler - this.activeHandler = handler - - this.log(`handler #${handler.handlerId}: attaching socket`) + // Handle close event + handler.addEventListener('close', () => { + this.log(`handler #${handler.handlerId}: closed`) + this.handlers.delete(handler) + this.log(`handleConnection: active connections: ${this.handlers.size}`) + }) - // Attach the socket to the handler + try { await handler.attach(socket) - this.dispatchEvent(new CustomEvent('connection', { detail: clientInfo })) } catch (err) { - // If there was an error attaching, clean up - this.log(`handler #${handler.handlerId}: error attaching socket:`, err) - this.activeHandler = null - if (socket.writable) { + this.log(`handleConnection: error attaching socket:`, err) + this.handlers.delete(handler) + this.dispatchEvent(new CustomEvent('error', { detail: err })) + try { socket.end() + } catch (closeErr) { + this.log(`handleConnection: error closing socket:`, closeErr) } - throw err + } + } + + public getStats() { + return { + activeConnections: this.handlers.size, + queuedQueries: this.queryQueue.getQueueLength(), + maxConnections: this.maxConnections, } } }