Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/fix-move-in-lsn-filtering.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@electric-sql/pglite-sync": patch
---

Fix move-in messages from tagged_subqueries not being synced

This fixes two issues with move-in messages from Electric's `tagged_subqueries` feature:

1. **LSN filtering bypass**: Move-in messages don't include an LSN header because they originate from direct database queries rather than the PostgreSQL replication stream. Previously, these messages were being filtered out as "already seen" because the missing LSN defaulted to 0. This fix checks for the `is_move_in` header and bypasses LSN filtering for these messages.

2. **Duplicate key handling**: Move-in data can overlap with data from the initial sync (e.g., when a row "moves in" to match a subquery that it already matched during initial sync). This fix uses `ON CONFLICT DO UPDATE` for move-in inserts to handle these duplicates gracefully, updating the row with the latest data instead of erroring.
50 changes: 50 additions & 0 deletions packages/pglite-sync/src/apply.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,38 @@ export async function applyMessageToTable({
}: ApplyMessageToTableOptions) {
const data = mapColumns ? doMapColumns(mapColumns, message) : message.value

// Check if this is a move-in message (from subquery-based shapes)
const isMoveIn = (message.headers as Record<string, unknown>).is_move_in === true

switch (message.headers.operation) {
case 'insert': {
if (debug) console.log('inserting', data)
const columns = Object.keys(data)

// Build ON CONFLICT clause for move-in messages
let onConflictClause = ''
if (isMoveIn && primaryKey && primaryKey.length > 0) {
const nonPkColumns = columns.filter((c) => !primaryKey.includes(c))
if (nonPkColumns.length > 0) {
onConflictClause = `
ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')})
DO UPDATE SET ${nonPkColumns.map((c) => `"${c}" = EXCLUDED."${c}"`).join(', ')}
`
} else {
onConflictClause = `
ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')})
DO NOTHING
`
}
}

return await pg.query(
`
INSERT INTO "${schema}"."${table}"
(${columns.map((s) => '"' + s + '"').join(', ')})
VALUES
(${columns.map((_v, i) => '$' + (i + 1)).join(', ')})
${onConflictClause}
`,
columns.map((column) => data[column]),
)
Expand Down Expand Up @@ -86,6 +108,7 @@ export interface BulkApplyMessagesToTableOptions {
schema?: string
messages: InsertChangeMessage[]
mapColumns?: MapColumns
primaryKey?: string[]
debug: boolean
}

Expand All @@ -95,6 +118,7 @@ export async function applyInsertsToTable({
schema = 'public',
messages,
mapColumns,
primaryKey,
debug,
}: BulkApplyMessagesToTableOptions) {
// Map the messages to the data to be inserted
Expand All @@ -104,6 +128,12 @@ export async function applyInsertsToTable({

if (debug) console.log('inserting', data)

// Check if any message is a move-in (from subquery-based shapes)
// Move-in data can overlap with existing data, so we need ON CONFLICT handling
const hasMoveIn = messages.some(
(m) => (m.headers as Record<string, unknown>).is_move_in === true,
)

// Get column names from the first message
const columns = Object.keys(data[0])

Expand Down Expand Up @@ -176,11 +206,31 @@ export async function applyInsertsToTable({

// Helper function to execute a batch insert
const executeBatch = async (batch: Record<string, any>[]) => {
// Build ON CONFLICT clause for move-in messages
// Move-in data can contain rows that already exist from initial sync
let onConflictClause = ''
if (hasMoveIn && primaryKey && primaryKey.length > 0) {
const nonPkColumns = columns.filter((c) => !primaryKey.includes(c))
if (nonPkColumns.length > 0) {
onConflictClause = `
ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')})
DO UPDATE SET ${nonPkColumns.map((c) => `"${c}" = EXCLUDED."${c}"`).join(', ')}
`
} else {
// All columns are primary key, just ignore duplicates
onConflictClause = `
ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')})
DO NOTHING
`
}
}

const sql = `
INSERT INTO "${schema}"."${table}"
(${columns.map((s) => `"${s}"`).join(', ')})
VALUES
${batch.map((_, j) => `(${columns.map((_v, k) => '$' + (j * columns.length + k + 1)).join(', ')})`).join(', ')}
${onConflictClause}
`
const values = batch.flatMap((message) =>
columns.map((column) => message[column]),
Expand Down
11 changes: 10 additions & 1 deletion packages/pglite-sync/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ async function createPlugin(
schema: shape.schema,
messages: initialInserts as InsertChangeMessage[],
mapColumns: shape.mapColumns,
primaryKey: shape.primaryKey,
debug,
})

Expand Down Expand Up @@ -282,6 +283,7 @@ async function createPlugin(
schema: shape.schema,
messages: bulkInserts as InsertChangeMessage[],
mapColumns: shape.mapColumns,
primaryKey: shape.primaryKey,
debug,
})
bulkInserts.length = 0
Expand Down Expand Up @@ -351,7 +353,14 @@ async function createPlugin(
typeof message.headers.lsn === 'string'
? BigInt(message.headers.lsn)
: BigInt(0) // we default to 0 if there no lsn on the message
if (lsn <= lastCommittedLsnForShape) {

// Move-in messages from subquery-based shapes don't have an LSN
// because they come from direct DB queries, not from replication.
// We should never skip these based on LSN filtering.
const isMoveIn = (message.headers as Record<string, unknown>)
.is_move_in === true

if (!isMoveIn && lsn <= lastCommittedLsnForShape) {
// We are replaying changes / have already seen this lsn
// skip and move on to the next message
return
Expand Down