diff --git a/connectors/cosmos/connector.go b/connectors/cosmos/connector.go index f7fe64fc..91b8d68a 100644 --- a/connectors/cosmos/connector.go +++ b/connectors/cosmos/connector.go @@ -124,7 +124,7 @@ func (cc *Connector) Setup(ctx context.Context, t iface.Transport) error { // Instantiate ConnectorType cc.ConnectorType = iface.ConnectorType{DbType: connectorDBType, Version: version.(string), Spec: connectorSpec} // Instantiate ConnectorCapabilities, current capabilities are source only - cc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, Resumability: true} + cc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, FullIntegrityCheck: true, Resumability: true} // Instantiate ConnectorStatus progressMetrics := iface.ProgressMetrics{ NumDocsSynced: 0, diff --git a/connectors/mongo/connector.go b/connectors/mongo/connector.go index c4c79be1..be1a3799 100644 --- a/connectors/mongo/connector.go +++ b/connectors/mongo/connector.go @@ -113,7 +113,7 @@ func (mc *Connector) Setup(ctx context.Context, t iface.Transport) error { // Instantiate ConnectorType mc.ConnectorType = iface.ConnectorType{DbType: connectorDBType, Version: version.(string), Spec: connectorSpec} // Instantiate ConnectorCapabilities - mc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, Resumability: true} + mc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, FullIntegrityCheck: true, Resumability: true} // Instantiate ConnectorStatus mc.Status = iface.ConnectorStatus{WriteLSN: 0} @@ -545,9 +545,13 @@ func (mc *Connector) StartWriteFromChannel(flowId iface.FlowID, dataChannelId if return nil } -func (mc *BaseMongoConnector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error { +func (mc *BaseMongoConnector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error { // need to make this async to honor the spec - go mc.doIntegrityCheck_sync(flowId, options) + if full { + go mc.doFullIntegrityCheck_sync(flowId, options) + } else { + go mc.doIntegrityCheck_sync(flowId, options) + } return nil } diff --git a/connectors/mongo/integrity.go b/connectors/mongo/integrity.go index 66359dfc..f3b710e5 100644 --- a/connectors/mongo/integrity.go +++ b/connectors/mongo/integrity.go @@ -9,13 +9,19 @@ package mongo import ( "context" "crypto/sha256" + "encoding/hex" "errors" "fmt" + "go.mongodb.org/mongo-driver/bson" "log/slog" "sort" "strings" + "sync" "github.com/adiom-data/dsync/protocol/iface" + + // TODO: Check how to add a new package properly + "github.com/holiman/uint256" ) func NamespaceString(ns iface.Namespace) string { @@ -28,10 +34,16 @@ type nsCountResult struct { err error } +type nsHashResult struct { + ns iface.Namespace + hash *uint256.Int + err error +} + // doIntegrityCheck performs a data integrity check on the underlying data store // _sync is a synchronous version of this function func (mc *BaseMongoConnector) doIntegrityCheck_sync(flowId iface.FlowID, options iface.ConnectorOptions) error { - //XXX: should we use/create flowContext here in case it becomes a part of the flow and we want to have ability to interrupt? + //XXX: should we use/create flowContext here in case it becomes a part of the flow, and we want to have ability to interrupt? var res iface.ConnectorDataIntegrityCheckResult res.Success = false @@ -141,6 +153,156 @@ func (mc *BaseMongoConnector) doIntegrityCheck_sync(flowId iface.FlowID, options return nil } +func (mc *BaseMongoConnector) doFullIntegrityCheck_sync(flowId iface.FlowID, options iface.ConnectorOptions) error { + var res iface.ConnectorDataIntegrityCheckResult + res.Success = false + + // 1. Get the list of fully qualified namespaces + namespaces, err := mc.getFQNamespaceList(options.Namespace) + if err != nil { + slog.Error(fmt.Sprintf("Failed to get fully qualified namespace list: %v", err)) + err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res) + if err != nil { + return err + } + } + + // Quick exit if there are no namespaces + if len(namespaces) == 0 { + res.Success = true + res.Count = 0 + res.Digest = "" + if err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res); err != nil { + return err + } + return nil + } + + slog.Debug(fmt.Sprintf("Namespaces for validation: %v", namespaces)) + + // create a map to store the results + namespacesSha256Map := make(map[string]string) + + // 2. Calculate the total number of documents for each namespace - we will parallelize this operation + + // create a channel to distribute tasks + taskChannel := make(chan iface.Namespace, len(namespaces)) + // create a channel to collect the results + resultChannel := make(chan nsHashResult, len(namespaces)) + + var wg sync.WaitGroup + wg.Add(mc.Settings.NumParallelIntegrityCheckTasks) + + mask := uint256.NewInt(0) + mask.SetAllOne() + + // start hashers + for i := 0; i < mc.Settings.NumParallelIntegrityCheckTasks; i++ { + go func() { + defer wg.Done() + completeHash := uint256.NewInt(1) + for ns := range taskChannel { + slog.Debug(fmt.Sprintf("Processing namespace: %v", ns)) + collection := mc.Client.Database(ns.Db).Collection(ns.Col) + cursor, err := collection.Find(mc.FlowCtx, bson.D{}) + if err != nil { + if errors.Is(context.Canceled, mc.FlowCtx.Err()) { + slog.Debug(fmt.Sprintf("Find error: %v, but the context was cancelled", err)) + } else { + slog.Error(fmt.Sprintf("Failed to find documents in collection: %v", err)) + } + continue + } + + for cursor.Next(mc.FlowCtx) { + rawData := cursor.Current + data := []byte(rawData) + + hash := sha256.Sum256(data) + hashInt := uint256.NewInt(1) + hashInt.SetBytes(hash[:]) + if hashInt.IsZero() { + hashInt.SetOne() + } + + // Multiplication is order independent, so it does not matter the order we are going to get the + // values in, we will still get the same hash as the result, if all the hashes were the same. + completeHash.MulMod(completeHash, hashInt, mask) + + if cursor.RemainingBatchLength() == 0 { //no more left in the batch + resultChannel <- nsHashResult{ns: ns, hash: completeHash, err: err} + completeHash = uint256.NewInt(1) + } + } + if err := cursor.Err(); err != nil { + if errors.Is(context.Canceled, mc.FlowCtx.Err()) { + slog.Debug(fmt.Sprintf("Cursor error: %v, but the context was cancelled", err)) + } else { + slog.Error(fmt.Sprintf("Cursor error: %v", err)) + } + } else { + _ = cursor.Close(mc.FlowCtx) + slog.Debug(fmt.Sprintf("Done hashing namespace: %v", ns)) + } + } + }() + } + + // iterate over all the namespaces and distribute them to workers + for _, task := range namespaces { + taskChannel <- task + } + + // close the task channel to signal workers that there are no more tasks + close(taskChannel) + + //wait for all hashers to finish + wg.Wait() + + close(resultChannel) + + for result := range resultChannel { + if result.err != nil { + slog.Error(fmt.Sprintf("Failed to hash documents for namespace %s: %v", NamespaceString(result.ns), result.err)) + if err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res); err != nil { + return err + } + return nil + } else { + namespacesSha256Map[NamespaceString(result.ns)] = result.hash.Hex() + } + } + + // 3. Arrange the namespaces in a lexicographical order + sortedNamespaces := make([]string, 0, len(namespacesSha256Map)) + for ns := range namespacesSha256Map { + sortedNamespaces = append(sortedNamespaces, ns) + } + sort.Strings(sortedNamespaces) + + // 4. Create a string, concatenating the namespaces and respective hash in the form of "namespace:hash" and using "," to join them + var concatenatedString string + for _, ns := range sortedNamespaces { + hash := namespacesSha256Map[ns] + concatenatedString += ns + ":" + hash + "," + } + // remove the trailing comma + concatenatedString = concatenatedString[:len(concatenatedString)-1] + + // 5. Calculate the SHA256 hash of the string + hash := sha256.Sum256([]byte(concatenatedString)) + + // post the result + res.Success = true + res.Count = 0 + res.Digest = hex.EncodeToString(hash[:]) + + if err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res); err != nil { + return err + } + return nil +} + // Returns a list of fully qualified namespaces func (mc *BaseMongoConnector) getFQNamespaceList(namespacesFilter []string) ([]iface.Namespace, error) { var dbsToResolve []string //database names that we need to resolve diff --git a/connectors/null/connector.go b/connectors/null/connector.go index e224d0f3..f760dc2c 100644 --- a/connectors/null/connector.go +++ b/connectors/null/connector.go @@ -143,7 +143,7 @@ func (nc *Connector) StartWriteFromChannel(flowId iface.FlowID, dataChannelId if return nil } -func (nc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error { +func (nc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error { //does nothing, no data to check return fmt.Errorf("null write connector does not support data integrity check") } diff --git a/connectors/random/connector.go b/connectors/random/connector.go index f099dcd4..fff2ee8e 100644 --- a/connectors/random/connector.go +++ b/connectors/random/connector.go @@ -267,7 +267,7 @@ func (rc *Connector) StartWriteFromChannel(flowId iface.FlowID, dataChannelId if return errors.New("RandomReadConnector does not write to destination") } -func (rc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error { +func (rc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error { //no client, errors return errors.New("RandomReadConnector does not have a client to request data integrity check") } diff --git a/connectors/testconn/connector.go b/connectors/testconn/connector.go index 9273f4ed..68a07a4f 100644 --- a/connectors/testconn/connector.go +++ b/connectors/testconn/connector.go @@ -114,7 +114,7 @@ func (c *connector) RequestCreateReadPlan(flowId iface.FlowID, options iface.Con } // RequestDataIntegrityCheck implements iface.Connector. -func (c *connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error { +func (c *connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error { // TODO: implement panic("unimplemented") } diff --git a/coordinators/simple/capabilities.go b/coordinators/simple/capabilities.go index 91bde223..851346cf 100644 --- a/coordinators/simple/capabilities.go +++ b/coordinators/simple/capabilities.go @@ -17,7 +17,7 @@ func calcSharedCapabilities(c1Caps iface.ConnectorCapabilities, c2Caps iface.Con // effectively a bitmask that will be later applied to individual connectors' capabilities // XXX: is there a better way to do this? Maybe a real bitmask? - caps := iface.ConnectorCapabilities{true, true, true, true} + caps := iface.ConnectorCapabilities{true, true, true, true, true} // Source and Sink are always true as they don't need to be shared // we only care about resumability right now diff --git a/coordinators/simple/coordinator.go b/coordinators/simple/coordinator.go index ad80ec8d..ce9a2bc9 100644 --- a/coordinators/simple/coordinator.go +++ b/coordinators/simple/coordinator.go @@ -420,8 +420,12 @@ func (c *Simple) NotifyTaskDone(flowId iface.FlowID, conn iface.ConnectorID, tas return fmt.Errorf("connector not part of the flow") } -func (c *Simple) PerformFlowIntegrityCheck(fid iface.FlowID) (iface.FlowDataIntegrityCheckResult, error) { - slog.Info("Initiating flow integrity check for flow with ID: " + fmt.Sprintf("%v", fid)) +func (c *Simple) PerformFlowIntegrityCheck(fid iface.FlowID, full bool) (iface.FlowDataIntegrityCheckResult, error) { + checkTypeStr := "basic" + if full { + checkTypeStr = "full" + } + slog.Info(fmt.Sprintf("Initiating flow %s integrity check for flow with ID: %v", checkTypeStr, fid)) res := iface.FlowDataIntegrityCheckResult{} @@ -441,8 +445,13 @@ func (c *Simple) PerformFlowIntegrityCheck(fid iface.FlowID) (iface.FlowDataInte return res, fmt.Errorf("destination connector %v not found", flowDet.Options.DstId) } - if !src.Details.Cap.IntegrityCheck || !dst.Details.Cap.IntegrityCheck { - return res, fmt.Errorf("one or both connectors don't support integrity checks") + checkTypeSupported := src.Details.Cap.IntegrityCheck && dst.Details.Cap.IntegrityCheck + if full { + checkTypeSupported = src.Details.Cap.FullIntegrityCheck && dst.Details.Cap.FullIntegrityCheck + } + + if !checkTypeSupported { + return res, fmt.Errorf(fmt.Sprintf("one or both connectors don't support %s integrity checks", checkTypeStr)) } // Wait for integrity check results asynchronously @@ -450,11 +459,11 @@ func (c *Simple) PerformFlowIntegrityCheck(fid iface.FlowID) (iface.FlowDataInte var resSource, resDestination iface.ConnectorDataIntegrityCheckResult // Request integrity check results from connectors - if err := src.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions); err != nil { + if err := src.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions, full); err != nil { slog.Error("Failed to request integrity check from source", err) return res, err } - if err := dst.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions); err != nil { //TODO (AK, 6/2024): should we have proper options here? (maybe even data validation-specific?) + if err := dst.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions, full); err != nil { //TODO (AK, 6/2024): should we have proper options here? (maybe even data validation-specific?) slog.Error("Failed to request integrity check from destination", err) return res, err } diff --git a/go.mod b/go.mod index 7d0990d1..64e8575e 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22.0 require ( github.com/gdamore/tcell/v2 v2.7.4 + github.com/holiman/uint256 v1.3.1 github.com/lmittmann/tint v1.0.5 github.com/mitchellh/hashstructure v1.1.0 github.com/rivo/tview v0.0.0-20240807095714-a8dd8799d63b diff --git a/go.sum b/go.sum index 4d90e888..8033a537 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/holiman/uint256 v1.3.1 h1:JfTzmih28bittyHM8z360dCjIA9dbPIBlcTI6lmctQs= +github.com/holiman/uint256 v1.3.1/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= diff --git a/internal/app/app.go b/internal/app/app.go index ea909f37..45ab089c 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -106,6 +106,7 @@ func runDsync(c *cli.Context) error { StateStoreConnString: o.StateStoreConnString, NsFromString: o.NamespaceFrom, VerifyRequestedFlag: o.Verify, + FullVerifyRequestedFlag: o.VerifyFully, CleanupRequestedFlag: o.Cleanup, FlowStatusReportingInterval: 10, CosmosDeletesEmuRequestedFlag: o.CosmosDeletesEmu, @@ -236,7 +237,7 @@ func runDsync(c *cli.Context) error { err := r.Setup(runnerCtx) if err == nil { err = r.Run() - if !o.Verify { //if verification was requested, the user should be able to see the results + if !o.Verify && !o.VerifyFully { //if verification was requested, the user should be able to see the results runnerCancelFunc() } } else { diff --git a/internal/app/options/flags.go b/internal/app/options/flags.go index 004b9e9e..2c14b107 100644 --- a/internal/app/options/flags.go +++ b/internal/app/options/flags.go @@ -127,6 +127,11 @@ func GetFlagsAndBeforeFunc() ([]cli.Flag, cli.BeforeFunc) { Usage: "perform a data integrity check for an existing flow", Category: "Special Commands", }), + altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "verify-fully", + Usage: "perform a full data integrity check for an existing flow", + Category: "Special Commands", + }), altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "cleanup", Usage: "cleanup metadata for an existing flow", diff --git a/internal/app/options/options.go b/internal/app/options/options.go index c2b190f7..d665c28f 100644 --- a/internal/app/options/options.go +++ b/internal/app/options/options.go @@ -26,10 +26,11 @@ type Options struct { NamespaceFrom []string - Verify bool - Cleanup bool - Progress bool - Reverse bool + Verify bool + VerifyFully bool + Cleanup bool + Progress bool + Reverse bool CosmosDeletesEmu bool @@ -38,19 +39,19 @@ type Options struct { PprofPort uint WebPort uint - LoadLevel string - InitialSyncNumParallelCopiers int - NumParallelWriters int - NumParallelIntegrityCheckTasks int - CosmosNumParallelPartitionWorkers int - CosmosReaderMaxNumNamespaces int - ServerConnectTimeout time.Duration - PingTimeout time.Duration - CdcResumeTokenUpdateInterval time.Duration - WriterMaxBatchSize int - CosmosTargetDocCountPerPartition int64 - CosmosDeletesCheckInterval time.Duration - Mode string + LoadLevel string + InitialSyncNumParallelCopiers int + NumParallelWriters int + NumParallelIntegrityCheckTasks int + CosmosNumParallelPartitionWorkers int + CosmosReaderMaxNumNamespaces int + ServerConnectTimeout time.Duration + PingTimeout time.Duration + CdcResumeTokenUpdateInterval time.Duration + WriterMaxBatchSize int + CosmosTargetDocCountPerPartition int64 + CosmosDeletesCheckInterval time.Duration + Mode string } func NewFromCLIContext(c *cli.Context) (Options, error) { @@ -65,6 +66,7 @@ func NewFromCLIContext(c *cli.Context) (Options, error) { o.Logfile = c.String("logfile") o.NamespaceFrom = c.Generic("namespace").(*ListFlag).Values o.Verify = c.Bool("verify") + o.VerifyFully = c.Bool("verify-fully") o.Cleanup = c.Bool("cleanup") o.Progress = c.Bool("progress") o.Pprof = c.Bool("pprof") diff --git a/protocol/iface/connector.go b/protocol/iface/connector.go index 58f99180..c5df1a30 100644 --- a/protocol/iface/connector.go +++ b/protocol/iface/connector.go @@ -19,10 +19,11 @@ type ConnectorType struct { // If something is announced as true, it can be later turned off through the setParameters call // But if something is announced as not supported (false), it can't be turned on type ConnectorCapabilities struct { - Source bool - Sink bool - IntegrityCheck bool - Resumability bool + Source bool + Sink bool + IntegrityCheck bool + FullIntegrityCheck bool + Resumability bool } // XXX (AK, 6/2024): not sure if it logically belongs here or to another iface file @@ -127,7 +128,7 @@ type ConnectorICoordinatorSignal interface { StartWriteFromChannel(flowId FlowID, dataChannel DataChannelID) error // Write data from the provided channel (async) Interrupt(flowId FlowID) error // Interrupt the flow (async) - RequestDataIntegrityCheck(flowId FlowID, options ConnectorOptions) error // Request a data integrity check based on a read plan (async) + RequestDataIntegrityCheck(flowId FlowID, options ConnectorOptions, full bool) error // Request a data integrity check based on a read plan (async) GetConnectorStatus(flowId FlowID) ConnectorStatus // Immediate and non-blocking } diff --git a/protocol/iface/coordinator.go b/protocol/iface/coordinator.go index f3355e76..c02ed4a8 100644 --- a/protocol/iface/coordinator.go +++ b/protocol/iface/coordinator.go @@ -20,13 +20,13 @@ type Coordinator interface { // User GetConnectors() []ConnectorDetails - FlowGetOrCreate(FlowOptions) (FlowID, error) // Get or create a flow if it doesn't exist - FlowStart(FlowID) error // Start the flow or resume it - FlowStop(FlowID) // Stop the flow - FlowDestroy(FlowID) // Destroy the flow and the associated metadata (also cleans up persisted state) - WaitForFlowDone(FlowID) error // Wait for the flow to be done - PerformFlowIntegrityCheck(FlowID) (FlowDataIntegrityCheckResult, error) // Perform an integrity check on the flow (synchronous) - GetFlowStatus(FlowID) (FlowStatus, error) // Get the status of the flow + FlowGetOrCreate(FlowOptions) (FlowID, error) // Get or create a flow if it doesn't exist + FlowStart(FlowID) error // Start the flow or resume it + FlowStop(FlowID) // Stop the flow + FlowDestroy(FlowID) // Destroy the flow and the associated metadata (also cleans up persisted state) + WaitForFlowDone(FlowID) error // Wait for the flow to be done + PerformFlowIntegrityCheck(FlowID, bool) (FlowDataIntegrityCheckResult, error) // Perform an integrity check on the flow (synchronous) + GetFlowStatus(FlowID) (FlowStatus, error) // Get the status of the flow CoordinatorIConnectorSignal } diff --git a/runners/local/runner.go b/runners/local/runner.go index 43cc8420..0fa44c8f 100644 --- a/runners/local/runner.go +++ b/runners/local/runner.go @@ -53,9 +53,10 @@ type RunnerLocalSettings struct { NsFromString []string - VerifyRequestedFlag bool - CleanupRequestedFlag bool - ReverseRequestedFlag bool + VerifyRequestedFlag bool + FullVerifyRequestedFlag bool + CleanupRequestedFlag bool + ReverseRequestedFlag bool FlowStatusReportingInterval time.Duration @@ -317,9 +318,9 @@ func (r *RunnerLocal) Run() error { r.activeFlowID = flowID //don't start the flow if the verify flag is set - if r.settings.VerifyRequestedFlag { - r.runnerProgress.SyncState = "Verify" - integrityCheckRes, err := r.coord.PerformFlowIntegrityCheck(flowID) + if r.settings.VerifyRequestedFlag || r.settings.FullVerifyRequestedFlag { + r.runnerProgress.SyncState = iface.VerifySyncState + integrityCheckRes, err := r.coord.PerformFlowIntegrityCheck(flowID, r.settings.FullVerifyRequestedFlag) if err != nil { slog.Error("Failed to perform flow integrity check", err) } else {