Skip to content

Conversation

@adiom-mark
Copy link
Collaborator

@adiom-mark adiom-mark commented Jan 10, 2026

Summary by CodeRabbit

  • Refactor

    • Streamlined updates-only sync with gated update flow, early exit for non-initial syncs, improved state emission, and dynamic updates partitioning.
  • New Features

    • Runtime options and CLI flags to set connector ID, plan parallelism, and documents-per-segment; connector info reports configured ID.
    • Localstack endpoint fallback when not provided via environment.
  • Bug Fixes

    • Graceful handling of missing table details, clearer error contexts, consistent NULL handling, and fixed-width binary length encoding.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 10, 2026

📝 Walkthrough

Walkthrough

GeneratePlan now skips state-driven processing when InitialSync is false; stream setup, restart, state retrieval, encoding, and emission occur only when Updates is true. Table-not-found is logged and ignored. Added Options.ID and configurators, CLI flags, and localstack endpoint fallback.

Changes

Cohort / File(s) Summary
DynamoDB connector core
connectors/dynamodb/conn.go
Gate stream init/restart, state retrieval, encoding, and emission on r.Msg.GetUpdates(); early return when r.Msg.GetInitialSync() is false; ignore ErrNotFound when retrieving table details; derive UpdatesPartitions from a dynamic slice; emit stream state into updates channel.
Connector options & API surface
connectors/dynamodb/conn.go
Added ID string to Options; added exported configurators WithID(string), WithPlanParallelism(int), WithDocsPerSegment(int); GetInfo now sets Id from options.
Localstack endpoint handling
connectors/dynamodb/conn.go
When targeting localstack, use AWS_ENDPOINT_URL env var if set, otherwise fall back to http://localhost:4566.
CLI flags & connector registration
internal/app/options/connectorflags.go
Register CLI flags doc-partition (int), plan-parallelism (int), and id (string) for the DynamoDB connector and pass them to dynamodb.NewConn via the new configurators; action signature now accepts *cli.Context to read flags.
Conversion & wire encoding
connectors/dynamodb/conv.go
Handle nil in fromBson/toBson; map stream NULL attribute type; wrap key/value conversion errors with context; encode B/N/S lengths as int32(len(...)) in dynamoWriteKeyValue.

Sequence Diagram(s)

