Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"fmt"
"time"

"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/profiles"
)

var (
Expand All @@ -37,13 +39,20 @@ var (
// 2. Create database (if schemaOnly is set to false)
// 3. Run data conversion (if schemaOnly is set to false)
// 4. Generate report
func CommandLine(ctx context.Context, driver, targetDb, dbURI string, dataOnly, schemaOnly, skipForeignKeys bool, schemaSampleSize int64, sessionJSON string, ioHelper *conversion.IOStreams, outputFilePrefix string, now time.Time) error {
func CommandLine(ctx context.Context, driver, targetDb, dbURI string, dataOnly, schemaOnly, skipForeignKeys bool, schemaSampleSize int64, sessionJSON string, ioHelper *utils.IOStreams, outputFilePrefix string, now time.Time) error {
var conv *internal.Conv
var err error
// Creating profiles from legacy flags. We only pass schema-sample-size here because thats the
// only flag passed through the arguments. Dumpfile params are contained within ioHelper
// and direct connect params will be fetched from the env variables.
sourceProfile, _ := profiles.NewSourceProfile(fmt.Sprintf("schema-sample-size=%d", schemaSampleSize), driver)
sourceProfile.Driver = driver
targetProfile, _ := profiles.NewTargetProfile("")
targetProfile.TargetDb = targetDb
if !dataOnly {
// We pass an empty string to the sqlConnectionStr parameter as this is the legacy codepath,
// which reads the environment variables and constructs the string later on.
conv, err = conversion.SchemaConv(driver, "", targetDb, ioHelper, schemaSampleSize)
conv, err = conversion.SchemaConv(sourceProfile, targetProfile, ioHelper)
if err != nil {
return err
}
Expand All @@ -64,24 +73,24 @@ func CommandLine(ctx context.Context, driver, targetDb, dbURI string, dataOnly,
return err
}
}
adminClient, err := conversion.NewDatabaseAdminClient(ctx)
adminClient, err := utils.NewDatabaseAdminClient(ctx)
if err != nil {
return fmt.Errorf("can't create admin client: %w", conversion.AnalyzeError(err, dbURI))
return fmt.Errorf("can't create admin client: %w", utils.AnalyzeError(err, dbURI))
}
defer adminClient.Close()
err = conversion.CreateOrUpdateDatabase(ctx, adminClient, dbURI, conv, ioHelper.Out)
if err != nil {
return fmt.Errorf("can't create/update database: %v", err)
}

client, err := conversion.GetClient(ctx, dbURI)
client, err := utils.GetClient(ctx, dbURI)
if err != nil {
return fmt.Errorf("can't create client for db %s: %v", dbURI, err)
}

// We pass an empty string to the sqlConnectionStr parameter as this is the legacy codepath,
// which reads the environment variables and constructs the string later on.
bw, err := conversion.DataConv(driver, "", ioHelper, client, conv, dataOnly, schemaSampleSize)
bw, err := conversion.DataConv(sourceProfile, targetProfile, ioHelper, client, conv, dataOnly)
if err != nil {
return fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
}
Expand All @@ -90,7 +99,7 @@ func CommandLine(ctx context.Context, driver, targetDb, dbURI string, dataOnly,
return fmt.Errorf("can't perform update schema on db %s with foreign keys: %v", dbURI, err)
}
}
banner := conversion.GetBanner(now, dbURI)
banner := utils.GetBanner(now, dbURI)
conversion.Report(driver, bw.DroppedRowsByTable(), ioHelper.BytesRead, banner, conv, outputFilePrefix+reportFile, ioHelper.Out)
conversion.WriteBadData(bw, conv, banner, outputFilePrefix+badDataFile, ioHelper.Out)
return nil
Expand Down
111 changes: 0 additions & 111 deletions cmd/common.go

This file was deleted.

34 changes: 18 additions & 16 deletions cmd/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"path"
"time"

"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/profiles"
"github.com/google/subcommands"
)

Expand Down Expand Up @@ -64,32 +66,32 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
}
}()

sourceProfile, err := NewSourceProfile(cmd.sourceProfile, cmd.source)
sourceProfile, err := profiles.NewSourceProfile(cmd.sourceProfile, cmd.source)
if err != nil {
return subcommands.ExitUsageError
}
driverName, err := sourceProfile.ToLegacyDriver(cmd.source)
sourceProfile.Driver, err = sourceProfile.ToLegacyDriver(cmd.source)
if err != nil {
return subcommands.ExitUsageError
}

targetProfile, err := NewTargetProfile(cmd.targetProfile)
targetProfile, err := profiles.NewTargetProfile(cmd.targetProfile)
if err != nil {
return subcommands.ExitUsageError
}
targetDb := targetProfile.ToLegacyTargetDb()
targetProfile.TargetDb = targetProfile.ToLegacyTargetDb()

