From e2cf414f591cdae51e9de097cc4dce75d1d4afa6 Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Mon, 2 Jun 2025 14:38:53 +0200 Subject: [PATCH 1/4] introduce unique ids --- platform/model/typical_queries/hits.go | 21 ++++++++++++- .../parsers/elastic_query_dsl/query_parser.go | 7 +++-- .../elastic_query_dsl/query_translator.go | 30 +++++++++++++++++-- 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/platform/model/typical_queries/hits.go b/platform/model/typical_queries/hits.go index ae15fa237..b63d9711d 100644 --- a/platform/model/typical_queries/hits.go +++ b/platform/model/typical_queries/hits.go @@ -3,7 +3,11 @@ package typical_queries import ( + "bytes" "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" "fmt" "github.com/QuesmaOrg/quesma/platform/clickhouse" "github.com/QuesmaOrg/quesma/platform/common_table" @@ -241,7 +245,9 @@ func (query Hits) computeIdForDocument(doc model.SearchHit, defaultID string) st // At database level we only compare timestamps with millisecond precision // However in search results we append `q` plus generated digits (we use q because it's not in hex) // so that kibana can iterate over documents in UI - pseudoUniqueId = fmt.Sprintf("%xq%s", vv, defaultID) + sourceHash := fmt.Sprintf("%x", HashJSON(doc.Source)) + pseudoUniqueId = fmt.Sprintf("%xqqq%sqqq%x", vv, defaultID, sourceHash) + //pseudoUniqueId = fmt.Sprintf("%xq%s", vv, defaultID) } else { logger.WarnWithCtx(query.ctx).Msgf("failed to convert timestamp field [%v] to time.Time", v[0]) return defaultID @@ -250,6 +256,19 @@ func (query Hits) computeIdForDocument(doc model.SearchHit, defaultID string) st return pseudoUniqueId } +func HashJSON(data json.RawMessage) string { + var buf bytes.Buffer + err := json.Compact(&buf, data) + if err != nil { + hash := sha256.Sum256(data) + return hex.EncodeToString(hash[:]) + } + hash := sha256.Sum256(buf.Bytes()) + eee := hex.EncodeToString(hash[:]) + println(eee) + return hex.EncodeToString(hash[:]) +} + func (query Hits) String() string { return fmt.Sprintf("hits(indexes: %v)", strings.Join(query.indexes, ", ")) } diff --git a/platform/parsers/elastic_query_dsl/query_parser.go b/platform/parsers/elastic_query_dsl/query_parser.go index fd1f5bd54..10d1d8175 100644 --- a/platform/parsers/elastic_query_dsl/query_parser.go +++ b/platform/parsers/elastic_query_dsl/query_parser.go @@ -34,7 +34,7 @@ func NewEmptyHighlighter() model.Highlighter { } const ( - defaultQueryResultSize = 10 + defaultQueryResultSize = 10000 // TODO looks like we can NOT limit the returned `hits` because we calculate IDs there defaultTrackTotalHits = 10000 ) @@ -322,6 +322,7 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue return model.NewSimpleQueryInvalid() } ids := make([]string, 0, len(idsRaw)) + uniqueIds := make([]string, 0, len(idsRaw)) // to avoid duplicates for _, id := range idsRaw { if idAsString, ok := id.(string); ok { ids = append(ids, idAsString) @@ -335,7 +336,7 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue // therefore we need to strip the hex part (before `q`) and convert it to decimal // then we can query at DB level for i, id := range ids { - idInHex := strings.Split(id, "q")[0] + idInHex := strings.Split(id, "qqq")[0] if idAsStr, err := hex.DecodeString(idInHex); err != nil { logger.Error().Msgf("error parsing document id %s: %v", id, err) return model.NewSimpleQueryInvalid() @@ -343,6 +344,7 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue tsWithoutTZ := strings.TrimSuffix(string(idAsStr), " +0000 UTC") ids[i] = fmt.Sprintf("'%s'", tsWithoutTZ) } + uniqueIds = append(uniqueIds, id) } var idToSql func(string) (model.Expr, error) @@ -399,6 +401,7 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue idsTuple := model.NewTupleExpr(idsAsExprs...) whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " IN ", idsTuple) } + cw.UniqueIDs = uniqueIds // TODO a crucial side effect here return model.NewSimpleQuery(whereStmt, true) } diff --git a/platform/parsers/elastic_query_dsl/query_translator.go b/platform/parsers/elastic_query_dsl/query_translator.go index c90e5723d..b33b77fb2 100644 --- a/platform/parsers/elastic_query_dsl/query_translator.go +++ b/platform/parsers/elastic_query_dsl/query_translator.go @@ -11,6 +11,8 @@ import ( "github.com/QuesmaOrg/quesma/platform/parsers/elastic_query_dsl/query_util" "github.com/QuesmaOrg/quesma/platform/schema" "github.com/QuesmaOrg/quesma/platform/util" + "slices" + "strings" ) type JsonMap = map[string]interface{} @@ -24,7 +26,8 @@ type ClickhouseQueryTranslator struct { Indexes []string // TODO this will be removed - Table *clickhouse.Table + Table *clickhouse.Table + UniqueIDs []string // used for hits queries, to filter out hits that are not in the list of IDs } var completionStatusOK = func() *int { value := 200; return &value }() @@ -130,8 +133,31 @@ func (cw *ClickhouseQueryTranslator) makeHits(queries []*model.Query, results [] } hitsPartOfResponse := hitsQuery.Type.TranslateSqlResponseToJson(hitsResultSet) + // trim hits + hitsResponse := hitsPartOfResponse["hits"].(model.SearchHits) - return queriesWithoutHits, resultsWithoutHits, &hitsResponse + hits := cw.RemoveHitsIfDocHashesSet(hitsResponse) + return queriesWithoutHits, resultsWithoutHits, &hits +} + +func (cw *ClickhouseQueryTranslator) RemoveHitsIfDocHashesSet(hits model.SearchHits) model.SearchHits { + // if we have doc hashes set, we need to remove hits from the response + if len(cw.UniqueIDs) == 0 { + return hits + } + docHashes := make([]string, 0, len(cw.UniqueIDs)) + for _, id := range cw.UniqueIDs { + docHashes = append(docHashes, strings.Split(id, "qqq")[2]) + } + filteredHits := make([]model.SearchHit, 0, len(hits.Hits)) + for _, hit := range hits.Hits { + hashOfOfDocInHit := strings.Split(hit.ID, "qqq")[2] + if slices.Contains(docHashes, hashOfOfDocInHit) { + filteredHits = append(filteredHits, hit) + } + } + hits.Hits = filteredHits + return hits } func (cw *ClickhouseQueryTranslator) makeTotalCount(queries []*model.Query, results [][]model.QueryResultRow) (queriesWithoutCount []*model.Query, resultsWithoutCount [][]model.QueryResultRow, total *model.Total) { From a60f6b88ee430f46c232a7cb7654af6c36c797b6 Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Mon, 2 Jun 2025 14:54:29 +0200 Subject: [PATCH 2/4] cleanup --- .../parsers/elastic_query_dsl/query_parser.go | 3 ++- .../elastic_query_dsl/query_translator.go | 20 +++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/platform/parsers/elastic_query_dsl/query_parser.go b/platform/parsers/elastic_query_dsl/query_parser.go index 10d1d8175..bc017477b 100644 --- a/platform/parsers/elastic_query_dsl/query_parser.go +++ b/platform/parsers/elastic_query_dsl/query_parser.go @@ -36,6 +36,7 @@ func NewEmptyHighlighter() model.Highlighter { const ( defaultQueryResultSize = 10000 // TODO looks like we can NOT limit the returned `hits` because we calculate IDs there defaultTrackTotalHits = 10000 + uuidSeparator = "qqq" // Document IDs (_id) fields in quesma ar ) func (cw *ClickhouseQueryTranslator) ParseQuery(body types.JSON) (*model.ExecutionPlan, error) { @@ -336,7 +337,7 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue // therefore we need to strip the hex part (before `q`) and convert it to decimal // then we can query at DB level for i, id := range ids { - idInHex := strings.Split(id, "qqq")[0] + idInHex := strings.Split(id, uuidSeparator)[0] if idAsStr, err := hex.DecodeString(idInHex); err != nil { logger.Error().Msgf("error parsing document id %s: %v", id, err) return model.NewSimpleQueryInvalid() diff --git a/platform/parsers/elastic_query_dsl/query_translator.go b/platform/parsers/elastic_query_dsl/query_translator.go index b33b77fb2..1b41905a7 100644 --- a/platform/parsers/elastic_query_dsl/query_translator.go +++ b/platform/parsers/elastic_query_dsl/query_translator.go @@ -133,26 +133,26 @@ func (cw *ClickhouseQueryTranslator) makeHits(queries []*model.Query, results [] } hitsPartOfResponse := hitsQuery.Type.TranslateSqlResponseToJson(hitsResultSet) - // trim hits - hitsResponse := hitsPartOfResponse["hits"].(model.SearchHits) - hits := cw.RemoveHitsIfDocHashesSet(hitsResponse) + hits := cw.FilterOutHitsIfThisIsIdQuery(hitsResponse) return queriesWithoutHits, resultsWithoutHits, &hits } -func (cw *ClickhouseQueryTranslator) RemoveHitsIfDocHashesSet(hits model.SearchHits) model.SearchHits { - // if we have doc hashes set, we need to remove hits from the response +// FilterOutHitsIfThisIsIdQuery - If during parsing we have found that this is a query for _id, +// we filter out hits that are not in the list of UniqueIDs. +// we only do this filtering based on the doc.Source hash comparison, ignoring the two first UUID parts. +func (cw *ClickhouseQueryTranslator) FilterOutHitsIfThisIsIdQuery(hits model.SearchHits) model.SearchHits { if len(cw.UniqueIDs) == 0 { - return hits + return hits // not _id query, proceed as usual } - docHashes := make([]string, 0, len(cw.UniqueIDs)) + hashesFromQuery := make([]string, 0, len(cw.UniqueIDs)) for _, id := range cw.UniqueIDs { - docHashes = append(docHashes, strings.Split(id, "qqq")[2]) + hashesFromQuery = append(hashesFromQuery, strings.Split(id, uuidSeparator)[2]) } filteredHits := make([]model.SearchHit, 0, len(hits.Hits)) for _, hit := range hits.Hits { - hashOfOfDocInHit := strings.Split(hit.ID, "qqq")[2] - if slices.Contains(docHashes, hashOfOfDocInHit) { + hash := strings.Split(hit.ID, uuidSeparator)[2] + if slices.Contains(hashesFromQuery, hash) { filteredHits = append(filteredHits, hit) } } From 4044e736b03316e1c08c4869f521b8fa6836f534 Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Tue, 3 Jun 2025 21:37:53 +0200 Subject: [PATCH 3/4] okay one more found --- platform/model/typical_queries/hits.go | 40 +++++++++++++++---- .../parsers/elastic_query_dsl/query_parser.go | 5 +++ 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/platform/model/typical_queries/hits.go b/platform/model/typical_queries/hits.go index b63d9711d..c318d203d 100644 --- a/platform/model/typical_queries/hits.go +++ b/platform/model/typical_queries/hits.go @@ -3,7 +3,6 @@ package typical_queries import ( - "bytes" "context" "crypto/sha256" "encoding/hex" @@ -245,7 +244,7 @@ func (query Hits) computeIdForDocument(doc model.SearchHit, defaultID string) st // At database level we only compare timestamps with millisecond precision // However in search results we append `q` plus generated digits (we use q because it's not in hex) // so that kibana can iterate over documents in UI - sourceHash := fmt.Sprintf("%x", HashJSON(doc.Source)) + sourceHash := fmt.Sprintf("%x", ComputeHash(doc.Source)) pseudoUniqueId = fmt.Sprintf("%xqqq%sqqq%x", vv, defaultID, sourceHash) //pseudoUniqueId = fmt.Sprintf("%xq%s", vv, defaultID) } else { @@ -256,19 +255,44 @@ func (query Hits) computeIdForDocument(doc model.SearchHit, defaultID string) st return pseudoUniqueId } -func HashJSON(data json.RawMessage) string { - var buf bytes.Buffer - err := json.Compact(&buf, data) +func ComputeHash(data json.RawMessage) string { + var parsed interface{} + if err := json.Unmarshal(data, &parsed); err != nil { + hash := sha256.Sum256(data) + return hex.EncodeToString(hash[:]) + } + normalized := normalizeJSON(parsed) + normalizedBytes, err := json.Marshal(normalized) if err != nil { hash := sha256.Sum256(data) return hex.EncodeToString(hash[:]) } - hash := sha256.Sum256(buf.Bytes()) - eee := hex.EncodeToString(hash[:]) - println(eee) + hash := sha256.Sum256(normalizedBytes) return hex.EncodeToString(hash[:]) } +// normalizeJSON recursively normalizes JSON structure to ensure consistent ordering for further hashing. +func normalizeJSON(v interface{}) interface{} { + switch val := v.(type) { + case map[string]interface{}: + normalized := make(map[string]interface{}) + for k, v := range val { + normalized[k] = normalizeJSON(v) + } + return normalized + + case []interface{}: + normalized := make([]interface{}, len(val)) + for i, v := range val { + normalized[i] = normalizeJSON(v) + } + return normalized + + default: + return val + } +} + func (query Hits) String() string { return fmt.Sprintf("hits(indexes: %v)", strings.Join(query.indexes, ", ")) } diff --git a/platform/parsers/elastic_query_dsl/query_parser.go b/platform/parsers/elastic_query_dsl/query_parser.go index bc017477b..a82622ae2 100644 --- a/platform/parsers/elastic_query_dsl/query_parser.go +++ b/platform/parsers/elastic_query_dsl/query_parser.go @@ -1205,6 +1205,11 @@ func ResolveField(ctx context.Context, fieldName string, schemaInstance schema.S } func (cw *ClickhouseQueryTranslator) parseSize(queryMap QueryMap, defaultSize int) int { + if len(cw.UniqueIDs) > 0 { + // If this is a unique ID query, we can't limit size at the SQL level, + // because we need all matching timestamps that later will be filtered out but looking at hashes computed on hits + return defaultSize + } sizeRaw, exists := queryMap["size"] if !exists { return defaultSize From 5b90e355f3bef965afc0f9bcc1b1f8203f57055c Mon Sep 17 00:00:00 2001 From: przemyslaw Date: Thu, 5 Jun 2025 20:17:56 +0200 Subject: [PATCH 4/4] unique ids take2 --- .../frontend_connectors/schema_transformer.go | 40 +++++++++++++++++++ platform/model/expr.go | 13 ++++-- platform/model/query.go | 1 + platform/model/typical_queries/hits.go | 1 + .../parsers/elastic_query_dsl/query_parser.go | 16 ++++---- .../elastic_query_dsl/query_translator.go | 5 ++- 6 files changed, 64 insertions(+), 12 deletions(-) diff --git a/platform/frontend_connectors/schema_transformer.go b/platform/frontend_connectors/schema_transformer.go index b0541c9fb..3c9be3eea 100644 --- a/platform/frontend_connectors/schema_transformer.go +++ b/platform/frontend_connectors/schema_transformer.go @@ -43,6 +43,44 @@ func (s *SchemaCheckPass) isFieldMapSyntaxEnabled(query *model.Query) bool { return enabled } +func (s *SchemaCheckPass) applyIDTransform(_ schema.Schema, query *model.Query) (*model.Query, error) { + // _id's are computed from the timestamp and the document content. + // Because the latter can be accessed AFTER execution of the query and compared when hits are returned, + // we need to adjust the SQL query as follows: + // 1) if we're filtering for specific _id's then just return the query as is, because we filter out non-matching timestamps + // 2) if we're filtering OUT (e.g. _id NOT IN (...)) then we still need to return these documents, + // because it might occur that multiple documents with the same timestamps and the final filtering + // has to be done in the response rendering layer. + visitor := model.NewBaseVisitor() + visitor.OverrideVisitPrefixExpr = func(b *model.BaseExprVisitor, e model.PrefixExpr) interface{} { + if e.Op == "NOT" && e.Args != nil { + newArgs := make([]model.Expr, 0, len(e.Args)) + for _, arg := range e.Args { + if infixExpr, ok := arg.(model.InfixExpr); ok { + if infixExpr.Metadata != model.IDQuery { + newArgs = append(newArgs, infixExpr) + } + } + } + e.Args = newArgs + } + if len(e.Args) == 0 { + query.IgnoreSize = true + return model.NewInfixExpr(model.NewLiteral(1), "=", model.NewLiteral(1)) // return a no-op expression + } else { + return model.NewPrefixExpr(e.Op, b.VisitChildren(e.Args)) + } + } + + expr := query.SelectCommand.Accept(visitor) + + if _, ok := expr.(*model.SelectCommand); ok { + query.SelectCommand = *expr.(*model.SelectCommand) + } + + return query, nil +} + func (s *SchemaCheckPass) applyBooleanLiteralLowering(index schema.Schema, query *model.Query) (*model.Query, error) { visitor := model.NewBaseVisitor() @@ -980,6 +1018,8 @@ func (s *SchemaCheckPass) Transform(plan *model.ExecutionPlan) (*model.Execution TransformationName string Transformation func(schema.Schema, *model.Query) (*model.Query, error) }{ + {TransformationName: "IDTransform", Transformation: s.applyIDTransform}, + // Section 1: from logical to physical {TransformationName: "PhysicalFromExpressionTransformation", Transformation: s.applyPhysicalFromExpression}, {TransformationName: "WildcardExpansion", Transformation: s.applyWildcardExpansion}, diff --git a/platform/model/expr.go b/platform/model/expr.go index 316ff0ae5..ba3fd7c43 100644 --- a/platform/model/expr.go +++ b/platform/model/expr.go @@ -123,9 +123,10 @@ func (e TupleExpr) Accept(v ExprVisitor) interface{} { } type InfixExpr struct { - Left Expr - Op string - Right Expr + Left Expr + Op string + Right Expr + Metadata string // Metadata can store additional information about the expression for futher transofrmation } func (e InfixExpr) Accept(v ExprVisitor) interface{} { @@ -239,6 +240,12 @@ func NewInfixExpr(lhs Expr, operator string, rhs Expr) InfixExpr { return InfixExpr{Left: lhs, Op: operator, Right: rhs} } +const IDQuery = "ID_QUERY" + +func NewInfixExprWithMetadata(lhs Expr, operator string, rhs Expr, metadata string) InfixExpr { + return InfixExpr{Left: lhs, Op: operator, Right: rhs, Metadata: metadata} +} + // AliasedExpr is an expression with an alias, e.g. `columnName AS alias` or `COUNT(x) AS sum_of_xs` type AliasedExpr struct { Expr Expr diff --git a/platform/model/query.go b/platform/model/query.go index f263481f8..866b7cfbc 100644 --- a/platform/model/query.go +++ b/platform/model/query.go @@ -69,6 +69,7 @@ type ( RuntimeMappings map[string]RuntimeMapping + IgnoreSize bool // dictionary to add as 'meta' field in the response. // WARNING: it's probably not passed everywhere where it's needed, just in one place. // But it works for the test + our dashboards, so let's fix it later if necessary. diff --git a/platform/model/typical_queries/hits.go b/platform/model/typical_queries/hits.go index c318d203d..6ec734148 100644 --- a/platform/model/typical_queries/hits.go +++ b/platform/model/typical_queries/hits.go @@ -36,6 +36,7 @@ type Hits struct { addVersion bool // true <=> we add hit.Version field to the response (whose value is always 1) indexes []string timestampFieldName string + IgnoreSize bool } func NewHits(ctx context.Context, table *clickhouse.Table, highlighter *model.Highlighter, diff --git a/platform/parsers/elastic_query_dsl/query_parser.go b/platform/parsers/elastic_query_dsl/query_parser.go index a82622ae2..2ba49aadf 100644 --- a/platform/parsers/elastic_query_dsl/query_parser.go +++ b/platform/parsers/elastic_query_dsl/query_parser.go @@ -389,7 +389,7 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue logger.ErrorWithCtx(cw.Ctx).Msgf("error converting id to sql: %v", err) return model.NewSimpleQueryInvalid() } - whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " = ", sql) + whereStmt = model.NewInfixExprWithMetadata(model.NewColumnRef(timestampColumnName), " = ", sql, model.IDQuery) default: idsAsExprs := make([]model.Expr, len(ids)) for i, id := range ids { @@ -400,7 +400,7 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue } } idsTuple := model.NewTupleExpr(idsAsExprs...) - whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " IN ", idsTuple) + whereStmt = model.NewInfixExprWithMetadata(model.NewColumnRef(timestampColumnName), " IN ", idsTuple, model.IDQuery) } cw.UniqueIDs = uniqueIds // TODO a crucial side effect here return model.NewSimpleQuery(whereStmt, true) @@ -1205,11 +1205,13 @@ func ResolveField(ctx context.Context, fieldName string, schemaInstance schema.S } func (cw *ClickhouseQueryTranslator) parseSize(queryMap QueryMap, defaultSize int) int { - if len(cw.UniqueIDs) > 0 { - // If this is a unique ID query, we can't limit size at the SQL level, - // because we need all matching timestamps that later will be filtered out but looking at hashes computed on hits - return defaultSize - } + //isIDQuery := len(cw.UniqueIDs) > 0 + // If this is a unique ID query, we can't limit size at the SQL level, + // because we need all matching timestamps that later will be filtered out but looking at hashes computed on hits + + //if len(cw.UniqueIDs) > 0 { TODO: maybe remove + // return defaultSize + //} sizeRaw, exists := queryMap["size"] if !exists { return defaultSize diff --git a/platform/parsers/elastic_query_dsl/query_translator.go b/platform/parsers/elastic_query_dsl/query_translator.go index 1b41905a7..2975b3441 100644 --- a/platform/parsers/elastic_query_dsl/query_translator.go +++ b/platform/parsers/elastic_query_dsl/query_translator.go @@ -26,8 +26,9 @@ type ClickhouseQueryTranslator struct { Indexes []string // TODO this will be removed - Table *clickhouse.Table - UniqueIDs []string // used for hits queries, to filter out hits that are not in the list of IDs + Table *clickhouse.Table + UniqueIDs []string // used for hits queries, to filter out hits that are not in the list of IDs + IgnoreSize bool } var completionStatusOK = func() *int { value := 200; return &value }()