Skip to content

Conversation

@adiom-mark
Copy link
Collaborator

@adiom-mark adiom-mark commented Dec 17, 2025

Summary by CodeRabbit

  • New Features

    • Per-namespace rate limiting: independent throughput control for different vector namespaces.
  • Refactor

    • S3 vector connector defaults updated: rate limit increased to 2500 and batch size to 500; rate-limit burst option removed.
    • Connector API simplified to accept a numeric rate limit (per-namespace limiter managed internally) rather than an external limiter object.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 17, 2025

Walkthrough

Replaces a global rate limiter with a concurrency-safe per-namespace limiter utility and updates the S3Vector connector API and call sites to construct and use namespace-scoped limiters via an integer rateLimit parameter instead of a shared *rate.Limiter.

Changes

Cohort / File(s) Summary
Rate Limiter Infrastructure
connectors/util/limiter.go
New file adding NamespaceLimiter interface and namespaceLimiter implementation with RWMutex-protected map of per-namespace *rate.Limiter, lazy initialization via a default factory, and NewNamespaceLimiter(namespaceToLimit map[string]int, defaultLimit int).
S3Vector Connector
connectors/s3vector/conn.go
Replaced limiter *rate.Limiter with limiter util.NamespaceLimiter in conn. NewConn signature changed to accept rateLimit int and constructs util.NewNamespaceLimiter(nil, rateLimit). All rate-limit waits now call c.limiter.Get(namespace).WaitN(...) rather than a global limiter. Removed direct import of golang.org/x/time/rate where no longer needed.
Connector Configuration / Flags
internal/app/options/connectorflags.go
Updated flags and NewConn invocation: pass raw rateLimit int (rate-limit increased to 2500, batch-size to 500), removed burst-related limiter wiring and imports.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Review concurrency and double-checked locking in connectors/util/limiter.go (RWMutex use, race conditions).
  • Verify all S3Vector limiter call sites use the correct namespace extraction and that WaitN semantics remain correct.
  • Confirm constructor NewConn and options wiring pass the intended default/namespace limits and that removal of burst parameter is acceptable.

Possibly related PRs

  • s3vectors support as sink #357 — Modifies the s3vector connector’s limiter API and replaces global limiter usage with per-namespace limiting (directly related change set).

Suggested reviewers

  • alex-thc

Poem

🐰 I hopped from global to lanes so fine,
Each namespace now gets its own little line,
With mutex and map I guard each pace,
No more crowding in one shared space,
Hooray — fair limits for every face! 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: introducing per-namespace rate limiting for s3vectors, which is the core focus of the changeset across all modified files.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch s3vectorsratelimit

📜 Recent review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b844cbb and 4f278ee.

📒 Files selected for processing (3)
  • connectors/s3vector/conn.go (6 hunks)
  • connectors/util/limiter.go (1 hunks)
  • internal/app/options/connectorflags.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • internal/app/options/connectorflags.go
  • connectors/util/limiter.go
🧰 Additional context used
🧬 Code graph analysis (1)
connectors/s3vector/conn.go (1)
connectors/util/limiter.go (2)
  • NamespaceLimiter (9-11)
  • NewNamespaceLimiter (38-57)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (5)
connectors/s3vector/conn.go (5)

30-30: LGTM! Field type updated to support per-namespace limiting.

The change from *rate.Limiter to util.NamespaceLimiter correctly enables namespace-scoped rate limiting.


223-243: LGTM! Consistent per-namespace rate limiting pattern.

The implementation correctly retrieves the per-namespace limiter once and uses it across both delete and put batch operations. This ensures all operations for the same namespace share the same rate limit.


275-275: LGTM! Proper initialization of per-namespace limiter.

The initialization correctly creates a namespace limiter with a default rate limit applied to all namespaces. The nil first parameter indicates no custom per-namespace overrides, which is appropriate for this use case.


263-263: Document the edge-case behavior for rateLimit parameter.

The implementation handles edge cases but lacks documentation:

  • rateLimit = 0: Creates a zero limiter (rate.NewLimiter(0, 0)) that blocks all operations
  • rateLimit < 0: Treats as unlimited (rate.Inf), allowing all events

Clarify whether blocking all operations when rateLimit = 0 is intentional. If not blocking is desired for zero values (e.g., to mean "unlimited"), update the implementation to handle this case explicitly, similar to the negative value handling. Otherwise, document this behavior in the function or add a comment explaining the zero case.


89-92: The per-namespace limiter retrieval pattern is correct and thread-safe. The Get method properly implements double-checked locking using sync.RWMutex to prevent race conditions when creating limiters on-demand for new namespaces. The limiter is retrieved once per request and shared across all batch goroutines, with each batch waiting for the appropriate number of tokens via limiter.WaitN.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
connectors/util/limiter.go (1)

38-57: Burst value may be negative when rate limit is infinite.

