Skip to content

Conversation

@adiom-mark
Copy link
Collaborator

@adiom-mark adiom-mark commented Dec 16, 2025

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced write rate limiting to control the throughput of write operations to the destination. Configure the per-second rate limit using the new write-rate-limit CLI option. The limit must exceed configured batch sizes.
  • Improvements

    • Enhanced error handling for writer closure and context cancellation scenarios.
    • Improved graceful shutdown to properly interrupt destination write flows.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 16, 2025

Walkthrough

This PR adds write-rate-limiting to the connector system by introducing a WriteRateLimit configuration option, threading it from CLI flags through runner settings into connector configurations, and implementing rate.Limiter in base.go to throttle batched write operations. It also introduces WriterClosedErr sentinel, adds a HandlerError method to the ParallelWriterConnector interface, and enhances error handling for context cancellation.

Changes

Cohort / File(s) Summary
Configuration & CLI Setup
internal/app/options/flags.go, internal/app/options/options.go
Added new write-rate-limit CLI flag (int) specifying per-second rate limit on destination writes, and wired it into Options struct as WriteRateLimit field
Runner Configuration Wiring
internal/app/app.go, runners/local/runner.go
Extended RunnerLocalSettings with WriteRateLimit field, threaded it into ConnectorSettings for both source and destination connectors, and updated GracefulShutdown to interrupt destination connector flow after integrity context cancellation
Rate Limiting Implementation
connectors/common/base.go
Implemented rate limiting via golang.org/x/time/rate with configurable rate and burst; added WaitN calls before batched writes in StartWriteFromChannel and ProcessDataMessages paths; creates limiter when WriteRateLimit > 0
Error Handling & Interface Changes
connectors/common/parallel_writer.go
Introduced WriterClosedErr sentinel error replacing plain "writer closed" errors; added HandlerError method to ParallelWriterConnector interface; integrated context.Canceled propagation in worker and barrier execution paths to short-circuit on cancellation

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Rate limiter placement in base.go: Verify WaitN calls are positioned correctly before write operations to avoid deadlocks or performance bottlenecks, and confirm the limiter initialization logic (creation when WriteRateLimit > 0) is sound
  • context.Canceled handling in parallel_writer.go: Ensure context cancellation is consistently propagated across all worker/barrier processing paths and that error routing logic (logging vs. HandlerError vs. early exit) is correct
  • HandlerError interface contract: Verify all implementations of ParallelWriterConnector properly implement the new HandlerError method and that callers invoke it correctly

Possibly related PRs

  • #346: Modifies connectors/common/parallel_writer.go for writer shutdown/error handling with sentinel errors and error-handling path adjustments
  • #332: Modifies ParallelWriter interface and connector write paths (StartWriteFromChannel) with HandlerError integration and error propagation changes

Suggested reviewers

  • alex-thc

Poem

🐰 A limiter hops through the write-path stream,
Rate-limiting writes like a data-flow dream!
Context cancels fast, and errors shine bright,
HandlerError handles both wrong and right—
The connector now throttles with graceful delight! ✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 16.67% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Add write-rate-limit' directly describes the main feature being introduced across multiple files: a write rate limiting capability with new WriteRateLimit configuration options, rate limiter integration, and related error handling improvements.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch writeratelimit

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
connectors/common/parallel_writer.go (1)

28-28: Note the interface change to ParallelWriterConnector.

Adding the HandlerError(error) error method to the ParallelWriterConnector interface is a breaking change. However, this appears to be an internal interface within the connectors package, so the impact should be limited.

If this interface has external implementations, ensure all implementations are updated to include the new method. Run this script to verify:

#!/bin/bash
# Find all implementations of ParallelWriterConnector
ast-grep --pattern $'type $_ struct {
  $$$
}

func ($_ *$_) HandleBarrierMessage($$$) error {
  $$$
}'

# Look for missing HandlerError implementations
rg -nP 'func \(\w+ \*\w+\) HandleBarrierMessage' -C5 | rg -v 'HandlerError'
🧹 Nitpick comments (1)
connectors/common/base.go (1)

1061-1064: Note the conservative burst size choice.

The limiter is initialized with burst size equal to the rate limit (rate.NewLimiter(rate.Limit(settings.WriteRateLimit), settings.WriteRateLimit)). This means the token bucket can hold exactly 1 second worth of operations.

While valid, this conservative choice means large batches will always wait proportionally, even at startup. Consider whether a larger burst (e.g., 2 * settings.WriteRateLimit) would better accommodate initial bursts while still maintaining the average rate.

If you want to allow a larger initial burst, apply this change:

 	var limiter *rate.Limiter
 	if settings.WriteRateLimit > 0 {
-		limiter = rate.NewLimiter(rate.Limit(settings.WriteRateLimit), settings.WriteRateLimit)
+		limiter = rate.NewLimiter(rate.Limit(settings.WriteRateLimit), settings.WriteRateLimit*2)
 	}