sequenceDiagram
  participant Requestor
  participant Conn as DynamoDB Connector
  participant Streams as StreamManager
  participant StateCh as Updates Channel

  Requestor->>Conn: GeneratePlan(r.Msg)
  alt InitialSync disabled
    Conn-->>Requestor: return (early exit)
  else InitialSync enabled
    alt Updates enabled
      Conn->>Streams: Ensure / start / restart stream for table
      Streams-->>Conn: StreamARN / status
      alt missing/incompatible & localstack
        Conn->>Streams: create/restart stream (use AWS_ENDPOINT_URL or http://localhost:4566)
        Streams-->>Conn: StreamARN
      else missing/incompatible & non-localstack
        Streams-->>Conn: error ("no stream found" / "incompatible stream found")
      end
      Conn->>Streams: Fetch stream state
      Streams-->>Conn: StreamState
      Conn->>StateCh: Encode & emit UpdatesPartition(s)
    else Updates disabled
      Conn-->>Requestor: skip stream setup/state emission
    end
    Conn-->>Requestor: return plan (includes UpdatesPartitions if any)
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 I hopped through streams with a twitching nose,

Flags and IDs in tidy rows.
Localstack hummed, the state took flight,
Partitions counted through the night.
A carrot cheer for every byte.

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.18% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The PR title 'Allow dynamodb to run initial sync only' is partially related to the changeset. While the code does add logic to skip state-driven processing when not in InitialSync mode, the majority of changes involve adding configuration options (WithID, WithPlanParallelism, WithDocsPerSegment), CLI flag support, NULL type handling in conversion functions, and stream state management—features beyond just enabling initial-sync-only mode. Consider a more comprehensive title that reflects the scope of changes, such as 'Add DynamoDB configuration options and initial sync mode support' or similar, to better represent the multi-faceted nature of this pull request.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
connectors/dynamodb/conn.go (1)

70-105: Fix loop-variable capture + use the post-start StreamARN when fetching state (localstack path).

Right now:

  • Line 71 captures name from the loop (can make all goroutines use the last table name).
  • Lines 79-85 can start a stream but Lines 100-105 still call GetStreamState(..., tableDetails.StreamARN) which is still "" unless you refresh it.
Proposed fix
 for _, name := range tableNames {
+    name := name // avoid loop-variable capture
     eg.Go(func() error {
         tableDetails, err := c.client.TableDetails(egCtx, name)
         if err != nil {
             return err
         }

         if r.Msg.GetUpdates() {
-            if tableDetails.StreamARN == "" {
+            streamARN := tableDetails.StreamARN
+            if streamARN == "" {
                 if c.spec == "localstack" {
                     slog.Debug("No stream found, starting stream", "table", name)
                     _, err := c.client.StartStream(egCtx, name, false)
                     if err != nil {
                         return err
                     }
+                    // Refresh StreamARN after enabling streams.
+                    tableDetails, err = c.client.TableDetails(egCtx, name)
+                    if err != nil {
+                        return err
+                    }
+                    streamARN = tableDetails.StreamARN
+                    if streamARN == "" {
+                        return fmt.Errorf("no stream found after starting stream for table %s", name)
+                    }
                 } else {
-                    return fmt.Errorf("no stream found")
+                    return fmt.Errorf("no stream found for table %s", name)
                 }
             } else if tableDetails.IncompatibleStream {
                 if c.spec == "localstack" {
                     slog.Debug("Incompatible stream found, restarting stream", "table", name)
                     _, err := c.client.StartStream(egCtx, name, true)
                     if err != nil {
                         return err
                     }
+                    tableDetails, err = c.client.TableDetails(egCtx, name)
+                    if err != nil {
+                        return err
+                    }
+                    streamARN = tableDetails.StreamARN
                 } else {
-                    return fmt.Errorf("incompatible stream found")
+                    return fmt.Errorf("incompatible stream found for table %s", name)
                 }
             }

-            state, err := c.client.GetStreamState(ctx, tableDetails.StreamARN)
+            state, err := c.client.GetStreamState(egCtx, streamARN)
             if err != nil {
                 return err
             }
             statesCh <- state
         }
🧹 Nitpick comments (1)
connectors/dynamodb/conn.go (1)

107-109: Optional: short-circuit GeneratePlan earlier when neither InitialSync nor Updates are requested.

This early return works, but it still does TableDetails(...) per table before returning when Updates == false and InitialSync == false.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4858a8e and 74afcb6.

📒 Files selected for processing (1)
  • connectors/dynamodb/conn.go
🧰 Additional context used
🧬 Code graph analysis (1)
connectors/dynamodb/conn.go (1)
gen/adiom/v1/messages.pb.go (6)
  • UpdatesPartition (188-198)
  • UpdatesPartition (211-211)
  • UpdatesPartition (226-228)
  • GeneratePlanResponse (649-655)
  • GeneratePlanResponse (668-668)
  • GeneratePlanResponse (683-685)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (1)
connectors/dynamodb/conn.go (1)

148-162: Confirm UpdatesPartition.Namespaces semantics when the request omits namespaces.

When r.Msg.GetNamespaces() is empty, you currently emit an updates partition with Namespaces: namespaces (empty). That’s fine if downstream interprets empty as “all namespaces” (similar to StreamUpdates behavior), but it’s worth confirming this contract since the plan response is what drives the next call.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/app/options/connectorflags.go (1)

358-371: CLI options not applied for regular DynamoDB connections.

The doc-partition and plan-parallelism flags are only passed to NewConn in the localstack branch. For regular AWS DynamoDB (the else branch at line 369), these user-specified options are silently ignored.

🐛 Proposed fix to apply options consistently
 		}, func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) {
+			opts := []func(*dynamodb.Options){
+				dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
+				dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
+			}
 			if strings.EqualFold(args[0], "dynamodb://localstack") {
 				_, connString, ok := strings.Cut(args[0], "://")
 				if !ok {
 					return nil, fmt.Errorf("invalid connection string %v", args[0])
 				}
-				return dynamodb.NewConn(connString,
-					dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
-					dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
-				), nil
+				return dynamodb.NewConn(connString, opts...), nil
 			} else {
-				return dynamodb.NewConn(""), nil
+				return dynamodb.NewConn("", opts...), nil
 			}
 		}),
🤖 Fix all issues with AI agents
In @connectors/dynamodb/conn.go:
- Around line 77-105: When starting or restarting a stream in the goroutine,
capture and update tableDetails.StreamARN with the ARN returned by
c.client.StartStream(...) instead of discarding it so subsequent calls use the
new ARN; then call c.client.GetStreamState using egCtx (not ctx) to respect the
errgroup cancellation; specifically, in the branches that call
c.client.StartStream(egCtx, name, ...) assign the returned ARN to
tableDetails.StreamARN (or a local var that you then pass to GetStreamState) and
replace the GetStreamState(ctx, tableDetails.StreamARN) call with
GetStreamState(egCtx, tableDetails.StreamARN) so statesCh receives the correct
stream state.
🧹 Nitpick comments (1)
connectors/dynamodb/conn.go (1)

115-118: Redundant check for GetInitialSync().

Since the code returns early on line 108 when !r.Msg.GetInitialSync(), this check is always true when reached. Consider simplifying.

Suggested simplification
-			if r.Msg.GetInitialSync() && c.options.DocsPerSegment > 0 {
+			if c.options.DocsPerSegment > 0 {
 				totalSegments = int(tableDetails.Count / uint64(c.options.DocsPerSegment))
 				totalSegments = max(1, min(1000000, totalSegments))
 			}
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 74afcb6 and fc19035.

📒 Files selected for processing (2)
  • connectors/dynamodb/conn.go
  • internal/app/options/connectorflags.go
🧰 Additional context used
🧬 Code graph analysis (2)
internal/app/options/connectorflags.go (1)
connectors/dynamodb/conn.go (3)
  • NewConn (404-426)
  • WithDocsPerSegment (398-402)
  • WithPlanParallelism (392-396)
connectors/dynamodb/conn.go (2)
gen/adiom/v1/messages.pb.go (6)
  • UpdatesPartition (188-198)
  • UpdatesPartition (211-211)
  • UpdatesPartition (226-228)
  • GeneratePlanResponse (649-655)
  • GeneratePlanResponse (668-668)
  • GeneratePlanResponse (683-685)
java/src/main/java/adiom/v1/Messages.java (2)
  • UpdatesPartition (1041-1957)
  • GeneratePlanResponse (8653-9710)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (4)
internal/app/options/connectorflags.go (1)

347-357: LGTM on CLI flag definitions.

The new doc-partition and plan-parallelism flags are properly defined with sensible defaults that match the connector's internal defaults.

connectors/dynamodb/conn.go (3)

107-109: LGTM!

The early return when InitialSync is disabled correctly allows the connector to run in updates-only mode.


148-157: LGTM!

The conditional building of the UpdatesPartition slice correctly handles the updates-only flow, encoding the stream state map into the cursor.


392-402: LGTM!

The option configurators follow idiomatic Go functional options pattern and integrate cleanly with NewConn.

Comment on lines 77 to 110
if r.Msg.GetUpdates() {
if tableDetails.StreamARN == "" {
if c.spec == "localstack" {
slog.Debug("No stream found, starting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, false)
if err != nil {
return err
}
} else {
return fmt.Errorf("no stream found")
}
} else {
return fmt.Errorf("no stream found")
}
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, true)
if err != nil {
return err
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, true)
if err != nil {
return err
}
} else {
return fmt.Errorf("incompatible stream found")
}
} else {
return fmt.Errorf("incompatible stream found")
}

state, err := c.client.GetStreamState(ctx, tableDetails.StreamARN)
if err != nil {
return err
}
statesCh <- state
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

StreamARN not updated after starting/restarting stream.

When tableDetails.StreamARN is empty (line 78) and a new stream is started (line 81), the returned ARN is discarded. Line 100 then calls GetStreamState with the original empty tableDetails.StreamARN, which will fail. The same issue applies after restarting an incompatible stream (line 91).

Additionally, line 100 uses ctx instead of egCtx, which is inconsistent with the rest of the goroutine and won't respect errgroup cancellation.

Proposed fix
 			if r.Msg.GetUpdates() {
 				if tableDetails.StreamARN == "" {
 					if c.spec == "localstack" {
 						slog.Debug("No stream found, starting stream", "table", name)
-						_, err := c.client.StartStream(egCtx, name, false)
+						streamARN, err := c.client.StartStream(egCtx, name, false)
 						if err != nil {
 							return err
 						}
+						tableDetails.StreamARN = streamARN
 					} else {
 						return fmt.Errorf("no stream found")
 					}
 				} else if tableDetails.IncompatibleStream {
 					if c.spec == "localstack" {
 						slog.Debug("Incompatible stream found, restarting stream", "table", name)
-						_, err := c.client.StartStream(egCtx, name, true)
+						streamARN, err := c.client.StartStream(egCtx, name, true)
 						if err != nil {
 							return err
 						}
+						tableDetails.StreamARN = streamARN
 					} else {
 						return fmt.Errorf("incompatible stream found")
 					}
 				}

-				state, err := c.client.GetStreamState(ctx, tableDetails.StreamARN)
+				state, err := c.client.GetStreamState(egCtx, tableDetails.StreamARN)
 				if err != nil {
 					return err
 				}
 				statesCh <- state
 			}
🤖 Prompt for AI Agents
In @connectors/dynamodb/conn.go around lines 77 - 105, When starting or
restarting a stream in the goroutine, capture and update tableDetails.StreamARN
with the ARN returned by c.client.StartStream(...) instead of discarding it so
subsequent calls use the new ARN; then call c.client.GetStreamState using egCtx
(not ctx) to respect the errgroup cancellation; specifically, in the branches
that call c.client.StartStream(egCtx, name, ...) assign the returned ARN to
tableDetails.StreamARN (or a local var that you then pass to GetStreamState) and
replace the GetStreamState(ctx, tableDetails.StreamARN) call with
GetStreamState(egCtx, tableDetails.StreamARN) so statesCh receives the correct
stream state.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/app/options/connectorflags.go (1)

358-371: CLI flags are ignored for non-localstack DynamoDB connections.

When using dynamodb (not dynamodb://localstack), line 369 calls dynamodb.NewConn("") without passing the WithDocsPerSegment and WithPlanParallelism options. Users would expect --doc-partition and --plan-parallelism flags to work regardless of the connection string.

Proposed fix
 			}, func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) {
+				opts := []func(*dynamodb.Options){
+					dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
+					dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
+				}
 				if strings.EqualFold(args[0], "dynamodb://localstack") {
 					_, connString, ok := strings.Cut(args[0], "://")
 					if !ok {
 						return nil, fmt.Errorf("invalid connection string %v", args[0])
 					}
-					return dynamodb.NewConn(connString,
-						dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
-						dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
-					), nil
+					return dynamodb.NewConn(connString, opts...), nil
 				} else {
-					return dynamodb.NewConn(""), nil
+					return dynamodb.NewConn("", opts...), nil
 				}
 			}),
