diff --git a/common/utils/utils.go b/common/utils/utils.go index 4081b2d347..8254521330 100644 --- a/common/utils/utils.go +++ b/common/utils/utils.go @@ -170,6 +170,43 @@ func PreloadGCSFiles(tables []ManifestTable) ([]ManifestTable, error) { return tables, nil } +func WriteToGCS(filePath, fileName, data string) error { + ctx := context.Background() + + client, err := storage.NewClient(ctx) + if err != nil { + fmt.Printf("Failed to create GCS client") + return err + } + defer client.Close() + if filePath[len(filePath)-1] != '/' { + filePath = filePath + "/" + } + u, err := url.Parse(filePath) + if err != nil { + fmt.Printf("parseFilePath: unable to parse file path %s", filePath) + return err + } + if u.Scheme != "gs" { + fmt.Printf("not a valid GCS path: %s, should start with 'gs'", filePath) + return err + } + bucketName := u.Host + bucket := client.Bucket(bucketName) + obj := bucket.Object(u.Path[1:] + fileName) + + w := obj.NewWriter(ctx) + if _, err := fmt.Fprint(w, data); err != nil { + fmt.Printf("Failed to write to Cloud Storage: %s", filePath) + return err + } + if err := w.Close(); err != nil { + fmt.Printf("Failed to close GCS file: %s", filePath) + return err + } + return nil +} + // GetProject returns the cloud project we should use for accessing Spanner. // Use environment variable GCLOUD_PROJECT if it is set. // Otherwise, use the default project returned from gcloud. diff --git a/conversion/conversion.go b/conversion/conversion.go index 1dbd0ae417..1453cc3f26 100644 --- a/conversion/conversion.go +++ b/conversion/conversion.go @@ -184,6 +184,18 @@ func performSnapshotMigration(config writer.BatchWriterConfig, conv *internal.Co return batchWriter, nil } +func snapshotMigrationHandler(sourceProfile profiles.SourceProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, infoSchema common.InfoSchema) (*writer.BatchWriter, error) { + switch sourceProfile.Driver { + // Skip snapshot migration via harbourbridge for mysql and oracle since dataflow job will job will handle this from backfilled data. + case constants.MYSQL, constants.ORACLE: + return &writer.BatchWriter{}, nil + case constants.DYNAMODB: + return performSnapshotMigration(config, conv, client, infoSchema) + default: + return &writer.BatchWriter{}, fmt.Errorf("streaming migration not supported for driver %s", sourceProfile.Driver) + } +} + func dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client) (*writer.BatchWriter, error) { infoSchema, err := GetInfoSchema(sourceProfile, targetProfile) if err != nil { @@ -195,16 +207,19 @@ func dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile, if err != nil { return nil, err } - } - bw, err := performSnapshotMigration(config, conv, client, infoSchema) - if err != nil { - return nil, err - } - if sourceProfile.Conn.Streaming { + bw, err := snapshotMigrationHandler(sourceProfile, config, conv, client, infoSchema) + if err != nil { + return nil, err + } err = infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo) if err != nil { return nil, err } + return bw, nil + } + bw, err := performSnapshotMigration(config, conv, client, infoSchema) + if err != nil { + return nil, err } return bw, nil } @@ -458,6 +473,7 @@ func CreateOrUpdateDatabase(ctx context.Context, adminClient *database.DatabaseA } // Adding migration metadata to the outgoing context. migrationData := metrics.GetMigrationData(conv, driver, targetDb, constants.SchemaConv) + fmt.Println(migrationData.String()) serializedMigrationData, _ := proto.Marshal(migrationData) migrationMetadataValue := base64.StdEncoding.EncodeToString(serializedMigrationData) ctx = metadata.AppendToOutgoingContext(ctx, migrationMetadataKey, migrationMetadataValue) diff --git a/internal/convert.go b/internal/convert.go index 8f66667049..120be1bb54 100644 --- a/internal/convert.go +++ b/internal/convert.go @@ -309,7 +309,7 @@ func (conv *Conv) AddPrimaryKeys() { if !primaryKeyPopulated { k := conv.buildPrimaryKey(t) ct.ColNames = append(ct.ColNames, k) - ct.ColDefs[k] = ddl.ColumnDef{Name: k, T: ddl.Type{Name: ddl.Int64}} + ct.ColDefs[k] = ddl.ColumnDef{Name: k, T: ddl.Type{Name: ddl.String, Len: 50}} ct.Pks = []ddl.IndexKey{{Col: k}} conv.SyntheticPKeys[t] = SyntheticPKey{k, 0} } diff --git a/internal/convert_test.go b/internal/convert_test.go index 82832abd83..5ec5f77fa1 100644 --- a/internal/convert_test.go +++ b/internal/convert_test.go @@ -218,7 +218,7 @@ func TestAddPrimaryKeys(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "a": {Name: "a", T: ddl.Type{Name: ddl.Int64}}, "b": {Name: "b", T: ddl.Type{Name: ddl.Float64}}, - "synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{{Col: "synth_id"}}, Indexes: []ddl.CreateIndex{{Name: "", Table: "", Unique: false, Keys: []ddl.IndexKey{{Col: "b"}}}}, @@ -246,7 +246,7 @@ func TestAddPrimaryKeys(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "a": {Name: "a", T: ddl.Type{Name: ddl.Int64}}, "b": {Name: "b", T: ddl.Type{Name: ddl.Float64}}, - "synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{{Col: "synth_id"}}}, }, diff --git a/sources/mysql/data.go b/sources/mysql/data.go index 590e811900..33de4ee39f 100644 --- a/sources/mysql/data.go +++ b/sources/mysql/data.go @@ -84,7 +84,7 @@ func ConvertData(conv *internal.Conv, srcTable string, srcCols []string, srcSche } if aux, ok := conv.SyntheticPKeys[spTable]; ok { c = append(c, aux.Col) - v = append(v, int64(bits.Reverse64(uint64(aux.Sequence)))) + v = append(v, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence))))) aux.Sequence++ conv.SyntheticPKeys[spTable] = aux } diff --git a/sources/mysql/data_test.go b/sources/mysql/data_test.go index c48baae0ca..5e02712a2e 100644 --- a/sources/mysql/data_test.go +++ b/sources/mysql/data_test.go @@ -279,14 +279,14 @@ func TestConvertsyntheticPKey(t *testing.T) { cols: []string{"a", "b", "c"}, vals: []string{"6", "6.6", "true"}, ecols: []string{"a", "b", "c", "synth_id"}, - evals: []interface{}{int64(6), float64(6.6), true, int64(0)}, + evals: []interface{}{int64(6), float64(6.6), true, fmt.Sprintf("%d", 0)}, }, { name: "Sequence 1", cols: []string{"a"}, vals: []string{"7"}, ecols: []string{"a", "synth_id"}, - evals: []interface{}{int64(7), int64(bits.Reverse64(1))}, + evals: []interface{}{int64(7), fmt.Sprintf("%d", int64(bits.Reverse64(1)))}, }, } tableName := "testtable" diff --git a/sources/mysql/infoschema.go b/sources/mysql/infoschema.go index 2542877ea9..97c5043558 100644 --- a/sources/mysql/infoschema.go +++ b/sources/mysql/infoschema.go @@ -17,6 +17,7 @@ package mysql import ( "context" "database/sql" + "encoding/json" "fmt" "sort" "strings" @@ -25,6 +26,7 @@ import ( _ "github.com/go-sql-driver/mysql" // The driver should be used via the database/sql package. _ "github.com/lib/pq" + "github.com/cloudspannerecosystem/harbourbridge/common/utils" "github.com/cloudspannerecosystem/harbourbridge/internal" "github.com/cloudspannerecosystem/harbourbridge/profiles" "github.com/cloudspannerecosystem/harbourbridge/schema" @@ -350,8 +352,22 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte // performing a streaming migration. func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error { streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg) - err := streaming.StartDataflow(ctx, isi.SourceProfile, isi.TargetProfile, streamingCfg) + convJSON, err := json.MarshalIndent(conv, "", " ") if err != nil { + err = fmt.Errorf("can't encode session state to JSON: %v", err) + return err + } + fmt.Printf("Writing session file to GCS...") + err = utils.WriteToGCS(streamingCfg.TmpDir, "session.json", string(convJSON)) + if err != nil { + err = fmt.Errorf("error writing session file to GCS: %v", err) + return err + } + fmt.Println("Done") + + err = streaming.StartDataflow(ctx, isi.SourceProfile, isi.TargetProfile, streamingCfg) + if err != nil { + err = fmt.Errorf("error starting dataflow: %v", err) return err } return nil diff --git a/sources/mysql/infoschema_test.go b/sources/mysql/infoschema_test.go index dc1e101140..9e21ce24eb 100644 --- a/sources/mysql/infoschema_test.go +++ b/sources/mysql/infoschema_test.go @@ -388,7 +388,7 @@ func TestProcessData_MultiCol(t *testing.T) { "a": ddl.ColumnDef{Name: "a", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}, NotNull: true}, "b": ddl.ColumnDef{Name: "b", T: ddl.Type{Name: ddl.Float64}}, "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}}, } @@ -404,8 +404,8 @@ func TestProcessData_MultiCol(t *testing.T) { }) common.ProcessData(conv, isi) assert.Equal(t, []spannerData{ - {table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"cat", float64(42.3), int64(0)}}, - {table: "test", cols: []string{"a", "c", "synth_id"}, vals: []interface{}{"dog", int64(22), int64(-9223372036854775808)}}}, + {table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"cat", float64(42.3), "0"}}, + {table: "test", cols: []string{"a", "c", "synth_id"}, vals: []interface{}{"dog", int64(22), "-9223372036854775808"}}}, rows) assert.Equal(t, int64(0), conv.Unexpecteds()) } diff --git a/sources/mysql/mysqldump_test.go b/sources/mysql/mysqldump_test.go index 1111e81685..69378506c9 100644 --- a/sources/mysql/mysqldump_test.go +++ b/sources/mysql/mysqldump_test.go @@ -125,7 +125,7 @@ func TestProcessMySQLDump_MultiCol(t *testing.T) { "productid": {Name: "productid", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "userid": {Name: "userid", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}, NotNull: true}, "quantity": {Name: "quantity", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{{Col: "synth_id"}}}}, }, @@ -175,7 +175,7 @@ func TestProcessMySQLDump_MultiCol(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "c": {Name: "c", T: ddl.Type{Name: ddl.Int64}}, "d": {Name: "d", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{{Col: "synth_id"}}, Fks: []ddl.Foreignkey{{Name: "fk_test", Columns: []string{"d"}, ReferTable: "test", ReferColumns: []string{"a"}}}}}, @@ -209,7 +209,7 @@ func TestProcessMySQLDump_MultiCol(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.Int64}}, "d": ddl.ColumnDef{Name: "d", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "A_fk_test_2", Columns: []string{"d"}, ReferTable: "test", ReferColumns: []string{"a"}}, @@ -235,7 +235,7 @@ func TestProcessMySQLDump_MultiCol(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.Int64}}, "d": ddl.ColumnDef{Name: "d", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Columns: []string{"d"}, ReferTable: "test", ReferColumns: []string{"a"}}}}}, @@ -260,7 +260,7 @@ func TestProcessMySQLDump_MultiCol(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.Int64}}, "d": ddl.ColumnDef{Name: "d", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "fk_test", Columns: []string{"d"}, ReferTable: "test", ReferColumns: []string{"a"}}}}}, @@ -294,7 +294,7 @@ func TestProcessMySQLDump_MultiCol(t *testing.T) { "e": ddl.ColumnDef{Name: "e", T: ddl.Type{Name: ddl.Int64}}, "f": ddl.ColumnDef{Name: "f", T: ddl.Type{Name: ddl.Int64}}, "g": ddl.ColumnDef{Name: "g", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "fk_test", Columns: []string{"e"}, ReferTable: "test", ReferColumns: []string{"a"}}, @@ -322,7 +322,7 @@ func TestProcessMySQLDump_MultiCol(t *testing.T) { "e": ddl.ColumnDef{Name: "e", T: ddl.Type{Name: ddl.Int64}}, "f": ddl.ColumnDef{Name: "f", T: ddl.Type{Name: ddl.Int64}}, "g": ddl.ColumnDef{Name: "g", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "fk_test", Columns: []string{"e", "f"}, ReferTable: "test", ReferColumns: []string{"a", "b"}}}}}, @@ -343,7 +343,7 @@ func TestProcessMySQLDump_MultiCol(t *testing.T) { "a": ddl.ColumnDef{Name: "a", T: ddl.Type{Name: ddl.Int64}}, "b": ddl.ColumnDef{Name: "b", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Indexes: []ddl.CreateIndex{ddl.CreateIndex{Name: "custom_index", Table: "test", Unique: false, Keys: []ddl.IndexKey{ddl.IndexKey{Col: "b", Desc: false}, ddl.IndexKey{Col: "c", Desc: false}}}}}}, @@ -405,7 +405,7 @@ func TestProcessMySQLDump_MultiCol(t *testing.T) { "a": ddl.ColumnDef{Name: "a", T: ddl.Type{Name: ddl.Int64}}, "b": ddl.ColumnDef{Name: "b", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Indexes: []ddl.CreateIndex{ddl.CreateIndex{Name: "custom_index", Table: "test", Unique: false, Keys: []ddl.IndexKey{ddl.IndexKey{Col: "b", Desc: false}, ddl.IndexKey{Col: "c", Desc: false}}}}}}, @@ -468,7 +468,7 @@ func TestProcessMySQLDump_MultiCol(t *testing.T) { "a": ddl.ColumnDef{Name: "a", T: ddl.Type{Name: ddl.Int64}}, "b": ddl.ColumnDef{Name: "b", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Indexes: []ddl.CreateIndex{ddl.CreateIndex{Name: "custom_index", Table: "test", Unique: false, Keys: []ddl.IndexKey{ddl.IndexKey{Col: "b", Desc: false}, ddl.IndexKey{Col: "c", Desc: false}}}}}}, @@ -625,10 +625,10 @@ CREATE TABLE test (a text PRIMARY KEY, b text);`, "('a33','b',9),\n" + "('a3','b',7);", expectedData: []spannerData{ - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a1", "b1", int64(42), bitReverse(0)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a22", "b99", int64(6), bitReverse(1)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a33", "b", int64(9), bitReverse(2)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), bitReverse(3)}}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a1", "b1", int64(42), fmt.Sprintf("%d", bitReverse(0))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a22", "b99", int64(6), fmt.Sprintf("%d", bitReverse(1))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a33", "b", int64(9), fmt.Sprintf("%d", bitReverse(2))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), fmt.Sprintf("%d", bitReverse(3))}}}, }, { name: "INSERT INTO with empty cols", @@ -639,10 +639,10 @@ CREATE TABLE test (a text PRIMARY KEY, b text);`, "('a33','b',NULL),\n" + "('a3','b',7);\n", expectedData: []spannerData{ - spannerData{table: "test", cols: []string{"b", "n", "synth_id"}, vals: []interface{}{"b1", int64(42), bitReverse(0)}}, - spannerData{table: "test", cols: []string{"a", "n", "synth_id"}, vals: []interface{}{"a22", int64(6), bitReverse(1)}}, - spannerData{table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"a33", "b", bitReverse(2)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), bitReverse(3)}}}, + spannerData{table: "test", cols: []string{"b", "n", "synth_id"}, vals: []interface{}{"b1", int64(42), fmt.Sprintf("%d", bitReverse(0))}}, + spannerData{table: "test", cols: []string{"a", "n", "synth_id"}, vals: []interface{}{"a22", int64(6), fmt.Sprintf("%d", bitReverse(1))}}, + spannerData{table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"a33", "b", fmt.Sprintf("%d", bitReverse(2))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), fmt.Sprintf("%d", bitReverse(3))}}}, }, { name: "INSERT", @@ -657,7 +657,7 @@ CREATE TABLE test (a text PRIMARY KEY, b text);`, input: "CREATE TABLE test (a text NOT NULL, b text NOT NULL, n bigint);\n" + "INSERT INTO test (a, b, n) VALUES ('a42', 'b6', 2);", expectedData: []spannerData{ - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a42", "b6", int64(2), bitReverse(0)}}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a42", "b6", int64(2), fmt.Sprintf("%d", bitReverse(0))}}}, }, { name: "INSERT with spaces", @@ -809,14 +809,14 @@ func TestProcessMySQLDump_DataError(t *testing.T) { vals: []interface{}{int64(7), float64(42.1), true, getDate("2019-10-29"), []byte{0x89, 0x50}, []spanner.NullString{{StringVal: "42", Valid: true}, {StringVal: "6", Valid: true}}, - bitReverse(0)}}, - spannerData{table: "test", cols: []string{"a", "synth_id"}, vals: []interface{}{int64(7), bitReverse(1)}}, - spannerData{table: "test", cols: []string{"b", "synth_id"}, vals: []interface{}{float64(42.1), bitReverse(2)}}, - spannerData{table: "test", cols: []string{"c", "synth_id"}, vals: []interface{}{true, bitReverse(3)}}, - spannerData{table: "test", cols: []string{"d", "synth_id"}, vals: []interface{}{getDate("2019-10-29"), bitReverse(4)}}, - spannerData{table: "test", cols: []string{"e", "synth_id"}, vals: []interface{}{[]byte{0x89, 0x50}, bitReverse(5)}}, + fmt.Sprintf("%d", bitReverse(0))}}, + spannerData{table: "test", cols: []string{"a", "synth_id"}, vals: []interface{}{int64(7), fmt.Sprintf("%d", bitReverse(1))}}, + spannerData{table: "test", cols: []string{"b", "synth_id"}, vals: []interface{}{float64(42.1), fmt.Sprintf("%d", bitReverse(2))}}, + spannerData{table: "test", cols: []string{"c", "synth_id"}, vals: []interface{}{true, fmt.Sprintf("%d", bitReverse(3))}}, + spannerData{table: "test", cols: []string{"d", "synth_id"}, vals: []interface{}{getDate("2019-10-29"), fmt.Sprintf("%d", bitReverse(4))}}, + spannerData{table: "test", cols: []string{"e", "synth_id"}, vals: []interface{}{[]byte{0x89, 0x50}, fmt.Sprintf("%d", bitReverse(5))}}, spannerData{table: "test", cols: []string{"f", "synth_id"}, - vals: []interface{}{[]spanner.NullString{{StringVal: "42", Valid: true}, {StringVal: "6", Valid: true}}, bitReverse(6)}}, + vals: []interface{}{[]spanner.NullString{{StringVal: "42", Valid: true}, {StringVal: "6", Valid: true}}, fmt.Sprintf("%d", bitReverse(6))}}, }, }, } @@ -876,7 +876,7 @@ func TestProcessMySQLDump_AddPrimaryKeys(t *testing.T) { "productid": ddl.ColumnDef{Name: "productid", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "userid": ddl.ColumnDef{Name: "userid", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "quantity": ddl.ColumnDef{Name: "quantity", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}}}, }, @@ -891,7 +891,7 @@ func TestProcessMySQLDump_AddPrimaryKeys(t *testing.T) { "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "synth_id0": ddl.ColumnDef{Name: "synth_id0", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "synth_id1": ddl.ColumnDef{Name: "synth_id1", T: ddl.Type{Name: ddl.Int64}}, - "synth_id2": ddl.ColumnDef{Name: "synth_id2", T: ddl.Type{Name: ddl.Int64}}, + "synth_id2": ddl.ColumnDef{Name: "synth_id2", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id2"}}}}, }, diff --git a/sources/oracle/data.go b/sources/oracle/data.go index ee251ba230..5c1dea44f7 100644 --- a/sources/oracle/data.go +++ b/sources/oracle/data.go @@ -80,7 +80,7 @@ func convertData(conv *internal.Conv, srcTable string, srcCols []string, srcSche } if aux, ok := conv.SyntheticPKeys[spTable]; ok { c = append(c, aux.Col) - v = append(v, int64(bits.Reverse64(uint64(aux.Sequence)))) + v = append(v, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence))))) aux.Sequence++ conv.SyntheticPKeys[spTable] = aux } diff --git a/sources/oracle/data_test.go b/sources/oracle/data_test.go index 67390137a6..0e37ed04a6 100644 --- a/sources/oracle/data_test.go +++ b/sources/oracle/data_test.go @@ -132,14 +132,14 @@ func TestConvertsyntheticPKey(t *testing.T) { cols: []string{"a", "b", "c"}, vals: []string{"6", "6.6", "t"}, ecols: []string{"a", "b", "c", "synth_id"}, - evals: []interface{}{int64(6), float64(6.6), "t", int64(0)}, + evals: []interface{}{int64(6), float64(6.6), "t", fmt.Sprintf("%d", 0)}, }, { name: "Sequence 1", cols: []string{"a"}, vals: []string{"7"}, ecols: []string{"a", "synth_id"}, - evals: []interface{}{int64(7), int64(bits.Reverse64(1))}, + evals: []interface{}{int64(7), fmt.Sprintf("%d", int64(bits.Reverse64(1)))}, }, } tableName := "testtable" diff --git a/sources/postgres/data.go b/sources/postgres/data.go index a8392816f7..a64a3f5bbf 100644 --- a/sources/postgres/data.go +++ b/sources/postgres/data.go @@ -102,7 +102,7 @@ func ConvertData(conv *internal.Conv, srcTable string, srcCols []string, vals [] } if aux, ok := conv.SyntheticPKeys[spTable]; ok { c = append(c, aux.Col) - v = append(v, int64(bits.Reverse64(uint64(aux.Sequence)))) + v = append(v, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence))))) aux.Sequence++ conv.SyntheticPKeys[spTable] = aux } diff --git a/sources/postgres/data_test.go b/sources/postgres/data_test.go index 89d567373b..9b9718d4ef 100644 --- a/sources/postgres/data_test.go +++ b/sources/postgres/data_test.go @@ -258,14 +258,14 @@ func TestConvertData(t *testing.T) { cols: []string{"a", "b", "c"}, vals: []string{"6", "6.6", "true"}, ecols: []string{"a", "b", "c", "synth_id"}, - evals: []interface{}{int64(6), float64(6.6), true, int64(0)}, + evals: []interface{}{int64(6), float64(6.6), true, "0"}, }, { name: "Sequence 1", cols: []string{"a"}, vals: []string{"7"}, ecols: []string{"a", "synth_id"}, - evals: []interface{}{int64(7), int64(bits.Reverse64(1))}, + evals: []interface{}{int64(7), fmt.Sprintf("%d", int64(bits.Reverse64(1)))}, }, } conv := buildConv(spTable, srcTable) diff --git a/sources/postgres/infoschema.go b/sources/postgres/infoschema.go index bfbd6d6ebf..b1b267c611 100644 --- a/sources/postgres/infoschema.go +++ b/sources/postgres/infoschema.go @@ -156,7 +156,7 @@ func convertSQLRow(conv *internal.Conv, srcTable string, srcCols []string, srcSc } if aux, ok := conv.SyntheticPKeys[spTable]; ok { cs = append(cs, aux.Col) - vs = append(vs, int64(bits.Reverse64(uint64(aux.Sequence)))) + vs = append(vs, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence))))) aux.Sequence++ conv.SyntheticPKeys[spTable] = aux } diff --git a/sources/postgres/infoschema_test.go b/sources/postgres/infoschema_test.go index fe61a5c7ea..c1424d2caf 100644 --- a/sources/postgres/infoschema_test.go +++ b/sources/postgres/infoschema_test.go @@ -461,8 +461,8 @@ func TestConvertSqlRow_MultiCol(t *testing.T) { }) common.ProcessData(conv, InfoSchemaImpl{db}) assert.Equal(t, []spannerData{ - {table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"cat", float64(42.3), int64(0)}}, - {table: "test", cols: []string{"a", "c", "synth_id"}, vals: []interface{}{"dog", int64(22), int64(-9223372036854775808)}}}, + {table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"cat", float64(42.3), "0"}}, + {table: "test", cols: []string{"a", "c", "synth_id"}, vals: []interface{}{"dog", int64(22), "-9223372036854775808"}}}, rows) assert.Equal(t, int64(0), conv.Unexpecteds()) } diff --git a/sources/postgres/pgdump_test.go b/sources/postgres/pgdump_test.go index 38ec38b382..f0aceb49e8 100644 --- a/sources/postgres/pgdump_test.go +++ b/sources/postgres/pgdump_test.go @@ -131,7 +131,7 @@ func TestProcessPgDump(t *testing.T) { "productid": ddl.ColumnDef{Name: "productid", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "userid": ddl.ColumnDef{Name: "userid", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}, NotNull: true}, "quantity": ddl.ColumnDef{Name: "quantity", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}}}, }, @@ -181,7 +181,7 @@ func TestProcessPgDump(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.Int64}}, "d": ddl.ColumnDef{Name: "d", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "fk_test", Columns: []string{"d"}, ReferTable: "test", ReferColumns: []string{"a"}}}, @@ -207,7 +207,7 @@ func TestProcessPgDump(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.Int64}}, "d": ddl.ColumnDef{Name: "d", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "fk_test", Columns: []string{"d"}, ReferTable: "test", ReferColumns: []string{"a"}}}, @@ -234,7 +234,7 @@ func TestProcessPgDump(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.Int64}}, "d": ddl.ColumnDef{Name: "d", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "fk_test", Columns: []string{"c", "d"}, ReferTable: "test", ReferColumns: []string{"a", "b"}}}, @@ -271,7 +271,7 @@ func TestProcessPgDump(t *testing.T) { "e": ddl.ColumnDef{Name: "e", T: ddl.Type{Name: ddl.Int64}}, "f": ddl.ColumnDef{Name: "f", T: ddl.Type{Name: ddl.Int64}}, "g": ddl.ColumnDef{Name: "g", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "fk_test", Columns: []string{"e"}, ReferTable: "test", ReferColumns: []string{"a"}}, @@ -293,7 +293,7 @@ func TestProcessPgDump(t *testing.T) { "a": ddl.ColumnDef{Name: "a", T: ddl.Type{Name: ddl.Int64}}, "b": ddl.ColumnDef{Name: "b", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Indexes: []ddl.CreateIndex{ddl.CreateIndex{Name: "custom_index", Table: "test", Unique: false, Keys: []ddl.IndexKey{ddl.IndexKey{Col: "b", Desc: false}, ddl.IndexKey{Col: "c", Desc: false}}}}}}, @@ -314,7 +314,7 @@ func TestProcessPgDump(t *testing.T) { "a": ddl.ColumnDef{Name: "a", T: ddl.Type{Name: ddl.Int64}}, "b": ddl.ColumnDef{Name: "b", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Indexes: []ddl.CreateIndex{ddl.CreateIndex{Name: "custom_index", Table: "test", Unique: false, Keys: []ddl.IndexKey{ddl.IndexKey{Col: "b", Desc: true}, ddl.IndexKey{Col: "c", Desc: false}}}}}}, @@ -487,10 +487,10 @@ func TestProcessPgDump(t *testing.T) { "a3 b 7\n" + "\\.\n", expectedData: []spannerData{ - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a1", "b1", int64(42), bitReverse(0)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a22", "b99", int64(6), bitReverse(1)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a33", "b", int64(9), bitReverse(2)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), bitReverse(3)}}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a1", "b1", int64(42), fmt.Sprintf("%d", bitReverse(0))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a22", "b99", int64(6), fmt.Sprintf("%d", bitReverse(1))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a33", "b", int64(9), fmt.Sprintf("%d", bitReverse(2))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), fmt.Sprintf("%d", bitReverse(3))}}}, }, { name: "COPY FROM with empty cols", @@ -502,10 +502,10 @@ func TestProcessPgDump(t *testing.T) { "a3 b 7\n" + "\\.\n", expectedData: []spannerData{ - spannerData{table: "test", cols: []string{"b", "n", "synth_id"}, vals: []interface{}{"b1", int64(42), bitReverse(0)}}, - spannerData{table: "test", cols: []string{"a", "n", "synth_id"}, vals: []interface{}{"a22", int64(6), bitReverse(1)}}, - spannerData{table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"a33", "b", bitReverse(2)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), bitReverse(3)}}}, + spannerData{table: "test", cols: []string{"b", "n", "synth_id"}, vals: []interface{}{"b1", int64(42), fmt.Sprintf("%d", bitReverse(0))}}, + spannerData{table: "test", cols: []string{"a", "n", "synth_id"}, vals: []interface{}{"a22", int64(6), fmt.Sprintf("%d", bitReverse(1))}}, + spannerData{table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"a33", "b", fmt.Sprintf("%d", bitReverse(2))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), fmt.Sprintf("%d", bitReverse(3))}}}, }, { name: "INSERT", @@ -538,7 +538,7 @@ func TestProcessPgDump(t *testing.T) { input: "CREATE TABLE test (a text NOT NULL, b text NOT NULL, n bigint);\n" + "INSERT INTO test (a, b, n) VALUES ('a42', 'b6', 2);", expectedData: []spannerData{ - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a42", "b6", int64(2), bitReverse(0)}}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a42", "b6", int64(2), fmt.Sprintf("%d", bitReverse(0))}}}, }, { name: "INSERT with spaces", @@ -690,15 +690,15 @@ COPY test (id, a, b, c, d, e, f, g) FROM stdin; vals: []interface{}{int64(7), float64(42.1), true, getTime(t, "2019-10-29T05:30:00Z"), getDate("2019-10-29"), []byte{0x0, 0x1, 0xbe, 0xef}, []spanner.NullInt64{{Int64: 42, Valid: true}, {Int64: 6, Valid: true}}, - bitReverse(0)}}, - spannerData{table: "test", cols: []string{"int8", "synth_id"}, vals: []interface{}{int64(7), bitReverse(1)}}, - spannerData{table: "test", cols: []string{"float8", "synth_id"}, vals: []interface{}{float64(42.1), bitReverse(2)}}, - spannerData{table: "test", cols: []string{"bool", "synth_id"}, vals: []interface{}{true, bitReverse(3)}}, - spannerData{table: "test", cols: []string{"timestamp", "synth_id"}, vals: []interface{}{getTime(t, "2019-10-29T05:30:00Z"), bitReverse(4)}}, - spannerData{table: "test", cols: []string{"date", "synth_id"}, vals: []interface{}{getDate("2019-10-29"), bitReverse(5)}}, - spannerData{table: "test", cols: []string{"bytea", "synth_id"}, vals: []interface{}{[]byte{0x0, 0x1, 0xbe, 0xef}, bitReverse(6)}}, + fmt.Sprintf("%d", bitReverse(0))}}, + spannerData{table: "test", cols: []string{"int8", "synth_id"}, vals: []interface{}{int64(7), fmt.Sprintf("%d", bitReverse(1))}}, + spannerData{table: "test", cols: []string{"float8", "synth_id"}, vals: []interface{}{float64(42.1), fmt.Sprintf("%d", bitReverse(2))}}, + spannerData{table: "test", cols: []string{"bool", "synth_id"}, vals: []interface{}{true, fmt.Sprintf("%d", bitReverse(3))}}, + spannerData{table: "test", cols: []string{"timestamp", "synth_id"}, vals: []interface{}{getTime(t, "2019-10-29T05:30:00Z"), fmt.Sprintf("%d", bitReverse(4))}}, + spannerData{table: "test", cols: []string{"date", "synth_id"}, vals: []interface{}{getDate("2019-10-29"), fmt.Sprintf("%d", bitReverse(5))}}, + spannerData{table: "test", cols: []string{"bytea", "synth_id"}, vals: []interface{}{[]byte{0x0, 0x1, 0xbe, 0xef}, fmt.Sprintf("%d", bitReverse(6))}}, spannerData{table: "test", cols: []string{"arr", "synth_id"}, - vals: []interface{}{[]spanner.NullInt64{{Int64: 42, Valid: true}, {Int64: 6, Valid: true}}, bitReverse(7)}}, + vals: []interface{}{[]spanner.NullInt64{{Int64: 42, Valid: true}, {Int64: 6, Valid: true}}, fmt.Sprintf("%d", bitReverse(7))}}, }, }, } @@ -799,7 +799,7 @@ func TestProcessPgDumpPGTarget(t *testing.T) { "productid": ddl.ColumnDef{Name: "productid", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "userid": ddl.ColumnDef{Name: "userid", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}, NotNull: true}, "quantity": ddl.ColumnDef{Name: "quantity", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}}}, }, @@ -849,7 +849,7 @@ func TestProcessPgDumpPGTarget(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.Int64}}, "d": ddl.ColumnDef{Name: "d", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "fk_test", Columns: []string{"d"}, ReferTable: "test", ReferColumns: []string{"a"}}}, @@ -875,7 +875,7 @@ func TestProcessPgDumpPGTarget(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.Int64}}, "d": ddl.ColumnDef{Name: "d", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "fk_test", Columns: []string{"d"}, ReferTable: "test", ReferColumns: []string{"a"}}}, @@ -902,7 +902,7 @@ func TestProcessPgDumpPGTarget(t *testing.T) { ColDefs: map[string]ddl.ColumnDef{ "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.Int64}}, "d": ddl.ColumnDef{Name: "d", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "fk_test", Columns: []string{"c", "d"}, ReferTable: "test", ReferColumns: []string{"a", "b"}}}, @@ -939,7 +939,7 @@ func TestProcessPgDumpPGTarget(t *testing.T) { "e": ddl.ColumnDef{Name: "e", T: ddl.Type{Name: ddl.Int64}}, "f": ddl.ColumnDef{Name: "f", T: ddl.Type{Name: ddl.Int64}}, "g": ddl.ColumnDef{Name: "g", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Fks: []ddl.Foreignkey{ddl.Foreignkey{Name: "fk_test", Columns: []string{"e"}, ReferTable: "test", ReferColumns: []string{"a"}}, @@ -961,7 +961,7 @@ func TestProcessPgDumpPGTarget(t *testing.T) { "a": ddl.ColumnDef{Name: "a", T: ddl.Type{Name: ddl.Int64}}, "b": ddl.ColumnDef{Name: "b", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Indexes: []ddl.CreateIndex{ddl.CreateIndex{Name: "custom_index", Table: "test", Unique: false, Keys: []ddl.IndexKey{ddl.IndexKey{Col: "b", Desc: false}, ddl.IndexKey{Col: "c", Desc: false}}}}}}, @@ -982,7 +982,7 @@ func TestProcessPgDumpPGTarget(t *testing.T) { "a": ddl.ColumnDef{Name: "a", T: ddl.Type{Name: ddl.Int64}}, "b": ddl.ColumnDef{Name: "b", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}, Indexes: []ddl.CreateIndex{ddl.CreateIndex{Name: "custom_index", Table: "test", Unique: false, Keys: []ddl.IndexKey{ddl.IndexKey{Col: "b", Desc: true}, ddl.IndexKey{Col: "c", Desc: false}}}}}}, @@ -1155,10 +1155,10 @@ func TestProcessPgDumpPGTarget(t *testing.T) { "a3 b 7\n" + "\\.\n", expectedData: []spannerData{ - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a1", "b1", int64(42), bitReverse(0)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a22", "b99", int64(6), bitReverse(1)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a33", "b", int64(9), bitReverse(2)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), bitReverse(3)}}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a1", "b1", int64(42), fmt.Sprintf("%d", bitReverse(0))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a22", "b99", int64(6), fmt.Sprintf("%d", bitReverse(1))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a33", "b", int64(9), fmt.Sprintf("%d", bitReverse(2))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), fmt.Sprintf("%d", bitReverse(3))}}}, }, { name: "COPY FROM with empty cols", @@ -1170,10 +1170,10 @@ func TestProcessPgDumpPGTarget(t *testing.T) { "a3 b 7\n" + "\\.\n", expectedData: []spannerData{ - spannerData{table: "test", cols: []string{"b", "n", "synth_id"}, vals: []interface{}{"b1", int64(42), bitReverse(0)}}, - spannerData{table: "test", cols: []string{"a", "n", "synth_id"}, vals: []interface{}{"a22", int64(6), bitReverse(1)}}, - spannerData{table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"a33", "b", bitReverse(2)}}, - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), bitReverse(3)}}}, + spannerData{table: "test", cols: []string{"b", "n", "synth_id"}, vals: []interface{}{"b1", int64(42), fmt.Sprintf("%d", bitReverse(0))}}, + spannerData{table: "test", cols: []string{"a", "n", "synth_id"}, vals: []interface{}{"a22", int64(6), fmt.Sprintf("%d", bitReverse(1))}}, + spannerData{table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"a33", "b", fmt.Sprintf("%d", bitReverse(2))}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a3", "b", int64(7), fmt.Sprintf("%d", bitReverse(3))}}}, }, { name: "INSERT", @@ -1206,7 +1206,7 @@ func TestProcessPgDumpPGTarget(t *testing.T) { input: "CREATE TABLE test (a text NOT NULL, b text NOT NULL, n bigint);\n" + "INSERT INTO test (a, b, n) VALUES ('a42', 'b6', 2);", expectedData: []spannerData{ - spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a42", "b6", int64(2), bitReverse(0)}}}, + spannerData{table: "test", cols: []string{"a", "b", "n", "synth_id"}, vals: []interface{}{"a42", "b6", int64(2), fmt.Sprintf("%d", bitReverse(0))}}}, }, { name: "INSERT with spaces", @@ -1358,15 +1358,15 @@ COPY test (id, a, b, c, d, e) FROM stdin; vals: []interface{}{int64(7), float64(42.1), true, getTime(t, "2019-10-29T05:30:00Z"), getDate("2019-10-29"), []byte{0x0, 0x1, 0xbe, 0xef}, "{42,6}", - bitReverse(0)}}, - spannerData{table: "test", cols: []string{"int8", "synth_id"}, vals: []interface{}{int64(7), bitReverse(1)}}, - spannerData{table: "test", cols: []string{"float8", "synth_id"}, vals: []interface{}{float64(42.1), bitReverse(2)}}, - spannerData{table: "test", cols: []string{"bool", "synth_id"}, vals: []interface{}{true, bitReverse(3)}}, - spannerData{table: "test", cols: []string{"timestamp", "synth_id"}, vals: []interface{}{getTime(t, "2019-10-29T05:30:00Z"), bitReverse(4)}}, - spannerData{table: "test", cols: []string{"date", "synth_id"}, vals: []interface{}{getDate("2019-10-29"), bitReverse(5)}}, - spannerData{table: "test", cols: []string{"bytea", "synth_id"}, vals: []interface{}{[]byte{0x0, 0x1, 0xbe, 0xef}, bitReverse(6)}}, - spannerData{table: "test", cols: []string{"arr", "synth_id"}, vals: []interface{}{"{42,6}", bitReverse(7)}}, - spannerData{table: "test", cols: []string{"arr", "synth_id"}, vals: []interface{}{"{42, 6}", bitReverse(8)}}, + fmt.Sprintf("%d", bitReverse(0))}}, + spannerData{table: "test", cols: []string{"int8", "synth_id"}, vals: []interface{}{int64(7), fmt.Sprintf("%d", bitReverse(1))}}, + spannerData{table: "test", cols: []string{"float8", "synth_id"}, vals: []interface{}{float64(42.1), fmt.Sprintf("%d", bitReverse(2))}}, + spannerData{table: "test", cols: []string{"bool", "synth_id"}, vals: []interface{}{true, fmt.Sprintf("%d", bitReverse(3))}}, + spannerData{table: "test", cols: []string{"timestamp", "synth_id"}, vals: []interface{}{getTime(t, "2019-10-29T05:30:00Z"), fmt.Sprintf("%d", bitReverse(4))}}, + spannerData{table: "test", cols: []string{"date", "synth_id"}, vals: []interface{}{getDate("2019-10-29"), fmt.Sprintf("%d", bitReverse(5))}}, + spannerData{table: "test", cols: []string{"bytea", "synth_id"}, vals: []interface{}{[]byte{0x0, 0x1, 0xbe, 0xef}, fmt.Sprintf("%d", bitReverse(6))}}, + spannerData{table: "test", cols: []string{"arr", "synth_id"}, vals: []interface{}{"{42,6}", fmt.Sprintf("%d", bitReverse(7))}}, + spannerData{table: "test", cols: []string{"arr", "synth_id"}, vals: []interface{}{"{42, 6}", fmt.Sprintf("%d", bitReverse(8))}}, }, }, } @@ -1441,7 +1441,7 @@ func TestProcessPgDump_AddPrimaryKeys(t *testing.T) { "productid": ddl.ColumnDef{Name: "productid", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "userid": ddl.ColumnDef{Name: "userid", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "quantity": ddl.ColumnDef{Name: "quantity", T: ddl.Type{Name: ddl.Int64}}, - "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}}, + "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}}}, }, @@ -1456,7 +1456,7 @@ func TestProcessPgDump_AddPrimaryKeys(t *testing.T) { "synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "synth_id0": ddl.ColumnDef{Name: "synth_id0", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}}, "synth_id1": ddl.ColumnDef{Name: "synth_id1", T: ddl.Type{Name: ddl.Int64}}, - "synth_id2": ddl.ColumnDef{Name: "synth_id2", T: ddl.Type{Name: ddl.Int64}}, + "synth_id2": ddl.ColumnDef{Name: "synth_id2", T: ddl.Type{Name: ddl.String, Len: 50}}, }, Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id2"}}}}, }, diff --git a/sources/sqlserver/data.go b/sources/sqlserver/data.go index 2a4fe3b97e..fd4963d58f 100644 --- a/sources/sqlserver/data.go +++ b/sources/sqlserver/data.go @@ -77,7 +77,7 @@ func ConvertData(conv *internal.Conv, srcTable string, srcCols []string, srcSche } if aux, ok := conv.SyntheticPKeys[spTable]; ok { c = append(c, aux.Col) - v = append(v, int64(bits.Reverse64(uint64(aux.Sequence)))) + v = append(v, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence))))) aux.Sequence++ conv.SyntheticPKeys[spTable] = aux } diff --git a/sources/sqlserver/data_test.go b/sources/sqlserver/data_test.go index 4e43c9be4f..9cfb351b43 100644 --- a/sources/sqlserver/data_test.go +++ b/sources/sqlserver/data_test.go @@ -237,14 +237,14 @@ func TestConvertsyntheticPKey(t *testing.T) { cols: []string{"a", "b", "c"}, vals: []string{"6", "6.6", "true"}, ecols: []string{"a", "b", "c", "synth_id"}, - evals: []interface{}{int64(6), float64(6.6), true, int64(0)}, + evals: []interface{}{int64(6), float64(6.6), true, fmt.Sprintf("%d", 0)}, }, { name: "Sequence 1", cols: []string{"a"}, vals: []string{"7"}, ecols: []string{"a", "synth_id"}, - evals: []interface{}{int64(7), int64(bits.Reverse64(1))}, + evals: []interface{}{int64(7), fmt.Sprintf("%d", int64(bits.Reverse64(1)))}, }, } tableName := "testtable" diff --git a/streaming/streaming.go b/streaming/streaming.go index ae8d7dc85f..d36551f263 100644 --- a/streaming/streaming.go +++ b/streaming/streaming.go @@ -18,10 +18,13 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/url" + "strings" "time" dataflow "cloud.google.com/go/dataflow/apiv1beta3" datastream "cloud.google.com/go/datastream/apiv1alpha1" + "cloud.google.com/go/storage" datastreampb "google.golang.org/genproto/googleapis/cloud/datastream/v1alpha1" dataflowpb "google.golang.org/genproto/googleapis/dataflow/v1beta3" fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb" @@ -58,6 +61,7 @@ type DataflowCfg struct { type StreamingCfg struct { DatastreamCfg DatastreamCfg DataflowCfg DataflowCfg + TmpDir string } // VerifyAndUpdateCfg checks the fields and errors out if certain fields are empty. @@ -100,11 +104,37 @@ func VerifyAndUpdateCfg(streamingCfg *StreamingCfg, dbName string) error { if dfCfg.JobName == "" { // Update names to have more info like dbname. jobName, err := utils.GenerateName("hb-dataflow-" + dbName) + jobName = strings.Replace(jobName, "_", "-", -1) if err != nil { return fmt.Errorf("error generating stream name: %v", err) } streamingCfg.DataflowCfg.JobName = jobName } + + filePath := streamingCfg.TmpDir + if filePath[len(filePath)-1] != '/' { + filePath = filePath + "/" + streamingCfg.TmpDir = filePath + } + u, err := url.Parse(filePath) + if err != nil { + return fmt.Errorf("parseFilePath: unable to parse file path %s", filePath) + } + if u.Scheme != "gs" { + return fmt.Errorf("not a valid GCS path: %s, should start with 'gs'", filePath) + } + bucketName := u.Host + ctx := context.Background() + client, err := storage.NewClient(ctx) + if err != nil { + return fmt.Errorf("failed to create GCS client") + } + defer client.Close() + bucket := client.Bucket(bucketName) + _, err = bucket.Attrs(ctx) + if err != nil { + return fmt.Errorf("bucket %s does not exist", bucketName) + } return nil } @@ -187,7 +217,7 @@ func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, pro SourceConfig: srcCfg, DestinationConfig: dstCfg, State: datastreampb.Stream_RUNNING, - BackfillStrategy: &datastreampb.Stream_BackfillNone{BackfillNone: &datastreampb.Stream_BackfillNoneStrategy{}}, + BackfillStrategy: &datastreampb.Stream_BackfillAll{BackfillAll: &datastreampb.Stream_BackfillAllStrategy{}}, } createStreamRequest := &datastreampb.CreateStreamRequest{ Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, datastreamCfg.StreamLocation), @@ -199,13 +229,13 @@ func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, pro dsOp, err := dsClient.CreateStream(ctx, createStreamRequest) if err != nil { - fmt.Printf("createStreamRequest: %+v\n", createStreamRequest) + fmt.Printf("cannot create stream: createStreamRequest: %+v\n", createStreamRequest) return fmt.Errorf("cannot create stream: %v ", err) } _, err = dsOp.Wait(ctx) if err != nil { - fmt.Printf("createStreamRequest: %+v\n", createStreamRequest) + fmt.Printf("datastream create operation failed: createStreamRequest: %+v\n", createStreamRequest) return fmt.Errorf("datastream create operation failed: %v", err) } fmt.Println("Successfully created stream ", datastreamCfg.StreamId) @@ -229,8 +259,10 @@ func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, pro } // LaunchDataflowJob populates the parameters from the streaming config and triggers a Dataflow job. -func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile, datastreamCfg DatastreamCfg, dataflowCfg DataflowCfg) error { +func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg) error { project, instance, dbName, _ := targetProfile.GetResourceIds(ctx, time.Now(), "", nil) + dataflowCfg := streamingCfg.DataflowCfg + datastreamCfg := streamingCfg.DatastreamCfg fmt.Println("Launching dataflow job ", dataflowCfg.JobName, " in ", project, "-", dataflowCfg.Location) c, err := dataflow.NewFlexTemplatesClient(ctx) @@ -265,6 +297,11 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile "streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", project, datastreamCfg.StreamLocation, datastreamCfg.StreamId), "instanceId": instance, "databaseId": dbName, + "sessionFilePath": streamingCfg.TmpDir + "session.json", + }, + Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{ + NumWorkers: 60, + MaxWorkers: 60, }, } @@ -313,7 +350,7 @@ func StartDatastream(ctx context.Context, sourceProfile profiles.SourceProfile, } func StartDataflow(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg) error { - err := LaunchDataflowJob(ctx, targetProfile, streamingCfg.DatastreamCfg, streamingCfg.DataflowCfg) + err := LaunchDataflowJob(ctx, targetProfile, streamingCfg) if err != nil { return fmt.Errorf("error launching dataflow: %v", err) } diff --git a/test_data/mysqldump.test.out b/test_data/mysqldump.test.out index 3a44f3296e..ac7b676b60 100644 --- a/test_data/mysqldump.test.out +++ b/test_data/mysqldump.test.out @@ -124,4 +124,4 @@ UNLOCK TABLES; /*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; --- Dump completed on 2020-08-10 17:40:01 +-- Dump completed on 2020-08-10 17:40:01 \ No newline at end of file diff --git a/ui/src/app/app-routing.module.ts b/ui/src/app/app-routing.module.ts index 06f7481b5a..4ef22270b1 100644 --- a/ui/src/app/app-routing.module.ts +++ b/ui/src/app/app-routing.module.ts @@ -8,6 +8,7 @@ import { LoadSessionComponent } from './components/load-session/load-session.com import { SourceSelectionComponent } from './components/source-selection/source-selection.component' import { SummaryComponent } from './components/summary/summary.component' import { WorkspaceComponent } from './components/workspace/workspace.component' +import { PrepareMigrationComponent } from './components/prepare-migration/prepare-migration.component' const routes: Routes = [ @@ -42,6 +43,10 @@ const routes: Routes = [ path: 'workspace', component: WorkspaceComponent, }, + { + path: 'prepare-migration', + component: PrepareMigrationComponent, + }, { path: 'instruction', component: InstructionComponent, diff --git a/ui/src/app/app.module.ts b/ui/src/app/app.module.ts index 46f9e58ad2..fefa62c055 100644 --- a/ui/src/app/app.module.ts +++ b/ui/src/app/app.module.ts @@ -30,7 +30,9 @@ import { AddIndexFormComponent } from './components/add-index-form/add-index-for import { EditGlobalDatatypeFormComponent } from './components/edit-global-datatype-form/edit-global-datatype-form.component' import { SidenavViewAssessmentComponent } from './components/sidenav-view-assessment/sidenav-view-assessment.component' import { SidenavSaveSessionComponent } from './components/sidenav-save-session/sidenav-save-session.component' -import { DropIndexDialogComponent } from './components/drop-index-dialog/drop-index-dialog.component' +import { DropIndexDialogComponent } from './components/drop-index-dialog/drop-index-dialog.component'; +import { PrepareMigrationComponent } from './components/prepare-migration/prepare-migration.component'; +import { TargetDetailsFormComponent } from './components/target-details-form/target-details-form.component' @NgModule({ declarations: [ @@ -58,6 +60,8 @@ import { DropIndexDialogComponent } from './components/drop-index-dialog/drop-in SidenavViewAssessmentComponent, SidenavSaveSessionComponent, DropIndexDialogComponent, + PrepareMigrationComponent, + TargetDetailsFormComponent, ], imports: [ BrowserModule, diff --git a/ui/src/app/components/home/home.component.html b/ui/src/app/components/home/home.component.html index ed484fa76f..e5786b7ddd 100644 --- a/ui/src/app/components/home/home.component.html +++ b/ui/src/app/components/home/home.component.html @@ -1,9 +1,9 @@
| Title | ++ {{ element.title }} + | +Source | +{{ element.source }} | +Destination | +{{ element.target }} | +
|---|
+ 1 + Set datastream connection profiles +
++ 2 + Setup Target details +
+