diff --git a/packages/benchmark/package.json b/packages/benchmark/package.json index bd825ccd..eaafb869 100644 --- a/packages/benchmark/package.json +++ b/packages/benchmark/package.json @@ -11,7 +11,7 @@ }, "dependencies": { "@eventstore/db-client": "^6.2.1", - "@kurrent/bridge": "^0.1.3", + "@kurrent/bridge": "^0.1.5", "tinybench": "^3.1.1" }, "devDependencies": { diff --git a/packages/db-client/package.json b/packages/db-client/package.json index 9238ba38..cd0f5518 100644 --- a/packages/db-client/package.json +++ b/packages/db-client/package.json @@ -46,7 +46,7 @@ }, "dependencies": { "@grpc/grpc-js": "^1.14.3", - "@kurrent/bridge": "^0.1.3", + "@kurrent/bridge": "^0.1.5", "@types/debug": "^4.1.12", "@types/google-protobuf": "^3.15.12", "@types/node": "^22.10.2", diff --git a/packages/db-client/src/utils/CommandError.ts b/packages/db-client/src/utils/CommandError.ts index b8c6a44a..8f1f1a88 100644 --- a/packages/db-client/src/utils/CommandError.ts +++ b/packages/db-client/src/utils/CommandError.ts @@ -640,6 +640,8 @@ export const convertToCommandError = (error: Error): CommandError | Error => { case StatusCode.DEADLINE_EXCEEDED: return new DeadlineExceededError(error); case StatusCode.UNAVAILABLE: + case StatusCode.INTERNAL: + case StatusCode.DATA_LOSS: return new UnavailableError(error); case StatusCode.UNAUTHENTICATED: return new AccessDeniedError(error); diff --git a/packages/db-client/src/utils/convertBridgeError.ts b/packages/db-client/src/utils/convertBridgeError.ts index 14ac7523..296ab8d5 100644 --- a/packages/db-client/src/utils/convertBridgeError.ts +++ b/packages/db-client/src/utils/convertBridgeError.ts @@ -3,43 +3,32 @@ import { StreamNotFoundError, StreamDeletedError, AccessDeniedError, + UnavailableError, + DeadlineExceededError, + UnknownError, } from "./CommandError"; import { ServiceError } from "@grpc/grpc-js"; -// export const convertBridgeError = ( -// error: ServiceError, -// streamName?: string -// ) => { -// const stream = streamName ?? "unknown stream"; -// -// switch (error.name) { -// case StreamNotFoundError.name: -// throw new StreamNotFoundError(error, stream); -// case StreamDeletedError.name: -// throw StreamDeletedError.fromStreamName(stream); -// case NotLeaderError.name: -// throw new NotLeaderError(error); -// case AccessDeniedError.name: -// throw new AccessDeniedError(error); -// default: -// throw error; -// } -// }; - export const convertBridgeError = (error: Error, streamName?: string) => { const stream = streamName ?? "unknown stream"; const serviceError = error as ServiceError; switch (error.name) { - case StreamNotFoundError.name: + case "StreamNotFoundError": return new StreamNotFoundError(serviceError, stream); - case StreamDeletedError.name: + case "StreamDeletedError": return StreamDeletedError.fromStreamName(stream); - case NotLeaderError.name: + case "NotLeaderError": return new NotLeaderError(serviceError); - case AccessDeniedError.name: + case "AccessDeniedError": return new AccessDeniedError(serviceError); + case "UnavailableError": + return new UnavailableError(serviceError); + case "DeadlineExceededError": + return new DeadlineExceededError(serviceError); + case "UnknownError": + return new UnknownError(serviceError); default: - return error; + return new UnknownError(serviceError); } }; diff --git a/packages/test/src/connection/reconnect/leader-failover.test.ts b/packages/test/src/connection/reconnect/leader-failover.test.ts new file mode 100644 index 00000000..64a58af2 --- /dev/null +++ b/packages/test/src/connection/reconnect/leader-failover.test.ts @@ -0,0 +1,441 @@ +import { + collect, + createTestCluster, + delay, + getCurrentConnection, +} from "@test-utils"; +import { + jsonEvent, + KurrentDBClient, + LEADER, + FOLLOWER, + START, +} from "@kurrent/kurrentdb-client"; + +jest.setTimeout(120_000); + +describe("reconnect", () => { + describe("leader failover - write operations (gRPC path)", () => { + const cluster = createTestCluster(); + + beforeAll(async () => { + await cluster.up(); + }); + + afterAll(async () => { + await cluster.down(); + }); + + test("should reconnect after leader is killed and resurrected as follower", async () => { + // 1. Client connects to leader + // 2. Leader goes down + // 3. New leader elected + // 4. Old leader comes back as follower + // 5. Client should NOT reconnect to the old (now follower) node + + const client = KurrentDBClient.connectionString( + cluster.connectionStringWithOverrides({ + nodePreference: LEADER, + defaultDeadline: 100_000_000, + }) + ); + + // Establish connection to the current leader + const firstAppend = await client.appendToStream( + "resurrect-stream", + jsonEvent({ type: "before-kill", data: { message: "test" } }), + { credentials: { username: "admin", password: "changeit" } } + ); + expect(firstAppend).toBeDefined(); + + const leaderConnection = await getCurrentConnection(client); + + // Kill the leader + await cluster.killNode(leaderConnection); + await delay(5_000); + + // First operation should fail + try { + await client.appendToStream( + "resurrect-stream", + jsonEvent({ type: "should-fail", data: { message: "test" } }), + { credentials: { username: "admin", password: "changeit" } } + ); + } catch (error) { + // Expected failure + } + + // Resurrect the killed leader (it comes back as follower) + await cluster.resurrect(); + await delay(5_000); + + // Subsequent operations should succeed on the new leader + const afterResurrect = await client.appendToStream( + "resurrect-stream", + jsonEvent({ + type: "after-resurrect", + data: { message: "test" }, + }), + { credentials: { username: "admin", password: "changeit" } } + ); + expect(afterResurrect).toBeDefined(); + }); + }); + + describe("leader failover - read operations (Rust bridge path)", () => { + const cluster = createTestCluster(); + + beforeAll(async () => { + await cluster.up(); + }); + + afterAll(async () => { + await cluster.down(); + }); + + test("readStream with requiresLeader should reconnect after NotLeader", async () => { + // This tests the Rust bridge code path, which does NOT go through + // handleError() and therefore may not trigger reconnection. + + // Connect to a follower to reliably trigger NotLeader + const client = KurrentDBClient.connectionString( + cluster.connectionStringWithOverrides({ + nodePreference: FOLLOWER, + defaultDeadline: 100_000_000, + }) + ); + + // Seed some data first (append doesn't require leader by default) + await client.appendToStream( + "read-reconnect-stream", + [ + jsonEvent({ type: "seed-1", data: { i: 1 } }), + jsonEvent({ type: "seed-2", data: { i: 2 } }), + jsonEvent({ type: "seed-3", data: { i: 3 } }), + ], + { credentials: { username: "admin", password: "changeit" } } + ); + + // Read with requiresLeader: true should fail on follower + try { + const events = await collect( + client.readStream("read-reconnect-stream", { + maxCount: 10, + fromRevision: START, + requiresLeader: true, + }) + ); + expect(events).toBe("unreachable"); + } catch (error) { + // Expected: NotLeaderError from the Rust bridge + } + + // The critical test: can we read successfully after the NotLeader error? + // If the Rust bridge doesn't handle reconnection, this will fail + // with the same NotLeader error + const events = await collect( + client.readStream("read-reconnect-stream", { + maxCount: 10, + fromRevision: START, + requiresLeader: true, + }) + ); + + expect(events).toHaveLength(3); + }); + + test("readAll with requiresLeader should reconnect after NotLeader", async () => { + // Same test but for readAll, which also goes through the Rust bridge + + const client = KurrentDBClient.connectionString( + cluster.connectionStringWithOverrides({ + nodePreference: FOLLOWER, + defaultDeadline: 100_000_000, + }) + ); + + // Read $all with requiresLeader: true should fail on follower + try { + const events = await collect( + client.readAll({ + maxCount: 1, + fromPosition: START, + requiresLeader: true, + }) + ); + expect(events).toBe("unreachable"); + } catch (error) { + // Expected: NotLeaderError from the Rust bridge + } + + // After the error, subsequent reads should work + const events = await collect( + client.readAll({ + maxCount: 1, + fromPosition: START, + requiresLeader: true, + }) + ); + + expect(events.length).toBeGreaterThanOrEqual(1); + }); + + test("readStream should keep working after leader node is killed", async () => { + // Connect to leader, kill it, verify reads still work after reconnection. + // The Rust bridge has its own connection, so after the first error triggers + // internal reconnection, subsequent reads may need a couple of attempts + // while the cluster stabilizes. + + const client = KurrentDBClient.connectionString( + cluster.connectionStringWithOverrides({ + nodePreference: LEADER, + defaultDeadline: 100_000_000, + }) + ); + + // Seed data + await client.appendToStream( + "read-failover-stream", + [ + jsonEvent({ type: "seed-1", data: { i: 1 } }), + jsonEvent({ type: "seed-2", data: { i: 2 } }), + ], + { credentials: { username: "admin", password: "changeit" } } + ); + + // Verify read works + const beforeEvents = await collect( + client.readStream("read-failover-stream", { + maxCount: 10, + fromRevision: START, + }) + ); + expect(beforeEvents).toHaveLength(2); + + // Kill the leader + const leaderConnection = await getCurrentConnection(client); + await cluster.killNode(leaderConnection); + await delay(5_000); + + // Read should fail (node is dead), triggering Rust client reconnection + try { + await collect( + client.readStream("read-failover-stream", { + maxCount: 10, + fromRevision: START, + }) + ); + } catch (error) { + // Expected failure + } + + // Resurrect and wait + await cluster.resurrect(); + await delay(5_000); + + // The Rust client needs to re-discover after the error. + // Allow a few attempts for the cluster to stabilize. + let afterEvents; + for (let attempt = 0; attempt < 5; attempt++) { + try { + afterEvents = await collect( + client.readStream("read-failover-stream", { + maxCount: 10, + fromRevision: START, + }) + ); + break; + } catch { + await delay(2_000); + } + } + + expect(afterEvents).toHaveLength(2); + }); + + test("repeated reads with requiresLeader should not get permanently stuck", async () => { + // Multiple sequential reads all failing with NotLeader + // without recovery, even though the first error should trigger reconnection + + const client = KurrentDBClient.connectionString( + cluster.connectionStringWithOverrides({ + nodePreference: FOLLOWER, + defaultDeadline: 100_000_000, + }) + ); + + // Seed data + await client.appendToStream( + "stuck-read-stream", + jsonEvent({ type: "seed", data: { message: "test" } }), + { credentials: { username: "admin", password: "changeit" } } + ); + + // First read with requiresLeader triggers NotLeader + try { + await collect( + client.readStream("stuck-read-stream", { + maxCount: 10, + fromRevision: START, + requiresLeader: true, + }) + ); + } catch (error) { + // Expected + } + + // Now make 10 sequential reads, all requiring leader + // If the Rust bridge is stuck, ALL of these will fail + for (let i = 0; i < 10; i++) { + const events = await collect( + client.readStream("stuck-read-stream", { + maxCount: 10, + fromRevision: START, + requiresLeader: true, + }) + ); + expect(events).toHaveLength(1); + } + }); + }); + + describe("concurrent operations during NotLeader", () => { + const cluster = createTestCluster(); + let client!: KurrentDBClient; + + beforeAll(async () => { + await cluster.up(); + }); + + afterAll(async () => { + await cluster.down(); + }); + + test("multiple rapid write operations after NotLeader should all eventually succeed", async () => { + // Connect to a follower to reliably trigger NotLeader errors + client = KurrentDBClient.connectionString( + cluster.connectionStringWithOverrides({ + nodePreference: FOLLOWER, + defaultDeadline: 100_000_000, + }) + ); + + // First append succeeds (goes to follower, no requiresLeader) + const firstAppend = await client.appendToStream( + "concurrent-stream", + jsonEvent({ type: "setup", data: { message: "test" } }), + { credentials: { username: "admin", password: "changeit" } } + ); + expect(firstAppend).toBeDefined(); + + // Trigger NotLeader error with requiresLeader + try { + await client.appendToStream( + "concurrent-stream", + jsonEvent({ type: "trigger-not-leader", data: {} }), + { + requiresLeader: true, + credentials: { username: "admin", password: "changeit" }, + } + ); + expect("should not reach").toBe("here"); + } catch (error) { + // Expected: NotLeaderError triggers reconnection + } + + // Now fire multiple operations rapidly in parallel + // If handleError has a race condition, some of these could fail + const results = await Promise.allSettled([ + client.appendToStream( + "concurrent-stream", + jsonEvent({ type: "concurrent-1", data: {} }), + { + requiresLeader: true, + credentials: { username: "admin", password: "changeit" }, + } + ), + client.appendToStream( + "concurrent-stream", + jsonEvent({ type: "concurrent-2", data: {} }), + { + requiresLeader: true, + credentials: { username: "admin", password: "changeit" }, + } + ), + client.appendToStream( + "concurrent-stream", + jsonEvent({ type: "concurrent-3", data: {} }), + { + requiresLeader: true, + credentials: { username: "admin", password: "changeit" }, + } + ), + ]); + + // All operations should succeed since the client should have + // reconnected to the leader after the first NotLeader error + for (const result of results) { + expect(result.status).toBe("fulfilled"); + } + }); + + test("mixed read and write operations after NotLeader", async () => { + // Tests that both code paths (gRPC writes and Rust bridge reads) + // recover after a NotLeader error. + // The gRPC channel and Rust bridge have independent connections, + // so each must encounter and recover from NotLeader separately. + + client = KurrentDBClient.connectionString( + cluster.connectionStringWithOverrides({ + nodePreference: FOLLOWER, + defaultDeadline: 100_000_000, + }) + ); + + // Seed data + await client.appendToStream( + "mixed-ops-stream", + jsonEvent({ type: "seed", data: { i: 1 } }), + { credentials: { username: "admin", password: "changeit" } } + ); + + // Trigger NotLeader via write (reconnects gRPC channel) + try { + await client.appendToStream( + "mixed-ops-stream", + jsonEvent({ type: "trigger", data: {} }), + { + requiresLeader: true, + credentials: { username: "admin", password: "changeit" }, + } + ); + } catch (error) { + // Expected + } + + // Trigger NotLeader on the Rust bridge too (it has its own connection) + try { + await collect( + client.readStream("mixed-ops-stream", { + maxCount: 10, + fromRevision: START, + requiresLeader: true, + }) + ); + } catch (error) { + // Expected: Rust bridge is still on follower + } + + // Now both code paths have reconnected — reads should work + const events = await collect( + client.readStream("mixed-ops-stream", { + maxCount: 10, + fromRevision: START, + requiresLeader: true, + }) + ); + + expect(events).toHaveLength(1); + }); + }); +}); diff --git a/yarn.lock b/yarn.lock index 0d374a44..862973f8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1065,66 +1065,66 @@ __metadata: languageName: node linkType: hard -"@kurrent/bridge-darwin-arm64@npm:0.1.3": - version: 0.1.3 - resolution: "@kurrent/bridge-darwin-arm64@npm:0.1.3" +"@kurrent/bridge-darwin-arm64@npm:0.1.5": + version: 0.1.5 + resolution: "@kurrent/bridge-darwin-arm64@npm:0.1.5" conditions: os=darwin & cpu=arm64 languageName: node linkType: hard -"@kurrent/bridge-darwin-x64@npm:0.1.3": - version: 0.1.3 - resolution: "@kurrent/bridge-darwin-x64@npm:0.1.3" +"@kurrent/bridge-darwin-x64@npm:0.1.5": + version: 0.1.5 + resolution: "@kurrent/bridge-darwin-x64@npm:0.1.5" conditions: os=darwin & cpu=x64 languageName: node linkType: hard -"@kurrent/bridge-linux-arm64-gnu@npm:0.1.3": - version: 0.1.3 - resolution: "@kurrent/bridge-linux-arm64-gnu@npm:0.1.3" +"@kurrent/bridge-linux-arm64-gnu@npm:0.1.5": + version: 0.1.5 + resolution: "@kurrent/bridge-linux-arm64-gnu@npm:0.1.5" conditions: os=linux & cpu=arm64 languageName: node linkType: hard -"@kurrent/bridge-linux-arm64-musl@npm:0.1.3": - version: 0.1.3 - resolution: "@kurrent/bridge-linux-arm64-musl@npm:0.1.3" +"@kurrent/bridge-linux-arm64-musl@npm:0.1.5": + version: 0.1.5 + resolution: "@kurrent/bridge-linux-arm64-musl@npm:0.1.5" conditions: os=linux & cpu=arm64 languageName: node linkType: hard -"@kurrent/bridge-linux-x64-gnu@npm:0.1.3": - version: 0.1.3 - resolution: "@kurrent/bridge-linux-x64-gnu@npm:0.1.3" +"@kurrent/bridge-linux-x64-gnu@npm:0.1.5": + version: 0.1.5 + resolution: "@kurrent/bridge-linux-x64-gnu@npm:0.1.5" conditions: os=linux & cpu=x64 languageName: node linkType: hard -"@kurrent/bridge-linux-x64-musl@npm:0.1.3": - version: 0.1.3 - resolution: "@kurrent/bridge-linux-x64-musl@npm:0.1.3" +"@kurrent/bridge-linux-x64-musl@npm:0.1.5": + version: 0.1.5 + resolution: "@kurrent/bridge-linux-x64-musl@npm:0.1.5" conditions: os=linux & cpu=x64 languageName: node linkType: hard -"@kurrent/bridge-win32-x64-msvc@npm:0.1.3": - version: 0.1.3 - resolution: "@kurrent/bridge-win32-x64-msvc@npm:0.1.3" +"@kurrent/bridge-win32-x64-msvc@npm:0.1.5": + version: 0.1.5 + resolution: "@kurrent/bridge-win32-x64-msvc@npm:0.1.5" conditions: os=win32 & cpu=x64 languageName: node linkType: hard -"@kurrent/bridge@npm:^0.1.3": - version: 0.1.3 - resolution: "@kurrent/bridge@npm:0.1.3" - dependencies: - "@kurrent/bridge-darwin-arm64": "npm:0.1.3" - "@kurrent/bridge-darwin-x64": "npm:0.1.3" - "@kurrent/bridge-linux-arm64-gnu": "npm:0.1.3" - "@kurrent/bridge-linux-arm64-musl": "npm:0.1.3" - "@kurrent/bridge-linux-x64-gnu": "npm:0.1.3" - "@kurrent/bridge-linux-x64-musl": "npm:0.1.3" - "@kurrent/bridge-win32-x64-msvc": "npm:0.1.3" +"@kurrent/bridge@npm:^0.1.5": + version: 0.1.5 + resolution: "@kurrent/bridge@npm:0.1.5" + dependencies: + "@kurrent/bridge-darwin-arm64": "npm:0.1.5" + "@kurrent/bridge-darwin-x64": "npm:0.1.5" + "@kurrent/bridge-linux-arm64-gnu": "npm:0.1.5" + "@kurrent/bridge-linux-arm64-musl": "npm:0.1.5" + "@kurrent/bridge-linux-x64-gnu": "npm:0.1.5" + "@kurrent/bridge-linux-x64-musl": "npm:0.1.5" + "@kurrent/bridge-win32-x64-msvc": "npm:0.1.5" "@neon-rs/load": "npm:^0.1.82" dependenciesMeta: "@kurrent/bridge-darwin-arm64": @@ -1141,7 +1141,7 @@ __metadata: optional: true "@kurrent/bridge-win32-x64-msvc": optional: true - checksum: 10c0/76c89f2fd392577a815c23d81332f052e1c3d30984c34e64ca6b7271ca2e1d713348dd44454e5d33d5b7935b33d7d143382c82c396bfa25c168678e05b3e387d + checksum: 10c0/8f1941011c17cd81b29e78b4743097fa8c6452abe2c80acee23c24d4b665ae027172bbb31d805d21a15ab42caa5e8ac83699a040434a53a0df14bcf7cf4a69b9 languageName: node linkType: hard @@ -1150,7 +1150,7 @@ __metadata: resolution: "@kurrent/kurrentdb-client@workspace:packages/db-client" dependencies: "@grpc/grpc-js": "npm:^1.14.3" - "@kurrent/bridge": "npm:^0.1.3" + "@kurrent/bridge": "npm:^0.1.5" "@types/debug": "npm:^4.1.12" "@types/google-protobuf": "npm:^3.15.12" "@types/node": "npm:^22.10.2" @@ -2740,7 +2740,7 @@ __metadata: resolution: "benchmark@workspace:packages/benchmark" dependencies: "@eventstore/db-client": "npm:^6.2.1" - "@kurrent/bridge": "npm:^0.1.3" + "@kurrent/bridge": "npm:^0.1.5" "@kurrent/kurrentdb-client": "workspace:^" clinic: "npm:^13.0.0" tinybench: "npm:^3.1.1"