Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions connectors/common/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.akshayshah.org/memhttp"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"golang.org/x/time/rate"
)

const progressReportingIntervalSec = 10
Expand All @@ -51,6 +52,7 @@ type ConnectorSettings struct {
TransformClient adiomv1connect.TransformServiceClient
SourceDataType adiomv1.DataType
DestinationDataType adiomv1.DataType
WriteRateLimit int
}

type maybeOptimizedConnectorService interface {
Expand Down Expand Up @@ -81,6 +83,8 @@ type connector struct {
progressTracker *ProgressTracker

namespaceMappings map[string]string

limiter *rate.Limiter
}

func isRetryable(err error) bool {
Expand Down Expand Up @@ -944,13 +948,19 @@ func (c *connector) StartWriteFromChannel(flowId iface.FlowID, dataChannelID ifa
if dataMsg.MutationType == iface.MutationType_Barrier {
err := flowParallelWriter.ScheduleBarrier(dataMsg)
if err != nil {
if errors.Is(err, WriterClosedErr) {
break
}
slog.Error(fmt.Sprintf("Failed to schedule barrier message: %v", err))
}
} else {
// Process the data message
writerProgress.dataMessages.Add(1)
err := flowParallelWriter.ScheduleDataMessage(dataMsg)
if err != nil {
if errors.Is(err, WriterClosedErr) {
break
}
slog.Error(fmt.Sprintf("Failed to schedule data message: %v", err))
}
}
Expand Down Expand Up @@ -1048,12 +1058,17 @@ func NewConnector(desc string, impl adiomv1connect.ConnectorServiceClient, under
if maybeOptimizedConnectorService == nil {
underlying = impl
}
var limiter *rate.Limiter
if settings.WriteRateLimit > 0 {
limiter = rate.NewLimiter(rate.Limit(settings.WriteRateLimit), settings.WriteRateLimit)
}
return &connector{
desc: desc,
impl: impl,
maybeOptimizedImpl: maybeOptimizedConnectorService,
settings: settings,
namespaceMappings: map[string]string{},
limiter: limiter,
}
}

Expand All @@ -1064,6 +1079,11 @@ func (c *connector) ProcessDataMessages(dataMsgs []iface.DataMessage) error {
switch dataMsg.MutationType {
case iface.MutationType_InsertBatch:
if len(msgs) > 0 {
if c.limiter != nil {
if err := c.limiter.WaitN(c.flowCtx, len(msgs)); err != nil {
return err
}
}
ns := dataMsg.Loc
_, err := c.maybeOptimizedImpl.WriteUpdates(c.flowCtx, connect.NewRequest(&adiomv1.WriteUpdatesRequest{
Namespace: ns,
Expand All @@ -1076,6 +1096,11 @@ func (c *connector) ProcessDataMessages(dataMsgs []iface.DataMessage) error {
c.progressTracker.UpdateWriteLSN(dataMsgs[i-1].SeqNum)
msgs = nil
}
if c.limiter != nil {
if err := c.limiter.WaitN(c.flowCtx, len(dataMsg.DataBatch)); err != nil {
return err
}
}
_, err := c.maybeOptimizedImpl.WriteData(c.flowCtx, connect.NewRequest(&adiomv1.WriteDataRequest{
Namespace: dataMsg.Loc,
Data: dataMsg.DataBatch,
Expand Down Expand Up @@ -1107,6 +1132,11 @@ func (c *connector) ProcessDataMessages(dataMsgs []iface.DataMessage) error {
}
}
if len(msgs) > 0 {
if c.limiter != nil {
if err := c.limiter.WaitN(c.flowCtx, len(msgs)); err != nil {
return err
}
}
ns := dataMsgs[0].Loc
_, err := c.impl.WriteUpdates(c.flowCtx, connect.NewRequest(&adiomv1.WriteUpdatesRequest{
Namespace: ns,
Expand Down
27 changes: 24 additions & 3 deletions connectors/common/parallel_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package common

import (
"context"
"errors"
"fmt"
"hash/fnv"
"log/slog"
Expand All @@ -19,6 +20,8 @@ import (
"golang.org/x/sync/errgroup"
)

var WriterClosedErr = errors.New("writer closed")

type ParallelWriterConnector interface {
HandleBarrierMessage(iface.DataMessage) error
ProcessDataMessages([]iface.DataMessage) error
Expand Down Expand Up @@ -161,7 +164,7 @@ func (bwa *ParallelWriter) ScheduleBarrier(barrierMsg iface.DataMessage) error {
select {
case <-bwa.blockBarrier:
case <-bwa.ctx.Done():
return fmt.Errorf("writer closed")
return WriterClosedErr
}
}
slog.Debug("Blocking barrier unblocked.")
Expand Down Expand Up @@ -244,6 +247,9 @@ func (ww *writerWorker) run() error {
if len(batch) > 0 {
err := ww.parallelWriter.connector.ProcessDataMessages(batch)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
}
slog.Error(fmt.Sprintf("Worker %v failed to process data messages: %v", ww.id, err))
if err2 := ww.parallelWriter.connector.HandlerError(err); err2 != nil {
return err2
Expand All @@ -253,6 +259,9 @@ func (ww *writerWorker) run() error {
}
if len(multiBatch) > 0 {
if err := ww.sendMultiBatch(multiBatch); err != nil {
if errors.Is(err, context.Canceled) {
return err
}
slog.Error(fmt.Sprintf("Worker %v failed to process data messages: %v", ww.id, err))
if err2 := ww.parallelWriter.connector.HandlerError(err); err2 != nil {
return err2
Expand Down Expand Up @@ -288,6 +297,9 @@ func (ww *writerWorker) run() error {
if isLastWorker {
err := ww.parallelWriter.connector.HandleBarrierMessage(msg)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
}
slog.Error(fmt.Sprintf("Worker %v failed to handle barrier message: %v", ww.id, err))
if err2 := ww.parallelWriter.connector.HandlerError(err); err2 != nil {
return err2
Expand All @@ -303,6 +315,9 @@ func (ww *writerWorker) run() error {
multiBatchCount += 1
if (ww.parallelWriter.maxBatchSize > 0 && multiBatchCount >= ww.parallelWriter.maxBatchSize) || msg.MutationType == iface.MutationType_InsertBatch || len(ww.queue) == 0 {
if err := ww.sendMultiBatch(multiBatch); err != nil {
if errors.Is(err, context.Canceled) {
return err
}
slog.Error(fmt.Sprintf("Worker %v failed to process data messages: %v", ww.id, err))
if err2 := ww.parallelWriter.connector.HandlerError(err); err2 != nil {
return err2
Expand All @@ -318,6 +333,9 @@ func (ww *writerWorker) run() error {
if len(batch) > 0 && msg.Loc != batch[0].Loc {
err := ww.parallelWriter.connector.ProcessDataMessages(batch)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
}
slog.Error(fmt.Sprintf("Worker %v failed to process data messages: %v", ww.id, err))
if err2 := ww.parallelWriter.connector.HandlerError(err); err2 != nil {
return err2
Expand All @@ -331,6 +349,9 @@ func (ww *writerWorker) run() error {
if (ww.parallelWriter.maxBatchSize > 0 && len(batch) >= ww.parallelWriter.maxBatchSize) || msg.MutationType == iface.MutationType_InsertBatch || len(ww.queue) == 0 {
err := ww.parallelWriter.connector.ProcessDataMessages(batch)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
}
if msg.MutationType == iface.MutationType_InsertBatch {
d := 0
if batch[0].Data != nil {
Expand All @@ -355,14 +376,14 @@ func (ww *writerWorker) addMessage(msg iface.DataMessage) error {
select {
case ww.queue <- msg:
case <-ww.parallelWriter.ctx.Done():
return fmt.Errorf("writer closed")
return WriterClosedErr
}
if ww.clogSize > 0 && msg.MutationType == iface.MutationType_InsertBatch {
for range ww.clogSize {
select {
case ww.queue <- iface.DataMessage{MutationType: iface.MutationType_Ignore}:
case <-ww.parallelWriter.ctx.Done():
return fmt.Errorf("writer closed")
return WriterClosedErr
}
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func runDsync(c *cli.Context) error {
MultinamespaceBatcher: o.MultinamespaceBatcher,
SyncMode: o.Mode,
ReverseRequestedFlag: o.Reverse,
WriteRateLimit: o.WriteRateLimit,
})

var wg sync.WaitGroup
Expand Down
4 changes: 4 additions & 0 deletions internal/app/options/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func GetFlagsAndBeforeFunc() ([]cli.Flag, cli.BeforeFunc) {
Required: false,
Hidden: true,
}),
altsrc.NewIntFlag(&cli.IntFlag{
Name: "write-rate-limit",
Usage: "If set, overall per second rate limit on the destination. If set, must be larger than any batch sizes.",
}),
Comment on lines +187 to +190
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find the flags.go file and examine the relevant lines
find . -name "flags.go" -type f | head -5

# Also look for batch size references
rg -l "batch" --type go | head -10

Repository: adiom-data/dsync

Length of output: 391


🏁 Script executed:

#!/bin/bash
# Search for batch size flags and write-rate-limit references
echo "=== Searching for batch size and rate limit flags ==="
rg -n "batch|rate-limit|WriterMaxBatchSize" internal/app/options/flags.go | head -30

echo -e "\n=== Look for write-rate-limit flag specifically ==="
rg -n "write-rate-limit" internal/app/options/flags.go -A 3 -B 1

echo -e "\n=== Search for validation logic ==="
rg -n "validate|Validate" internal/app/options/ -A 2 | head -40

echo -e "\n=== Check if there's flag validation in the app ==="
rg -n "func.*Before|flag.*validation" internal/app/ --type go | head -20

Repository: adiom-data/dsync

Length of output: 788


🏁 Script executed:

#!/bin/bash
# Look at the GetFlagsAndBeforeFunc function to see what validation happens
echo "=== GetFlagsAndBeforeFunc function ==="
sed -n '42,200p' internal/app/options/flags.go | head -100

echo -e "\n=== Look for writer-batch-size flag ==="
rg -n "writer-batch-size" internal/app/options/flags.go -A 3 -B 1

echo -e "\n=== Search for validation that uses write-rate-limit and batch sizes together ==="
rg -n "write.*rate.*limit|batch.*size" internal/app/ --type go -i

echo -e "\n=== Check options.go for any validation ==="
rg -n "validate|Validate|rate.*limit" internal/app/options/options.go -A 2

Repository: adiom-data/dsync

Length of output: 6189


🏁 Script executed:

#!/bin/bash
# Check how WriterMaxBatchSize and WriteRateLimit are used in app.go
echo "=== How WriterMaxBatchSize and WriteRateLimit are used in app.go ==="
rg -n "WriterMaxBatchSize|WriteRateLimit" internal/app/app.go -B 2 -A 2

echo -e "\n=== Look at the full context around line 242-246 in app.go ==="
sed -n '230,260p' internal/app/app.go

echo -e "\n=== Search for any validation or constraint checks anywhere ==="
rg -n "must be larger|rate.*limit.*batch|batch.*rate" . --type go -i

echo -e "\n=== Check if there's validation in the Before function ==="
rg -n "BeforeFunc" internal/app/options/flags.go -A 50 | head -60

Repository: adiom-data/dsync

Length of output: 4885


🏁 Script executed:

#!/bin/bash
# Find where the BeforeFunc is returned and what it does
echo "=== Find the end of GetFlagsAndBeforeFunc to see BeforeFunc ==="
rg -n "return.*flags|BeforeFunc" internal/app/options/flags.go | tail -20

echo -e "\n=== Get the last part of the file to see BeforeFunc implementation ==="
tail -100 internal/app/options/flags.go

echo -e "\n=== Check if options.go has validation after parsing flags ==="
cat internal/app/options/options.go

Repository: adiom-data/dsync

Length of output: 5680


🏁 Script executed:

#!/bin/bash
# Let's verify the exact state of the flags and see if there are any runtime checks
echo "=== Check how WriterMaxBatchSize is used at runtime ==="
rg -n "WriterMaxBatchSize" . --type go -B 1 -A 1 | head -40

echo -e "\n=== Check how WriteRateLimit is used at runtime ==="
rg -n "WriteRateLimit" . --type go -B 1 -A 1 | head -40

echo -e "\n=== Check for any validation in the runner or flow logic ==="
rg -n "batch.*rate|rate.*batch|WriteRateLimit.*Batch|WriterMaxBatchSize.*Rate" . --type go -i

Repository: adiom-data/dsync

Length of output: 4840


🏁 Script executed:

#!/bin/bash
# Let's see the complete usage of WriteRateLimit when the limiter is created
echo "=== Full context of rate limiter creation ==="
sed -n '1055,1075p' ./connectors/common/base.go

echo -e "\n=== Check if there's any validation between flags and limiter creation ==="
rg -n "WriteRateLimit\|WriterMaxBatchSize" ./runners/local/runner.go -B 3 -A 3

Repository: adiom-data/dsync

Length of output: 794


🏁 Script executed:

#!/bin/bash
# Let's check the Before function more carefully for any validation
echo "=== Complete Before function ==="
sed -n '191,205p' ./internal/app/options/flags.go

echo -e "\n=== Check if there's validation in app.go before passing to runner ==="
sed -n '200,250p' ./internal/app/app.go | head -50

echo -e "\n=== Check for any Action handlers on write-rate-limit flag ==="
rg -n "write-rate-limit" ./internal/app/options/flags.go -B 5 -A 10

Repository: adiom-data/dsync

Length of output: 3144


Add validation to enforce write-rate-limit constraint against batch sizes.

The flag usage text states "must be larger than any batch sizes," but there is no validation enforcing this constraint. The Before function (lines 193-198) only validates flag combinations for progress/logfile. Add validation to check that write-rate-limit exceeds writer-batch-size when both are set, preventing misconfiguration that could cause confusing runtime behavior.

🤖 Prompt for AI Agents
In internal/app/options/flags.go around lines 187-190 (and add logic into the
Before function near lines 193-198), the flag usage claims write-rate-limit
"must be larger than any batch sizes" but no validation enforces that; add a
check in the Before hook that if both write-rate-limit and writer-batch-size are
set (non-zero) then write-rate-limit > writer-batch-size, and return a
descriptive error (e.g. "write-rate-limit must be larger than
writer-batch-size") when the constraint is violated so misconfiguration is
rejected at startup.

}

before := func(c *cli.Context) error {
Expand Down
4 changes: 4 additions & 0 deletions internal/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Options struct {
WriterMaxBatchSize int
MultinamespaceBatcher bool
Mode string

WriteRateLimit int
}

// works with a copy of the struct to avoid modifying the original
Expand Down Expand Up @@ -78,5 +80,7 @@ func NewFromCLIContext(c *cli.Context) (Options, error) {
o.Mode = c.String("mode")
o.Reverse = c.Bool("reverse")

o.WriteRateLimit = c.Int("write-rate-limit")

return o, nil
}
4 changes: 4 additions & 0 deletions runners/local/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type RunnerLocalSettings struct {
WriterMaxBatchSize int
SyncMode string
MultinamespaceBatcher bool

WriteRateLimit int
}

const (
Expand Down Expand Up @@ -109,6 +111,7 @@ func NewRunnerLocal(settings RunnerLocalSettings) *RunnerLocal {
TransformClient: settings.TransformClient,
SourceDataType: settings.SrcDataType,
DestinationDataType: settings.DstDataType,
WriteRateLimit: settings.WriteRateLimit,
}
if settings.LoadLevel != "" {
btc := GetBaseThreadCount(settings.LoadLevel)
Expand Down Expand Up @@ -307,6 +310,7 @@ func (r *RunnerLocal) GracefulShutdown() {
if r.cancelIntegrityCtx != nil {
r.cancelIntegrityCtx()
}
_ = r.dst.Interrupt(r.activeFlowID)
}

func (r *RunnerLocal) Teardown() {
Expand Down