-
Notifications
You must be signed in to change notification settings - Fork 42
Add state machine to ReconnectingWebSocket #744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
4d266cf
93355b9
2e27a6b
69df291
6b4fe99
59d4147
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,91 @@ import type { | |
| UnidirectionalStream, | ||
| } from "./eventStreamConnection"; | ||
|
|
||
| /** | ||
| * Connection states for the ReconnectingWebSocket state machine. | ||
| */ | ||
| export enum ConnectionState { | ||
| /** Initial state, ready to connect */ | ||
| IDLE = "IDLE", | ||
| /** Actively running connect() - WS factory in progress */ | ||
| CONNECTING = "CONNECTING", | ||
| /** Socket is open and working */ | ||
| CONNECTED = "CONNECTED", | ||
| /** Waiting for backoff timer before attempting reconnection */ | ||
| AWAITING_RETRY = "AWAITING_RETRY", | ||
| /** Temporarily paused - user must call reconnect() to resume */ | ||
| DISCONNECTED = "DISCONNECTED", | ||
| /** Permanently closed - cannot be reused */ | ||
| DISPOSED = "DISPOSED", | ||
|
EhabY marked this conversation as resolved.
|
||
| } | ||
|
|
||
| /** | ||
| * Actions that trigger state transitions. | ||
| */ | ||
| type StateAction = | ||
| | { readonly type: "CONNECT" } | ||
| | { readonly type: "OPEN" } | ||
| | { readonly type: "SCHEDULE_RETRY" } | ||
| | { readonly type: "DISCONNECT" } | ||
| | { readonly type: "DISPOSE" }; | ||
|
|
||
| /** | ||
| * Pure reducer function for state transitions. | ||
| */ | ||
| function reduceState( | ||
| state: ConnectionState, | ||
| action: StateAction, | ||
| ): ConnectionState { | ||
| switch (action.type) { | ||
| case "CONNECT": | ||
| switch (state) { | ||
| case ConnectionState.IDLE: | ||
| case ConnectionState.CONNECTED: | ||
| case ConnectionState.AWAITING_RETRY: | ||
| case ConnectionState.DISCONNECTED: | ||
| return ConnectionState.CONNECTING; | ||
| default: | ||
| return state; | ||
| } | ||
|
|
||
| case "OPEN": | ||
| switch (state) { | ||
| case ConnectionState.CONNECTING: | ||
| return ConnectionState.CONNECTED; | ||
| default: | ||
| return state; | ||
| } | ||
|
|
||
| case "SCHEDULE_RETRY": | ||
| switch (state) { | ||
| case ConnectionState.CONNECTING: | ||
| case ConnectionState.CONNECTED: | ||
| return ConnectionState.AWAITING_RETRY; | ||
| default: | ||
| return state; | ||
| } | ||
|
|
||
| case "DISCONNECT": | ||
| switch (state) { | ||
| case ConnectionState.IDLE: | ||
| case ConnectionState.CONNECTING: | ||
| case ConnectionState.CONNECTED: | ||
| case ConnectionState.AWAITING_RETRY: | ||
| return ConnectionState.DISCONNECTED; | ||
| default: | ||
| return state; | ||
| } | ||
|
|
||
| case "DISPOSE": | ||
| switch (state) { | ||
| case ConnectionState.DISPOSED: | ||
| return state; | ||
| default: | ||
| return ConnectionState.DISPOSED; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| export type SocketFactory<TData> = () => Promise<UnidirectionalStream<TData>>; | ||
|
|
||
| export interface ReconnectingWebSocketOptions { | ||
|
|
@@ -46,13 +131,29 @@ export class ReconnectingWebSocket< | |
| #lastRoute = "unknown"; // Cached route for logging when socket is closed | ||
| #backoffMs: number; | ||
| #reconnectTimeoutId: NodeJS.Timeout | null = null; | ||
| #isDisconnected = false; // Temporary pause, can be resumed via reconnect() | ||
| #isDisposed = false; // Permanent disposal, cannot be resumed | ||
| #isConnecting = false; | ||
| #pendingReconnect = false; | ||
| #state: ConnectionState = ConnectionState.IDLE; | ||
| #certRefreshAttempted = false; // Tracks if cert refresh was already attempted this connection cycle | ||
| readonly #onDispose?: () => void; | ||
|
|
||
| /** | ||
| * Dispatch an action to transition state. Returns true if transition is allowed. | ||
| */ | ||
| #dispatch(action: StateAction): boolean { | ||
| const newState = reduceState(this.#state, action); | ||
| if (newState === this.#state) { | ||
| // Allow CONNECT from CONNECTING as a "restart" operation | ||
| if ( | ||
| action.type === "CONNECT" && | ||
| this.#state === ConnectionState.CONNECTING | ||
| ) { | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| this.#state = newState; | ||
| return true; | ||
| } | ||
|
|
||
| private constructor( | ||
| socketFactory: SocketFactory<TData>, | ||
| logger: Logger, | ||
|
|
@@ -94,11 +195,10 @@ export class ReconnectingWebSocket< | |
| } | ||
|
|
||
| /** | ||
| * Returns true if the socket is temporarily disconnected and not attempting to reconnect. | ||
| * Use reconnect() to resume. | ||
| * Returns the current connection state. | ||
| */ | ||
| get isDisconnected(): boolean { | ||
| return this.#isDisconnected; | ||
| get state(): ConnectionState { | ||
| return this.#state; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -133,26 +233,20 @@ export class ReconnectingWebSocket< | |
| * Resumes the socket if previously disconnected via disconnect(). | ||
| */ | ||
| reconnect(): void { | ||
| if (this.#isDisconnected) { | ||
| this.#isDisconnected = false; | ||
| this.#backoffMs = this.#options.initialBackoffMs; | ||
| this.#certRefreshAttempted = false; // User-initiated reconnect, allow retry | ||
| if (this.#state === ConnectionState.DISPOSED) { | ||
| return; | ||
| } | ||
|
|
||
| if (this.#isDisposed) { | ||
| return; | ||
| if (this.#state === ConnectionState.DISCONNECTED) { | ||
| this.#backoffMs = this.#options.initialBackoffMs; | ||
| this.#certRefreshAttempted = false; // User-initiated reconnect, allow retry | ||
|
EhabY marked this conversation as resolved.
|
||
| } | ||
|
|
||
| if (this.#reconnectTimeoutId !== null) { | ||
| clearTimeout(this.#reconnectTimeoutId); | ||
| this.#reconnectTimeoutId = null; | ||
| } | ||
|
|
||
| if (this.#isConnecting) { | ||
| this.#pendingReconnect = true; | ||
| return; | ||
| } | ||
|
|
||
| // connect() handles all errors internally | ||
| void this.connect(); | ||
| } | ||
|
|
@@ -161,16 +255,14 @@ export class ReconnectingWebSocket< | |
| * Temporarily disconnect the socket. Can be resumed via reconnect(). | ||
| */ | ||
| disconnect(code?: number, reason?: string): void { | ||
| if (this.#isDisposed || this.#isDisconnected) { | ||
| if (!this.#dispatch({ type: "DISCONNECT" })) { | ||
| return; | ||
| } | ||
|
|
||
| this.#isDisconnected = true; | ||
| this.clearCurrentSocket(code, reason); | ||
| } | ||
|
|
||
| close(code?: number, reason?: string): void { | ||
| if (this.#isDisposed) { | ||
| if (this.#state === ConnectionState.DISPOSED) { | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -187,11 +279,9 @@ export class ReconnectingWebSocket< | |
| } | ||
|
|
||
| private async connect(): Promise<void> { | ||
| if (this.#isDisposed || this.#isDisconnected || this.#isConnecting) { | ||
| if (!this.#dispatch({ type: "CONNECT" })) { | ||
| return; | ||
| } | ||
|
|
||
| this.#isConnecting = true; | ||
| try { | ||
| // Close any existing socket before creating a new one | ||
| if (this.#currentSocket) { | ||
|
|
@@ -204,8 +294,8 @@ export class ReconnectingWebSocket< | |
|
|
||
| const socket = await this.#socketFactory(); | ||
|
|
||
| // Check if disconnected/disposed while waiting for factory | ||
| if (this.#isDisposed || this.#isDisconnected) { | ||
| // Check if state changed while waiting for factory (e.g., disconnect/dispose called) | ||
| if (this.#state !== ConnectionState.CONNECTING) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we expecting something may asynchronously change this state considering we just set connecting?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, JS is single threaded but it's |
||
| socket.close(WebSocketCloseCode.NORMAL, "Cancelled during connection"); | ||
| return; | ||
| } | ||
|
|
@@ -214,16 +304,32 @@ export class ReconnectingWebSocket< | |
| this.#lastRoute = this.#route; | ||
|
|
||
| socket.addEventListener("open", (event) => { | ||
| if (this.#currentSocket !== socket) { | ||
| return; | ||
| } | ||
|
|
||
| if (!this.#dispatch({ type: "OPEN" })) { | ||
| return; | ||
| } | ||
| // Reset backoff on successful connection | ||
| this.#backoffMs = this.#options.initialBackoffMs; | ||
| this.#certRefreshAttempted = false; // Reset on successful connection | ||
| this.#certRefreshAttempted = false; | ||
| this.executeHandlers("open", event); | ||
| }); | ||
|
|
||
| socket.addEventListener("message", (event) => { | ||
| if (this.#currentSocket !== socket) { | ||
| return; | ||
| } | ||
|
|
||
| this.executeHandlers("message", event); | ||
| }); | ||
|
|
||
| socket.addEventListener("error", (event) => { | ||
| if (this.#currentSocket !== socket) { | ||
| return; | ||
| } | ||
|
|
||
| this.executeHandlers("error", event); | ||
| // Errors during initial connection are caught by the factory (waitForOpen). | ||
| // This handler is for errors AFTER successful connection. | ||
|
|
@@ -233,7 +339,14 @@ export class ReconnectingWebSocket< | |
| }); | ||
|
|
||
| socket.addEventListener("close", (event) => { | ||
| if (this.#isDisposed || this.#isDisconnected) { | ||
| if (this.#currentSocket !== socket) { | ||
| return; | ||
| } | ||
|
|
||
| if ( | ||
| this.#state === ConnectionState.DISPOSED || | ||
| this.#state === ConnectionState.DISCONNECTED | ||
| ) { | ||
|
EhabY marked this conversation as resolved.
|
||
| return; | ||
| } | ||
|
|
||
|
|
@@ -255,22 +368,11 @@ export class ReconnectingWebSocket< | |
| }); | ||
| } catch (error) { | ||
| await this.handleConnectionError(error); | ||
| } finally { | ||
| this.#isConnecting = false; | ||
|
|
||
| if (this.#pendingReconnect) { | ||
| this.#pendingReconnect = false; | ||
| this.reconnect(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private scheduleReconnect(): void { | ||
| if ( | ||
| this.#isDisposed || | ||
| this.#isDisconnected || | ||
| this.#reconnectTimeoutId !== null | ||
| ) { | ||
| if (!this.#dispatch({ type: "SCHEDULE_RETRY" })) { | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -354,7 +456,14 @@ export class ReconnectingWebSocket< | |
| * otherwise schedules a reconnect. | ||
| */ | ||
| private async handleConnectionError(error: unknown): Promise<void> { | ||
| if (this.#isDisposed || this.#isDisconnected) { | ||
| if ( | ||
| this.#state === ConnectionState.DISPOSED || | ||
| this.#state === ConnectionState.DISCONNECTED | ||
| ) { | ||
| this.#logger.debug( | ||
| `Ignoring connection error in ${this.#state} state for ${this.#route}`, | ||
| error, | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -396,11 +505,9 @@ export class ReconnectingWebSocket< | |
| } | ||
|
|
||
| private dispose(code?: number, reason?: string): void { | ||
| if (this.#isDisposed) { | ||
| if (!this.#dispatch({ type: "DISPOSE" })) { | ||
| return; | ||
| } | ||
|
|
||
| this.#isDisposed = true; | ||
| this.clearCurrentSocket(code, reason); | ||
|
|
||
| for (const set of Object.values(this.#eventHandlers)) { | ||
|
|
@@ -411,9 +518,6 @@ export class ReconnectingWebSocket< | |
| } | ||
|
|
||
| private clearCurrentSocket(code?: number, reason?: string): void { | ||
| // Clear pending reconnect to prevent resume | ||
| this.#pendingReconnect = false; | ||
|
|
||
| if (this.#reconnectTimeoutId !== null) { | ||
| clearTimeout(this.#reconnectTimeoutId); | ||
| this.#reconnectTimeoutId = null; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.