diff --git a/packages/zero-cache/src/config/zero-config.test.ts b/packages/zero-cache/src/config/zero-config.test.ts index 2e727c3c6e..a3493a36d7 100644 --- a/packages/zero-cache/src/config/zero-config.test.ts +++ b/packages/zero-cache/src/config/zero-config.test.ts @@ -431,6 +431,15 @@ test('zero-cache --help', () => { If unspecified, defaults to --port + 2. + --litestream-snapshot-reservation-timeout-ms number default: 0 + ZERO_LITESTREAM_SNAPSHOT_RESERVATION_TIMEOUT_MS env + How long a /snapshot reservation may be held open before it is forcefully ended. + This is a safeguard against a view-syncer that opens a /snapshot request but + never completes snapshot initialization (e.g. it hangs, crashes, or is wedged in + startup) which would otherwise pause change log cleanup indefinitely. + + Set to 0 to disable. + --litestream-checkpoint-threshold-mb number default: 40 ZERO_LITESTREAM_CHECKPOINT_THRESHOLD_MB env The size of the WAL file at which to perform an SQlite checkpoint to apply diff --git a/packages/zero-cache/src/config/zero-config.ts b/packages/zero-cache/src/config/zero-config.ts index 5a4ee58b68..84776f4a3d 100644 --- a/packages/zero-cache/src/config/zero-config.ts +++ b/packages/zero-cache/src/config/zero-config.ts @@ -681,6 +681,18 @@ export const zeroOptions = { ], }, + snapshotReservationTimeoutMs: { + type: v.number().default(0), + desc: [ + `How long a /snapshot reservation may be held open before it is forcefully ended.`, + `This is a safeguard against a view-syncer that opens a /snapshot request but`, + `never completes snapshot initialization (e.g. it hangs, crashes, or is wedged in`, + `startup) which would otherwise pause change log cleanup indefinitely.`, + ``, + `Set to 0 to disable.`, + ], + }, + checkpointThresholdMB: { type: v.number().default(40), desc: [ diff --git a/packages/zero-cache/src/server/change-streamer.ts b/packages/zero-cache/src/server/change-streamer.ts index e5496d87bf..6c34046ee9 100644 --- a/packages/zero-cache/src/server/change-streamer.ts +++ b/packages/zero-cache/src/server/change-streamer.ts @@ -118,7 +118,7 @@ export default async function runWorker( // impossible: upstream must have advanced in order for replication to be stuck. assert(changeStreamer, `resetting replica did not advance replicaVersion`); - const {backupURL, port: metricsPort} = litestream; + const {backupURL, port: metricsPort, snapshotReservationTimeoutMs} = litestream; const monitor = backupURL ? new BackupMonitor( lc, @@ -133,6 +133,7 @@ export default async function runWorker( // // Consider: Also account for permanent volumes? Date.now() - parentStartMs, + snapshotReservationTimeoutMs, ) : new ReplicaMonitor(lc, replica.file, changeStreamer); diff --git a/packages/zero-cache/src/services/change-streamer/backup-monitor.test.ts b/packages/zero-cache/src/services/change-streamer/backup-monitor.test.ts index da165bec9f..5472996a91 100644 --- a/packages/zero-cache/src/services/change-streamer/backup-monitor.test.ts +++ b/packages/zero-cache/src/services/change-streamer/backup-monitor.test.ts @@ -46,6 +46,7 @@ litestream_replica_validation_total{db="/tmp/zbugs-sync-replica.db",name="file", 'http://localhost:4850/metrics', changeStreamer as unknown as ChangeStreamerService, 100_000, // 100 seconds + 0, ); nock('http://localhost:4850') @@ -256,4 +257,40 @@ litestream_replica_validation_total{db="/tmp/zbugs-sync-replica.db",name="file", await monitor.checkWatermarksAndScheduleCleanup(); expect(scheduled).toEqual(['618p0bw8']); }); + + test('expires reservations after timeout', async () => { + monitor = new BackupMonitor( + createSilentLogContext(), + 's3://foo/bar', + 'http://localhost:4850/metrics', + changeStreamer as unknown as ChangeStreamerService, + 100_000, // 100 seconds + 10_000, + ); + + const time = Date.UTC(2025, 3, 24); + vi.setSystemTime(time); + const nowSeconds = (Date.now() / 1000).toPrecision(9); + setMetricsResponse('618p0bw8', nowSeconds); + + await monitor.checkWatermarksAndScheduleCleanup(); + + const sub = await monitor.startSnapshotReservation('foo-bar'); + expect(await getFirstMessage(sub)).toEqual([ + 'status', + { + tag: 'status', + backupURL: 's3://foo/bar', + replicaVersion: '123', + minWatermark: '1ab', + }, + ]); + + vi.advanceTimersByTime(10_000); + expect(sub.active).toBe(false); + + vi.setSystemTime(time + 100_000); + await monitor.checkWatermarksAndScheduleCleanup(); + expect(scheduled).toEqual(['618p0bw8']); + }); }); diff --git a/packages/zero-cache/src/services/change-streamer/backup-monitor.ts b/packages/zero-cache/src/services/change-streamer/backup-monitor.ts index fb9ef09d40..b0ae61f4ff 100644 --- a/packages/zero-cache/src/services/change-streamer/backup-monitor.ts +++ b/packages/zero-cache/src/services/change-streamer/backup-monitor.ts @@ -13,6 +13,7 @@ const MIN_CLEANUP_DELAY_MS = 30 * 1000; type Reservation = { start: Date; sub: Subscription; + timeout: NodeJS.Timeout | undefined; }; /** @@ -59,6 +60,7 @@ export class BackupMonitor implements Service { #lastWatermark: string = ''; #cleanupDelayMs: number; + readonly #snapshotReservationTimeoutMs: number; #checkMetricsTimer: NodeJS.Timeout | undefined; constructor( @@ -67,6 +69,7 @@ export class BackupMonitor implements Service { metricsEndpoint: string, changeStreamer: ChangeStreamerService, initialCleanupDelayMs: number, + snapshotReservationTimeoutMs: number, ) { this.#lc = lc.withContext('component', this.id); this.#backupURL = backupURL; @@ -76,6 +79,7 @@ export class BackupMonitor implements Service { initialCleanupDelayMs, MIN_CLEANUP_DELAY_MS, // purely for peace of mind ); + this.#snapshotReservationTimeoutMs = snapshotReservationTimeoutMs; this.#lc.info?.( `backup monitor started ${initialCleanupDelayMs} ms after snapshot restore`, @@ -107,7 +111,20 @@ export class BackupMonitor implements Service { // cleanup delay. cleanup: () => this.endReservation(taskID, false), }); - this.#reservations.set(taskID, {start: new Date(), sub}); + const timeout = + this.#snapshotReservationTimeoutMs > 0 + ? setTimeout(() => { + if (this.#reservations.has(taskID)) { + this.#lc.warn?.( + `snapshot reservation for ${taskID} exceeded timeout ` + + `(${this.#snapshotReservationTimeoutMs} ms). Ending reservation.`, + ); + } + this.endReservation(taskID, false); + }, this.#snapshotReservationTimeoutMs) + : undefined; + + this.#reservations.set(taskID, {start: new Date(), sub, timeout}); const changeLogState = await this.#changeStreamer.getChangeLogState(); sub.push([ 'status', @@ -122,7 +139,9 @@ export class BackupMonitor implements Service { return; } this.#reservations.delete(taskID); - const {start, sub} = res; + const {start, sub, timeout} = res; + + clearTimeout(timeout); sub.cancel(); // closes the connection if still open if (updateCleanupDelay) { @@ -213,12 +232,12 @@ export class BackupMonitor implements Service { stop(): Promise { clearInterval(this.#checkMetricsTimer); - for (const {sub} of this.#reservations.values()) { + for (const taskID of [...this.#reservations.keys()]) { // Close any pending reservations. This commonly happens when a new // replication-manager makes a `/snapshot` reservation on the existing // replication-manager, and then shuts it down when it takes over the // replication slot. - sub.cancel(); + this.endReservation(taskID, false); } this.#state.stop(this.#lc); return promiseVoid;