🤖 Fix all issues with AI agents
In @connectors/dynamodb/conn.go:
- Around line 100-104: The call to c.client.GetStreamState uses the parent ctx
variable instead of the errgroup context egCtx, so if another goroutine errors
out GetStreamState won't be cancelled; update the invocation to use egCtx (i.e.,
replace ctx with egCtx in the GetStreamState call) ensuring the call runs with
the errgroup's context and preserves existing error handling that returns err
and sends state to statesCh.
- Around line 148-157: The UpdatesPartition being appended uses the wrong
variable for Namespaces; when auto-discovery populates tableNames (via
GetAllTableNames()) namespaces can be empty. In the block that builds updates
(the append to updates in the r.Msg.GetUpdates() branch), change the Namespaces
field from using namespaces to using tableNames so UpdatesPartition.Namespaces
contains the actual resolved table names (i.e., set Namespaces to tableNames
when creating the adiomv1.UpdatesPartition with the encoded stateMap cursor).
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fc19035 and f754f61.

📒 Files selected for processing (3)
  • connectors/dynamodb/conn.go
  • connectors/dynamodb/conv.go
  • internal/app/options/connectorflags.go
🧰 Additional context used
🧬 Code graph analysis (2)
connectors/dynamodb/conn.go (1)
gen/adiom/v1/messages.pb.go (6)
  • UpdatesPartition (188-198)
  • UpdatesPartition (211-211)
  • UpdatesPartition (226-228)
  • GeneratePlanResponse (649-655)
  • GeneratePlanResponse (668-668)
  • GeneratePlanResponse (683-685)
