From 3a7159bd025ce484e4163b4066e36b0069fdfe97 Mon Sep 17 00:00:00 2001 From: Johannes Bingen Date: Sun, 25 Jan 2026 05:06:38 +0100 Subject: [PATCH 1/2] fix(pglite-sync): don't filter move-in messages by LSN Move-in messages from Electric's tagged_subqueries feature don't have an LSN header because they come from direct DB queries, not replication. Previously these messages were incorrectly skipped as "already seen" because the missing LSN defaulted to 0. This checks for the is_move_in header and bypasses LSN filtering for move-in messages, ensuring rows moving into a shape due to subquery condition changes are properly synced. Fixes electric-sql/electric#3769 --- .changeset/fix-move-in-lsn-filtering.md | 9 +++++++++ packages/pglite-sync/src/index.ts | 9 ++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 .changeset/fix-move-in-lsn-filtering.md diff --git a/.changeset/fix-move-in-lsn-filtering.md b/.changeset/fix-move-in-lsn-filtering.md new file mode 100644 index 000000000..17db42d48 --- /dev/null +++ b/.changeset/fix-move-in-lsn-filtering.md @@ -0,0 +1,9 @@ +--- +"@electric-sql/pglite-sync": patch +--- + +Fix move-in messages being incorrectly skipped due to LSN filtering + +Move-in messages from Electric's tagged_subqueries feature don't include an LSN header because they originate from direct database queries rather than the PostgreSQL replication stream. Previously, these messages were being filtered out as "already seen" because the missing LSN defaulted to 0, which was less than or equal to the last committed LSN. + +This fix checks for the `is_move_in` header and bypasses LSN filtering for move-in messages, ensuring that rows moving into a shape due to subquery condition changes are properly synced to the client. diff --git a/packages/pglite-sync/src/index.ts b/packages/pglite-sync/src/index.ts index 641dae7e5..7d318e122 100644 --- a/packages/pglite-sync/src/index.ts +++ b/packages/pglite-sync/src/index.ts @@ -351,7 +351,14 @@ async function createPlugin( typeof message.headers.lsn === 'string' ? BigInt(message.headers.lsn) : BigInt(0) // we default to 0 if there no lsn on the message - if (lsn <= lastCommittedLsnForShape) { + + // Move-in messages from subquery-based shapes don't have an LSN + // because they come from direct DB queries, not from replication. + // We should never skip these based on LSN filtering. + const isMoveIn = (message.headers as Record) + .is_move_in === true + + if (!isMoveIn && lsn <= lastCommittedLsnForShape) { // We are replaying changes / have already seen this lsn // skip and move on to the next message return From 44babaf20f616955be2883201be5464fa50c8f4b Mon Sep 17 00:00:00 2001 From: Johannes Bingen Date: Sun, 25 Jan 2026 06:00:33 +0100 Subject: [PATCH 2/2] fix(pglite-sync): handle move-in duplicate keys with ON CONFLICT Move-in data from tagged_subqueries can overlap with initial sync data, causing duplicate key errors. This adds ON CONFLICT DO UPDATE handling specifically for move-in inserts. - Add primaryKey param to applyInsertsToTable - Use ON CONFLICT DO UPDATE for move-in inserts - Update changeset description --- .changeset/fix-move-in-lsn-filtering.md | 8 ++-- packages/pglite-sync/src/apply.ts | 50 +++++++++++++++++++++++++ packages/pglite-sync/src/index.ts | 2 + 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/.changeset/fix-move-in-lsn-filtering.md b/.changeset/fix-move-in-lsn-filtering.md index 17db42d48..85bb5eaea 100644 --- a/.changeset/fix-move-in-lsn-filtering.md +++ b/.changeset/fix-move-in-lsn-filtering.md @@ -2,8 +2,10 @@ "@electric-sql/pglite-sync": patch --- -Fix move-in messages being incorrectly skipped due to LSN filtering +Fix move-in messages from tagged_subqueries not being synced -Move-in messages from Electric's tagged_subqueries feature don't include an LSN header because they originate from direct database queries rather than the PostgreSQL replication stream. Previously, these messages were being filtered out as "already seen" because the missing LSN defaulted to 0, which was less than or equal to the last committed LSN. +This fixes two issues with move-in messages from Electric's `tagged_subqueries` feature: -This fix checks for the `is_move_in` header and bypasses LSN filtering for move-in messages, ensuring that rows moving into a shape due to subquery condition changes are properly synced to the client. +1. **LSN filtering bypass**: Move-in messages don't include an LSN header because they originate from direct database queries rather than the PostgreSQL replication stream. Previously, these messages were being filtered out as "already seen" because the missing LSN defaulted to 0. This fix checks for the `is_move_in` header and bypasses LSN filtering for these messages. + +2. **Duplicate key handling**: Move-in data can overlap with data from the initial sync (e.g., when a row "moves in" to match a subquery that it already matched during initial sync). This fix uses `ON CONFLICT DO UPDATE` for move-in inserts to handle these duplicates gracefully, updating the row with the latest data instead of erroring. diff --git a/packages/pglite-sync/src/apply.ts b/packages/pglite-sync/src/apply.ts index 56437bd85..a11237a79 100644 --- a/packages/pglite-sync/src/apply.ts +++ b/packages/pglite-sync/src/apply.ts @@ -23,16 +23,38 @@ export async function applyMessageToTable({ }: ApplyMessageToTableOptions) { const data = mapColumns ? doMapColumns(mapColumns, message) : message.value + // Check if this is a move-in message (from subquery-based shapes) + const isMoveIn = (message.headers as Record).is_move_in === true + switch (message.headers.operation) { case 'insert': { if (debug) console.log('inserting', data) const columns = Object.keys(data) + + // Build ON CONFLICT clause for move-in messages + let onConflictClause = '' + if (isMoveIn && primaryKey && primaryKey.length > 0) { + const nonPkColumns = columns.filter((c) => !primaryKey.includes(c)) + if (nonPkColumns.length > 0) { + onConflictClause = ` + ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')}) + DO UPDATE SET ${nonPkColumns.map((c) => `"${c}" = EXCLUDED."${c}"`).join(', ')} + ` + } else { + onConflictClause = ` + ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')}) + DO NOTHING + ` + } + } + return await pg.query( ` INSERT INTO "${schema}"."${table}" (${columns.map((s) => '"' + s + '"').join(', ')}) VALUES (${columns.map((_v, i) => '$' + (i + 1)).join(', ')}) + ${onConflictClause} `, columns.map((column) => data[column]), ) @@ -86,6 +108,7 @@ export interface BulkApplyMessagesToTableOptions { schema?: string messages: InsertChangeMessage[] mapColumns?: MapColumns + primaryKey?: string[] debug: boolean } @@ -95,6 +118,7 @@ export async function applyInsertsToTable({ schema = 'public', messages, mapColumns, + primaryKey, debug, }: BulkApplyMessagesToTableOptions) { // Map the messages to the data to be inserted @@ -104,6 +128,12 @@ export async function applyInsertsToTable({ if (debug) console.log('inserting', data) + // Check if any message is a move-in (from subquery-based shapes) + // Move-in data can overlap with existing data, so we need ON CONFLICT handling + const hasMoveIn = messages.some( + (m) => (m.headers as Record).is_move_in === true, + ) + // Get column names from the first message const columns = Object.keys(data[0]) @@ -176,11 +206,31 @@ export async function applyInsertsToTable({ // Helper function to execute a batch insert const executeBatch = async (batch: Record[]) => { + // Build ON CONFLICT clause for move-in messages + // Move-in data can contain rows that already exist from initial sync + let onConflictClause = '' + if (hasMoveIn && primaryKey && primaryKey.length > 0) { + const nonPkColumns = columns.filter((c) => !primaryKey.includes(c)) + if (nonPkColumns.length > 0) { + onConflictClause = ` + ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')}) + DO UPDATE SET ${nonPkColumns.map((c) => `"${c}" = EXCLUDED."${c}"`).join(', ')} + ` + } else { + // All columns are primary key, just ignore duplicates + onConflictClause = ` + ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')}) + DO NOTHING + ` + } + } + const sql = ` INSERT INTO "${schema}"."${table}" (${columns.map((s) => `"${s}"`).join(', ')}) VALUES ${batch.map((_, j) => `(${columns.map((_v, k) => '$' + (j * columns.length + k + 1)).join(', ')})`).join(', ')} + ${onConflictClause} ` const values = batch.flatMap((message) => columns.map((column) => message[column]), diff --git a/packages/pglite-sync/src/index.ts b/packages/pglite-sync/src/index.ts index 7d318e122..fa619b0db 100644 --- a/packages/pglite-sync/src/index.ts +++ b/packages/pglite-sync/src/index.ts @@ -255,6 +255,7 @@ async function createPlugin( schema: shape.schema, messages: initialInserts as InsertChangeMessage[], mapColumns: shape.mapColumns, + primaryKey: shape.primaryKey, debug, }) @@ -282,6 +283,7 @@ async function createPlugin( schema: shape.schema, messages: bulkInserts as InsertChangeMessage[], mapColumns: shape.mapColumns, + primaryKey: shape.primaryKey, debug, }) bulkInserts.length = 0