Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 66 additions & 28 deletions connectors/dynamodb/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"log/slog"
"os"
"sync"

"connectrpc.com/connect"
Expand Down Expand Up @@ -71,36 +72,46 @@ func (c *conn) GeneratePlan(ctx context.Context, r *connect.Request[adiomv1.Gene
eg.Go(func() error {
tableDetails, err := c.client.TableDetails(egCtx, name)
if err != nil {
if errors.Is(err, ErrNotFound) {
slog.Warn("Table not found. Ignoring.", "name", name)
return nil
}
return err
}

if tableDetails.StreamARN == "" {
if c.spec == "localstack" {
slog.Debug("No stream found, starting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, false)
if err != nil {
return err
if r.Msg.GetUpdates() {
if tableDetails.StreamARN == "" {
if c.spec == "localstack" {
slog.Debug("No stream found, starting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, false)
if err != nil {
return err
}
} else {
return fmt.Errorf("no stream found")
}
} else {
return fmt.Errorf("no stream found")
}
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, true)
if err != nil {
return err
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, true)
if err != nil {
return err
}
} else {
return fmt.Errorf("incompatible stream found")
}
} else {
return fmt.Errorf("incompatible stream found")
}

state, err := c.client.GetStreamState(egCtx, tableDetails.StreamARN)
if err != nil {
return err
}
statesCh <- state
}

state, err := c.client.GetStreamState(ctx, tableDetails.StreamARN)
if err != nil {
return err
if !r.Msg.GetInitialSync() {
return nil
}
Comment on lines +82 to 114
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's understand the file structure and locate the StartStream method
git ls-files connectors/dynamodb/ | head -20

Repository: adiom-data/dsync

Length of output: 330


🏁 Script executed:

# Search for StartStream method definition
rg "func.*StartStream" connectors/dynamodb/

Repository: adiom-data/dsync

Length of output: 191


🏁 Script executed:

# Let's also check the complete context of lines 81-113 in conn.go
wc -l connectors/dynamodb/conn.go

Repository: adiom-data/dsync

Length of output: 91


🏁 Script executed:

# Read the relevant section with more context
sed -n '60,130p' connectors/dynamodb/conn.go

Repository: adiom-data/dsync

Length of output: 1873


🏁 Script executed:

# Search for GetStreamState method to understand what it expects
rg "func.*GetStreamState" connectors/dynamodb/

Repository: adiom-data/dsync

Length of output: 184


🏁 Script executed:

# Search for where tableDetails is defined and used
rg "tableDetails" connectors/dynamodb/conn.go -B 2 -A 2

Repository: adiom-data/dsync

Length of output: 1702


🏁 Script executed:

# Find the complete StartStream implementation
rg "func \(c \*client\) StartStream" -A 30 connectors/dynamodb/client.go

Repository: adiom-data/dsync

Length of output: 994


🏁 Script executed:

# Also check GetStreamState to understand what it expects
rg "func \(c \*client\) GetStreamState" -A 15 connectors/dynamodb/client.go

Repository: adiom-data/dsync

Length of output: 580


🏁 Script executed:

# Let's see the TableDetails struct to understand its structure
rg "type.*TableDetails" connectors/dynamodb/

Repository: adiom-data/dsync

Length of output: 122


🏁 Script executed:

# Get the complete StartStream implementation including the return statement
rg "func \(c \*client\) StartStream" -A 50 connectors/dynamodb/client.go | head -60

Repository: adiom-data/dsync

Length of output: 1601


🏁 Script executed:

# Also get the TableDetailsResult struct definition
rg "type TableDetailsResult" -A 15 connectors/dynamodb/client.go

Repository: adiom-data/dsync

Length of output: 578


After StartStream is called, use the returned ARN instead of the stale tableDetails.StreamARN.

When r.Msg.GetUpdates() is true and a stream doesn't exist (or is incompatible), the code calls StartStream but discards its return value. The subsequent call to GetStreamState uses tableDetails.StreamARN, which was fetched before the stream was started and remains empty. StartStream already returns the new stream ARN—capture and use that instead of re-fetching table details.

🔧 Suggested fix: Capture the returned ARN from StartStream
 			if r.Msg.GetUpdates() {
 				if tableDetails.StreamARN == "" {
 					if c.spec == "localstack" {
 						slog.Debug("No stream found, starting stream", "table", name)
-						_, err := c.client.StartStream(egCtx, name, false)
+						tableDetails.StreamARN, err = c.client.StartStream(egCtx, name, false)
 						if err != nil {
 							return err
 						}
 					} else {
 						return fmt.Errorf("no stream found")
 					}
 				} else if tableDetails.IncompatibleStream {
 					if c.spec == "localstack" {
 						slog.Debug("Incompatible stream found, restarting stream", "table", name)
-						_, err := c.client.StartStream(egCtx, name, true)
+						tableDetails.StreamARN, err = c.client.StartStream(egCtx, name, true)
 						if err != nil {
 							return err
 						}
 					} else {
 						return fmt.Errorf("incompatible stream found")
 					}
 				}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if r.Msg.GetUpdates() {
if tableDetails.StreamARN == "" {
if c.spec == "localstack" {
slog.Debug("No stream found, starting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, false)
if err != nil {
return err
}
} else {
return fmt.Errorf("no stream found")
}
} else {
return fmt.Errorf("no stream found")
}
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, true)
if err != nil {
return err
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
_, err := c.client.StartStream(egCtx, name, true)
if err != nil {
return err
}
} else {
return fmt.Errorf("incompatible stream found")
}
} else {
return fmt.Errorf("incompatible stream found")
}
state, err := c.client.GetStreamState(egCtx, tableDetails.StreamARN)
if err != nil {
return err
}
statesCh <- state
}
state, err := c.client.GetStreamState(ctx, tableDetails.StreamARN)
if err != nil {
return err
if !r.Msg.GetInitialSync() {
return nil
}
if r.Msg.GetUpdates() {
if tableDetails.StreamARN == "" {
if c.spec == "localstack" {
slog.Debug("No stream found, starting stream", "table", name)
tableDetails.StreamARN, err = c.client.StartStream(egCtx, name, false)
if err != nil {
return err
}
} else {
return fmt.Errorf("no stream found")
}
} else if tableDetails.IncompatibleStream {
if c.spec == "localstack" {
slog.Debug("Incompatible stream found, restarting stream", "table", name)
tableDetails.StreamARN, err = c.client.StartStream(egCtx, name, true)
if err != nil {
return err
}
} else {
return fmt.Errorf("incompatible stream found")
}
}
state, err := c.client.GetStreamState(egCtx, tableDetails.StreamARN)
if err != nil {
return err
}
statesCh <- state
}
if !r.Msg.GetInitialSync() {
return nil
}

statesCh <- state

// TODO: reconsider how to map namespaces properly
ns := name
Expand Down Expand Up @@ -139,22 +150,27 @@ func (c *conn) GeneratePlan(ctx context.Context, r *connect.Request[adiomv1.Gene
return nil, connect.NewError(connect.CodeInternal, err)
}

var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err = enc.Encode(stateMap)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
var updates []*adiomv1.UpdatesPartition
if r.Msg.GetUpdates() {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err = enc.Encode(stateMap)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
updates = append(updates, &adiomv1.UpdatesPartition{Namespaces: namespaces, Cursor: buf.Bytes()})
}
Comment on lines +153 to 162
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the full context around lines 148-157 and the earlier part (lines 34-35)
cd connectors/dynamodb && cat -n conn.go | sed -n '30,165p'

Repository: adiom-data/dsync

Length of output: 4683


🏁 Script executed:

# Search for UpdatesPartition usage to see how Namespaces is consumed
rg -n -B2 -A5 'UpdatesPartition' --type=go -g '!*_test.go'

Repository: adiom-data/dsync

Length of output: 20733


🏁 Script executed:

# Look for the proto definition of UpdatesPartition to understand the Namespaces field
find . -name '*.proto' -type f | head -20

Repository: adiom-data/dsync

Length of output: 150


🏁 Script executed:

# Verify the fix: check what tableNames contains and how it should be used
cat -n connectors/dynamodb/conn.go | sed -n '148,157p'

Repository: adiom-data/dsync

Length of output: 461


🏁 Script executed:

# Check the proto definition for UpdatesPartition to understand the expected semantics
cat proto/adiom/v1/messages.proto | grep -A 10 "message UpdatesPartition"

Repository: adiom-data/dsync

Length of output: 397


Use tableNames instead of namespaces for UpdatesPartition.Namespaces at line 156.

When r.Msg.GetNamespaces() is empty (lines 34-40), tableNames is populated via GetAllTableNames(), but the namespaces variable remains empty. Line 156 incorrectly uses the empty namespaces variable instead of the resolved tableNames, causing UpdatesPartition.Namespaces to be empty in auto-discovery scenarios. This is inconsistent with regular Partition objects (line 112) which correctly use actual table names, and breaks downstream code in pkg/test/connector.go and internal/app/verify.go that expects Namespaces to contain the tables being streamed.

Change line 156 to: updates = append(updates, &adiomv1.UpdatesPartition{Namespaces: tableNames, Cursor: buf.Bytes()})

🤖 Prompt for AI Agents
In @connectors/dynamodb/conn.go around lines 148 - 157, The UpdatesPartition
being appended uses the wrong variable for Namespaces; when auto-discovery
populates tableNames (via GetAllTableNames()) namespaces can be empty. In the
block that builds updates (the append to updates in the r.Msg.GetUpdates()
branch), change the Namespaces field from using namespaces to using tableNames
so UpdatesPartition.Namespaces contains the actual resolved table names (i.e.,
set Namespaces to tableNames when creating the adiomv1.UpdatesPartition with the
encoded stateMap cursor).


return connect.NewResponse(&adiomv1.GeneratePlanResponse{
Partitions: partitions,
UpdatesPartitions: []*adiomv1.UpdatesPartition{{Namespaces: namespaces, Cursor: buf.Bytes()}},
UpdatesPartitions: updates,
}), nil
}

// GetInfo implements adiomv1connect.ConnectorServiceHandler.
func (c *conn) GetInfo(context.Context, *connect.Request[adiomv1.GetInfoRequest]) (*connect.Response[adiomv1.GetInfoResponse], error) {
return connect.NewResponse(&adiomv1.GetInfoResponse{
Id: c.options.ID,
DbType: "dynamodb",
Version: "",
Spec: c.spec,
Expand Down Expand Up @@ -355,7 +371,10 @@ func (c *conn) WriteUpdates(context.Context, *connect.Request[adiomv1.WriteUpdat
func AWSClientHelper(connStr string) (*dynamodb.Client, *dynamodbstreams.Client) {
var endpoint string
if connStr == "localstack" {
endpoint = "http://localhost:4566"
endpoint = os.Getenv("AWS_ENDPOINT_URL")
if endpoint == "" {
endpoint = "http://localhost:4566"
}
}
awsConfig, err := config.LoadDefaultConfig(context.Background())
if err != nil {
Expand All @@ -375,10 +394,29 @@ func AWSClientHelper(connStr string) (*dynamodb.Client, *dynamodbstreams.Client)
}

type Options struct {
ID string
DocsPerSegment int
PlanParallelism int
}

func WithID(s string) func(*Options) {
return func(o *Options) {
o.ID = s
}
}

func WithPlanParallelism(n int) func(*Options) {
return func(o *Options) {
o.PlanParallelism = n
}
}

func WithDocsPerSegment(n int) func(*Options) {
return func(o *Options) {
o.DocsPerSegment = n
}
}

func NewConn(connStr string, optFns ...func(*Options)) adiomv1connect.ConnectorServiceHandler {
opts := Options{
DocsPerSegment: 50000,
Expand Down
17 changes: 12 additions & 5 deletions connectors/dynamodb/conv.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func fromBson(bs interface{}) (types.AttributeValue, error) {
return &types.AttributeValueMemberB{Value: b.Data}, nil
case primitive.Decimal128:
return &types.AttributeValueMemberN{Value: b.String()}, nil
case nil:
return &types.AttributeValueMemberNULL{Value: true}, nil
default:
return &types.AttributeValueMemberS{Value: "XUnsupportedX"}, nil
}
Expand Down Expand Up @@ -167,6 +169,9 @@ func toBson(av types.AttributeValue) (interface{}, error) {
}
return arr, nil

case *types.AttributeValueMemberNULL:
return nil, nil

default:
return nil, fmt.Errorf("unknown attribute %T", av)
}
Expand Down Expand Up @@ -203,11 +208,11 @@ func itemsToBson(items []map[string]types.AttributeValue, keySchema []string) ([
for i, m := range items {
id, err := dynamoKeyToIdBson(m, keySchema)
if err != nil {
return nil, err
return nil, fmt.Errorf("err in key to bson: %w", err)
}
b, err := toBson(&types.AttributeValueMemberM{Value: m})
if err != nil {
return nil, err
return nil, fmt.Errorf("err in to bson: %w", err)
}

// TODO: We currently clobber any existing _id
Expand Down Expand Up @@ -262,6 +267,8 @@ func streamTypeToDynamoType(st streamtypes.AttributeValue) (types.AttributeValue
return &types.AttributeValueMemberS{Value: tv.Value}, nil
case *streamtypes.AttributeValueMemberSS:
return &types.AttributeValueMemberSS{Value: tv.Value}, nil
case *streamtypes.AttributeValueMemberNULL:
return &types.AttributeValueMemberNULL{Value: tv.Value}, nil
default:
return nil, fmt.Errorf("unknown attribute %T", st)
}
Expand All @@ -270,21 +277,21 @@ func streamTypeToDynamoType(st streamtypes.AttributeValue) (types.AttributeValue
func dynamoWriteKeyValue(w io.Writer, av types.AttributeValue) error {
switch tv := av.(type) {
case *types.AttributeValueMemberB:
if err := binary.Write(w, binary.BigEndian, len(tv.Value)); err != nil {
if err := binary.Write(w, binary.BigEndian, int32(len(tv.Value))); err != nil {
return err
}
if _, err := w.Write(tv.Value); err != nil {
return err
}
case *types.AttributeValueMemberN:
if err := binary.Write(w, binary.BigEndian, len(tv.Value)); err != nil {
if err := binary.Write(w, binary.BigEndian, int32(len(tv.Value))); err != nil {
return err
}
if _, err := w.Write([]byte(tv.Value)); err != nil {
return err
}
case *types.AttributeValueMemberS:
if err := binary.Write(w, binary.BigEndian, len(tv.Value)); err != nil {
if err := binary.Write(w, binary.BigEndian, int32(len(tv.Value))); err != nil {
return err
}
if _, err := w.Write([]byte(tv.Value)); err != nil {
Expand Down
29 changes: 26 additions & 3 deletions internal/app/options/connectorflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,38 @@ func GetRegisteredConnectors() []RegisteredConnector {
IsConnector: func(s string) bool {
return strings.EqualFold(s, "dynamodb") || strings.EqualFold(s, "dynamodb://localstack")
},
Create: CreateHelper("DynamoDB", "dynamodb OR dynamodb://localstack", nil, func(_ *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) {
Create: CreateHelper("DynamoDB", "dynamodb OR dynamodb://localstack", []cli.Flag{
&cli.IntFlag{
Name: "doc-partition",
Usage: "Target number of documents per partition",
Value: 50000,
},
&cli.IntFlag{
Name: "plan-parallelism",
Usage: "Parallelism during planning",
Value: 4,
},
&cli.StringFlag{
Name: "id",
Usage: "A fixed id for the connector",
},
}, func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) {
if strings.EqualFold(args[0], "dynamodb://localstack") {
_, connString, ok := strings.Cut(args[0], "://")
if !ok {
return nil, fmt.Errorf("invalid connection string %v", args[0])
}
return dynamodb.NewConn(connString), nil
return dynamodb.NewConn(connString,
dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
dynamodb.WithID(c.String("id")),
), nil
} else {
return dynamodb.NewConn(""), nil
return dynamodb.NewConn("",
dynamodb.WithDocsPerSegment(c.Int("doc-partition")),
dynamodb.WithPlanParallelism(c.Int("plan-parallelism")),
dynamodb.WithID(c.String("id")),
), nil
}
}),
},
Expand Down