Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions platform/frontend_connectors/schema_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,44 @@ func (s *SchemaCheckPass) isFieldMapSyntaxEnabled(query *model.Query) bool {
return enabled
}

func (s *SchemaCheckPass) applyIDTransform(_ schema.Schema, query *model.Query) (*model.Query, error) {
// _id's are computed from the timestamp and the document content.
// Because the latter can be accessed AFTER execution of the query and compared when hits are returned,
// we need to adjust the SQL query as follows:
// 1) if we're filtering for specific _id's then just return the query as is, because we filter out non-matching timestamps
// 2) if we're filtering OUT (e.g. _id NOT IN (...)) then we still need to return these documents,
// because it might occur that multiple documents with the same timestamps and the final filtering
// has to be done in the response rendering layer.
visitor := model.NewBaseVisitor()
visitor.OverrideVisitPrefixExpr = func(b *model.BaseExprVisitor, e model.PrefixExpr) interface{} {
if e.Op == "NOT" && e.Args != nil {
newArgs := make([]model.Expr, 0, len(e.Args))
for _, arg := range e.Args {
if infixExpr, ok := arg.(model.InfixExpr); ok {
if infixExpr.Metadata != model.IDQuery {
newArgs = append(newArgs, infixExpr)
}
}
}
e.Args = newArgs
}
if len(e.Args) == 0 {
query.IgnoreSize = true
return model.NewInfixExpr(model.NewLiteral(1), "=", model.NewLiteral(1)) // return a no-op expression
} else {
return model.NewPrefixExpr(e.Op, b.VisitChildren(e.Args))
}
}

expr := query.SelectCommand.Accept(visitor)

if _, ok := expr.(*model.SelectCommand); ok {
query.SelectCommand = *expr.(*model.SelectCommand)
}

return query, nil
}

func (s *SchemaCheckPass) applyBooleanLiteralLowering(index schema.Schema, query *model.Query) (*model.Query, error) {

visitor := model.NewBaseVisitor()
Expand Down Expand Up @@ -980,6 +1018,8 @@ func (s *SchemaCheckPass) Transform(plan *model.ExecutionPlan) (*model.Execution
TransformationName string
Transformation func(schema.Schema, *model.Query) (*model.Query, error)
}{
{TransformationName: "IDTransform", Transformation: s.applyIDTransform},

// Section 1: from logical to physical
{TransformationName: "PhysicalFromExpressionTransformation", Transformation: s.applyPhysicalFromExpression},
{TransformationName: "WildcardExpansion", Transformation: s.applyWildcardExpansion},
Expand Down
13 changes: 10 additions & 3 deletions platform/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,10 @@ func (e TupleExpr) Accept(v ExprVisitor) interface{} {
}

type InfixExpr struct {
Left Expr
Op string
Right Expr
Left Expr
Op string
Right Expr
Metadata string // Metadata can store additional information about the expression for futher transofrmation
}

func (e InfixExpr) Accept(v ExprVisitor) interface{} {
Expand Down Expand Up @@ -239,6 +240,12 @@ func NewInfixExpr(lhs Expr, operator string, rhs Expr) InfixExpr {
return InfixExpr{Left: lhs, Op: operator, Right: rhs}
}

const IDQuery = "ID_QUERY"

func NewInfixExprWithMetadata(lhs Expr, operator string, rhs Expr, metadata string) InfixExpr {
return InfixExpr{Left: lhs, Op: operator, Right: rhs, Metadata: metadata}
}

// AliasedExpr is an expression with an alias, e.g. `columnName AS alias` or `COUNT(x) AS sum_of_xs`
type AliasedExpr struct {
Expr Expr
Expand Down
1 change: 1 addition & 0 deletions platform/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type (

RuntimeMappings map[string]RuntimeMapping

IgnoreSize bool
// dictionary to add as 'meta' field in the response.
// WARNING: it's probably not passed everywhere where it's needed, just in one place.
// But it works for the test + our dashboards, so let's fix it later if necessary.
Expand Down
46 changes: 45 additions & 1 deletion platform/model/typical_queries/hits.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ package typical_queries

import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/QuesmaOrg/quesma/platform/clickhouse"
"github.com/QuesmaOrg/quesma/platform/common_table"
Expand Down Expand Up @@ -33,6 +36,7 @@ type Hits struct {
addVersion bool // true <=> we add hit.Version field to the response (whose value is always 1)
indexes []string
timestampFieldName string
IgnoreSize bool
}

func NewHits(ctx context.Context, table *clickhouse.Table, highlighter *model.Highlighter,
Expand Down Expand Up @@ -241,7 +245,9 @@ func (query Hits) computeIdForDocument(doc model.SearchHit, defaultID string) st
// At database level we only compare timestamps with millisecond precision
// However in search results we append `q` plus generated digits (we use q because it's not in hex)
// so that kibana can iterate over documents in UI
pseudoUniqueId = fmt.Sprintf("%xq%s", vv, defaultID)
sourceHash := fmt.Sprintf("%x", ComputeHash(doc.Source))
pseudoUniqueId = fmt.Sprintf("%xqqq%sqqq%x", vv, defaultID, sourceHash)
//pseudoUniqueId = fmt.Sprintf("%xq%s", vv, defaultID)
} else {
logger.WarnWithCtx(query.ctx).Msgf("failed to convert timestamp field [%v] to time.Time", v[0])
return defaultID
Expand All @@ -250,6 +256,44 @@ func (query Hits) computeIdForDocument(doc model.SearchHit, defaultID string) st
return pseudoUniqueId
}

func ComputeHash(data json.RawMessage) string {
var parsed interface{}
if err := json.Unmarshal(data, &parsed); err != nil {
hash := sha256.Sum256(data)
return hex.EncodeToString(hash[:])
}
normalized := normalizeJSON(parsed)
normalizedBytes, err := json.Marshal(normalized)
if err != nil {
hash := sha256.Sum256(data)
return hex.EncodeToString(hash[:])
}
hash := sha256.Sum256(normalizedBytes)
return hex.EncodeToString(hash[:])
}

// normalizeJSON recursively normalizes JSON structure to ensure consistent ordering for further hashing.
func normalizeJSON(v interface{}) interface{} {
switch val := v.(type) {
case map[string]interface{}:
normalized := make(map[string]interface{})
for k, v := range val {
normalized[k] = normalizeJSON(v)
}
return normalized

case []interface{}:
normalized := make([]interface{}, len(val))
for i, v := range val {
normalized[i] = normalizeJSON(v)
}
return normalized

default:
return val
}
}

func (query Hits) String() string {
return fmt.Sprintf("hits(indexes: %v)", strings.Join(query.indexes, ", "))
}
19 changes: 15 additions & 4 deletions platform/parsers/elastic_query_dsl/query_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func NewEmptyHighlighter() model.Highlighter {
}

const (
defaultQueryResultSize = 10
defaultQueryResultSize = 10000 // TODO looks like we can NOT limit the returned `hits` because we calculate IDs there
defaultTrackTotalHits = 10000
uuidSeparator = "qqq" // Document IDs (_id) fields in quesma ar
)

func (cw *ClickhouseQueryTranslator) ParseQuery(body types.JSON) (*model.ExecutionPlan, error) {
Expand Down Expand Up @@ -322,6 +323,7 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue
return model.NewSimpleQueryInvalid()
}
ids := make([]string, 0, len(idsRaw))
uniqueIds := make([]string, 0, len(idsRaw)) // to avoid duplicates
for _, id := range idsRaw {
if idAsString, ok := id.(string); ok {
ids = append(ids, idAsString)
Expand All @@ -335,14 +337,15 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue
// therefore we need to strip the hex part (before `q`) and convert it to decimal
// then we can query at DB level
for i, id := range ids {
idInHex := strings.Split(id, "q")[0]
idInHex := strings.Split(id, uuidSeparator)[0]
if idAsStr, err := hex.DecodeString(idInHex); err != nil {
logger.Error().Msgf("error parsing document id %s: %v", id, err)
return model.NewSimpleQueryInvalid()
} else {
tsWithoutTZ := strings.TrimSuffix(string(idAsStr), " +0000 UTC")
ids[i] = fmt.Sprintf("'%s'", tsWithoutTZ)
}
uniqueIds = append(uniqueIds, id)
}

var idToSql func(string) (model.Expr, error)
Expand Down Expand Up @@ -386,7 +389,7 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue
logger.ErrorWithCtx(cw.Ctx).Msgf("error converting id to sql: %v", err)
return model.NewSimpleQueryInvalid()
}
whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " = ", sql)
whereStmt = model.NewInfixExprWithMetadata(model.NewColumnRef(timestampColumnName), " = ", sql, model.IDQuery)
default:
idsAsExprs := make([]model.Expr, len(ids))
for i, id := range ids {
Expand All @@ -397,8 +400,9 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue
}
}
idsTuple := model.NewTupleExpr(idsAsExprs...)
whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " IN ", idsTuple)
whereStmt = model.NewInfixExprWithMetadata(model.NewColumnRef(timestampColumnName), " IN ", idsTuple, model.IDQuery)
}
cw.UniqueIDs = uniqueIds // TODO a crucial side effect here
return model.NewSimpleQuery(whereStmt, true)
}

Expand Down Expand Up @@ -1201,6 +1205,13 @@ func ResolveField(ctx context.Context, fieldName string, schemaInstance schema.S
}

func (cw *ClickhouseQueryTranslator) parseSize(queryMap QueryMap, defaultSize int) int {
//isIDQuery := len(cw.UniqueIDs) > 0
// If this is a unique ID query, we can't limit size at the SQL level,
// because we need all matching timestamps that later will be filtered out but looking at hashes computed on hits

//if len(cw.UniqueIDs) > 0 { TODO: maybe remove
// return defaultSize
//}
sizeRaw, exists := queryMap["size"]
if !exists {
return defaultSize
Expand Down
31 changes: 29 additions & 2 deletions platform/parsers/elastic_query_dsl/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/QuesmaOrg/quesma/platform/parsers/elastic_query_dsl/query_util"
"github.com/QuesmaOrg/quesma/platform/schema"
"github.com/QuesmaOrg/quesma/platform/util"
"slices"
"strings"
)

type JsonMap = map[string]interface{}
Expand All @@ -24,7 +26,9 @@ type ClickhouseQueryTranslator struct {
Indexes []string

// TODO this will be removed
Table *clickhouse.Table
Table *clickhouse.Table
UniqueIDs []string // used for hits queries, to filter out hits that are not in the list of IDs
IgnoreSize bool
}

var completionStatusOK = func() *int { value := 200; return &value }()
Expand Down Expand Up @@ -131,7 +135,30 @@ func (cw *ClickhouseQueryTranslator) makeHits(queries []*model.Query, results []
hitsPartOfResponse := hitsQuery.Type.TranslateSqlResponseToJson(hitsResultSet)

hitsResponse := hitsPartOfResponse["hits"].(model.SearchHits)
return queriesWithoutHits, resultsWithoutHits, &hitsResponse
hits := cw.FilterOutHitsIfThisIsIdQuery(hitsResponse)
return queriesWithoutHits, resultsWithoutHits, &hits
}

// FilterOutHitsIfThisIsIdQuery - If during parsing we have found that this is a query for _id,
// we filter out hits that are not in the list of UniqueIDs.
// we only do this filtering based on the doc.Source hash comparison, ignoring the two first UUID parts.
func (cw *ClickhouseQueryTranslator) FilterOutHitsIfThisIsIdQuery(hits model.SearchHits) model.SearchHits {
if len(cw.UniqueIDs) == 0 {
return hits // not _id query, proceed as usual
}
hashesFromQuery := make([]string, 0, len(cw.UniqueIDs))
for _, id := range cw.UniqueIDs {
hashesFromQuery = append(hashesFromQuery, strings.Split(id, uuidSeparator)[2])
}
filteredHits := make([]model.SearchHit, 0, len(hits.Hits))
for _, hit := range hits.Hits {
hash := strings.Split(hit.ID, uuidSeparator)[2]
if slices.Contains(hashesFromQuery, hash) {
filteredHits = append(filteredHits, hit)
}
}
hits.Hits = filteredHits
return hits
}

func (cw *ClickhouseQueryTranslator) makeTotalCount(queries []*model.Query, results [][]model.QueryResultRow) (queriesWithoutCount []*model.Query, resultsWithoutCount [][]model.QueryResultRow, total *model.Total) {
Expand Down