dumpFilePath := ""
if sourceProfile.ty == SourceProfileTypeFile && (sourceProfile.file.format == "" || sourceProfile.file.format == "dump") {
dumpFilePath = sourceProfile.file.path
if sourceProfile.Ty == profiles.SourceProfileTypeFile && (sourceProfile.File.Format == "" || sourceProfile.File.Format == "dump") {
dumpFilePath = sourceProfile.File.Path
}
ioHelper := conversion.NewIOStreams(driverName, dumpFilePath)
ioHelper := utils.NewIOStreams(sourceProfile.Driver, dumpFilePath)
if ioHelper.SeekableIn != nil {
defer ioHelper.In.Close()
}

now := time.Now()
project, instance, dbName, err := getResourceIds(ctx, targetProfile, now, driverName, ioHelper.Out)
project, instance, dbName, err := profiles.GetResourceIds(ctx, targetProfile, now, sourceProfile.Driver, ioHelper.Out)
if err != nil {
return subcommands.ExitUsageError
}
Expand All @@ -105,18 +107,18 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
if err != nil {
return subcommands.ExitUsageError
}
if targetDb != "" && conv.TargetDb != targetDb {
err = fmt.Errorf("running data migration for Spanner dialect: %v, whereas schema mapping was done for dialect: %v", targetDb, conv.TargetDb)
if targetProfile.TargetDb != "" && conv.TargetDb != targetProfile.TargetDb {
err = fmt.Errorf("running data migration for Spanner dialect: %v, whereas schema mapping was done for dialect: %v", targetProfile.TargetDb, conv.TargetDb)
return subcommands.ExitUsageError
}

adminClient, err := conversion.NewDatabaseAdminClient(ctx)
adminClient, err := utils.NewDatabaseAdminClient(ctx)
if err != nil {
err = fmt.Errorf("can't create admin client: %w", conversion.AnalyzeError(err, dbURI))
err = fmt.Errorf("can't create admin client: %w", utils.AnalyzeError(err, dbURI))
return subcommands.ExitFailure
}
defer adminClient.Close()
client, err := conversion.GetClient(ctx, dbURI)
client, err := utils.GetClient(ctx, dbURI)
if err != nil {
err = fmt.Errorf("can't create client for db %s: %v", dbURI, err)
return subcommands.ExitFailure
Expand All @@ -129,7 +131,7 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
return subcommands.ExitFailure
}

bw, err := conversion.DataConv(driverName, getSQLConnectionStr(sourceProfile), &ioHelper, client, conv, true, getSchemaSampleSize(sourceProfile))
bw, err := conversion.DataConv(sourceProfile, targetProfile, &ioHelper, client, conv, true)
if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
return subcommands.ExitFailure
Expand All @@ -140,8 +142,8 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
return subcommands.ExitFailure
}
}
banner := conversion.GetBanner(now, dbURI)
conversion.Report(driverName, bw.DroppedRowsByTable(), ioHelper.BytesRead, banner, conv, cmd.filePrefix+reportFile, ioHelper.Out)
banner := utils.GetBanner(now, dbURI)
conversion.Report(sourceProfile.Driver, bw.DroppedRowsByTable(), ioHelper.BytesRead, banner, conv, cmd.filePrefix+reportFile, ioHelper.Out)
conversion.WriteBadData(bw, conv, banner, cmd.filePrefix+badDataFile, ioHelper.Out)
return subcommands.ExitSuccess
}
22 changes: 12 additions & 10 deletions cmd/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"path"
"time"

"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/profiles"
"github.com/google/subcommands"
)

Expand Down Expand Up @@ -60,33 +62,33 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa
}
}()

sourceProfile, err := NewSourceProfile(cmd.sourceProfile, cmd.source)
sourceProfile, err := profiles.NewSourceProfile(cmd.sourceProfile, cmd.source)
if err != nil {
return subcommands.ExitUsageError
}
driverName, err := sourceProfile.ToLegacyDriver(cmd.source)
sourceProfile.Driver, err = sourceProfile.ToLegacyDriver(cmd.source)
if err != nil {
return subcommands.ExitUsageError
}

targetProfile, err := NewTargetProfile(cmd.targetProfile)
targetProfile, err := profiles.NewTargetProfile(cmd.targetProfile)
if err != nil {
return subcommands.ExitUsageError
}
targetDb := targetProfile.ToLegacyTargetDb()
targetProfile.TargetDb = targetProfile.ToLegacyTargetDb()

dumpFilePath := ""
if sourceProfile.ty == SourceProfileTypeFile && (sourceProfile.file.format == "" || sourceProfile.file.format == "dump") {
dumpFilePath = sourceProfile.file.path
if sourceProfile.Ty == profiles.SourceProfileTypeFile && (sourceProfile.File.Format == "" || sourceProfile.File.Format == "dump") {
dumpFilePath = sourceProfile.File.Path
}
ioHelper := conversion.NewIOStreams(driverName, dumpFilePath)
ioHelper := utils.NewIOStreams(sourceProfile.Driver, dumpFilePath)
if ioHelper.SeekableIn != nil {
defer ioHelper.In.Close()
}

// If filePrefix not explicitly set, use generated dbName.
if cmd.filePrefix == "" {
dbName, err := conversion.GetDatabaseName(driverName, time.Now())
dbName, err := utils.GetDatabaseName(sourceProfile.Driver, time.Now())
if err != nil {
err = fmt.Errorf("can't generate database name for prefix: %v", err)
return subcommands.ExitFailure
Expand All @@ -95,14 +97,14 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa
}

var conv *internal.Conv
conv, err = conversion.SchemaConv(driverName, getSQLConnectionStr(sourceProfile), targetDb, &ioHelper, getSchemaSampleSize(sourceProfile))
conv, err = conversion.SchemaConv(sourceProfile, targetProfile, &ioHelper)
if err != nil {
return subcommands.ExitFailure
}

now := time.Now()
conversion.WriteSchemaFile(conv, now, cmd.filePrefix+schemaFile, ioHelper.Out)
conversion.WriteSessionFile(conv, cmd.filePrefix+sessionFile, ioHelper.Out)
conversion.Report(driverName, nil, ioHelper.BytesRead, "", conv, cmd.filePrefix+reportFile, ioHelper.Out)
conversion.Report(sourceProfile.Driver, nil, ioHelper.BytesRead, "", conv, cmd.filePrefix+reportFile, ioHelper.Out)
return subcommands.ExitSuccess
}
Loading