Also applies to: 1071-1071

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 559d9d4 and 1e7414c.

📒 Files selected for processing (6)
  • connectors/common/base.go (8 hunks)
  • connectors/common/parallel_writer.go (10 hunks)
  • internal/app/app.go (1 hunks)
  • internal/app/options/flags.go (1 hunks)
  • internal/app/options/options.go (2 hunks)
  • runners/local/runner.go (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
connectors/common/base.go (2)
connectors/dynamodb/stream/helper.go (1)
  • Limiter (23-25)
connectors/common/parallel_writer.go (1)
  • WriterClosedErr (23-23)
connectors/common/parallel_writer.go (1)
protocol/iface/transport.go (3)
  • MutationType_InsertBatch (39-39)
  • DataMessage (16-34)
  • MutationType_Ignore (42-42)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (11)
internal/app/options/options.go (1)

46-46: LGTM!

The WriteRateLimit field addition and wiring from CLI context is straightforward and consistent with other options.

Also applies to: 83-83

internal/app/app.go (1)

246-246: LGTM!

The WriteRateLimit wiring into RunnerLocalSettings is correct and consistent with other configuration fields.

runners/local/runner.go (2)

313-313: LGTM!

Adding r.dst.Interrupt(r.activeFlowID) to GracefulShutdown ensures the destination flow is properly interrupted during shutdown, which is especially important when rate limiting is active to prevent hanging write operations.


83-83: The review comment is incorrect. While WriteRateLimit is passed to both source and destination connectors via connectorSettings, the rate limiter is only checked within ProcessDataMessages(), which is exclusively a write operation. Source connectors only invoke read operations (StartReadToChannel, RequestCreateReadPlan, etc.) and never call ProcessDataMessages(). Therefore, the limiter does not throttle reads from source connectors.

Likely an incorrect or invalid review comment.

connectors/common/base.go (4)

55-55: LGTM!

Adding WriteRateLimit to ConnectorSettings properly threads the configuration through to connectors.


1082-1086: LGTM!

The rate limiter is correctly applied before all write operations:

  • Uses limiter.WaitN to reserve tokens for batch operations
  • Counts individual messages/documents via len(msgs) or len(dataMsg.DataBatch)
  • Uses c.flowCtx for proper cancellation support
  • Returns errors immediately for graceful shutdown

This implementation correctly enforces the per-second rate limit on the destination connector.

Also applies to: 1099-1103, 1135-1139


951-953: LGTM!

Properly checks for WriterClosedErr sentinel to detect when the writer has been closed, allowing the write loop to break cleanly without logging spurious errors.

Also applies to: 961-963


37-37: No action required. The project is already using golang.org/x/time v0.12.0 (a recent, stable version) and the codebase properly implements rate limiting with WaitN and context cancellation support across multiple connectors.

connectors/common/parallel_writer.go (3)

23-23: LGTM!

Introducing WriterClosedErr as a sentinel error is good practice. It allows callers to distinguish between writer shutdown and other errors using errors.Is(), enabling cleaner error handling during graceful shutdown.


250-252: LGTM!

The enhanced context cancellation handling is excellent:

  • Detects context.Canceled errors using errors.Is()
  • Returns immediately without logging spurious errors
  • Allows graceful shutdown without error spam
  • Consistent pattern applied across all write paths

This properly integrates with the rate limiter, which may return context.Canceled when WaitN is interrupted during shutdown.

Also applies to: 262-264, 300-302, 318-320, 336-338, 352-354


167-167: LGTM!

Returning WriterClosedErr when the context is done while attempting to send messages is appropriate. This allows callers to distinguish between clean shutdown scenarios and actual errors, reducing spurious error logging.

Also applies to: 379-379, 386-386

Comment on lines +187 to +190
altsrc.NewIntFlag(&cli.IntFlag{
Name: "write-rate-limit",
Usage: "If set, overall per second rate limit on the destination. If set, must be larger than any batch sizes.",
}),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find the flags.go file and examine the relevant lines
find . -name "flags.go" -type f | head -5

# Also look for batch size references
rg -l "batch" --type go | head -10

Repository: adiom-data/dsync

Length of output: 391


🏁 Script executed:

#!/bin/bash
# Search for batch size flags and write-rate-limit references
echo "=== Searching for batch size and rate limit flags ==="
rg -n "batch|rate-limit|WriterMaxBatchSize" internal/app/options/flags.go | head -30

echo -e "\n=== Look for write-rate-limit flag specifically ==="
rg -n "write-rate-limit" internal/app/options/flags.go -A 3 -B 1

echo -e "\n=== Search for validation logic ==="
rg -n "validate|Validate" internal/app/options/ -A 2 | head -40

echo -e "\n=== Check if there's flag validation in the app ==="
rg -n "func.*Before|flag.*validation" internal/app/ --type go | head -20

Repository: adiom-data/dsync

Length of output: 788


🏁 Script executed:

#!/bin/bash
# Look at the GetFlagsAndBeforeFunc function to see what validation happens
echo "=== GetFlagsAndBeforeFunc function ==="
sed -n '42,200p' internal/app/options/flags.go | head -100

echo -e "\n=== Look for writer-batch-size flag ==="
rg -n "writer-batch-size" internal/app/options/flags.go -A 3 -B 1

echo -e "\n=== Search for validation that uses write-rate-limit and batch sizes together ==="
rg -n "write.*rate.*limit|batch.*size" internal/app/ --type go -i

echo -e "\n=== Check options.go for any validation ==="
rg -n "validate|Validate|rate.*limit" internal/app/options/options.go -A 2

Repository: adiom-data/dsync

Length of output: 6189


🏁 Script executed:

#!/bin/bash
# Check how WriterMaxBatchSize and WriteRateLimit are used in app.go
echo "=== How WriterMaxBatchSize and WriteRateLimit are used in app.go ==="
rg -n "WriterMaxBatchSize|WriteRateLimit" internal/app/app.go -B 2 -A 2

echo -e "\n=== Look at the full context around line 242-246 in app.go ==="
sed -n '230,260p' internal/app/app.go

echo -e "\n=== Search for any validation or constraint checks anywhere ==="
rg -n "must be larger|rate.*limit.*batch|batch.*rate" . --type go -i

echo -e "\n=== Check if there's validation in the Before function ==="
rg -n "BeforeFunc" internal/app/options/flags.go -A 50 | head -60

Repository: adiom-data/dsync

Length of output: 4885


🏁 Script executed:

#!/bin/bash
# Find where the BeforeFunc is returned and what it does
echo "=== Find the end of GetFlagsAndBeforeFunc to see BeforeFunc ==="
rg -n "return.*flags|BeforeFunc" internal/app/options/flags.go | tail -20

echo -e "\n=== Get the last part of the file to see BeforeFunc implementation ==="
tail -100 internal/app/options/flags.go

echo -e "\n=== Check if options.go has validation after parsing flags ==="
cat internal/app/options/options.go

Repository: adiom-data/dsync

Length of output: 5680


🏁 Script executed:

#!/bin/bash
# Let's verify the exact state of the flags and see if there are any runtime checks
echo "=== Check how WriterMaxBatchSize is used at runtime ==="
rg -n "WriterMaxBatchSize" . --type go -B 1 -A 1 | head -40

echo -e "\n=== Check how WriteRateLimit is used at runtime ==="
rg -n "WriteRateLimit" . --type go -B 1 -A 1 | head -40

echo -e "\n=== Check for any validation in the runner or flow logic ==="
rg -n "batch.*rate|rate.*batch|WriteRateLimit.*Batch|WriterMaxBatchSize.*Rate" . --type go -i

Repository: adiom-data/dsync

Length of output: 4840


🏁 Script executed:

#!/bin/bash
# Let's see the complete usage of WriteRateLimit when the limiter is created
echo "=== Full context of rate limiter creation ==="
sed -n '1055,1075p' ./connectors/common/base.go

echo -e "\n=== Check if there's any validation between flags and limiter creation ==="
rg -n "WriteRateLimit\|WriterMaxBatchSize" ./runners/local/runner.go -B 3 -A 3

Repository: adiom-data/dsync

Length of output: 794


🏁 Script executed:

#!/bin/bash
# Let's check the Before function more carefully for any validation
echo "=== Complete Before function ==="
sed -n '191,205p' ./internal/app/options/flags.go

echo -e "\n=== Check if there's validation in app.go before passing to runner ==="
sed -n '200,250p' ./internal/app/app.go | head -50

echo -e "\n=== Check for any Action handlers on write-rate-limit flag ==="
rg -n "write-rate-limit" ./internal/app/options/flags.go -B 5 -A 10

Repository: adiom-data/dsync

Length of output: 3144


Add validation to enforce write-rate-limit constraint against batch sizes.

The flag usage text states "must be larger than any batch sizes," but there is no validation enforcing this constraint. The Before function (lines 193-198) only validates flag combinations for progress/logfile. Add validation to check that write-rate-limit exceeds writer-batch-size when both are set, preventing misconfiguration that could cause confusing runtime behavior.

🤖 Prompt for AI Agents
In internal/app/options/flags.go around lines 187-190 (and add logic into the
Before function near lines 193-198), the flag usage claims write-rate-limit
"must be larger than any batch sizes" but no validation enforces that; add a
check in the Before hook that if both write-rate-limit and writer-batch-size are
set (non-zero) then write-rate-limit > writer-batch-size, and return a
descriptive error (e.g. "write-rate-limit must be larger than
writer-batch-size") when the constraint is violated so misconfiguration is
rejected at startup.

@adiom-mark adiom-mark merged commit 960486d into main Dec 16, 2025
2 checks passed
@adiom-mark adiom-mark deleted the writeratelimit branch December 16, 2025 21:06
adiom-mark added a commit that referenced this pull request Jan 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants