Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package cmd
import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/cloudspannerecosystem/harbourbridge/common/constants"
"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
Expand All @@ -41,7 +43,8 @@ var (
// 3. Run data conversion (if schemaOnly is set to false)
// 4. Generate report
func CommandLine(ctx context.Context, driver, targetDb, dbURI string, dataOnly, schemaOnly, skipForeignKeys bool, schemaSampleSize int64, sessionJSON string, ioHelper *utils.IOStreams, outputFilePrefix string, now time.Time) error {

// Cleanup hb tmp data directory in case residuals remain from prev runs.
os.RemoveAll(os.TempDir() + constants.HB_TMP_DIR)
// Legacy mode is only supported for MySQL, PostgreSQL and DynamoDB
if driver != "" && utils.IsValidDriver(driver) && !utils.IsLegacyModeSupportedDriver(driver) {
return fmt.Errorf("legacy mode is not supported for drivers other than %s", strings.Join(utils.GetLegacyModeSupportedDrivers(), ", "))
Expand Down Expand Up @@ -97,7 +100,7 @@ func CommandLine(ctx context.Context, driver, targetDb, dbURI string, dataOnly,

// We pass an empty string to the sqlConnectionStr parameter as this is the legacy codepath,
// which reads the environment variables and constructs the string later on.
bw, err := conversion.DataConv(sourceProfile, targetProfile, ioHelper, client, conv, dataOnly)
bw, err := conversion.DataConv(ctx, sourceProfile, targetProfile, ioHelper, client, conv, dataOnly)
if err != nil {
return fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
}
Expand All @@ -109,5 +112,7 @@ func CommandLine(ctx context.Context, driver, targetDb, dbURI string, dataOnly,
banner := utils.GetBanner(now, dbURI)
conversion.Report(driver, bw.DroppedRowsByTable(), ioHelper.BytesRead, banner, conv, outputFilePrefix+reportFile, ioHelper.Out)
conversion.WriteBadData(bw, conv, banner, outputFilePrefix+badDataFile, ioHelper.Out)
// Cleanup hb tmp data directory.
os.RemoveAll(os.TempDir() + constants.HB_TMP_DIR)
return nil
}
9 changes: 7 additions & 2 deletions cmd/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path"
"time"

"github.com/cloudspannerecosystem/harbourbridge/common/constants"
"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
Expand Down Expand Up @@ -54,11 +55,13 @@ func (cmd *DataCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&cmd.sessionJSON, "session", "", "Specifies the file we restore session state from")
f.StringVar(&cmd.target, "target", "Spanner", "Specifies the target DB, defaults to Spanner (accepted values: `Spanner`)")
f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"")
flag.BoolVar(&cmd.skipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.BoolVar(&cmd.skipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files")
}

func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
// Cleanup hb tmp data directory in case residuals remain from prev runs.
os.RemoveAll(os.TempDir() + constants.HB_TMP_DIR)
var err error
defer func() {
if err != nil {
Expand Down Expand Up @@ -136,7 +139,7 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
}
}

bw, err := conversion.DataConv(sourceProfile, targetProfile, &ioHelper, client, conv, true)
bw, err := conversion.DataConv(ctx, sourceProfile, targetProfile, &ioHelper, client, conv, true)
if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
return subcommands.ExitFailure
Expand All @@ -150,5 +153,7 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
banner := utils.GetBanner(now, dbURI)
conversion.Report(sourceProfile.Driver, bw.DroppedRowsByTable(), ioHelper.BytesRead, banner, conv, cmd.filePrefix+reportFile, ioHelper.Out)
conversion.WriteBadData(bw, conv, banner, cmd.filePrefix+badDataFile, ioHelper.Out)
// Cleanup hb tmp data directory.
os.RemoveAll(os.TempDir() + constants.HB_TMP_DIR)
return subcommands.ExitSuccess
}
5 changes: 5 additions & 0 deletions cmd/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path"
"time"

"github.com/cloudspannerecosystem/harbourbridge/common/constants"
"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
Expand Down Expand Up @@ -55,6 +56,8 @@ func (cmd *SchemaCmd) SetFlags(f *flag.FlagSet) {
}

func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
// Cleanup hb tmp data directory in case residuals remain from prev runs.
os.RemoveAll(os.TempDir() + constants.HB_TMP_DIR)
var err error
defer func() {
if err != nil {
Expand Down Expand Up @@ -106,5 +109,7 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa
conversion.WriteSchemaFile(conv, now, cmd.filePrefix+schemaFile, ioHelper.Out)
conversion.WriteSessionFile(conv, cmd.filePrefix+sessionFile, ioHelper.Out)
conversion.Report(sourceProfile.Driver, nil, ioHelper.BytesRead, "", conv, cmd.filePrefix+reportFile, ioHelper.Out)
// Cleanup hb tmp data directory.
os.RemoveAll(os.TempDir() + constants.HB_TMP_DIR)
return subcommands.ExitSuccess
}
9 changes: 7 additions & 2 deletions cmd/schema_and_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path"
"time"

"github.com/cloudspannerecosystem/harbourbridge/common/constants"
"github.com/cloudspannerecosystem/harbourbridge/common/utils"
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
Expand Down Expand Up @@ -52,11 +53,13 @@ func (cmd *SchemaAndDataCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&cmd.sourceProfile, "source-profile", "", "Flag for specifying connection profile for source database e.g., \"file=<path>,format=dump\"")
f.StringVar(&cmd.target, "target", "Spanner", "Specifies the target DB, defaults to Spanner (accepted values: `Spanner`)")
f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"")
flag.BoolVar(&cmd.skipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.BoolVar(&cmd.skipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files")
}

func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
// Cleanup hb tmp data directory in case residuals remain from prev runs.
os.RemoveAll(os.TempDir() + constants.HB_TMP_DIR)
var err error
defer func() {
if err != nil {
Expand Down Expand Up @@ -134,7 +137,7 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
return subcommands.ExitFailure
}

bw, err := conversion.DataConv(sourceProfile, targetProfile, &ioHelper, client, conv, true)
bw, err := conversion.DataConv(ctx, sourceProfile, targetProfile, &ioHelper, client, conv, true)
if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
return subcommands.ExitFailure
Expand All @@ -148,5 +151,7 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
banner := utils.GetBanner(now, dbURI)
conversion.Report(sourceProfile.Driver, bw.DroppedRowsByTable(), ioHelper.BytesRead, banner, conv, cmd.filePrefix+reportFile, ioHelper.Out)
conversion.WriteBadData(bw, conv, banner, cmd.filePrefix+badDataFile, ioHelper.Out)
// Cleanup hb tmp data directory.
os.RemoveAll(os.TempDir() + constants.HB_TMP_DIR)
return subcommands.ExitSuccess
}
3 changes: 3 additions & 0 deletions common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ const (
// Supported dialects for Cloud Spanner database.
DIALECT_POSTGRESQL string = "postgresql"
DIALECT_GOOGLESQL string = "google_standard_sql"

// Temp directory name to write data which we cleanup at the end.
HB_TMP_DIR string = "harbourbridge_tmp_data"
)
84 changes: 77 additions & 7 deletions common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"net/url"
"os"
"os/exec"
"reflect"
"sort"
"strings"
"syscall"
"time"
Expand All @@ -22,6 +24,9 @@ import (
instance "cloud.google.com/go/spanner/admin/instance/apiv1"
"cloud.google.com/go/storage"
"github.com/cloudspannerecosystem/harbourbridge/common/constants"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/sources/common"
"github.com/cloudspannerecosystem/harbourbridge/sources/spanner"
"golang.org/x/crypto/ssh/terminal"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
Expand All @@ -34,6 +39,12 @@ type IOStreams struct {
BytesRead int64
}

// Harbourbridge accepts a manifest file in the form of a json which unmarshalls into the ManifestTables struct.
type ManifestTable struct {
Table_name string `json:"table_name"`
File_patterns []string `json:"file_patterns"`
}

// NewIOStreams returns a new IOStreams struct such that input stream is set
// to open file descriptor for dumpFile if driver is PGDUMP or MYSQLDUMP.
// Input stream defaults to stdin. Output stream is always set to stdout.
Expand All @@ -51,7 +62,7 @@ func NewIOStreams(driver string, dumpFile string) IOStreams {
if u.Scheme == "gs" {
bucketName := u.Host
filePath := u.Path[1:] // removes "/" from beginning of path
f, err = downloadFromGCS(bucketName, filePath)
f, err = DownloadFromGCS(bucketName, filePath, "harbourbridge.gcs.data")
} else {
f, err = os.Open(dumpFile)
}
Expand All @@ -64,8 +75,8 @@ func NewIOStreams(driver string, dumpFile string) IOStreams {
return io
}

// downloadFromGCS returns the dump file that is downloaded from GCS
func downloadFromGCS(bucketName string, filePath string) (*os.File, error) {
// DownloadFromGCS returns the dump file that is downloaded from GCS.
func DownloadFromGCS(bucketName, filePath, tmpFile string) (*os.File, error) {
ctx := context.Background()

client, err := storage.NewClient(ctx)
Expand All @@ -85,22 +96,23 @@ func downloadFromGCS(bucketName string, filePath string) (*os.File, error) {
defer rc.Close()
r := bufio.NewReader(rc)

tmpfile, err := ioutil.TempFile("", "harbourbridge.gcs.data")
tmpDir := os.TempDir() + constants.HB_TMP_DIR
os.MkdirAll(tmpDir, os.ModePerm)
tmpfile, err := os.Create(tmpDir + "/" + tmpFile)
if err != nil {
fmt.Printf("saveFile: unable to open temporary file to save dump file from GCS bucket %v", err)
log.Fatal(err)
return nil, err
}
syscall.Unlink(tmpfile.Name()) // File will be deleted when this process exits.

fmt.Printf("\nDownloading dump file from GCS bucket %s, path %s\n", bucketName, filePath)
fmt.Printf("\nDownloading file from GCS bucket %s, path %s\n", bucketName, filePath)
buffer := make([]byte, 1024)
for {
// read a chunk
n, err := r.Read(buffer[:cap(buffer)])

if err != nil && err != io.EOF {
fmt.Printf("readFile: unable to read entire dump file from bucket %s, file %s: %v", bucketName, filePath, err)
fmt.Printf("readFile: unable to read entire file from bucket %s, file %s: %v", bucketName, filePath, err)
log.Fatal(err)
return nil, err
}
Expand All @@ -118,6 +130,32 @@ func downloadFromGCS(bucketName string, filePath string) (*os.File, error) {
return tmpfile, nil
}

// PreloadGCSFiles downloads gcs files to tmp and updates the file paths in manifest with the local path.
func PreloadGCSFiles(tables []ManifestTable) ([]ManifestTable, error) {
for i, table := range tables {
for j, filePath := range table.File_patterns {
u, err := url.Parse(filePath)
if err != nil {
return nil, fmt.Errorf("unable parse file path %s for table %s", filePath, table.Table_name)
}
if u.Scheme == "gs" {
bucketName := u.Host
filePath := u.Path[1:] // removes "/" from beginning of path
tmpFile := strings.ReplaceAll(filePath, "/", ".")
// Files get downloaded to tmp dir.
fileLoc := os.TempDir() + constants.HB_TMP_DIR + "/" + tmpFile
_, err = DownloadFromGCS(bucketName, filePath, tmpFile)
if err != nil {
return nil, fmt.Errorf("cannot download gcs file: %s for table %s", filePath, table.Table_name)
}
tables[i].File_patterns[j] = fileLoc
fmt.Printf("Downloaded file: %s\n", fileLoc)
}
}
}
return tables, nil
}

// GetProject returns the cloud project we should use for accessing Spanner.
// Use environment variable GCLOUD_PROJECT if it is set.
// Otherwise, use the default project returned from gcloud.
Expand Down Expand Up @@ -289,6 +327,15 @@ func ContainsAny(s string, l []string) bool {
return false
}

// CheckEqualSets checks if the set of values in a and b are equal.
func CheckEqualSets(a, b []string) bool {
tmp_a := append(make([]string, len(a)), a...)
tmp_b := append(make([]string, len(b)), b...)
sort.Strings(tmp_a)
sort.Strings(tmp_b)
return reflect.DeepEqual(tmp_a, tmp_b)
}

func GetFileSize(f *os.File) (int64, error) {
info, err := f.Stat()
if err != nil {
Expand Down Expand Up @@ -415,3 +462,26 @@ func IsLegacyModeSupportedDriver(driver string) bool {
func GetLegacyModeSupportedDrivers() []string {
return GetValidDrivers()[:5]
}

// ReadSpannerSchema fills conv by querying Spanner infoschema treating Spanner as both the source and dest.
func ReadSpannerSchema(ctx context.Context, conv *internal.Conv, client *sp.Client) error {
infoSchema := spanner.InfoSchemaImpl{Client: client, Ctx: ctx, TargetDb: conv.TargetDb}
err := common.ProcessSchema(conv, infoSchema)
if err != nil {
return fmt.Errorf("error trying to read and convert spanner schema: %v", err)
}
parentTables, err := infoSchema.GetInterleaveTables()
if err != nil {
// We should ideally throw an error here as it could potentially cause a lot of failed writes.
// We raise an unexpected error for now to make it compatible with the integration tests.
// In the emulator, the interleave_type column in not supported hence the query fails.
conv.Unexpected(fmt.Sprintf("error trying to fetch interleave table info from schema: %v", err))
}
// Assign parents if any.
for table, parent := range parentTables {
spTable := conv.SpSchema[table]
spTable.Parent = parent
conv.SpSchema[table] = spTable
}
return nil
}
Loading