diff --git a/connectors/vector/conn.go b/connectors/vector/conn.go index 44f931cc..2784bd3d 100644 --- a/connectors/vector/conn.go +++ b/connectors/vector/conn.go @@ -271,6 +271,14 @@ func (c *conn) WriteUpdates(ctx context.Context, r *connect.Request[adiomv1.Writ return connect.NewResponse(&adiomv1.WriteUpdatesResponse{}), nil } +func NewQdrantConn(chunker adiomv1connect.ChunkingServiceClient, embedder adiomv1connect.EmbeddingServiceClient, host string, port int) (adiomv1connect.ConnectorServiceHandler, error) { + c, err := NewQdrantConnector(host, port) + if err != nil { + return nil, err + } + return NewConn(fmt.Sprintf("%v:%v", host, port), "Qdrant", 200, chunker, embedder, c), nil +} + func NewWeaviateConn(chunker adiomv1connect.ChunkingServiceClient, embedder adiomv1connect.EmbeddingServiceClient, url string, groupID string, apiKey string, useIdentityMapper bool) (adiomv1connect.ConnectorServiceHandler, error) { splitted := strings.SplitN(url, "://", 2) if len(splitted) != 2 { diff --git a/connectors/vector/fromdata.go b/connectors/vector/fromdata.go new file mode 100644 index 00000000..74fe2a26 --- /dev/null +++ b/connectors/vector/fromdata.go @@ -0,0 +1,43 @@ +package vector + +import ( + "context" + + "connectrpc.com/connect" + adiomv1 "github.com/adiom-data/dsync/gen/adiom/v1" + "go.mongodb.org/mongo-driver/bson" +) + +type fromData struct { + field string +} + +// GetEmbedding implements adiomv1connect.EmbeddingServiceHandler. +func (s *fromData) GetEmbedding(ctx context.Context, r *connect.Request[adiomv1.GetEmbeddingRequest]) (*connect.Response[adiomv1.GetEmbeddingResponse], error) { + var embeddings []*adiomv1.Embedding + for _, data := range r.Msg.GetData() { + var d []float64 + v := bson.Raw(data).Lookup(s.field) + if v.IsZero() { + embeddings = append(embeddings, &adiomv1.Embedding{}) + continue + } + if err := bson.UnmarshalValue(v.Type, v.Value, &d); err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + embeddings = append(embeddings, &adiomv1.Embedding{Data: d}) + } + return connect.NewResponse(&adiomv1.GetEmbeddingResponse{ + Data: embeddings, + }), nil +} + +func (s *fromData) GetSupportedDataTypes(context.Context, *connect.Request[adiomv1.GetSupportedDataTypesRequest]) (*connect.Response[adiomv1.GetSupportedDataTypesResponse], error) { + return connect.NewResponse(&adiomv1.GetSupportedDataTypesResponse{ + Types: []adiomv1.DataType{adiomv1.DataType_DATA_TYPE_MONGO_BSON}, + }), nil +} + +func NewFromData(field string) *fromData { + return &fromData{field: field} +} diff --git a/connectors/vector/ollama.go b/connectors/vector/ollama.go new file mode 100644 index 00000000..7bd66a92 --- /dev/null +++ b/connectors/vector/ollama.go @@ -0,0 +1,85 @@ +package vector + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + + "connectrpc.com/connect" + adiomv1 "github.com/adiom-data/dsync/gen/adiom/v1" + "go.mongodb.org/mongo-driver/bson" +) + +type ollamaDemo struct { + baseURL string + model string +} + +type EmbeddingResponse struct { + Embedding []float64 +} + +func (s *ollamaDemo) getEmbedding(payload interface{}) ([]float64, error) { + payloadData, err := json.Marshal(payload) + if err != nil { + return nil, err + } + + body, err := json.Marshal(map[string]string{ + "model": s.model, + "prompt": string(payloadData), + }) + if err != nil { + return nil, err + } + + req, err := http.NewRequest(http.MethodPost, s.baseURL+"/embeddings", bytes.NewReader(body)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var embeddingResponse EmbeddingResponse + if err := json.NewDecoder(resp.Body).Decode(&embeddingResponse); err != nil { + return nil, err + } + + return embeddingResponse.Embedding, nil +} + +// GetEmbedding implements adiomv1connect.EmbeddingServiceHandler. +func (s *ollamaDemo) GetEmbedding(ctx context.Context, r *connect.Request[adiomv1.GetEmbeddingRequest]) (*connect.Response[adiomv1.GetEmbeddingResponse], error) { + var embeddings []*adiomv1.Embedding + for _, d := range r.Msg.GetData() { + var m map[string]interface{} + if err := bson.Unmarshal(d, &m); err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + e, err := s.getEmbedding(m) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + + embeddings = append(embeddings, &adiomv1.Embedding{Data: e}) + } + return connect.NewResponse(&adiomv1.GetEmbeddingResponse{ + Data: embeddings, + }), nil +} + +func (s *ollamaDemo) GetSupportedDataTypes(context.Context, *connect.Request[adiomv1.GetSupportedDataTypesRequest]) (*connect.Response[adiomv1.GetSupportedDataTypesResponse], error) { + return connect.NewResponse(&adiomv1.GetSupportedDataTypesResponse{ + Types: []adiomv1.DataType{adiomv1.DataType_DATA_TYPE_MONGO_BSON}, + }), nil +} + +func NewOllama(baseURL string, model string) *ollamaDemo { + return &ollamaDemo{baseURL: baseURL, model: model} +} diff --git a/connectors/vector/qdrant.go b/connectors/vector/qdrant.go new file mode 100644 index 00000000..47b2acb6 --- /dev/null +++ b/connectors/vector/qdrant.go @@ -0,0 +1,90 @@ +package vector + +import ( + "context" + "fmt" + + "github.com/google/uuid" + "github.com/qdrant/go-client/qdrant" +) + +type qdrantConnector struct { + client *qdrant.Client +} + +func NewQdrantConnector(host string, port int) (*qdrantConnector, error) { + client, err := qdrant.NewClient(&qdrant.Config{ + Host: host, + Port: port, + }) + if err != nil { + return nil, err + } + + return &qdrantConnector{client: client}, nil +} + +var True bool = true + +func (c *qdrantConnector) UpsertDocuments(ctx context.Context, namespace string, docs []*VectorDocument) error { + if len(docs) == 0 { + return nil + } + var points []*qdrant.PointStruct + + var keywords []string + for _, doc := range docs { + keywords = append(keywords, doc.ID) + } + deleteRes, err := c.client.Delete(ctx, &qdrant.DeletePoints{ + CollectionName: namespace, + Wait: &True, + Points: &qdrant.PointsSelector{ + PointsSelectorOneOf: &qdrant.PointsSelector_Filter{ + Filter: &qdrant.Filter{ + Must: []*qdrant.Condition{ + qdrant.NewMatchKeywords("_id", keywords...), + }, + }, + }, + }, + }) + if err != nil { + return err + } + if deleteRes.GetStatus() != qdrant.UpdateStatus_Completed { + return fmt.Errorf("qdrant delete not completed successfully") + } + + for _, doc := range docs { + for _, chunk := range doc.Chunks { + if len(chunk.Vector) == 0 { + continue + } + var vectors []float32 + for _, v := range chunk.Vector { + vectors = append(vectors, float32(v)) + } + d := chunk.Data.(map[string]interface{}) + d["_id"] = doc.ID + points = append(points, &qdrant.PointStruct{ + Id: qdrant.NewID(uuid.New().String()), + Payload: qdrant.NewValueMap(d), + Vectors: qdrant.NewVectors(vectors...), + }) + } + } + + operationInfo, err := c.client.Upsert(ctx, &qdrant.UpsertPoints{ + CollectionName: namespace, + Points: points, + Wait: &True, + }) + if err != nil { + return err + } + if operationInfo.GetStatus() == qdrant.UpdateStatus_Completed { + return nil + } + return fmt.Errorf("qdrant upsert not completed successfully") +} diff --git a/internal/app/options/connectorflags.go b/internal/app/options/connectorflags.go index 159ca945..8676dad1 100644 --- a/internal/app/options/connectorflags.go +++ b/internal/app/options/connectorflags.go @@ -265,6 +265,44 @@ func GetRegisteredConnectors() []RegisteredConnector { })(args, as) }, }, + { + Name: "qdrant", + IsConnector: func(s string) bool { + return strings.EqualFold(s, "qdrant") + }, + Create: CreateHelperWithRestArgs("qdrant", "qdrant --host localhost --port 6334 --has-chunker [grpc://embedder-host:port] [grpc://chunker-host:port]", QdrantFlags(), func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, []string, error) { + host := c.String("host") + port := c.Int("port") + chunker := c.Bool("has-chunker") + restArgs := c.Args().Slice() + var chunkerClient adiomv1connect.ChunkingServiceClient = vector.NewSimple() + var embedderClient adiomv1connect.EmbeddingServiceClient = vector.NewSimple() + var err error + + if len(restArgs) == 0 { + return nil, nil, ErrMissingEmbedder + } + embedderClient, restArgs, err = ConfigureEmbedder(restArgs) + if err != nil { + return nil, nil, err + } + if chunker { + if len(restArgs) == 0 { + return nil, nil, ErrMissingChunker + } + chunkerClient, restArgs, err = ConfigureChunker(restArgs) + if err != nil { + return nil, nil, err + } + } + + conn, err := vector.NewQdrantConn(chunkerClient, embedderClient, host, port) + if err != nil { + return nil, nil, err + } + return conn, restArgs, err + }), + }, { Name: "weaviate", IsConnector: func(s string) bool { @@ -396,6 +434,29 @@ func GetRegisteredConnectors() []RegisteredConnector { } } +func QdrantFlags() []cli.Flag { + return []cli.Flag{ + altsrc.NewStringFlag(&cli.StringFlag{ + Name: "host", + Usage: "e.g. localhost", + Required: true, + }), + altsrc.NewIntFlag(&cli.IntFlag{ + Name: "port", + Usage: "e.g. 6334", + Required: true, + }), + altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "has-chunker", + Usage: "Specifies that there will be grpc chunker specified (grpc://chunker-host:port). If embedder also specified, first arg is chunker.", + }), + altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "has-embedder", + Usage: "Specifies that there will be grpc embedder specified (grpc://embedder-host:port). If chunker also specified, last arg is embedder.", + }), + } +} + func WeaviateFlags() []cli.Flag { return []cli.Flag{ altsrc.NewStringFlag(&cli.StringFlag{