From 6cdc77320b1d97ba6e305b74ad75f6129dd65459 Mon Sep 17 00:00:00 2001 From: Mark C Date: Tue, 10 Jun 2025 21:17:44 -0700 Subject: [PATCH] Add json2bson transformer --- cmd/json2bson/main.go | 23 +++++++++++++ transform/json2bson.go | 73 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 cmd/json2bson/main.go create mode 100644 transform/json2bson.go diff --git a/cmd/json2bson/main.go b/cmd/json2bson/main.go new file mode 100644 index 00000000..7124166c --- /dev/null +++ b/cmd/json2bson/main.go @@ -0,0 +1,23 @@ +package main + +import ( + "flag" + "net/http" + + "github.com/adiom-data/dsync/gen/adiom/v1/adiomv1connect" + "github.com/adiom-data/dsync/transform" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" +) + +func main() { + hostPort := flag.String("hostport", "localhost:8085", "host:port") + flag.Parse() + mux := http.NewServeMux() + tpath, thandler := adiomv1connect.NewTransformServiceHandler(transform.NewJson2BsonTransform()) + mux.Handle(tpath, thandler) + http.ListenAndServe( + *hostPort, + h2c.NewHandler(mux, &http2.Server{}), + ) +} diff --git a/transform/json2bson.go b/transform/json2bson.go new file mode 100644 index 00000000..ae043b0e --- /dev/null +++ b/transform/json2bson.go @@ -0,0 +1,73 @@ +package transform + +import ( + "context" + "encoding/json" + "fmt" + + "connectrpc.com/connect" + adiomv1 "github.com/adiom-data/dsync/gen/adiom/v1" + "github.com/adiom-data/dsync/gen/adiom/v1/adiomv1connect" + "go.mongodb.org/mongo-driver/bson" +) + +type json2BsonTransform struct { +} + +func (i *json2BsonTransform) mapData(d []byte) []byte { + var m map[string]any + err := json.Unmarshal(d, &m) + if err != nil { + panic(err) + } + m["_id"] = m["id"] + delete(m, "id") + out, err := bson.Marshal(m) + if err != nil { + panic(err) + } + + return out +} + +func (i *json2BsonTransform) GetTransform(_ context.Context, r *connect.Request[adiomv1.GetTransformRequest]) (*connect.Response[adiomv1.GetTransformResponse], error) { + if r.Msg.GetRequestType() != adiomv1.DataType_DATA_TYPE_JSON_ID && r.Msg.GetResponseType() != adiomv1.DataType_DATA_TYPE_MONGO_BSON { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unsupported request and response type")) + } + + var newUpdates []*adiomv1.Update + var newData [][]byte + + for _, update := range r.Msg.GetUpdates() { + newUpdates = append(newUpdates, &adiomv1.Update{ + Id: update.GetId(), + Type: update.GetType(), + Data: i.mapData(update.GetData()), + }) + } + + for _, data := range r.Msg.GetData() { + newData = append(newData, i.mapData(data)) + } + + return connect.NewResponse(&adiomv1.GetTransformResponse{ + Namespace: r.Msg.Namespace, + Updates: newUpdates, + Data: newData, + }), nil +} + +func (i *json2BsonTransform) GetTransformInfo(context.Context, *connect.Request[adiomv1.GetTransformInfoRequest]) (*connect.Response[adiomv1.GetTransformInfoResponse], error) { + return connect.NewResponse(&adiomv1.GetTransformInfoResponse{ + Transforms: []*adiomv1.GetTransformInfoResponse_TransformInfo{ + { + RequestType: adiomv1.DataType_DATA_TYPE_JSON_ID, + ResponseTypes: []adiomv1.DataType{adiomv1.DataType_DATA_TYPE_MONGO_BSON}, + }, + }, + }), nil +} + +func NewJson2BsonTransform() adiomv1connect.TransformServiceHandler { + return &json2BsonTransform{} +}