internal/app/options/connectorflags.go (1)
connectors/dynamodb/conn.go (3)
  • NewConn (404-426)
  • WithDocsPerSegment (398-402)
  • WithPlanParallelism (392-396)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (4)
connectors/dynamodb/conv.go (2)

204-211: LGTM!

Good improvement wrapping errors with contextual messages. This will help with debugging when conversion failures occur in itemsToBson.


270-297: LGTM!

The int32 cast is correct for binary.Write which requires fixed-size types. This ensures consistent 4-byte length prefixes across platforms. Given DynamoDB's 400KB attribute value limit, overflow is not a concern.

connectors/dynamodb/conn.go (2)

392-402: LGTM!

Clean functional options pattern implementation. The configurators correctly modify the Options struct fields.


107-109: LGTM!

The early return correctly allows "updates only" mode by skipping partition generation when GetInitialSync() is false.

Comment on lines +148 to 162
var updates []*adiomv1.UpdatesPartition
if r.Msg.GetUpdates() {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err = enc.Encode(stateMap)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
updates = append(updates, &adiomv1.UpdatesPartition{Namespaces: namespaces, Cursor: buf.Bytes()})
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the full context around lines 148-157 and the earlier part (lines 34-35)
cd connectors/dynamodb && cat -n conn.go | sed -n '30,165p'

Repository: adiom-data/dsync

Length of output: 4683


🏁 Script executed:

# Search for UpdatesPartition usage to see how Namespaces is consumed
rg -n -B2 -A5 'UpdatesPartition' --type=go -g '!*_test.go'

Repository: adiom-data/dsync

Length of output: 20733


🏁 Script executed:

# Look for the proto definition of UpdatesPartition to understand the Namespaces field
find . -name '*.proto' -type f | head -20

Repository: adiom-data/dsync

Length of output: 150


🏁 Script executed:

# Verify the fix: check what tableNames contains and how it should be used
cat -n connectors/dynamodb/conn.go | sed -n '148,157p'

Repository: adiom-data/dsync

Length of output: 461


🏁 Script executed:

# Check the proto definition for UpdatesPartition to understand the expected semantics
cat proto/adiom/v1/messages.proto | grep -A 10 "message UpdatesPartition"

Repository: adiom-data/dsync

Length of output: 397


Use tableNames instead of namespaces for UpdatesPartition.Namespaces at line 156.

When r.Msg.GetNamespaces() is empty (lines 34-40), tableNames is populated via GetAllTableNames(), but the namespaces variable remains empty. Line 156 incorrectly uses the empty namespaces variable instead of the resolved tableNames, causing UpdatesPartition.Namespaces to be empty in auto-discovery scenarios. This is inconsistent with regular Partition objects (line 112) which correctly use actual table names, and breaks downstream code in pkg/test/connector.go and internal/app/verify.go that expects Namespaces to contain the tables being streamed.

Change line 156 to: updates = append(updates, &adiomv1.UpdatesPartition{Namespaces: tableNames, Cursor: buf.Bytes()})

🤖 Prompt for AI Agents
In @connectors/dynamodb/conn.go around lines 148 - 157, The UpdatesPartition
being appended uses the wrong variable for Namespaces; when auto-discovery
populates tableNames (via GetAllTableNames()) namespaces can be empty. In the
block that builds updates (the append to updates in the r.Msg.GetUpdates()
branch), change the Namespaces field from using namespaces to using tableNames
so UpdatesPartition.Namespaces contains the actual resolved table names (i.e.,
set Namespaces to tableNames when creating the adiomv1.UpdatesPartition with the
encoded stateMap cursor).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
connectors/dynamodb/conn.go (2)

111-113: Inconsistent condition: GetInitialSync() is checked twice.

Line 111 checks !r.Msg.GetInitialSync() to return early, but line 119 also checks r.Msg.GetInitialSync() as a condition. After line 111, the code can only continue if GetInitialSync() is true, making the condition on line 119 always true and redundant.

♻️ Suggested simplification
 			if !r.Msg.GetInitialSync() {
 				return nil
 			}

 			// TODO: reconsider how to map namespaces properly
 			ns := name
 			totalSegments := 1

-			if r.Msg.GetInitialSync() && c.options.DocsPerSegment > 0 {
+			if c.options.DocsPerSegment > 0 {
 				totalSegments = int(tableDetails.Count / uint64(c.options.DocsPerSegment))
 				totalSegments = max(1, min(1000000, totalSegments))
 			}

144-150: Dead code path: ErrNotFound is already handled in the loop.

Since ErrNotFound is handled at line 74-77 by logging and returning nil, the errors.Is(err, ErrNotFound) check at line 146 will never be true. The error group will not receive an ErrNotFound from the table processing loop.

This check could be removed or kept for defensive purposes if ErrNotFound handling may change in the future.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f754f61 and 95b9f9c.

📒 Files selected for processing (3)
  • connectors/dynamodb/conn.go
  • connectors/dynamodb/conv.go
  • internal/app/options/connectorflags.go
🧰 Additional context used
🧬 Code graph analysis (2)
connectors/dynamodb/conn.go (2)
connectors/dynamodb/client.go (1)
  • ErrNotFound (95-95)
gen/adiom/v1/messages.pb.go (6)
  • UpdatesPartition (188-198)
  • UpdatesPartition (211-211)
  • UpdatesPartition (226-228)
  • GeneratePlanResponse (649-655)
  • GeneratePlanResponse (668-668)
  • GeneratePlanResponse (683-685)
internal/app/options/connectorflags.go (1)
connectors/dynamodb/conn.go (3)
  • NewConn (408-430)
  • WithDocsPerSegment (402-406)
  • WithPlanParallelism (396-400)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (6)
connectors/dynamodb/conv.go (2)

206-210: LGTM! Good error context addition.

Wrapping errors with context ("err in key to bson" and "err in to bson") improves debuggability when BSON encoding fails during item conversion.


273-292: LGTM! Explicit int32 ensures consistent cross-platform encoding.

Switching to explicit int32 for length encoding guarantees 4-byte lengths regardless of platform architecture, which is important for binary protocol consistency with state encoding used in conn.go.

connectors/dynamodb/conn.go (3)

74-78: LGTM! Graceful handling of missing tables.

When a table is not found during plan generation, logging a warning and continuing allows partial plans to be generated, which is a reasonable approach for dynamic environments.


152-161: LGTM! Updates partition construction is correct.

The state map is properly gob-encoded and used to construct an UpdatesPartition only when updates are requested.


396-406: LGTM! Option configurators follow idiomatic functional options pattern.

The WithPlanParallelism and WithDocsPerSegment configurators are well-structured and align with Go's functional options pattern.

internal/app/options/connectorflags.go (1)

347-371: CLI flags are only applied for localstack, not for standard DynamoDB.

The doc-partition and plan-parallelism flags are defined but only passed to dynamodb.NewConn when using dynamodb://localstack. For the standard dynamodb case (line 369), the flags are ignored and the connector is created with default options only.

If this is intentional (e.g., production DynamoDB should always use defaults), this is fine. Otherwise, consider applying the flags consistently.

♻️ Suggested fix if flags should apply to both cases
 			}, func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) {
+				opts := []func(*dynamodb.Options){
+					dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
+					dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
+				}
 				if strings.EqualFold(args[0], "dynamodb://localstack") {
 					_, connString, ok := strings.Cut(args[0], "://")
 					if !ok {
 						return nil, fmt.Errorf("invalid connection string %v", args[0])
 					}
-					return dynamodb.NewConn(connString,
-						dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
-						dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
-					), nil
+					return dynamodb.NewConn(connString, opts...), nil
 				} else {
-					return dynamodb.NewConn(""), nil
+					return dynamodb.NewConn("", opts...), nil
 				}
 			}),

