diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e7e2911..8fe0a83 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,6 +20,19 @@ jobs: --health-timeout 5s --health-retries 5 + maria: + image: mariadb:10.6 + env: + MYSQL_DATABASE: ci_tests + MYSQL_ALLOW_EMPTY_PASSWORD: '1' + ports: + - 3307:3306 + options: >- + --health-cmd "mysqladmin ping" + --health-interval 20s + --health-timeout 5s + --health-retries 5 + steps: - uses: actions/checkout@v4 @@ -32,4 +45,6 @@ jobs: env: MYSQL_DB_PORT: 3306 MYSQL_DB_NAME: ci_tests + MARIADB_DB_PORT: 3307 + MARIADB_DB_NAME: ci_tests run: go test -v ./... diff --git a/docker-compose.yaml b/docker-compose.yaml index cc708f7..439d9b1 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -7,3 +7,12 @@ services: environment: MYSQL_ALLOW_EMPTY_PASSWORD: 'yes' MYSQL_DATABASE: app + + maria106: + image: mariadb:10.6 + platform: linux/amd64 + ports: + - '3421:3306' + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: 'yes' + MYSQL_DATABASE: app diff --git a/job_test.go b/job_test.go index 692fcc0..5efdaba 100644 --- a/job_test.go +++ b/job_test.go @@ -514,3 +514,300 @@ func TestExecJob_mysql_multiple_primary_key(t *testing.T) { } } } + +func TestExecJob_mysql_json_columns(t *testing.T) { + dbName := os.Getenv("MYSQL_DB_NAME") + dbPortStr := os.Getenv("MYSQL_DB_PORT") + dbPort, _ := strconv.Atoi(dbPortStr) + + createTable := func(name string) string { + return fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY NOT NULL, + name TEXT NOT NULL, + age INT NOT NULL, + settings JSON NOT NULL + ) + `, name) + } + + sourceConfig := TableConfig{ + Driver: "mysql", + Table: "users_json", + User: "root", + DB: dbName, + Port: dbPort, + } + + source := table{config: sourceConfig} + err := source.connect() + require.NoError(t, err) + source.MustExec(createTable(sourceConfig.Table)) + + expectedData := [][]any{ + {1, "Alice", 30, `{"favoriteColor": "green"}`}, + {2, "Bob", 25, `{"favoriteColor": "orange"}`}, + {3, "Charlie", 35, `{"favoriteColor": "blue"}`}, + } + + insert := sq. + Insert(sourceConfig.Table). + Columns("id", "name", "age", "settings") + + for _, row := range expectedData { + insert = insert.Values(row...) + } + + sql, args, err := insert.ToSql() + require.NoError(t, err) + + // Insert some data into the source + source.MustExec(sql, args...) + + target1Config := TableConfig{ + Driver: "mysql", + Table: "users2_json", + User: "root", + DB: dbName, + Port: dbPort, + } + + target1 := table{config: target1Config} + err = target1.connect() + require.NoError(t, err) + target1.MustExec(createTable(target1Config.Table)) + + // target1 has some data that needs to be updated/deleted + target1.MustExec( + fmt.Sprintf( + `INSERT INTO %s (id, name, age, settings) VALUES (1, 'Nick', 31, "{}")`, + target1Config.Table, + ), + ) + target1.MustExec( + fmt.Sprintf( + `INSERT INTO %s (id, name, age, settings) VALUES (420, 'Azamat', 69, "{}")`, + target1Config.Table, + ), + ) + + target2Config := TableConfig{ + Driver: "mysql", + Table: "users3_json", + User: "root", + DB: dbName, + Port: dbPort, + } + + target2 := table{config: target2Config} + err = target2.connect() + require.NoError(t, err) + target2.MustExec(createTable(target2Config.Table)) + + // target2 has no data + + target3Config := TableConfig{ + Label: "already in sync", + Driver: "mysql", + Table: "users4_json", + User: "root", + DB: dbName, + Port: dbPort, + } + + target3 := table{config: target3Config} + err = target3.connect() + require.NoError(t, err) + target3.MustExec(createTable(target3Config.Table)) + + // table3 is already in sync + insert = sq.Insert(target3Config.Table).Columns("id", "name", "age", "settings") + + for _, row := range expectedData { + insert = insert.Values(row...) + } + + sql, args, err = insert.ToSql() + require.NoError(t, err) + target3.MustExec(sql, args...) + + config := Config{ + Jobs: map[string]JobConfig{ + "users": { + PrimaryKeys: []string{"id"}, + Columns: []string{"id", "name", "age", "settings"}, + Source: sourceConfig, + Targets: []TableConfig{target1Config, target2Config, target3Config}, + }, + }, + } + + results, err := config.ExecJob("users") + require.NoError(t, err) + require.Len(t, results.Results, 3) + + for _, result := range results.Results { + assert.NoError(t, result.Error) + + if result.Target.Label == "already in sync" { + assert.False(t, result.Synced) + } else { + assert.True(t, result.Synced) + } + } + + // Check that the data was copied to each target + for _, target := range []table{target1, target2, target3} { + query := fmt.Sprintf("SELECT * FROM %s", target.config.Table) + rows, err := target.Queryx(query) + require.NoError(t, err) + + defer rows.Close() + + var data [][]any + for rows.Next() { + cols, err := rows.SliceScan() + require.NoError(t, err) + data = append(data, cols) + } + + require.Equal(t, len(expectedData), len(data)) + + // Make sure the data is correct + for i := range expectedData { + require.Len(t, data[i], len(expectedData[i])) + for j := range expectedData[i] { + require.EqualValues(t, expectedData[i][j], data[i][j]) + } + } + } +} + +func TestExecJob_mysql_to_maria_json_columns(t *testing.T) { + mysqlDBName := os.Getenv("MYSQL_DB_NAME") + mysqlDBPortStr := os.Getenv("MYSQL_DB_PORT") + mysqlDBPort, _ := strconv.Atoi(mysqlDBPortStr) + mariaDBName := os.Getenv("MARIADB_DB_NAME") + mariaDBPortStr := os.Getenv("MARIADB_DB_PORT") + mariaDBPort, _ := strconv.Atoi(mariaDBPortStr) + + createTable := func(name string) string { + return fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY NOT NULL, + name TEXT NOT NULL, + age INT NOT NULL, + settings JSON NOT NULL + ) + `, name) + } + + sourceConfig := TableConfig{ + Driver: "mysql", + Table: "users_json_maria", + User: "root", + DB: mysqlDBName, + Port: mysqlDBPort, + } + + source := table{config: sourceConfig} + err := source.connect() + require.NoError(t, err) + source.MustExec(createTable(sourceConfig.Table)) + + expectedData := [][]any{ + {1, "Alice", 30, `{"favoriteColor": "green"}`}, + {2, "Bob", 25, `{"favoriteColor": "orange"}`}, + {3, "Charlie", 35, `{"favoriteColor": "blue"}`}, + } + + insert := sq. + Insert(sourceConfig.Table). + Columns("id", "name", "age", "settings") + + for _, row := range expectedData { + insert = insert.Values(row...) + } + + sql, args, err := insert.ToSql() + require.NoError(t, err) + + // Insert some data into the source + source.MustExec(sql, args...) + + targetConfig := TableConfig{ + Driver: "mysql", + Table: "users2_json_maria", + User: "root", + DB: mariaDBName, + Port: mariaDBPort, + } + + target := table{config: targetConfig} + err = target.connect() + require.NoError(t, err) + target.MustExec(createTable(targetConfig.Table)) + + // target1 has some data that needs to be updated/deleted + target.MustExec( + fmt.Sprintf( + `INSERT INTO %s (id, name, age, settings) VALUES (1, 'Nick', 31, "{}")`, + targetConfig.Table, + ), + ) + target.MustExec( + fmt.Sprintf( + `INSERT INTO %s (id, name, age, settings) VALUES (420, 'Azamat', 69, "{}")`, + targetConfig.Table, + ), + ) + + config := Config{ + Jobs: map[string]JobConfig{ + "users": { + PrimaryKeys: []string{"id"}, + Columns: []string{"id", "name", "age", "settings"}, + Source: sourceConfig, + Targets: []TableConfig{targetConfig}, + }, + }, + } + + results, err := config.ExecJob("users") + require.NoError(t, err) + require.Len(t, results.Results, 1) + + for _, result := range results.Results { + assert.NoError(t, result.Error) + + if result.Target.Label == "already in sync" { + assert.False(t, result.Synced) + } else { + assert.True(t, result.Synced) + } + } + + // Check that the data was copied to each target + query := fmt.Sprintf("SELECT * FROM %s", target.config.Table) + rows, err := target.Queryx(query) + require.NoError(t, err) + + defer rows.Close() + + var data [][]any + for rows.Next() { + cols, err := rows.SliceScan() + require.NoError(t, err) + data = append(data, cols) + } + + require.Equal(t, len(expectedData), len(data)) + + // Make sure the data is correct + for i := range expectedData { + require.Len(t, data[i], len(expectedData[i])) + for j := range expectedData[i] { + require.EqualValues(t, expectedData[i][j], data[i][j]) + } + } +}