From 6502199db31d71987555f31cf88b89de84e57a41 Mon Sep 17 00:00:00 2001 From: isapego Date: Thu, 26 Sep 2024 20:30:50 +0200 Subject: [PATCH 1/3] Add option verify-fully --- internal/app/app.go | 3 ++- internal/app/options/flags.go | 5 +++++ internal/app/options/options.go | 36 +++++++++++++++++---------------- internal/app/progress.go | 9 +++++++++ protocol/iface/connector.go | 1 + runners/local/progress.go | 2 +- runners/local/runner.go | 27 +++++++++++++++++++++---- 7 files changed, 60 insertions(+), 23 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index ea909f37..45ab089c 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -106,6 +106,7 @@ func runDsync(c *cli.Context) error { StateStoreConnString: o.StateStoreConnString, NsFromString: o.NamespaceFrom, VerifyRequestedFlag: o.Verify, + FullVerifyRequestedFlag: o.VerifyFully, CleanupRequestedFlag: o.Cleanup, FlowStatusReportingInterval: 10, CosmosDeletesEmuRequestedFlag: o.CosmosDeletesEmu, @@ -236,7 +237,7 @@ func runDsync(c *cli.Context) error { err := r.Setup(runnerCtx) if err == nil { err = r.Run() - if !o.Verify { //if verification was requested, the user should be able to see the results + if !o.Verify && !o.VerifyFully { //if verification was requested, the user should be able to see the results runnerCancelFunc() } } else { diff --git a/internal/app/options/flags.go b/internal/app/options/flags.go index 004b9e9e..2c14b107 100644 --- a/internal/app/options/flags.go +++ b/internal/app/options/flags.go @@ -127,6 +127,11 @@ func GetFlagsAndBeforeFunc() ([]cli.Flag, cli.BeforeFunc) { Usage: "perform a data integrity check for an existing flow", Category: "Special Commands", }), + altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "verify-fully", + Usage: "perform a full data integrity check for an existing flow", + Category: "Special Commands", + }), altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "cleanup", Usage: "cleanup metadata for an existing flow", diff --git a/internal/app/options/options.go b/internal/app/options/options.go index c2b190f7..d665c28f 100644 --- a/internal/app/options/options.go +++ b/internal/app/options/options.go @@ -26,10 +26,11 @@ type Options struct { NamespaceFrom []string - Verify bool - Cleanup bool - Progress bool - Reverse bool + Verify bool + VerifyFully bool + Cleanup bool + Progress bool + Reverse bool CosmosDeletesEmu bool @@ -38,19 +39,19 @@ type Options struct { PprofPort uint WebPort uint - LoadLevel string - InitialSyncNumParallelCopiers int - NumParallelWriters int - NumParallelIntegrityCheckTasks int - CosmosNumParallelPartitionWorkers int - CosmosReaderMaxNumNamespaces int - ServerConnectTimeout time.Duration - PingTimeout time.Duration - CdcResumeTokenUpdateInterval time.Duration - WriterMaxBatchSize int - CosmosTargetDocCountPerPartition int64 - CosmosDeletesCheckInterval time.Duration - Mode string + LoadLevel string + InitialSyncNumParallelCopiers int + NumParallelWriters int + NumParallelIntegrityCheckTasks int + CosmosNumParallelPartitionWorkers int + CosmosReaderMaxNumNamespaces int + ServerConnectTimeout time.Duration + PingTimeout time.Duration + CdcResumeTokenUpdateInterval time.Duration + WriterMaxBatchSize int + CosmosTargetDocCountPerPartition int64 + CosmosDeletesCheckInterval time.Duration + Mode string } func NewFromCLIContext(c *cli.Context) (Options, error) { @@ -65,6 +66,7 @@ func NewFromCLIContext(c *cli.Context) (Options, error) { o.Logfile = c.String("logfile") o.NamespaceFrom = c.Generic("namespace").(*ListFlag).Values o.Verify = c.Bool("verify") + o.VerifyFully = c.Bool("verify-fully") o.Cleanup = c.Bool("cleanup") o.Progress = c.Bool("progress") o.Pprof = c.Bool("pprof") diff --git a/internal/app/progress.go b/internal/app/progress.go index 7443e33a..55b103da 100644 --- a/internal/app/progress.go +++ b/internal/app/progress.go @@ -145,6 +145,15 @@ func (tv *TViewDetails) GetStatusReport(runnerProgress runnerLocal.RunnerSyncPro headerString := fmt.Sprintf("Dsync Progress Report : %v\nTime Elapsed: %02d:%02d:%02d\n%s", runnerProgress.SyncState, hours, minutes, seconds, stateString) header.SetText(headerString) + case iface.VerifyFullySyncState: + //set the header text + stateString := "Performing Full Data Integrity Check" + if runnerProgress.VerificationResult != "" { + stateString = fmt.Sprintf("Data Integrity Check: %s Press Ctrl+C to exit", runnerProgress.VerificationResult) + } + headerString := fmt.Sprintf("Dsync Progress Report : %v\nTime Elapsed: %02d:%02d:%02d\n%s", runnerProgress.SyncState, hours, minutes, seconds, stateString) + header.SetText(headerString) + default: headerString := "This connector does not support progress reporting yet\n" header.SetText(headerString) diff --git a/protocol/iface/connector.go b/protocol/iface/connector.go index 58f99180..17c78b23 100644 --- a/protocol/iface/connector.go +++ b/protocol/iface/connector.go @@ -59,6 +59,7 @@ type ConnectorStatus struct { const ( SetupSyncState = "Setup" VerifySyncState = "Verify" + VerifyFullySyncState = "VerifyFully" CleanupSyncState = "Cleanup" ReadPlanningSyncState = "ReadPlanning" diff --git a/runners/local/progress.go b/runners/local/progress.go index cdb91571..0592fedd 100644 --- a/runners/local/progress.go +++ b/runners/local/progress.go @@ -59,7 +59,7 @@ func (r *RunnerLocal) UpdateRunnerProgress() { stateInfo := "" switch { - case r.runnerProgress.SyncState == iface.VerifySyncState || r.runnerProgress.SyncState == iface.CleanupSyncState: + case r.runnerProgress.SyncState == iface.VerifySyncState || r.runnerProgress.SyncState == iface.VerifyFullySyncState && r.runnerProgress.SyncState != iface.CleanupSyncState: // Do nothing, keep the current state case srcStatus.SyncState == iface.ChangeStreamSyncState && !flowStatus.AllTasksCompleted: // Source is already in the change stream mode but not all tasks were fully completed diff --git a/runners/local/runner.go b/runners/local/runner.go index 43cc8420..841b7453 100644 --- a/runners/local/runner.go +++ b/runners/local/runner.go @@ -53,9 +53,10 @@ type RunnerLocalSettings struct { NsFromString []string - VerifyRequestedFlag bool - CleanupRequestedFlag bool - ReverseRequestedFlag bool + VerifyRequestedFlag bool + FullVerifyRequestedFlag bool + CleanupRequestedFlag bool + ReverseRequestedFlag bool FlowStatusReportingInterval time.Duration @@ -316,9 +317,27 @@ func (r *RunnerLocal) Run() error { } r.activeFlowID = flowID + //don't start the flow if the verify-fully flag is set + if r.settings.FullVerifyRequestedFlag { + r.runnerProgress.SyncState = iface.VerifyFullySyncState + integrityCheckRes, err := r.coord.PerformFlowIntegrityCheck(flowID) + if err != nil { + slog.Error("Failed to perform flow full integrity check", err) + } else { + if integrityCheckRes.Passed { + r.runnerProgress.VerificationResult = "OK" + slog.Info("Full data integrity check: OK") + } else { + r.runnerProgress.VerificationResult = "FAIL" + slog.Error("Full data integrity check: FAIL") + } + } + return err + } + //don't start the flow if the verify flag is set if r.settings.VerifyRequestedFlag { - r.runnerProgress.SyncState = "Verify" + r.runnerProgress.SyncState = iface.VerifySyncState integrityCheckRes, err := r.coord.PerformFlowIntegrityCheck(flowID) if err != nil { slog.Error("Failed to perform flow integrity check", err) From eedcdfce5116d27519aef627f0b9fb13162fb2bb Mon Sep 17 00:00:00 2001 From: isapego Date: Sat, 28 Sep 2024 01:49:50 +0200 Subject: [PATCH 2/3] Verification draft --- connectors/cosmos/connector.go | 2 +- connectors/mongo/connector.go | 10 +- connectors/mongo/integrity.go | 160 +++++++++++++++++++++++++++- connectors/null/connector.go | 2 +- connectors/random/connector.go | 2 +- coordinators/simple/capabilities.go | 2 +- coordinators/simple/coordinator.go | 21 ++-- go.mod | 1 + go.sum | 2 + internal/app/progress.go | 9 -- protocol/iface/connector.go | 12 +-- protocol/iface/coordinator.go | 14 +-- runners/local/progress.go | 2 +- runners/local/runner.go | 22 +--- 14 files changed, 204 insertions(+), 57 deletions(-) diff --git a/connectors/cosmos/connector.go b/connectors/cosmos/connector.go index f7fe64fc..91b8d68a 100644 --- a/connectors/cosmos/connector.go +++ b/connectors/cosmos/connector.go @@ -124,7 +124,7 @@ func (cc *Connector) Setup(ctx context.Context, t iface.Transport) error { // Instantiate ConnectorType cc.ConnectorType = iface.ConnectorType{DbType: connectorDBType, Version: version.(string), Spec: connectorSpec} // Instantiate ConnectorCapabilities, current capabilities are source only - cc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, Resumability: true} + cc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, FullIntegrityCheck: true, Resumability: true} // Instantiate ConnectorStatus progressMetrics := iface.ProgressMetrics{ NumDocsSynced: 0, diff --git a/connectors/mongo/connector.go b/connectors/mongo/connector.go index c4c79be1..be1a3799 100644 --- a/connectors/mongo/connector.go +++ b/connectors/mongo/connector.go @@ -113,7 +113,7 @@ func (mc *Connector) Setup(ctx context.Context, t iface.Transport) error { // Instantiate ConnectorType mc.ConnectorType = iface.ConnectorType{DbType: connectorDBType, Version: version.(string), Spec: connectorSpec} // Instantiate ConnectorCapabilities - mc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, Resumability: true} + mc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, FullIntegrityCheck: true, Resumability: true} // Instantiate ConnectorStatus mc.Status = iface.ConnectorStatus{WriteLSN: 0} @@ -545,9 +545,13 @@ func (mc *Connector) StartWriteFromChannel(flowId iface.FlowID, dataChannelId if return nil } -func (mc *BaseMongoConnector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error { +func (mc *BaseMongoConnector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error { // need to make this async to honor the spec - go mc.doIntegrityCheck_sync(flowId, options) + if full { + go mc.doFullIntegrityCheck_sync(flowId, options) + } else { + go mc.doIntegrityCheck_sync(flowId, options) + } return nil } diff --git a/connectors/mongo/integrity.go b/connectors/mongo/integrity.go index 66359dfc..6e5f8508 100644 --- a/connectors/mongo/integrity.go +++ b/connectors/mongo/integrity.go @@ -9,13 +9,19 @@ package mongo import ( "context" "crypto/sha256" + "encoding/hex" "errors" "fmt" + "go.mongodb.org/mongo-driver/bson" "log/slog" "sort" "strings" + "sync" "github.com/adiom-data/dsync/protocol/iface" + + // TODO: Check how to add a new package properly + "github.com/holiman/uint256" ) func NamespaceString(ns iface.Namespace) string { @@ -28,10 +34,16 @@ type nsCountResult struct { err error } +type nsHashResult struct { + ns iface.Namespace + hash *uint256.Int + err error +} + // doIntegrityCheck performs a data integrity check on the underlying data store // _sync is a synchronous version of this function func (mc *BaseMongoConnector) doIntegrityCheck_sync(flowId iface.FlowID, options iface.ConnectorOptions) error { - //XXX: should we use/create flowContext here in case it becomes a part of the flow and we want to have ability to interrupt? + //XXX: should we use/create flowContext here in case it becomes a part of the flow, and we want to have ability to interrupt? var res iface.ConnectorDataIntegrityCheckResult res.Success = false @@ -141,6 +153,152 @@ func (mc *BaseMongoConnector) doIntegrityCheck_sync(flowId iface.FlowID, options return nil } +func (mc *BaseMongoConnector) doFullIntegrityCheck_sync(flowId iface.FlowID, options iface.ConnectorOptions) error { + var res iface.ConnectorDataIntegrityCheckResult + res.Success = false + + // 1. Get the list of fully qualified namespaces + namespaces, err := mc.getFQNamespaceList(options.Namespace) + if err != nil { + slog.Error(fmt.Sprintf("Failed to get fully qualified namespace list: %v", err)) + err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res) + if err != nil { + return err + } + } + + // Quick exit if there are no namespaces + if len(namespaces) == 0 { + res.Success = true + res.Count = 0 + res.Digest = "" + if err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res); err != nil { + return err + } + return nil + } + + slog.Debug(fmt.Sprintf("Namespaces for validation: %v", namespaces)) + + // create a map to store the results + namespacesSha256Map := make(map[string]string) + + // 2. Calculate the total number of documents for each namespace - we will parallelize this operation + + // create a channel to distribute tasks + taskChannel := make(chan iface.Namespace, len(namespaces)) + // create a channel to collect the results + resultChannel := make(chan nsHashResult, len(namespaces)) + + var wg sync.WaitGroup + wg.Add(mc.Settings.NumParallelIntegrityCheckTasks) + + // start hashers + for i := 0; i < mc.Settings.NumParallelIntegrityCheckTasks; i++ { + go func() { + defer wg.Done() + completeHash := uint256.NewInt(1) + for ns := range taskChannel { + slog.Debug(fmt.Sprintf("Processing namespace: %v", ns)) + collection := mc.Client.Database(ns.Db).Collection(ns.Col) + cursor, err := collection.Find(mc.FlowCtx, bson.D{}) + if err != nil { + if errors.Is(context.Canceled, mc.FlowCtx.Err()) { + slog.Debug(fmt.Sprintf("Find error: %v, but the context was cancelled", err)) + } else { + slog.Error(fmt.Sprintf("Failed to find documents in collection: %v", err)) + } + continue + } + + for cursor.Next(mc.FlowCtx) { + rawData := cursor.Current + data := []byte(rawData) + + hash := sha256.Sum256(data) + + // TODO: Conversation here is slow and ugly. Find more fitting package for this. + hashStr := strings.TrimLeft(hex.EncodeToString(hash[:]), "0") + hashInt := uint256.MustFromHex("0x" + hashStr) + + // Multiplication is order independent, so it does not matter the order we are going to get the + // values in, we will still get the same hash as the result, if all the hashes were the same. + completeHash.Mul(completeHash, hashInt) + + if cursor.RemainingBatchLength() == 0 { //no more left in the batch + resultChannel <- nsHashResult{ns: ns, hash: completeHash, err: err} + completeHash = uint256.NewInt(1) + } + } + if err := cursor.Err(); err != nil { + if errors.Is(context.Canceled, mc.FlowCtx.Err()) { + slog.Debug(fmt.Sprintf("Cursor error: %v, but the context was cancelled", err)) + } else { + slog.Error(fmt.Sprintf("Cursor error: %v", err)) + } + } else { + _ = cursor.Close(mc.FlowCtx) + slog.Debug(fmt.Sprintf("Done hashing namespace: %v", ns)) + } + } + }() + } + + // iterate over all the namespaces and distribute them to workers + for _, task := range namespaces { + taskChannel <- task + } + + // close the task channel to signal workers that there are no more tasks + close(taskChannel) + + //wait for all hashers to finish + wg.Wait() + + close(resultChannel) + + for result := range resultChannel { + if result.err != nil { + slog.Error(fmt.Sprintf("Failed to hash documents for namespace %s: %v", NamespaceString(result.ns), result.err)) + if err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res); err != nil { + return err + } + return nil + } else { + namespacesSha256Map[NamespaceString(result.ns)] = result.hash.Hex() + } + } + + // 3. Arrange the namespaces in a lexicographical order + sortedNamespaces := make([]string, 0, len(namespacesSha256Map)) + for ns := range namespacesSha256Map { + sortedNamespaces = append(sortedNamespaces, ns) + } + sort.Strings(sortedNamespaces) + + // 4. Create a string, concatenating the namespaces and respective hash in the form of "namespace:hash" and using "," to join them + var concatenatedString string + for _, ns := range sortedNamespaces { + hash := namespacesSha256Map[ns] + concatenatedString += ns + ":" + hash + "," + } + // remove the trailing comma + concatenatedString = concatenatedString[:len(concatenatedString)-1] + + // 5. Calculate the SHA256 hash of the string + hash := sha256.Sum256([]byte(concatenatedString)) + + // post the result + res.Success = true + res.Count = 0 + res.Digest = hex.EncodeToString(hash[:]) + + if err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res); err != nil { + return err + } + return nil +} + // Returns a list of fully qualified namespaces func (mc *BaseMongoConnector) getFQNamespaceList(namespacesFilter []string) ([]iface.Namespace, error) { var dbsToResolve []string //database names that we need to resolve diff --git a/connectors/null/connector.go b/connectors/null/connector.go index e224d0f3..f760dc2c 100644 --- a/connectors/null/connector.go +++ b/connectors/null/connector.go @@ -143,7 +143,7 @@ func (nc *Connector) StartWriteFromChannel(flowId iface.FlowID, dataChannelId if return nil } -func (nc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error { +func (nc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error { //does nothing, no data to check return fmt.Errorf("null write connector does not support data integrity check") } diff --git a/connectors/random/connector.go b/connectors/random/connector.go index f099dcd4..fff2ee8e 100644 --- a/connectors/random/connector.go +++ b/connectors/random/connector.go @@ -267,7 +267,7 @@ func (rc *Connector) StartWriteFromChannel(flowId iface.FlowID, dataChannelId if return errors.New("RandomReadConnector does not write to destination") } -func (rc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error { +func (rc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error { //no client, errors return errors.New("RandomReadConnector does not have a client to request data integrity check") } diff --git a/coordinators/simple/capabilities.go b/coordinators/simple/capabilities.go index 91bde223..851346cf 100644 --- a/coordinators/simple/capabilities.go +++ b/coordinators/simple/capabilities.go @@ -17,7 +17,7 @@ func calcSharedCapabilities(c1Caps iface.ConnectorCapabilities, c2Caps iface.Con // effectively a bitmask that will be later applied to individual connectors' capabilities // XXX: is there a better way to do this? Maybe a real bitmask? - caps := iface.ConnectorCapabilities{true, true, true, true} + caps := iface.ConnectorCapabilities{true, true, true, true, true} // Source and Sink are always true as they don't need to be shared // we only care about resumability right now diff --git a/coordinators/simple/coordinator.go b/coordinators/simple/coordinator.go index ad80ec8d..ce9a2bc9 100644 --- a/coordinators/simple/coordinator.go +++ b/coordinators/simple/coordinator.go @@ -420,8 +420,12 @@ func (c *Simple) NotifyTaskDone(flowId iface.FlowID, conn iface.ConnectorID, tas return fmt.Errorf("connector not part of the flow") } -func (c *Simple) PerformFlowIntegrityCheck(fid iface.FlowID) (iface.FlowDataIntegrityCheckResult, error) { - slog.Info("Initiating flow integrity check for flow with ID: " + fmt.Sprintf("%v", fid)) +func (c *Simple) PerformFlowIntegrityCheck(fid iface.FlowID, full bool) (iface.FlowDataIntegrityCheckResult, error) { + checkTypeStr := "basic" + if full { + checkTypeStr = "full" + } + slog.Info(fmt.Sprintf("Initiating flow %s integrity check for flow with ID: %v", checkTypeStr, fid)) res := iface.FlowDataIntegrityCheckResult{} @@ -441,8 +445,13 @@ func (c *Simple) PerformFlowIntegrityCheck(fid iface.FlowID) (iface.FlowDataInte return res, fmt.Errorf("destination connector %v not found", flowDet.Options.DstId) } - if !src.Details.Cap.IntegrityCheck || !dst.Details.Cap.IntegrityCheck { - return res, fmt.Errorf("one or both connectors don't support integrity checks") + checkTypeSupported := src.Details.Cap.IntegrityCheck && dst.Details.Cap.IntegrityCheck + if full { + checkTypeSupported = src.Details.Cap.FullIntegrityCheck && dst.Details.Cap.FullIntegrityCheck + } + + if !checkTypeSupported { + return res, fmt.Errorf(fmt.Sprintf("one or both connectors don't support %s integrity checks", checkTypeStr)) } // Wait for integrity check results asynchronously @@ -450,11 +459,11 @@ func (c *Simple) PerformFlowIntegrityCheck(fid iface.FlowID) (iface.FlowDataInte var resSource, resDestination iface.ConnectorDataIntegrityCheckResult // Request integrity check results from connectors - if err := src.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions); err != nil { + if err := src.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions, full); err != nil { slog.Error("Failed to request integrity check from source", err) return res, err } - if err := dst.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions); err != nil { //TODO (AK, 6/2024): should we have proper options here? (maybe even data validation-specific?) + if err := dst.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions, full); err != nil { //TODO (AK, 6/2024): should we have proper options here? (maybe even data validation-specific?) slog.Error("Failed to request integrity check from destination", err) return res, err } diff --git a/go.mod b/go.mod index 7d0990d1..64e8575e 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22.0 require ( github.com/gdamore/tcell/v2 v2.7.4 + github.com/holiman/uint256 v1.3.1 github.com/lmittmann/tint v1.0.5 github.com/mitchellh/hashstructure v1.1.0 github.com/rivo/tview v0.0.0-20240807095714-a8dd8799d63b diff --git a/go.sum b/go.sum index 4d90e888..8033a537 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/holiman/uint256 v1.3.1 h1:JfTzmih28bittyHM8z360dCjIA9dbPIBlcTI6lmctQs= +github.com/holiman/uint256 v1.3.1/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= diff --git a/internal/app/progress.go b/internal/app/progress.go index 55b103da..7443e33a 100644 --- a/internal/app/progress.go +++ b/internal/app/progress.go @@ -145,15 +145,6 @@ func (tv *TViewDetails) GetStatusReport(runnerProgress runnerLocal.RunnerSyncPro headerString := fmt.Sprintf("Dsync Progress Report : %v\nTime Elapsed: %02d:%02d:%02d\n%s", runnerProgress.SyncState, hours, minutes, seconds, stateString) header.SetText(headerString) - case iface.VerifyFullySyncState: - //set the header text - stateString := "Performing Full Data Integrity Check" - if runnerProgress.VerificationResult != "" { - stateString = fmt.Sprintf("Data Integrity Check: %s Press Ctrl+C to exit", runnerProgress.VerificationResult) - } - headerString := fmt.Sprintf("Dsync Progress Report : %v\nTime Elapsed: %02d:%02d:%02d\n%s", runnerProgress.SyncState, hours, minutes, seconds, stateString) - header.SetText(headerString) - default: headerString := "This connector does not support progress reporting yet\n" header.SetText(headerString) diff --git a/protocol/iface/connector.go b/protocol/iface/connector.go index 17c78b23..c5df1a30 100644 --- a/protocol/iface/connector.go +++ b/protocol/iface/connector.go @@ -19,10 +19,11 @@ type ConnectorType struct { // If something is announced as true, it can be later turned off through the setParameters call // But if something is announced as not supported (false), it can't be turned on type ConnectorCapabilities struct { - Source bool - Sink bool - IntegrityCheck bool - Resumability bool + Source bool + Sink bool + IntegrityCheck bool + FullIntegrityCheck bool + Resumability bool } // XXX (AK, 6/2024): not sure if it logically belongs here or to another iface file @@ -59,7 +60,6 @@ type ConnectorStatus struct { const ( SetupSyncState = "Setup" VerifySyncState = "Verify" - VerifyFullySyncState = "VerifyFully" CleanupSyncState = "Cleanup" ReadPlanningSyncState = "ReadPlanning" @@ -128,7 +128,7 @@ type ConnectorICoordinatorSignal interface { StartWriteFromChannel(flowId FlowID, dataChannel DataChannelID) error // Write data from the provided channel (async) Interrupt(flowId FlowID) error // Interrupt the flow (async) - RequestDataIntegrityCheck(flowId FlowID, options ConnectorOptions) error // Request a data integrity check based on a read plan (async) + RequestDataIntegrityCheck(flowId FlowID, options ConnectorOptions, full bool) error // Request a data integrity check based on a read plan (async) GetConnectorStatus(flowId FlowID) ConnectorStatus // Immediate and non-blocking } diff --git a/protocol/iface/coordinator.go b/protocol/iface/coordinator.go index f3355e76..c02ed4a8 100644 --- a/protocol/iface/coordinator.go +++ b/protocol/iface/coordinator.go @@ -20,13 +20,13 @@ type Coordinator interface { // User GetConnectors() []ConnectorDetails - FlowGetOrCreate(FlowOptions) (FlowID, error) // Get or create a flow if it doesn't exist - FlowStart(FlowID) error // Start the flow or resume it - FlowStop(FlowID) // Stop the flow - FlowDestroy(FlowID) // Destroy the flow and the associated metadata (also cleans up persisted state) - WaitForFlowDone(FlowID) error // Wait for the flow to be done - PerformFlowIntegrityCheck(FlowID) (FlowDataIntegrityCheckResult, error) // Perform an integrity check on the flow (synchronous) - GetFlowStatus(FlowID) (FlowStatus, error) // Get the status of the flow + FlowGetOrCreate(FlowOptions) (FlowID, error) // Get or create a flow if it doesn't exist + FlowStart(FlowID) error // Start the flow or resume it + FlowStop(FlowID) // Stop the flow + FlowDestroy(FlowID) // Destroy the flow and the associated metadata (also cleans up persisted state) + WaitForFlowDone(FlowID) error // Wait for the flow to be done + PerformFlowIntegrityCheck(FlowID, bool) (FlowDataIntegrityCheckResult, error) // Perform an integrity check on the flow (synchronous) + GetFlowStatus(FlowID) (FlowStatus, error) // Get the status of the flow CoordinatorIConnectorSignal } diff --git a/runners/local/progress.go b/runners/local/progress.go index 0592fedd..cdb91571 100644 --- a/runners/local/progress.go +++ b/runners/local/progress.go @@ -59,7 +59,7 @@ func (r *RunnerLocal) UpdateRunnerProgress() { stateInfo := "" switch { - case r.runnerProgress.SyncState == iface.VerifySyncState || r.runnerProgress.SyncState == iface.VerifyFullySyncState && r.runnerProgress.SyncState != iface.CleanupSyncState: + case r.runnerProgress.SyncState == iface.VerifySyncState || r.runnerProgress.SyncState == iface.CleanupSyncState: // Do nothing, keep the current state case srcStatus.SyncState == iface.ChangeStreamSyncState && !flowStatus.AllTasksCompleted: // Source is already in the change stream mode but not all tasks were fully completed diff --git a/runners/local/runner.go b/runners/local/runner.go index 841b7453..0fa44c8f 100644 --- a/runners/local/runner.go +++ b/runners/local/runner.go @@ -317,28 +317,10 @@ func (r *RunnerLocal) Run() error { } r.activeFlowID = flowID - //don't start the flow if the verify-fully flag is set - if r.settings.FullVerifyRequestedFlag { - r.runnerProgress.SyncState = iface.VerifyFullySyncState - integrityCheckRes, err := r.coord.PerformFlowIntegrityCheck(flowID) - if err != nil { - slog.Error("Failed to perform flow full integrity check", err) - } else { - if integrityCheckRes.Passed { - r.runnerProgress.VerificationResult = "OK" - slog.Info("Full data integrity check: OK") - } else { - r.runnerProgress.VerificationResult = "FAIL" - slog.Error("Full data integrity check: FAIL") - } - } - return err - } - //don't start the flow if the verify flag is set - if r.settings.VerifyRequestedFlag { + if r.settings.VerifyRequestedFlag || r.settings.FullVerifyRequestedFlag { r.runnerProgress.SyncState = iface.VerifySyncState - integrityCheckRes, err := r.coord.PerformFlowIntegrityCheck(flowID) + integrityCheckRes, err := r.coord.PerformFlowIntegrityCheck(flowID, r.settings.FullVerifyRequestedFlag) if err != nil { slog.Error("Failed to perform flow integrity check", err) } else { From 762403019197daffcf43bb8d596191f4024088ba Mon Sep 17 00:00:00 2001 From: isapego Date: Sat, 28 Sep 2024 03:22:20 +0200 Subject: [PATCH 3/3] Fixes --- connectors/mongo/integrity.go | 14 +++++++++----- connectors/testconn/connector.go | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/connectors/mongo/integrity.go b/connectors/mongo/integrity.go index 6e5f8508..f3b710e5 100644 --- a/connectors/mongo/integrity.go +++ b/connectors/mongo/integrity.go @@ -193,6 +193,9 @@ func (mc *BaseMongoConnector) doFullIntegrityCheck_sync(flowId iface.FlowID, opt var wg sync.WaitGroup wg.Add(mc.Settings.NumParallelIntegrityCheckTasks) + mask := uint256.NewInt(0) + mask.SetAllOne() + // start hashers for i := 0; i < mc.Settings.NumParallelIntegrityCheckTasks; i++ { go func() { @@ -216,14 +219,15 @@ func (mc *BaseMongoConnector) doFullIntegrityCheck_sync(flowId iface.FlowID, opt data := []byte(rawData) hash := sha256.Sum256(data) - - // TODO: Conversation here is slow and ugly. Find more fitting package for this. - hashStr := strings.TrimLeft(hex.EncodeToString(hash[:]), "0") - hashInt := uint256.MustFromHex("0x" + hashStr) + hashInt := uint256.NewInt(1) + hashInt.SetBytes(hash[:]) + if hashInt.IsZero() { + hashInt.SetOne() + } // Multiplication is order independent, so it does not matter the order we are going to get the // values in, we will still get the same hash as the result, if all the hashes were the same. - completeHash.Mul(completeHash, hashInt) + completeHash.MulMod(completeHash, hashInt, mask) if cursor.RemainingBatchLength() == 0 { //no more left in the batch resultChannel <- nsHashResult{ns: ns, hash: completeHash, err: err} diff --git a/connectors/testconn/connector.go b/connectors/testconn/connector.go index 9273f4ed..68a07a4f 100644 --- a/connectors/testconn/connector.go +++ b/connectors/testconn/connector.go @@ -114,7 +114,7 @@ func (c *connector) RequestCreateReadPlan(flowId iface.FlowID, options iface.Con } // RequestDataIntegrityCheck implements iface.Connector. -func (c *connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error { +func (c *connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error { // TODO: implement panic("unimplemented") }