Comment on lines +81 to 114
if r.Msg.GetUpdates() {
if tableDetails.StreamARN == "" {
if c.spec == "localstack" {
slog.Debug("No stream found, starting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, false)
if err != nil {
return err
}
} else {
return fmt.Errorf("no stream found")
}
} else {
return fmt.Errorf("no stream found")
}
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, true)
if err != nil {
return err
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, true)
if err != nil {
return err
}
} else {
return fmt.Errorf("incompatible stream found")
}
} else {
return fmt.Errorf("incompatible stream found")
}

state, err := c.client.GetStreamState(egCtx, tableDetails.StreamARN)
if err != nil {
return err
}
statesCh <- state
}

state, err := c.client.GetStreamState(ctx, tableDetails.StreamARN)
if err != nil {
return err
if !r.Msg.GetInitialSync() {
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's understand the file structure and locate the StartStream method
git ls-files connectors/dynamodb/ | head -20

Repository: adiom-data/dsync

Length of output: 330


🏁 Script executed:

# Search for StartStream method definition
rg "func.*StartStream" connectors/dynamodb/

Repository: adiom-data/dsync

Length of output: 191


🏁 Script executed:

# Let's also check the complete context of lines 81-113 in conn.go
wc -l connectors/dynamodb/conn.go

Repository: adiom-data/dsync

Length of output: 91


🏁 Script executed:

# Read the relevant section with more context
sed -n '60,130p' connectors/dynamodb/conn.go

Repository: adiom-data/dsync

Length of output: 1873


🏁 Script executed:

# Search for GetStreamState method to understand what it expects
rg "func.*GetStreamState" connectors/dynamodb/

Repository: adiom-data/dsync

Length of output: 184


🏁 Script executed:

# Search for where tableDetails is defined and used
rg "tableDetails" connectors/dynamodb/conn.go -B 2 -A 2

Repository: adiom-data/dsync

Length of output: 1702


🏁 Script executed:

# Find the complete StartStream implementation
rg "func \(c \*client\) StartStream" -A 30 connectors/dynamodb/client.go

Repository: adiom-data/dsync

Length of output: 994


🏁 Script executed:

# Also check GetStreamState to understand what it expects
rg "func \(c \*client\) GetStreamState" -A 15 connectors/dynamodb/client.go

Repository: adiom-data/dsync

Length of output: 580


🏁 Script executed:

# Let's see the TableDetails struct to understand its structure
rg "type.*TableDetails" connectors/dynamodb/

Repository: adiom-data/dsync

Length of output: 122


🏁 Script executed:

# Get the complete StartStream implementation including the return statement
rg "func \(c \*client\) StartStream" -A 50 connectors/dynamodb/client.go | head -60

Repository: adiom-data/dsync

Length of output: 1601


🏁 Script executed:

# Also get the TableDetailsResult struct definition
rg "type TableDetailsResult" -A 15 connectors/dynamodb/client.go

Repository: adiom-data/dsync

Length of output: 578


After StartStream is called, use the returned ARN instead of the stale tableDetails.StreamARN.

When r.Msg.GetUpdates() is true and a stream doesn't exist (or is incompatible), the code calls StartStream but discards its return value. The subsequent call to GetStreamState uses tableDetails.StreamARN, which was fetched before the stream was started and remains empty. StartStream already returns the new stream ARN—capture and use that instead of re-fetching table details.

🔧 Suggested fix: Capture the returned ARN from StartStream
 			if r.Msg.GetUpdates() {
 				if tableDetails.StreamARN == "" {
 					if c.spec == "localstack" {
 						slog.Debug("No stream found, starting stream", "table", name)
-						_, err := c.client.StartStream(egCtx, name, false)
+						tableDetails.StreamARN, err = c.client.StartStream(egCtx, name, false)
 						if err != nil {
 							return err
 						}
 					} else {
 						return fmt.Errorf("no stream found")
 					}
 				} else if tableDetails.IncompatibleStream {
 					if c.spec == "localstack" {
 						slog.Debug("Incompatible stream found, restarting stream", "table", name)
-						_, err := c.client.StartStream(egCtx, name, true)
+						tableDetails.StreamARN, err = c.client.StartStream(egCtx, name, true)
 						if err != nil {
 							return err
 						}
 					} else {
 						return fmt.Errorf("incompatible stream found")
 					}
 				}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if r.Msg.GetUpdates() {
if tableDetails.StreamARN == "" {
if c.spec == "localstack" {
slog.Debug("No stream found, starting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, false)
if err != nil {
return err
}
} else {
return fmt.Errorf("no stream found")
}
} else {
return fmt.Errorf("no stream found")
}
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, true)
if err != nil {
return err
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, true)
if err != nil {
return err
}
} else {
return fmt.Errorf("incompatible stream found")
}
} else {
return fmt.Errorf("incompatible stream found")
}
state, err := c.client.GetStreamState(egCtx, tableDetails.StreamARN)
if err != nil {
return err
}
statesCh <- state
}
state, err := c.client.GetStreamState(ctx, tableDetails.StreamARN)
if err != nil {
return err
if !r.Msg.GetInitialSync() {
return nil
}
if r.Msg.GetUpdates() {
if tableDetails.StreamARN == "" {
if c.spec == "localstack" {
slog.Debug("No stream found, starting stream", "table", name)
tableDetails.StreamARN, err = c.client.StartStream(egCtx, name, false)
if err != nil {
return err
}
} else {
return fmt.Errorf("no stream found")
}
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
tableDetails.StreamARN, err = c.client.StartStream(egCtx, name, true)
if err != nil {
return err
}
} else {
return fmt.Errorf("incompatible stream found")
}
}
state, err := c.client.GetStreamState(egCtx, tableDetails.StreamARN)
if err != nil {
return err
}
statesCh <- state
}
if !r.Msg.GetInitialSync() {
return nil
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @connectors/dynamodb/conn.go:
- Around line 370-377: The localstack endpoint logic in AWSClientHelper is
shadowing the outer variable: change the inner declaration so you assign to the
existing outer variable (use = instead of :=) when setting endpoint based on
connStr == "localstack", ensure AWSClientHelper uses that endpoint variable
later when creating the dynamodb.Client and dynamodbstreams.Client so the
localstack URL is actually passed through.
🧹 Nitpick comments (1)
internal/app/options/connectorflags.go (1)

358-371: CLI flags are ignored for non-localstack DynamoDB connections.

The doc-partition and plan-parallelism flags are only applied when using dynamodb://localstack. Users specifying dynamodb will have their flag values silently ignored (line 369 passes no options).

Consider applying the flags consistently:

♻️ Proposed fix
 		}, func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) {
+			opts := []func(*dynamodb.Options){
+				dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
+				dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
+			}
 			if strings.EqualFold(args[0], "dynamodb://localstack") {
 				_, connString, ok := strings.Cut(args[0], "://")
 				if !ok {
 					return nil, fmt.Errorf("invalid connection string %v", args[0])
 				}
-				return dynamodb.NewConn(connString,
-					dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
-					dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
-				), nil
+				return dynamodb.NewConn(connString, opts...), nil
 			} else {
-				return dynamodb.NewConn(""), nil
+				return dynamodb.NewConn("", opts...), nil
 			}
 		}),
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 95b9f9c and 3ed6f01.