When v < 0 or defaultLimit < 0, the code sets limit = rate.Inf but still uses the negative value as the burst parameter in rate.NewLimiter(limit, v) (line 45) and rate.NewLimiter(limit, defaultLimit) (line 54). While rate.Limiter with rate.Inf allows unlimited tokens regardless of burst, a negative burst is semantically odd and may cause unexpected behavior if the limiter implementation changes.

Consider using a sensible burst value (e.g., 1 or 0) when the limit is infinite:

 	for k, v := range namespaceToLimit {
 		limit := rate.Limit(v)
+		burst := v
 		if v < 0 {
 			limit = rate.Inf
+			burst = 0
 		}
-		limiters[k] = rate.NewLimiter(limit, v)
+		limiters[k] = rate.NewLimiter(limit, burst)
 	}
 	limit := rate.Limit(defaultLimit)
+	burst := defaultLimit
 	if defaultLimit < 0 {
 		limit = rate.Inf
+		burst = 0
 	}
 	return &namespaceLimiter{
 		limiters: limiters,
 		defaultLimiterFactory: func() *rate.Limiter {
-			return rate.NewLimiter(limit, defaultLimit)
+			return rate.NewLimiter(limit, burst)
 		},
 	}
connectors/s3vector/conn.go (1)

221-256: Consider caching the namespace limiter to avoid repeated lookups.

Within WriteUpdates, limiter.Get(r.Msg.GetNamespace()) is called in both the delete and put batch loops. While the Get method is thread-safe and efficient (read-lock for existing entries), caching the limiter before the loops would be slightly cleaner:

 	if len(updates) > 0 {
+		nsLimiter := c.limiter.Get(r.Msg.GetNamespace())
 		eg, ctx := errgroup.WithContext(ctx)
 		eg.SetLimit(c.maxParallelism)
 		for batch := range slices.Chunk(toDelete, c.batchSize) {
 			eg.Go(func() error {
-				if err := c.limiter.Get(r.Msg.GetNamespace()).WaitN(ctx, len(batch)); err != nil {
+				if err := nsLimiter.WaitN(ctx, len(batch)); err != nil {
 					return fmt.Errorf("err in limiter: %w", err)
 				}
 				// ...
 			})
 		}
 		for batch := range slices.Chunk(vectors, c.batchSize) {
 			eg.Go(func() error {
-				if err := c.limiter.Get(r.Msg.GetNamespace()).WaitN(ctx, len(batch)); err != nil {
+				if err := nsLimiter.WaitN(ctx, len(batch)); err != nil {
 					return fmt.Errorf("err in limiter: %w", err)
 				}
 				// ...
 			})
 		}
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 19f1fbe and b844cbb.

📒 Files selected for processing (3)
  • connectors/s3vector/conn.go (6 hunks)
  • connectors/util/limiter.go (1 hunks)
  • internal/app/options/connectorflags.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
connectors/s3vector/conn.go (1)
connectors/util/limiter.go (2)
  • NamespaceLimiter (9-11)
  • NewNamespaceLimiter (38-57)
connectors/util/limiter.go (1)
connectors/dynamodb/stream/helper.go (1)
  • Limiter (23-25)
internal/app/options/connectorflags.go (1)
connectors/s3vector/conn.go (1)
  • NewConn (261-275)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (7)
connectors/util/limiter.go (2)

9-11: LGTM!

Clean interface definition that provides a clear contract for obtaining namespace-specific rate limiters.


19-36: LGTM!

The double-checked locking pattern is correctly implemented for thread-safe lazy initialization of per-namespace limiters.

internal/app/options/connectorflags.go (2)

396-396: LGTM!

The call to NewConn correctly passes rateLimit as an integer, aligning with the updated signature that now handles limiter construction internally.


380-388: Rate-limit and batch-size defaults align with AWS S3 Vectors service limits.

The new rate-limit default of 2,500 vectors per second and batch-size default of 500 correspond to AWS S3 Vectors' documented maximum throughput (2,500 vectors per second per vector index) and maximum batch size per API request. These changes set the connector to recommended service maximums rather than conservative defaults, enabling optimal throughput for users without additional configuration. The previous values (500 rate-limit, 200 batch-size) were below the service's stated limits, so users hitting expected production loads would need manual tuning anyway. No action required.

connectors/s3vector/conn.go (3)

30-30: LGTM!

The limiter field type correctly changed to util.NamespaceLimiter to support per-namespace rate limiting.


89-103: LGTM!

The rate limiter correctly retrieves a namespace-specific limiter before each batch operation. The closure safely captures r.Msg.GetNamespace() since it's called at goroutine execution time (not captured by value at loop iteration).


261-275: LGTM!

The constructor cleanly initializes the NamespaceLimiter with nil for the namespace-specific map, meaning all namespaces will use the default rate limit. This provides uniform per-namespace limiting as intended by the PR.

@adiom-mark adiom-mark merged commit 79d0ed3 into main Dec 17, 2025
2 checks passed
@adiom-mark adiom-mark deleted the s3vectorsratelimit branch December 17, 2025 00:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants