Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connectors/cosmos/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (cc *Connector) Setup(ctx context.Context, t iface.Transport) error {
// Instantiate ConnectorType
cc.ConnectorType = iface.ConnectorType{DbType: connectorDBType, Version: version.(string), Spec: connectorSpec}
// Instantiate ConnectorCapabilities, current capabilities are source only
cc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, Resumability: true}
cc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, FullIntegrityCheck: true, Resumability: true}
// Instantiate ConnectorStatus
progressMetrics := iface.ProgressMetrics{
NumDocsSynced: 0,
Expand Down
10 changes: 7 additions & 3 deletions connectors/mongo/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (mc *Connector) Setup(ctx context.Context, t iface.Transport) error {
// Instantiate ConnectorType
mc.ConnectorType = iface.ConnectorType{DbType: connectorDBType, Version: version.(string), Spec: connectorSpec}
// Instantiate ConnectorCapabilities
mc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, Resumability: true}
mc.ConnectorCapabilities = iface.ConnectorCapabilities{Source: true, Sink: true, IntegrityCheck: true, FullIntegrityCheck: true, Resumability: true}
// Instantiate ConnectorStatus
mc.Status = iface.ConnectorStatus{WriteLSN: 0}

Expand Down Expand Up @@ -545,9 +545,13 @@ func (mc *Connector) StartWriteFromChannel(flowId iface.FlowID, dataChannelId if
return nil
}

func (mc *BaseMongoConnector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error {
func (mc *BaseMongoConnector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error {
// need to make this async to honor the spec
go mc.doIntegrityCheck_sync(flowId, options)
if full {
go mc.doFullIntegrityCheck_sync(flowId, options)
} else {
go mc.doIntegrityCheck_sync(flowId, options)
}

return nil
}
Expand Down
164 changes: 163 additions & 1 deletion connectors/mongo/integrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@ package mongo
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"log/slog"
"sort"
"strings"
"sync"

"github.com/adiom-data/dsync/protocol/iface"

// TODO: Check how to add a new package properly
"github.com/holiman/uint256"
)

func NamespaceString(ns iface.Namespace) string {
Expand All @@ -28,10 +34,16 @@ type nsCountResult struct {
err error
}

type nsHashResult struct {
ns iface.Namespace
hash *uint256.Int
err error
}

// doIntegrityCheck performs a data integrity check on the underlying data store
// _sync is a synchronous version of this function
func (mc *BaseMongoConnector) doIntegrityCheck_sync(flowId iface.FlowID, options iface.ConnectorOptions) error {
//XXX: should we use/create flowContext here in case it becomes a part of the flow and we want to have ability to interrupt?
//XXX: should we use/create flowContext here in case it becomes a part of the flow, and we want to have ability to interrupt?

var res iface.ConnectorDataIntegrityCheckResult
res.Success = false
Expand Down Expand Up @@ -141,6 +153,156 @@ func (mc *BaseMongoConnector) doIntegrityCheck_sync(flowId iface.FlowID, options
return nil
}

func (mc *BaseMongoConnector) doFullIntegrityCheck_sync(flowId iface.FlowID, options iface.ConnectorOptions) error {
var res iface.ConnectorDataIntegrityCheckResult
res.Success = false

// 1. Get the list of fully qualified namespaces
namespaces, err := mc.getFQNamespaceList(options.Namespace)
if err != nil {
slog.Error(fmt.Sprintf("Failed to get fully qualified namespace list: %v", err))
err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res)
if err != nil {
return err
}
}

// Quick exit if there are no namespaces
if len(namespaces) == 0 {
res.Success = true
res.Count = 0
res.Digest = ""
if err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res); err != nil {
return err
}
return nil
}

slog.Debug(fmt.Sprintf("Namespaces for validation: %v", namespaces))

// create a map to store the results
namespacesSha256Map := make(map[string]string)

// 2. Calculate the total number of documents for each namespace - we will parallelize this operation

// create a channel to distribute tasks
taskChannel := make(chan iface.Namespace, len(namespaces))
// create a channel to collect the results
resultChannel := make(chan nsHashResult, len(namespaces))

var wg sync.WaitGroup
wg.Add(mc.Settings.NumParallelIntegrityCheckTasks)

mask := uint256.NewInt(0)
mask.SetAllOne()

// start hashers
for i := 0; i < mc.Settings.NumParallelIntegrityCheckTasks; i++ {
go func() {
defer wg.Done()
completeHash := uint256.NewInt(1)
for ns := range taskChannel {
slog.Debug(fmt.Sprintf("Processing namespace: %v", ns))
collection := mc.Client.Database(ns.Db).Collection(ns.Col)
cursor, err := collection.Find(mc.FlowCtx, bson.D{})
if err != nil {
if errors.Is(context.Canceled, mc.FlowCtx.Err()) {
slog.Debug(fmt.Sprintf("Find error: %v, but the context was cancelled", err))
} else {
slog.Error(fmt.Sprintf("Failed to find documents in collection: %v", err))
}
continue
}

for cursor.Next(mc.FlowCtx) {
rawData := cursor.Current
data := []byte(rawData)

hash := sha256.Sum256(data)
hashInt := uint256.NewInt(1)
hashInt.SetBytes(hash[:])
if hashInt.IsZero() {
hashInt.SetOne()
}

// Multiplication is order independent, so it does not matter the order we are going to get the
// values in, we will still get the same hash as the result, if all the hashes were the same.
completeHash.MulMod(completeHash, hashInt, mask)

if cursor.RemainingBatchLength() == 0 { //no more left in the batch
resultChannel <- nsHashResult{ns: ns, hash: completeHash, err: err}
completeHash = uint256.NewInt(1)
}
}
if err := cursor.Err(); err != nil {
if errors.Is(context.Canceled, mc.FlowCtx.Err()) {
slog.Debug(fmt.Sprintf("Cursor error: %v, but the context was cancelled", err))
} else {
slog.Error(fmt.Sprintf("Cursor error: %v", err))
}
} else {
_ = cursor.Close(mc.FlowCtx)
slog.Debug(fmt.Sprintf("Done hashing namespace: %v", ns))
}
}
}()
}

// iterate over all the namespaces and distribute them to workers
for _, task := range namespaces {
taskChannel <- task
}

// close the task channel to signal workers that there are no more tasks
close(taskChannel)

//wait for all hashers to finish
wg.Wait()

close(resultChannel)

for result := range resultChannel {
if result.err != nil {
slog.Error(fmt.Sprintf("Failed to hash documents for namespace %s: %v", NamespaceString(result.ns), result.err))
if err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res); err != nil {
return err
}
return nil
} else {
namespacesSha256Map[NamespaceString(result.ns)] = result.hash.Hex()
}
}

// 3. Arrange the namespaces in a lexicographical order
sortedNamespaces := make([]string, 0, len(namespacesSha256Map))
for ns := range namespacesSha256Map {
sortedNamespaces = append(sortedNamespaces, ns)
}
sort.Strings(sortedNamespaces)

// 4. Create a string, concatenating the namespaces and respective hash in the form of "namespace:hash" and using "," to join them
var concatenatedString string
for _, ns := range sortedNamespaces {
hash := namespacesSha256Map[ns]
concatenatedString += ns + ":" + hash + ","
}
// remove the trailing comma
concatenatedString = concatenatedString[:len(concatenatedString)-1]

// 5. Calculate the SHA256 hash of the string
hash := sha256.Sum256([]byte(concatenatedString))

// post the result
res.Success = true
res.Count = 0
res.Digest = hex.EncodeToString(hash[:])

if err := mc.Coord.PostDataIntegrityCheckResult(flowId, mc.ID, res); err != nil {
return err
}
return nil
}

// Returns a list of fully qualified namespaces
func (mc *BaseMongoConnector) getFQNamespaceList(namespacesFilter []string) ([]iface.Namespace, error) {
var dbsToResolve []string //database names that we need to resolve
Expand Down
2 changes: 1 addition & 1 deletion connectors/null/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (nc *Connector) StartWriteFromChannel(flowId iface.FlowID, dataChannelId if
return nil
}

func (nc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error {
func (nc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error {
//does nothing, no data to check
return fmt.Errorf("null write connector does not support data integrity check")
}
Expand Down
2 changes: 1 addition & 1 deletion connectors/random/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (rc *Connector) StartWriteFromChannel(flowId iface.FlowID, dataChannelId if
return errors.New("RandomReadConnector does not write to destination")
}

func (rc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error {
func (rc *Connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error {
//no client, errors
return errors.New("RandomReadConnector does not have a client to request data integrity check")
}
Expand Down
2 changes: 1 addition & 1 deletion connectors/testconn/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *connector) RequestCreateReadPlan(flowId iface.FlowID, options iface.Con
}

// RequestDataIntegrityCheck implements iface.Connector.
func (c *connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions) error {
func (c *connector) RequestDataIntegrityCheck(flowId iface.FlowID, options iface.ConnectorOptions, full bool) error {
// TODO: implement
panic("unimplemented")
}
Expand Down
2 changes: 1 addition & 1 deletion coordinators/simple/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func calcSharedCapabilities(c1Caps iface.ConnectorCapabilities, c2Caps iface.Con
// effectively a bitmask that will be later applied to individual connectors' capabilities

// XXX: is there a better way to do this? Maybe a real bitmask?
caps := iface.ConnectorCapabilities{true, true, true, true}
caps := iface.ConnectorCapabilities{true, true, true, true, true}

// Source and Sink are always true as they don't need to be shared
// we only care about resumability right now
Expand Down
21 changes: 15 additions & 6 deletions coordinators/simple/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,12 @@ func (c *Simple) NotifyTaskDone(flowId iface.FlowID, conn iface.ConnectorID, tas
return fmt.Errorf("connector not part of the flow")
}

func (c *Simple) PerformFlowIntegrityCheck(fid iface.FlowID) (iface.FlowDataIntegrityCheckResult, error) {
slog.Info("Initiating flow integrity check for flow with ID: " + fmt.Sprintf("%v", fid))
func (c *Simple) PerformFlowIntegrityCheck(fid iface.FlowID, full bool) (iface.FlowDataIntegrityCheckResult, error) {
checkTypeStr := "basic"
if full {
checkTypeStr = "full"
}
slog.Info(fmt.Sprintf("Initiating flow %s integrity check for flow with ID: %v", checkTypeStr, fid))

res := iface.FlowDataIntegrityCheckResult{}

Expand All @@ -441,20 +445,25 @@ func (c *Simple) PerformFlowIntegrityCheck(fid iface.FlowID) (iface.FlowDataInte
return res, fmt.Errorf("destination connector %v not found", flowDet.Options.DstId)
}

if !src.Details.Cap.IntegrityCheck || !dst.Details.Cap.IntegrityCheck {
return res, fmt.Errorf("one or both connectors don't support integrity checks")
checkTypeSupported := src.Details.Cap.IntegrityCheck && dst.Details.Cap.IntegrityCheck
if full {
checkTypeSupported = src.Details.Cap.FullIntegrityCheck && dst.Details.Cap.FullIntegrityCheck
}

if !checkTypeSupported {
return res, fmt.Errorf(fmt.Sprintf("one or both connectors don't support %s integrity checks", checkTypeStr))
}

// Wait for integrity check results asynchronously
slog.Debug("Waiting for integrity check results")
var resSource, resDestination iface.ConnectorDataIntegrityCheckResult

// Request integrity check results from connectors
if err := src.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions); err != nil {
if err := src.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions, full); err != nil {
slog.Error("Failed to request integrity check from source", err)
return res, err
}
if err := dst.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions); err != nil { //TODO (AK, 6/2024): should we have proper options here? (maybe even data validation-specific?)
if err := dst.Endpoint.RequestDataIntegrityCheck(fid, flowDet.Options.SrcConnectorOptions, full); err != nil { //TODO (AK, 6/2024): should we have proper options here? (maybe even data validation-specific?)
slog.Error("Failed to request integrity check from destination", err)
return res, err
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.0

require (
github.com/gdamore/tcell/v2 v2.7.4
github.com/holiman/uint256 v1.3.1
github.com/lmittmann/tint v1.0.5
github.com/mitchellh/hashstructure v1.1.0
github.com/rivo/tview v0.0.0-20240807095714-a8dd8799d63b
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/holiman/uint256 v1.3.1 h1:JfTzmih28bittyHM8z360dCjIA9dbPIBlcTI6lmctQs=
github.com/holiman/uint256 v1.3.1/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
Expand Down
3 changes: 2 additions & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func runDsync(c *cli.Context) error {
StateStoreConnString: o.StateStoreConnString,
NsFromString: o.NamespaceFrom,
VerifyRequestedFlag: o.Verify,
FullVerifyRequestedFlag: o.VerifyFully,
CleanupRequestedFlag: o.Cleanup,
FlowStatusReportingInterval: 10,
CosmosDeletesEmuRequestedFlag: o.CosmosDeletesEmu,
Expand Down Expand Up @@ -236,7 +237,7 @@ func runDsync(c *cli.Context) error {
err := r.Setup(runnerCtx)
if err == nil {
err = r.Run()
if !o.Verify { //if verification was requested, the user should be able to see the results
if !o.Verify && !o.VerifyFully { //if verification was requested, the user should be able to see the results
runnerCancelFunc()
}
} else {
Expand Down
5 changes: 5 additions & 0 deletions internal/app/options/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func GetFlagsAndBeforeFunc() ([]cli.Flag, cli.BeforeFunc) {
Usage: "perform a data integrity check for an existing flow",
Category: "Special Commands",
}),
altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "verify-fully",
Usage: "perform a full data integrity check for an existing flow",
Category: "Special Commands",
}),
altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "cleanup",
Usage: "cleanup metadata for an existing flow",
Expand Down
Loading