📒 Files selected for processing (3)
  • connectors/dynamodb/conn.go
  • connectors/dynamodb/conv.go
  • internal/app/options/connectorflags.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • connectors/dynamodb/conv.go
🧰 Additional context used
🧬 Code graph analysis (2)
internal/app/options/connectorflags.go (1)
connectors/dynamodb/conn.go (3)
  • NewConn (412-434)
  • WithDocsPerSegment (406-410)
  • WithPlanParallelism (400-404)
connectors/dynamodb/conn.go (2)
connectors/dynamodb/client.go (1)
  • ErrNotFound (95-95)
gen/adiom/v1/messages.pb.go (6)
  • UpdatesPartition (188-198)
  • UpdatesPartition (211-211)
  • UpdatesPartition (226-228)
  • GeneratePlanResponse (649-655)
  • GeneratePlanResponse (668-668)
  • GeneratePlanResponse (683-685)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (4)
connectors/dynamodb/conn.go (4)

75-80: LGTM! Good defensive handling.

Gracefully skipping tables that are not found instead of failing the entire plan is a reasonable approach for robustness.


82-114: Logic flow concern: early exit placement.

The early exit at lines 112-114 (if !r.Msg.GetInitialSync()) is placed after the updates handling block (lines 82-110). This means if GetUpdates() is true but GetInitialSync() is false, the code will still perform stream setup and state retrieval before returning early without generating partitions.

