Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,43 @@ func PreloadGCSFiles(tables []ManifestTable) ([]ManifestTable, error) {
return tables, nil
}

func WriteToGCS(filePath, fileName, data string) error {
ctx := context.Background()

client, err := storage.NewClient(ctx)
if err != nil {
fmt.Printf("Failed to create GCS client")
return err
}
defer client.Close()
if filePath[len(filePath)-1] != '/' {
filePath = filePath + "/"
}
u, err := url.Parse(filePath)
if err != nil {
fmt.Printf("parseFilePath: unable to parse file path %s", filePath)
return err
}
if u.Scheme != "gs" {
fmt.Printf("not a valid GCS path: %s, should start with 'gs'", filePath)
return err
}
bucketName := u.Host
bucket := client.Bucket(bucketName)
obj := bucket.Object(u.Path[1:] + fileName)

w := obj.NewWriter(ctx)
if _, err := fmt.Fprint(w, data); err != nil {
fmt.Printf("Failed to write to Cloud Storage: %s", filePath)
return err
}
if err := w.Close(); err != nil {
fmt.Printf("Failed to close GCS file: %s", filePath)
return err
}
return nil
}

// GetProject returns the cloud project we should use for accessing Spanner.
// Use environment variable GCLOUD_PROJECT if it is set.
// Otherwise, use the default project returned from gcloud.
Expand Down
28 changes: 22 additions & 6 deletions conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ func performSnapshotMigration(config writer.BatchWriterConfig, conv *internal.Co
return batchWriter, nil
}

func snapshotMigrationHandler(sourceProfile profiles.SourceProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, infoSchema common.InfoSchema) (*writer.BatchWriter, error) {
switch sourceProfile.Driver {
// Skip snapshot migration via harbourbridge for mysql and oracle since dataflow job will job will handle this from backfilled data.
case constants.MYSQL, constants.ORACLE:
return &writer.BatchWriter{}, nil
case constants.DYNAMODB:
return performSnapshotMigration(config, conv, client, infoSchema)
default:
return &writer.BatchWriter{}, fmt.Errorf("streaming migration not supported for driver %s", sourceProfile.Driver)
}
}

func dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client) (*writer.BatchWriter, error) {
infoSchema, err := GetInfoSchema(sourceProfile, targetProfile)
if err != nil {
Expand All @@ -195,16 +207,19 @@ func dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile,
if err != nil {
return nil, err
}
}
bw, err := performSnapshotMigration(config, conv, client, infoSchema)
if err != nil {
return nil, err
}
if sourceProfile.Conn.Streaming {
bw, err := snapshotMigrationHandler(sourceProfile, config, conv, client, infoSchema)
if err != nil {
return nil, err
}
err = infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo)
if err != nil {
return nil, err
}
return bw, nil
}
bw, err := performSnapshotMigration(config, conv, client, infoSchema)
if err != nil {
return nil, err
}
return bw, nil
}
Expand Down Expand Up @@ -458,6 +473,7 @@ func CreateOrUpdateDatabase(ctx context.Context, adminClient *database.DatabaseA
}
// Adding migration metadata to the outgoing context.
migrationData := metrics.GetMigrationData(conv, driver, targetDb, constants.SchemaConv)
fmt.Println(migrationData.String())
serializedMigrationData, _ := proto.Marshal(migrationData)
migrationMetadataValue := base64.StdEncoding.EncodeToString(serializedMigrationData)
ctx = metadata.AppendToOutgoingContext(ctx, migrationMetadataKey, migrationMetadataValue)
Expand Down
2 changes: 1 addition & 1 deletion internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (conv *Conv) AddPrimaryKeys() {
if !primaryKeyPopulated {
k := conv.buildPrimaryKey(t)
ct.ColNames = append(ct.ColNames, k)
ct.ColDefs[k] = ddl.ColumnDef{Name: k, T: ddl.Type{Name: ddl.Int64}}
ct.ColDefs[k] = ddl.ColumnDef{Name: k, T: ddl.Type{Name: ddl.String, Len: 50}}
ct.Pks = []ddl.IndexKey{{Col: k}}
conv.SyntheticPKeys[t] = SyntheticPKey{k, 0}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestAddPrimaryKeys(t *testing.T) {
ColDefs: map[string]ddl.ColumnDef{
"a": {Name: "a", T: ddl.Type{Name: ddl.Int64}},
"b": {Name: "b", T: ddl.Type{Name: ddl.Float64}},
"synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.Int64}},
"synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}},
},
Pks: []ddl.IndexKey{{Col: "synth_id"}},
Indexes: []ddl.CreateIndex{{Name: "", Table: "", Unique: false, Keys: []ddl.IndexKey{{Col: "b"}}}},
Expand Down Expand Up @@ -246,7 +246,7 @@ func TestAddPrimaryKeys(t *testing.T) {
ColDefs: map[string]ddl.ColumnDef{
"a": {Name: "a", T: ddl.Type{Name: ddl.Int64}},
"b": {Name: "b", T: ddl.Type{Name: ddl.Float64}},
"synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.Int64}},
"synth_id": {Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}},
},
Pks: []ddl.IndexKey{{Col: "synth_id"}}},
},
Expand Down
2 changes: 1 addition & 1 deletion sources/mysql/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func ConvertData(conv *internal.Conv, srcTable string, srcCols []string, srcSche
}
if aux, ok := conv.SyntheticPKeys[spTable]; ok {
c = append(c, aux.Col)
v = append(v, int64(bits.Reverse64(uint64(aux.Sequence))))
v = append(v, fmt.Sprintf("%d", int64(bits.Reverse64(uint64(aux.Sequence)))))
aux.Sequence++
conv.SyntheticPKeys[spTable] = aux
}
Expand Down
4 changes: 2 additions & 2 deletions sources/mysql/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,14 @@ func TestConvertsyntheticPKey(t *testing.T) {
cols: []string{"a", "b", "c"},
vals: []string{"6", "6.6", "true"},
ecols: []string{"a", "b", "c", "synth_id"},
evals: []interface{}{int64(6), float64(6.6), true, int64(0)},
evals: []interface{}{int64(6), float64(6.6), true, fmt.Sprintf("%d", 0)},
},
{
name: "Sequence 1",
cols: []string{"a"},
vals: []string{"7"},
ecols: []string{"a", "synth_id"},
evals: []interface{}{int64(7), int64(bits.Reverse64(1))},
evals: []interface{}{int64(7), fmt.Sprintf("%d", int64(bits.Reverse64(1)))},
},
}
tableName := "testtable"
Expand Down
18 changes: 17 additions & 1 deletion sources/mysql/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package mysql
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sort"
"strings"
Expand All @@ -25,6 +26,7 @@ import (
_ "github.com/go-sql-driver/mysql" // The driver should be used via the database/sql package.
_ "github.com/lib/pq"

"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/profiles"
"github.com/cloudspannerecosystem/harbourbridge/schema"
Expand Down Expand Up @@ -350,8 +352,22 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)
err := streaming.StartDataflow(ctx, isi.SourceProfile, isi.TargetProfile, streamingCfg)
convJSON, err := json.MarshalIndent(conv, "", " ")
if err != nil {
err = fmt.Errorf("can't encode session state to JSON: %v", err)
return err
}
fmt.Printf("Writing session file to GCS...")
err = utils.WriteToGCS(streamingCfg.TmpDir, "session.json", string(convJSON))
if err != nil {
err = fmt.Errorf("error writing session file to GCS: %v", err)
return err
}
fmt.Println("Done")

err = streaming.StartDataflow(ctx, isi.SourceProfile, isi.TargetProfile, streamingCfg)
if err != nil {
err = fmt.Errorf("error starting dataflow: %v", err)
return err
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions sources/mysql/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func TestProcessData_MultiCol(t *testing.T) {
"a": ddl.ColumnDef{Name: "a", T: ddl.Type{Name: ddl.String, Len: ddl.MaxLength}, NotNull: true},
"b": ddl.ColumnDef{Name: "b", T: ddl.Type{Name: ddl.Float64}},
"c": ddl.ColumnDef{Name: "c", T: ddl.Type{Name: ddl.Int64}},
"synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.Int64}},
"synth_id": ddl.ColumnDef{Name: "synth_id", T: ddl.Type{Name: ddl.String, Len: 50}},
},
Pks: []ddl.IndexKey{ddl.IndexKey{Col: "synth_id"}}},
}
Expand All @@ -404,8 +404,8 @@ func TestProcessData_MultiCol(t *testing.T) {
})
common.ProcessData(conv, isi)
assert.Equal(t, []spannerData{
{table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"cat", float64(42.3), int64(0)}},
{table: "test", cols: []string{"a", "c", "synth_id"}, vals: []interface{}{"dog", int64(22), int64(-9223372036854775808)}}},
{table: "test", cols: []string{"a", "b", "synth_id"}, vals: []interface{}{"cat", float64(42.3), "0"}},
{table: "test", cols: []string{"a", "c", "synth_id"}, vals: []interface{}{"dog", int64(22), "-9223372036854775808"}}},
rows)
assert.Equal(t, int64(0), conv.Unexpecteds())
}
Expand Down
Loading