Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions packages/zero-cache/src/config/zero-config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,15 @@ test('zero-cache --help', () => {

If unspecified, defaults to --port + 2.

--litestream-snapshot-reservation-timeout-ms number default: 0
ZERO_LITESTREAM_SNAPSHOT_RESERVATION_TIMEOUT_MS env
How long a /snapshot reservation may be held open before it is forcefully ended.
This is a safeguard against a view-syncer that opens a /snapshot request but
never completes snapshot initialization (e.g. it hangs, crashes, or is wedged in
startup) which would otherwise pause change log cleanup indefinitely.

Set to 0 to disable.

--litestream-checkpoint-threshold-mb number default: 40
ZERO_LITESTREAM_CHECKPOINT_THRESHOLD_MB env
The size of the WAL file at which to perform an SQlite checkpoint to apply
Expand Down
12 changes: 12 additions & 0 deletions packages/zero-cache/src/config/zero-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,18 @@ export const zeroOptions = {
],
},

snapshotReservationTimeoutMs: {
type: v.number().default(0),
desc: [
`How long a /snapshot reservation may be held open before it is forcefully ended.`,
`This is a safeguard against a view-syncer that opens a /snapshot request but`,
`never completes snapshot initialization (e.g. it hangs, crashes, or is wedged in`,
`startup) which would otherwise pause change log cleanup indefinitely.`,
``,
`Set to 0 to disable.`,
],
},

checkpointThresholdMB: {
type: v.number().default(40),
desc: [
Expand Down
3 changes: 2 additions & 1 deletion packages/zero-cache/src/server/change-streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export default async function runWorker(
// impossible: upstream must have advanced in order for replication to be stuck.
assert(changeStreamer, `resetting replica did not advance replicaVersion`);

const {backupURL, port: metricsPort} = litestream;
const {backupURL, port: metricsPort, snapshotReservationTimeoutMs} = litestream;
const monitor = backupURL
? new BackupMonitor(
lc,
Expand All @@ -133,6 +133,7 @@ export default async function runWorker(
//
// Consider: Also account for permanent volumes?
Date.now() - parentStartMs,
snapshotReservationTimeoutMs,
)
: new ReplicaMonitor(lc, replica.file, changeStreamer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ litestream_replica_validation_total{db="/tmp/zbugs-sync-replica.db",name="file",
'http://localhost:4850/metrics',
changeStreamer as unknown as ChangeStreamerService,
100_000, // 100 seconds
0,
);

nock('http://localhost:4850')
Expand Down Expand Up @@ -256,4 +257,40 @@ litestream_replica_validation_total{db="/tmp/zbugs-sync-replica.db",name="file",
await monitor.checkWatermarksAndScheduleCleanup();
expect(scheduled).toEqual(['618p0bw8']);
});

test('expires reservations after timeout', async () => {
monitor = new BackupMonitor(
createSilentLogContext(),
's3://foo/bar',
'http://localhost:4850/metrics',
changeStreamer as unknown as ChangeStreamerService,
100_000, // 100 seconds
10_000,
);

const time = Date.UTC(2025, 3, 24);
vi.setSystemTime(time);
const nowSeconds = (Date.now() / 1000).toPrecision(9);
setMetricsResponse('618p0bw8', nowSeconds);

await monitor.checkWatermarksAndScheduleCleanup();

const sub = await monitor.startSnapshotReservation('foo-bar');
expect(await getFirstMessage(sub)).toEqual([
'status',
{
tag: 'status',
backupURL: 's3://foo/bar',
replicaVersion: '123',
minWatermark: '1ab',
},
]);

vi.advanceTimersByTime(10_000);
expect(sub.active).toBe(false);

vi.setSystemTime(time + 100_000);
await monitor.checkWatermarksAndScheduleCleanup();
expect(scheduled).toEqual(['618p0bw8']);
});
});
27 changes: 23 additions & 4 deletions packages/zero-cache/src/services/change-streamer/backup-monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const MIN_CLEANUP_DELAY_MS = 30 * 1000;
type Reservation = {
start: Date;
sub: Subscription<SnapshotMessage>;
timeout: NodeJS.Timeout | undefined;
};

/**
Expand Down Expand Up @@ -59,6 +60,7 @@ export class BackupMonitor implements Service {

#lastWatermark: string = '';
#cleanupDelayMs: number;
readonly #snapshotReservationTimeoutMs: number;
#checkMetricsTimer: NodeJS.Timeout | undefined;

constructor(
Expand All @@ -67,6 +69,7 @@ export class BackupMonitor implements Service {
metricsEndpoint: string,
changeStreamer: ChangeStreamerService,
initialCleanupDelayMs: number,
snapshotReservationTimeoutMs: number,
) {
this.#lc = lc.withContext('component', this.id);
this.#backupURL = backupURL;
Expand All @@ -76,6 +79,7 @@ export class BackupMonitor implements Service {
initialCleanupDelayMs,
MIN_CLEANUP_DELAY_MS, // purely for peace of mind
);
this.#snapshotReservationTimeoutMs = snapshotReservationTimeoutMs;

this.#lc.info?.(
`backup monitor started ${initialCleanupDelayMs} ms after snapshot restore`,
Expand Down Expand Up @@ -107,7 +111,20 @@ export class BackupMonitor implements Service {
// cleanup delay.
cleanup: () => this.endReservation(taskID, false),
});
this.#reservations.set(taskID, {start: new Date(), sub});
const timeout =
this.#snapshotReservationTimeoutMs > 0
? setTimeout(() => {
if (this.#reservations.has(taskID)) {
this.#lc.warn?.(
`snapshot reservation for ${taskID} exceeded timeout ` +
`(${this.#snapshotReservationTimeoutMs} ms). Ending reservation.`,
);
}
this.endReservation(taskID, false);
}, this.#snapshotReservationTimeoutMs)
: undefined;

this.#reservations.set(taskID, {start: new Date(), sub, timeout});
const changeLogState = await this.#changeStreamer.getChangeLogState();
sub.push([
'status',
Expand All @@ -122,7 +139,9 @@ export class BackupMonitor implements Service {
return;
}
this.#reservations.delete(taskID);
const {start, sub} = res;
const {start, sub, timeout} = res;

clearTimeout(timeout);
sub.cancel(); // closes the connection if still open

if (updateCleanupDelay) {
Expand Down Expand Up @@ -213,12 +232,12 @@ export class BackupMonitor implements Service {

stop(): Promise<void> {
clearInterval(this.#checkMetricsTimer);
for (const {sub} of this.#reservations.values()) {
for (const taskID of [...this.#reservations.keys()]) {
// Close any pending reservations. This commonly happens when a new
// replication-manager makes a `/snapshot` reservation on the existing
// replication-manager, and then shuts it down when it takes over the
// replication slot.
sub.cancel();
this.endReservation(taskID, false);
}
this.#state.stop(this.#lc);
return promiseVoid;
Expand Down