diff --git a/.changeset/fix-move-in-lsn-filtering.md b/.changeset/fix-move-in-lsn-filtering.md new file mode 100644 index 000000000..85bb5eaea --- /dev/null +++ b/.changeset/fix-move-in-lsn-filtering.md @@ -0,0 +1,11 @@ +--- +"@electric-sql/pglite-sync": patch +--- + +Fix move-in messages from tagged_subqueries not being synced + +This fixes two issues with move-in messages from Electric's `tagged_subqueries` feature: + +1. **LSN filtering bypass**: Move-in messages don't include an LSN header because they originate from direct database queries rather than the PostgreSQL replication stream. Previously, these messages were being filtered out as "already seen" because the missing LSN defaulted to 0. This fix checks for the `is_move_in` header and bypasses LSN filtering for these messages. + +2. **Duplicate key handling**: Move-in data can overlap with data from the initial sync (e.g., when a row "moves in" to match a subquery that it already matched during initial sync). This fix uses `ON CONFLICT DO UPDATE` for move-in inserts to handle these duplicates gracefully, updating the row with the latest data instead of erroring. diff --git a/packages/pglite-sync/src/apply.ts b/packages/pglite-sync/src/apply.ts index 56437bd85..a11237a79 100644 --- a/packages/pglite-sync/src/apply.ts +++ b/packages/pglite-sync/src/apply.ts @@ -23,16 +23,38 @@ export async function applyMessageToTable({ }: ApplyMessageToTableOptions) { const data = mapColumns ? doMapColumns(mapColumns, message) : message.value + // Check if this is a move-in message (from subquery-based shapes) + const isMoveIn = (message.headers as Record).is_move_in === true + switch (message.headers.operation) { case 'insert': { if (debug) console.log('inserting', data) const columns = Object.keys(data) + + // Build ON CONFLICT clause for move-in messages + let onConflictClause = '' + if (isMoveIn && primaryKey && primaryKey.length > 0) { + const nonPkColumns = columns.filter((c) => !primaryKey.includes(c)) + if (nonPkColumns.length > 0) { + onConflictClause = ` + ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')}) + DO UPDATE SET ${nonPkColumns.map((c) => `"${c}" = EXCLUDED."${c}"`).join(', ')} + ` + } else { + onConflictClause = ` + ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')}) + DO NOTHING + ` + } + } + return await pg.query( ` INSERT INTO "${schema}"."${table}" (${columns.map((s) => '"' + s + '"').join(', ')}) VALUES (${columns.map((_v, i) => '$' + (i + 1)).join(', ')}) + ${onConflictClause} `, columns.map((column) => data[column]), ) @@ -86,6 +108,7 @@ export interface BulkApplyMessagesToTableOptions { schema?: string messages: InsertChangeMessage[] mapColumns?: MapColumns + primaryKey?: string[] debug: boolean } @@ -95,6 +118,7 @@ export async function applyInsertsToTable({ schema = 'public', messages, mapColumns, + primaryKey, debug, }: BulkApplyMessagesToTableOptions) { // Map the messages to the data to be inserted @@ -104,6 +128,12 @@ export async function applyInsertsToTable({ if (debug) console.log('inserting', data) + // Check if any message is a move-in (from subquery-based shapes) + // Move-in data can overlap with existing data, so we need ON CONFLICT handling + const hasMoveIn = messages.some( + (m) => (m.headers as Record).is_move_in === true, + ) + // Get column names from the first message const columns = Object.keys(data[0]) @@ -176,11 +206,31 @@ export async function applyInsertsToTable({ // Helper function to execute a batch insert const executeBatch = async (batch: Record[]) => { + // Build ON CONFLICT clause for move-in messages + // Move-in data can contain rows that already exist from initial sync + let onConflictClause = '' + if (hasMoveIn && primaryKey && primaryKey.length > 0) { + const nonPkColumns = columns.filter((c) => !primaryKey.includes(c)) + if (nonPkColumns.length > 0) { + onConflictClause = ` + ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')}) + DO UPDATE SET ${nonPkColumns.map((c) => `"${c}" = EXCLUDED."${c}"`).join(', ')} + ` + } else { + // All columns are primary key, just ignore duplicates + onConflictClause = ` + ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')}) + DO NOTHING + ` + } + } + const sql = ` INSERT INTO "${schema}"."${table}" (${columns.map((s) => `"${s}"`).join(', ')}) VALUES ${batch.map((_, j) => `(${columns.map((_v, k) => '$' + (j * columns.length + k + 1)).join(', ')})`).join(', ')} + ${onConflictClause} ` const values = batch.flatMap((message) => columns.map((column) => message[column]), diff --git a/packages/pglite-sync/src/index.ts b/packages/pglite-sync/src/index.ts index 641dae7e5..fa619b0db 100644 --- a/packages/pglite-sync/src/index.ts +++ b/packages/pglite-sync/src/index.ts @@ -255,6 +255,7 @@ async function createPlugin( schema: shape.schema, messages: initialInserts as InsertChangeMessage[], mapColumns: shape.mapColumns, + primaryKey: shape.primaryKey, debug, }) @@ -282,6 +283,7 @@ async function createPlugin( schema: shape.schema, messages: bulkInserts as InsertChangeMessage[], mapColumns: shape.mapColumns, + primaryKey: shape.primaryKey, debug, }) bulkInserts.length = 0 @@ -351,7 +353,14 @@ async function createPlugin( typeof message.headers.lsn === 'string' ? BigInt(message.headers.lsn) : BigInt(0) // we default to 0 if there no lsn on the message - if (lsn <= lastCommittedLsnForShape) { + + // Move-in messages from subquery-based shapes don't have an LSN + // because they come from direct DB queries, not from replication. + // We should never skip these based on LSN filtering. + const isMoveIn = (message.headers as Record) + .is_move_in === true + + if (!isMoveIn && lsn <= lastCommittedLsnForShape) { // We are replaying changes / have already seen this lsn // skip and move on to the next message return