Verify this is the intended behavior—when only streaming updates without initial sync, you still want to prepare the stream state but skip partition generation.


400-410: LGTM! Clean functional options pattern.

The option configurators follow Go's idiomatic functional options pattern and integrate well with NewConn.


153-162: LGTM!

The dynamic building of UpdatesPartition entries based on the GetUpdates() flag is clean. The gob encoding of the state map is consistent with how it's decoded in StreamUpdates.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/app/options/connectorflags.go (1)

363-376: CLI flags are ignored for non-localstack DynamoDB connections.

The flags --doc-partition, --plan-parallelism, and --id are only passed to NewConn when using dynamodb://localstack. For plain dynamodb connections (line 374), these flags are silently ignored because NewConn("") is called without the option functions.

Consider applying the same options to both paths:

🔧 Proposed fix
-			}, func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) {
-				if strings.EqualFold(args[0], "dynamodb://localstack") {
-					_, connString, ok := strings.Cut(args[0], "://")
-					if !ok {
-						return nil, fmt.Errorf("invalid connection string %v", args[0])
-					}
-					return dynamodb.NewConn(connString,
-						dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
-						dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
-						dynamodb.WithID(c.String("id")),
-					), nil
-				} else {
-					return dynamodb.NewConn(""), nil
-				}
+			}, func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) {
+				connString := ""
+				if strings.EqualFold(args[0], "dynamodb://localstack") {
+					_, connString, _ = strings.Cut(args[0], "://")
+				}
+				return dynamodb.NewConn(connString,
+					dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
+					dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
+					dynamodb.WithID(c.String("id")),
+				), nil
 			}),
🤖 Fix all issues with AI agents
In @connectors/dynamodb/conn.go:
- Around line 371-378: In AWSClientHelper the inner if uses := which shadows the
outer endpoint, leaving the outer endpoint empty and preventing BaseEndpoint
from being set; change the inner declaration to assign to the existing variable
(use = instead of :=) so the outer endpoint gets the localstack URL (from
AWS_ENDPOINT_URL or default "http://localhost:4566"), then ensure any subsequent
use (e.g., setting BaseEndpoint) reads that populated endpoint variable.
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3ed6f01 and 9d40246.

📒 Files selected for processing (3)
  • connectors/dynamodb/conn.go
  • connectors/dynamodb/conv.go
  • internal/app/options/connectorflags.go
🧰 Additional context used
🧬 Code graph analysis (2)
internal/app/options/connectorflags.go (1)
connectors/dynamodb/conn.go (4)
  • NewConn (420-442)
  • WithDocsPerSegment (414-418)
  • WithPlanParallelism (408-412)
  • WithID (402-406)
connectors/dynamodb/conn.go (2)
connectors/dynamodb/client.go (1)
  • ErrNotFound (95-95)
gen/adiom/v1/messages.pb.go (3)
  • GetInfoResponse (424-433)
  • GetInfoResponse (446-446)
  • GetInfoResponse (461-463)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (8)
connectors/dynamodb/conv.go (2)

206-210: LGTM! Good addition of error context.

Wrapping errors with context helps debugging by indicating which step failed during BSON conversion.


270-297: Good fix for cross-platform consistency, but verify backward compatibility.

Using int32 ensures fixed-width (4-byte) length encoding regardless of platform architecture. This is important for binary protocol consistency.

However, if there's existing encoded data from a 64-bit system where int was 8 bytes, this change could cause decoding issues. Verify that no persistent state relies on the previous encoding format.

internal/app/options/connectorflags.go (1)

347-362: LGTM! CLI flags are well-defined.

The flag names and default values align with the option configurators in conn.go.

connectors/dynamodb/conn.go (5)

75-78: LGTM! Good error handling for missing tables.

Logging a warning and continuing instead of failing allows the sync to proceed when tables are removed during planning.


82-114: LGTM! Flow logic correctly supports initial-sync-only mode.

The conditional structure properly gates stream setup behind GetUpdates() and allows early return when only updates are requested without initial sync.


153-162: LGTM! State encoding is properly gated.

The UpdatesPartition is only created when GetUpdates() is true, correctly aligning with when stream state is collected.


396-418: LGTM! Clean functional options implementation.

The option configurators follow a standard pattern and integrate well with NewConn.


171-188: LGTM! GetInfo now returns the configured ID.

This allows users to set a custom identifier for the connector via the --id flag.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@connectors/dynamodb/conv.go`:
- Around line 74-75: The nil branch currently returns an
AttributeValueMemberNULL with the zero-value (false); update the nil case to
return &types.AttributeValueMemberNULL{Value: true} so DynamoDB NULL attributes
serialize correctly—locate the nil case in connectors/dynamodb/conv.go (the
switch/case that returns types.AttributeValueMemberNULL) and set Value: true on
the returned struct.

In `@internal/app/options/connectorflags.go`:
- Around line 347-375: The non-localstack branch currently returns
dynamodb.NewConn("") and ignores CLI flags; change it to parse the connection
string (reusing strings.Cut if a scheme is present) and call dynamodb.NewConn
with the same option builders used for localstack:
dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
dynamodb.WithPlanParallelism(c.Int("plan-parallelism")), and
dynamodb.WithID(c.String("id")). Update the else branch in the Create helper
(the CreateHelper block that calls dynamodb.NewConn) so both the localstack and
non-localstack paths construct the connection string and pass the options to
dynamodb.NewConn instead of returning an empty connection.
♻️ Duplicate comments (2)
connectors/dynamodb/conn.go (2)

82-109: Stream ARN is still discarded after StartStream.

The ARN returned by StartStream is ignored, so GetStreamState can use a stale/empty ARN. This was flagged earlier. Please capture and use the returned ARN.

🔧 Proposed fix
 if r.Msg.GetUpdates() {
 	if tableDetails.StreamARN == "" {
 		if c.spec == "localstack" {
 			slog.Debug("No stream found, starting stream", "table", name)
-			_, err := c.client.StartStream(egCtx, name, false)
+			tableDetails.StreamARN, err = c.client.StartStream(egCtx, name, false)
 			if err != nil {
 				return err
 			}
 		} else {
 			return fmt.Errorf("no stream found")
 		}
 	} else if tableDetails.IncompatibleStream {
 		if c.spec == "localstack" {
 			slog.Debug("Incompatible stream found, restarting stream", "table", name)
-			_, err := c.client.StartStream(egCtx, name, true)
+			tableDetails.StreamARN, err = c.client.StartStream(egCtx, name, true)
 			if err != nil {
 				return err
 			}
 		} else {
 			return fmt.Errorf("incompatible stream found")
 		}
 	}

 	state, err := c.client.GetStreamState(egCtx, tableDetails.StreamARN)

153-162: UpdatesPartition should use resolved table names.

When namespaces are auto-discovered, namespaces is empty but tableNames is populated. Using namespaces produces empty UpdatesPartition.Namespaces. This was previously flagged.

🔧 Proposed fix
-		updates = append(updates, &adiomv1.UpdatesPartition{Namespaces: namespaces, Cursor: buf.Bytes()})
+		updates = append(updates, &adiomv1.UpdatesPartition{Namespaces: tableNames, Cursor: buf.Bytes()})

@adiom-mark adiom-mark force-pushed the dynamodb_separate branch 2 times, most recently from 650a6ca to e37ef50 Compare January 19, 2026 17:09
Expose doc-partition and plan-parallelism params.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants