From 0b1cbed86d263f206e9c2d25833211b7a32a135c Mon Sep 17 00:00:00 2001 From: Nick Dubelman Date: Tue, 27 May 2025 16:07:08 -0700 Subject: [PATCH] fix bug when primary keys are not the first columns --- job_test.go | 79 ++++++++++++++++++++++++++++++++++++++--------------- sync.go | 8 +++--- 2 files changed, 61 insertions(+), 26 deletions(-) diff --git a/job_test.go b/job_test.go index 5efdaba..029aa58 100644 --- a/job_test.go +++ b/job_test.go @@ -204,37 +204,72 @@ func TestExecJob_multiple_primary_key(t *testing.T) { } results, err := config.ExecJob("users") - require.NoError(t, err) - require.Len(t, results.Results, 1) - for _, result := range results.Results { - assert.NoError(t, result.Error) - assert.True(t, result.Synced) + validateSync := func() { + require.NoError(t, err) + require.Len(t, results.Results, 1) + + for _, result := range results.Results { + assert.NoError(t, result.Error) + assert.True(t, result.Synced) + } + + // Check that the data was copied to the target + order := strings.Join(primaryKeys, ", ") + rows, err := target1.Queryx("SELECT * FROM users ORDER BY " + order) + require.NoError(t, err) + + defer rows.Close() + + var data [][]any + for rows.Next() { + cols, err := rows.SliceScan() + require.NoError(t, err) + data = append(data, cols) + } + + require.Equal(t, len(expectedData), len(data)) + + // Make sure the data is correct + for i := range expectedData { + require.Len(t, data[i], len(expectedData[i])) + for j := range expectedData[i] { + require.EqualValues(t, expectedData[i][j], data[i][j]) + } + } } - // Check that the data was copied to the target - order := strings.Join(primaryKeys, ", ") - rows, err := target1.Queryx("SELECT * FROM users ORDER BY " + order) + validateSync() + + // Make sure the multiple PK works when the PK columns aren't the first columns + // Set the target1 data so that it requires a sync with inserts, updates. and deletes + delete := sq.Delete(target1Config.Table) + sql, args, err = delete.ToSql() require.NoError(t, err) + target1.MustExec(sql, args...) - defer rows.Close() + insert = sq.Insert(target1Config.Table).Columns("name", "age", "favoriteColor") + insert = insert.Values("Alice", 30, "purpz") // needs to be updated + insert = insert.Values("Nick", 420, "purpz") // needs to be deleted + sql, args, err = insert.ToSql() + require.NoError(t, err) + target1.MustExec(sql, args...) - var data [][]any - for rows.Next() { - cols, err := rows.SliceScan() - require.NoError(t, err) - data = append(data, cols) + config = Config{ + Jobs: map[string]JobConfig{ + "users": { + PrimaryKeys: primaryKeys, + Columns: []string{"name", "favoriteColor", "age"}, // Age is at the end + Source: sourceConfig, + Targets: []TableConfig{target1Config}, + }, + }, } - require.Equal(t, len(expectedData), len(data)) + results, err = config.ExecJob("users") + require.NoError(t, err) - // Make sure the data is correct - for i := range expectedData { - require.Len(t, data[i], len(expectedData[i])) - for j := range expectedData[i] { - require.EqualValues(t, expectedData[i][j], data[i][j]) - } - } + validateSync() } func TestExecJob_mysql(t *testing.T) { diff --git a/sync.go b/sync.go index cd8d3a6..0811f9e 100644 --- a/sync.go +++ b/sync.go @@ -142,7 +142,7 @@ func (t table) syncTarget( // There is a diff, perform an UPDATE update := sq. Update(tableName). - Where(key.whereClause(t.primaryKeys, t.primaryKeyIndices)) + Where(key.whereClause(t.columns, t.primaryKeyIndices)) pkSet := map[string]struct{}{} for _, pk := range t.primaryKeys { @@ -169,7 +169,7 @@ func (t table) syncTarget( for key := range targetMap { delete := sq. Delete(tableName). - Where(key.whereClause(t.primaryKeys, t.primaryKeyIndices)) + Where(key.whereClause(t.columns, t.primaryKeyIndices)) deletes = append(deletes, delete) } @@ -294,11 +294,11 @@ func (job JobConfig) getPrimaryKeyIndices() []int { // For now, we limit to a maximum of 3 primary key columns type primaryKeyTuple struct{ First, Second, Third any } -func (key primaryKeyTuple) whereClause(primaryKeys []string, primaryKeyIndices []int) sq.Eq { +func (key primaryKeyTuple) whereClause(columns []string, primaryKeyIndices []int) sq.Eq { where := sq.Eq{} for i, idx := range primaryKeyIndices { - columnName := primaryKeys[idx] + columnName := columns[idx] switch i { case 0: