diff --git a/connectors/dynamodb/conn.go b/connectors/dynamodb/conn.go index 912041ec..19e90cfe 100644 --- a/connectors/dynamodb/conn.go +++ b/connectors/dynamodb/conn.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "log/slog" + "os" "sync" "connectrpc.com/connect" @@ -71,36 +72,46 @@ func (c *conn) GeneratePlan(ctx context.Context, r *connect.Request[adiomv1.Gene eg.Go(func() error { tableDetails, err := c.client.TableDetails(egCtx, name) if err != nil { + if errors.Is(err, ErrNotFound) { + slog.Warn("Table not found. Ignoring.", "name", name) + return nil + } return err } - if tableDetails.StreamARN == "" { - if c.spec == "localstack" { - slog.Debug("No stream found, starting stream", "table", name) - _, err := c.client.StartStream(egCtx, name, false) - if err != nil { - return err + if r.Msg.GetUpdates() { + if tableDetails.StreamARN == "" { + if c.spec == "localstack" { + slog.Debug("No stream found, starting stream", "table", name) + _, err := c.client.StartStream(egCtx, name, false) + if err != nil { + return err + } + } else { + return fmt.Errorf("no stream found") } - } else { - return fmt.Errorf("no stream found") - } - } else if tableDetails.IncompatibleStream { - if c.spec == "localstack" { - slog.Debug("Incompatible stream found, restarting stream", "table", name) - _, err := c.client.StartStream(egCtx, name, true) - if err != nil { - return err + } else if tableDetails.IncompatibleStream { + if c.spec == "localstack" { + slog.Debug("Incompatible stream found, restarting stream", "table", name) + _, err := c.client.StartStream(egCtx, name, true) + if err != nil { + return err + } + } else { + return fmt.Errorf("incompatible stream found") } - } else { - return fmt.Errorf("incompatible stream found") } + + state, err := c.client.GetStreamState(egCtx, tableDetails.StreamARN) + if err != nil { + return err + } + statesCh <- state } - state, err := c.client.GetStreamState(ctx, tableDetails.StreamARN) - if err != nil { - return err + if !r.Msg.GetInitialSync() { + return nil } - statesCh <- state // TODO: reconsider how to map namespaces properly ns := name @@ -139,22 +150,27 @@ func (c *conn) GeneratePlan(ctx context.Context, r *connect.Request[adiomv1.Gene return nil, connect.NewError(connect.CodeInternal, err) } - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - err = enc.Encode(stateMap) - if err != nil { - return nil, connect.NewError(connect.CodeInternal, err) + var updates []*adiomv1.UpdatesPartition + if r.Msg.GetUpdates() { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err = enc.Encode(stateMap) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + updates = append(updates, &adiomv1.UpdatesPartition{Namespaces: namespaces, Cursor: buf.Bytes()}) } return connect.NewResponse(&adiomv1.GeneratePlanResponse{ Partitions: partitions, - UpdatesPartitions: []*adiomv1.UpdatesPartition{{Namespaces: namespaces, Cursor: buf.Bytes()}}, + UpdatesPartitions: updates, }), nil } // GetInfo implements adiomv1connect.ConnectorServiceHandler. func (c *conn) GetInfo(context.Context, *connect.Request[adiomv1.GetInfoRequest]) (*connect.Response[adiomv1.GetInfoResponse], error) { return connect.NewResponse(&adiomv1.GetInfoResponse{ + Id: c.options.ID, DbType: "dynamodb", Version: "", Spec: c.spec, @@ -355,7 +371,10 @@ func (c *conn) WriteUpdates(context.Context, *connect.Request[adiomv1.WriteUpdat func AWSClientHelper(connStr string) (*dynamodb.Client, *dynamodbstreams.Client) { var endpoint string if connStr == "localstack" { - endpoint = "http://localhost:4566" + endpoint = os.Getenv("AWS_ENDPOINT_URL") + if endpoint == "" { + endpoint = "http://localhost:4566" + } } awsConfig, err := config.LoadDefaultConfig(context.Background()) if err != nil { @@ -375,10 +394,29 @@ func AWSClientHelper(connStr string) (*dynamodb.Client, *dynamodbstreams.Client) } type Options struct { + ID string DocsPerSegment int PlanParallelism int } +func WithID(s string) func(*Options) { + return func(o *Options) { + o.ID = s + } +} + +func WithPlanParallelism(n int) func(*Options) { + return func(o *Options) { + o.PlanParallelism = n + } +} + +func WithDocsPerSegment(n int) func(*Options) { + return func(o *Options) { + o.DocsPerSegment = n + } +} + func NewConn(connStr string, optFns ...func(*Options)) adiomv1connect.ConnectorServiceHandler { opts := Options{ DocsPerSegment: 50000, diff --git a/connectors/dynamodb/conv.go b/connectors/dynamodb/conv.go index ecc8c213..8013ce8c 100644 --- a/connectors/dynamodb/conv.go +++ b/connectors/dynamodb/conv.go @@ -71,6 +71,8 @@ func fromBson(bs interface{}) (types.AttributeValue, error) { return &types.AttributeValueMemberB{Value: b.Data}, nil case primitive.Decimal128: return &types.AttributeValueMemberN{Value: b.String()}, nil + case nil: + return &types.AttributeValueMemberNULL{Value: true}, nil default: return &types.AttributeValueMemberS{Value: "XUnsupportedX"}, nil } @@ -167,6 +169,9 @@ func toBson(av types.AttributeValue) (interface{}, error) { } return arr, nil + case *types.AttributeValueMemberNULL: + return nil, nil + default: return nil, fmt.Errorf("unknown attribute %T", av) } @@ -203,11 +208,11 @@ func itemsToBson(items []map[string]types.AttributeValue, keySchema []string) ([ for i, m := range items { id, err := dynamoKeyToIdBson(m, keySchema) if err != nil { - return nil, err + return nil, fmt.Errorf("err in key to bson: %w", err) } b, err := toBson(&types.AttributeValueMemberM{Value: m}) if err != nil { - return nil, err + return nil, fmt.Errorf("err in to bson: %w", err) } // TODO: We currently clobber any existing _id @@ -262,6 +267,8 @@ func streamTypeToDynamoType(st streamtypes.AttributeValue) (types.AttributeValue return &types.AttributeValueMemberS{Value: tv.Value}, nil case *streamtypes.AttributeValueMemberSS: return &types.AttributeValueMemberSS{Value: tv.Value}, nil + case *streamtypes.AttributeValueMemberNULL: + return &types.AttributeValueMemberNULL{Value: tv.Value}, nil default: return nil, fmt.Errorf("unknown attribute %T", st) } @@ -270,21 +277,21 @@ func streamTypeToDynamoType(st streamtypes.AttributeValue) (types.AttributeValue func dynamoWriteKeyValue(w io.Writer, av types.AttributeValue) error { switch tv := av.(type) { case *types.AttributeValueMemberB: - if err := binary.Write(w, binary.BigEndian, len(tv.Value)); err != nil { + if err := binary.Write(w, binary.BigEndian, int32(len(tv.Value))); err != nil { return err } if _, err := w.Write(tv.Value); err != nil { return err } case *types.AttributeValueMemberN: - if err := binary.Write(w, binary.BigEndian, len(tv.Value)); err != nil { + if err := binary.Write(w, binary.BigEndian, int32(len(tv.Value))); err != nil { return err } if _, err := w.Write([]byte(tv.Value)); err != nil { return err } case *types.AttributeValueMemberS: - if err := binary.Write(w, binary.BigEndian, len(tv.Value)); err != nil { + if err := binary.Write(w, binary.BigEndian, int32(len(tv.Value))); err != nil { return err } if _, err := w.Write([]byte(tv.Value)); err != nil { diff --git a/internal/app/options/connectorflags.go b/internal/app/options/connectorflags.go index c917c19e..47853a2c 100644 --- a/internal/app/options/connectorflags.go +++ b/internal/app/options/connectorflags.go @@ -344,15 +344,38 @@ func GetRegisteredConnectors() []RegisteredConnector { IsConnector: func(s string) bool { return strings.EqualFold(s, "dynamodb") || strings.EqualFold(s, "dynamodb://localstack") }, - Create: CreateHelper("DynamoDB", "dynamodb OR dynamodb://localstack", nil, func(_ *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) { + Create: CreateHelper("DynamoDB", "dynamodb OR dynamodb://localstack", []cli.Flag{ + &cli.IntFlag{ + Name: "doc-partition", + Usage: "Target number of documents per partition", + Value: 50000, + }, + &cli.IntFlag{ + Name: "plan-parallelism", + Usage: "Parallelism during planning", + Value: 4, + }, + &cli.StringFlag{ + Name: "id", + Usage: "A fixed id for the connector", + }, + }, func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) { if strings.EqualFold(args[0], "dynamodb://localstack") { _, connString, ok := strings.Cut(args[0], "://") if !ok { return nil, fmt.Errorf("invalid connection string %v", args[0]) } - return dynamodb.NewConn(connString), nil + return dynamodb.NewConn(connString, + dynamodb.WithDocsPerSegment(c.Int("doc-partition")), + dynamodb.WithPlanParallelism(c.Int("plan-parallelism")), + dynamodb.WithID(c.String("id")), + ), nil } else { - return dynamodb.NewConn(""), nil + return dynamodb.NewConn("", + dynamodb.WithDocsPerSegment(c.Int("doc-partition")), + dynamodb.WithPlanParallelism(c.Int("plan-parallelism")), + dynamodb.WithID(c.String("id")), + ), nil } }), },