From ef5ecb335089afaccbc7d477b2fcf54f8bc0441c Mon Sep 17 00:00:00 2001 From: "keyang.lk" Date: Wed, 31 Dec 2025 20:30:21 +0800 Subject: [PATCH 1/3] feat(ob_conn): perform a fair hybrid query of full-text and vector --- rag/utils/ob_conn.py | 296 +++++++++++++++++++++++++++++++++---------- 1 file changed, 226 insertions(+), 70 deletions(-) diff --git a/rag/utils/ob_conn.py b/rag/utils/ob_conn.py index 83c22fb1f..99b6600e3 100644 --- a/rag/utils/ob_conn.py +++ b/rag/utils/ob_conn.py @@ -481,6 +481,8 @@ def is_true(var: str, default: str) -> bool: self.use_fulltext_hint = is_true('USE_FULLTEXT_HINT', 'true') self.search_original_content = is_true("SEARCH_ORIGINAL_CONTENT", 'true') self.enable_hybrid_search = is_true('ENABLE_HYBRID_SEARCH', 'false') + self.use_fulltext_first_fusion_search = is_true('USE_FULLTEXT_FIRST_FUSION_SEARCH', 'true') + logger.info(f"USE_FULLTEXT_FIRST_FUSION_SEARCH={self.use_fulltext_first_fusion_search}") """ Database operations @@ -963,39 +965,113 @@ def search( if search_type == "fusion": # fusion search, usually for chat num_candidates = vector_topn + fulltext_topn - if group_results: - count_sql = ( - f"WITH fulltext_results AS (" - f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {fulltext_search_filter}" - f" ORDER BY relevance DESC" - f" LIMIT {num_candidates}" - f")," - f" scored_results AS (" - f" SELECT *" - f" FROM fulltext_results" - f" WHERE {vector_search_filter}" - f")," - f" group_results AS (" - f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id) as rn" - f" FROM scored_results" - f")" - f" SELECT COUNT(*)" - f" FROM group_results" - f" WHERE rn = 1" - ) + if self.use_fulltext_first_fusion_search: + # fulltext-first fusion: take top-N fulltext results, then apply vector similarity threshold filter + if group_results: + count_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {num_candidates}" + f")," + f" scored_results AS (" + f" SELECT *" + f" FROM fulltext_results" + f" WHERE {vector_search_filter}" + f")," + f" group_results AS (" + f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id) as rn" + f" FROM scored_results" + f")" + f" SELECT COUNT(*)" + f" FROM group_results" + f" WHERE rn = 1" + ) + else: + count_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {num_candidates}" + f")" + f" SELECT COUNT(*) FROM fulltext_results WHERE {vector_search_filter}" + ) else: - count_sql = ( - f"WITH fulltext_results AS (" - f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {fulltext_search_filter}" - f" ORDER BY relevance DESC" - f" LIMIT {num_candidates}" - f")" - f" SELECT COUNT(*) FROM fulltext_results WHERE {vector_search_filter}" - ) + # join-based fusion: fetch top-K fulltext and top-K vector results independently, then FULL OUTER JOIN by id + # NOTE: We use COALESCE for pagerank so vector-only results can also get pagerank. + if group_results: + count_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} id, {PAGERANK_FLD}, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {fulltext_topn}" + f")," + f"vector_results AS (" + f" SELECT id, {PAGERANK_FLD}, {vector_search_score_expr} AS similarity" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {vector_search_filter}" + f" ORDER BY {vector_search_expr}" + f" APPROXIMATE LIMIT {vector_topn}" + f")," + f"combined_results AS (" + f" SELECT" + f" COALESCE(f.id, v.id) AS id," + f" (" + # Normalize fulltext relevance within candidate set to make fusion weights meaningful, + # and protect vector-only/fulltext-only rows from NULL-propagation. + f" IFNULL(f.relevance / NULLIF((SELECT MAX(relevance) FROM fulltext_results), 0), 0) * {1 - vector_similarity_weight}" + f" + IFNULL(v.similarity, 0) * {vector_similarity_weight}" + f" + (CAST(IFNULL(COALESCE(f.{PAGERANK_FLD}, v.{PAGERANK_FLD}), 0) AS DECIMAL(10, 2)) / 100)" + f" ) AS score" + f" FROM fulltext_results f" + f" FULL OUTER JOIN vector_results v" + f" ON f.id = v.id" + f")," + f"joined_results AS (" + f" SELECT t.*, c.score AS _score" + f" FROM combined_results c" + f" JOIN {index_name} t" + f" ON c.id = t.id" + f")," + f"group_results AS (" + f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id ORDER BY _score DESC) as rn" + f" FROM joined_results" + f")" + f" SELECT COUNT(*)" + f" FROM group_results" + f" WHERE rn = 1" + ) + else: + count_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} id, {PAGERANK_FLD}, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {fulltext_topn}" + f")," + f"vector_results AS (" + f" SELECT id, {PAGERANK_FLD}, {vector_search_score_expr} AS similarity" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {vector_search_filter}" + f" ORDER BY {vector_search_expr}" + f" APPROXIMATE LIMIT {vector_topn}" + f")," + f"combined_results AS (" + f" SELECT" + f" COALESCE(f.id, v.id) AS id" + f" FROM fulltext_results f" + f" FULL OUTER JOIN vector_results v" + f" ON f.id = v.id" + f")" + f" SELECT COUNT(*) FROM combined_results" + ) logger.debug("OBConnection.search with count sql: %s", count_sql) start_time = time.time() @@ -1017,46 +1093,126 @@ def search( if total_count == 0: continue - score_expr = f"(relevance * {1 - vector_similarity_weight} + {vector_search_score_expr} * {vector_similarity_weight} + {pagerank_score_expr})" - if group_results: - fusion_sql = ( - f"WITH fulltext_results AS (" - f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {fulltext_search_filter}" - f" ORDER BY relevance DESC" - f" LIMIT {num_candidates}" - f")," - f" scored_results AS (" - f" SELECT *, {score_expr} AS _score" - f" FROM fulltext_results" - f" WHERE {vector_search_filter}" - f")," - f" group_results AS (" - f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id ORDER BY _score DESC) as rn" - f" FROM scored_results" - f")" - f" SELECT {fields_expr}, _score" - f" FROM group_results" - f" WHERE rn = 1" - f" ORDER BY _score DESC" - f" LIMIT {offset}, {limit}" - ) + if self.use_fulltext_first_fusion_search: + # NOTE: + # MATCH ... AGAINST relevance is not bounded and can dwarf vector similarity (typically in [-1, 1]). + # Normalize relevance within the candidate set (fulltext_results) so weights behave as intended. + relevance_norm_expr = "IFNULL(relevance / NULLIF((SELECT MAX(relevance) FROM fulltext_results), 0), 0)" + score_expr = f"({relevance_norm_expr} * {1 - vector_similarity_weight} + {vector_search_score_expr} * {vector_similarity_weight} + {pagerank_score_expr})" + if group_results: + fusion_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {num_candidates}" + f")," + f" scored_results AS (" + f" SELECT *, {score_expr} AS _score" + f" FROM fulltext_results" + f" WHERE {vector_search_filter}" + f")," + f" group_results AS (" + f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id ORDER BY _score DESC) as rn" + f" FROM scored_results" + f")" + f" SELECT {fields_expr}, _score" + f" FROM group_results" + f" WHERE rn = 1" + f" ORDER BY _score DESC" + f" LIMIT {offset}, {limit}" + ) + else: + fusion_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {num_candidates}" + f")" + f" SELECT {fields_expr}, {score_expr} AS _score" + f" FROM fulltext_results" + f" WHERE {vector_search_filter}" + f" ORDER BY _score DESC" + f" LIMIT {offset}, {limit}" + ) else: - fusion_sql = ( - f"WITH fulltext_results AS (" - f" SELECT {fulltext_search_hint} *, {fulltext_search_score_expr} AS relevance" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {fulltext_search_filter}" - f" ORDER BY relevance DESC" - f" LIMIT {num_candidates}" - f")" - f" SELECT {fields_expr}, {score_expr} AS _score" - f" FROM fulltext_results" - f" WHERE {vector_search_filter}" - f" ORDER BY _score DESC" - f" LIMIT {offset}, {limit}" + join_fields_expr = ", ".join([f"t.{f} as {f}" for f in output_fields if f != "_score"]) + relevance_norm_expr = "IFNULL(f.relevance / NULLIF((SELECT MAX(relevance) FROM fulltext_results), 0), 0)" + score_expr = ( + f"({relevance_norm_expr} * {1 - vector_similarity_weight}" + f" + IFNULL(v.similarity, 0) * {vector_similarity_weight}" + f" + (CAST(IFNULL(COALESCE(f.{PAGERANK_FLD}, v.{PAGERANK_FLD}), 0) AS DECIMAL(10, 2)) / 100))" ) + if group_results: + fusion_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} id, {PAGERANK_FLD}, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {fulltext_topn}" + f")," + f"vector_results AS (" + f" SELECT id, {PAGERANK_FLD}, {vector_search_score_expr} AS similarity" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {vector_search_filter}" + f" ORDER BY {vector_search_expr}" + f" APPROXIMATE LIMIT {vector_topn}" + f")," + f"combined_results AS (" + f" SELECT COALESCE(f.id, v.id) AS id, {score_expr} AS score" + f" FROM fulltext_results f" + f" FULL OUTER JOIN vector_results v" + f" ON f.id = v.id" + f")," + f"joined_results AS (" + f" SELECT {join_fields_expr}, c.score AS _score, t.group_id as group_id" + f" FROM combined_results c" + f" JOIN {index_name} t" + f" ON c.id = t.id" + f")," + f"group_results AS (" + f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id ORDER BY _score DESC) as rn" + f" FROM joined_results" + f")" + f" SELECT {fields_expr}, _score" + f" FROM group_results" + f" WHERE rn = 1" + f" ORDER BY _score DESC" + f" LIMIT {offset}, {limit}" + ) + else: + fusion_sql = ( + f"WITH fulltext_results AS (" + f" SELECT {fulltext_search_hint} id, {PAGERANK_FLD}, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {fulltext_topn}" + f")," + f"vector_results AS (" + f" SELECT id, {PAGERANK_FLD}, {vector_search_score_expr} AS similarity" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {vector_search_filter}" + f" ORDER BY {vector_search_expr}" + f" APPROXIMATE LIMIT {vector_topn}" + f")," + f"combined_results AS (" + f" SELECT COALESCE(f.id, v.id) AS id, {score_expr} AS score" + f" FROM fulltext_results f" + f" FULL OUTER JOIN vector_results v" + f" ON f.id = v.id" + f")" + f" SELECT {join_fields_expr}, c.score as _score" + f" FROM combined_results c" + f" JOIN {index_name} t" + f" ON c.id = t.id" + f" ORDER BY score DESC" + f" LIMIT {offset}, {limit}" + ) logger.debug("OBConnection.search with fusion sql: %s", fusion_sql) start_time = time.time() From 185c5525c3793ff52bf0360a7fc30c9d0897e0f2 Mon Sep 17 00:00:00 2001 From: "keyang.lk" Date: Fri, 9 Jan 2026 14:25:49 +0800 Subject: [PATCH 2/3] feat: add fusion_search_parallel in ob conn search --- rag/utils/ob_conn.py | 509 +++++++++++++++++++++++++++---------------- 1 file changed, 323 insertions(+), 186 deletions(-) diff --git a/rag/utils/ob_conn.py b/rag/utils/ob_conn.py index 99b6600e3..acc7ef5c9 100644 --- a/rag/utils/ob_conn.py +++ b/rag/utils/ob_conn.py @@ -13,13 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import contextvars import json import logging import os import re import threading import time -from typing import Any, Optional +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, Callable, Optional, Sequence import numpy as np from elasticsearch_dsl import Q, Search @@ -47,6 +49,37 @@ logger = logging.getLogger('ragflow.ob_conn') + +def run_functions_tuples_in_parallel( + functions_with_args: Sequence[tuple[Callable[..., Any], tuple[Any, ...]]], + allow_failures: bool = False, + max_workers: int | None = None, +) -> list[Any]: + """ + Execute multiple callables in parallel and return results in the same order. + Preserves `contextvars` across threads. + """ + workers = min(max_workers, len(functions_with_args)) if max_workers is not None else len(functions_with_args) + if workers <= 0: + return [] + + results: list[Any] = [None] * len(functions_with_args) + with ThreadPoolExecutor(max_workers=workers) as executor: + future_to_index = { + executor.submit(contextvars.copy_context().run, func, *args): i + for i, (func, args) in enumerate(functions_with_args) + } + for future in as_completed(future_to_index): + index = future_to_index[future] + try: + results[index] = future.result() + except Exception as e: + logger.exception("Function at index %d failed due to %s", index, e) + results[index] = None + if not allow_failures: + raise + return results + column_order_id = Column("_order_id", Integer, nullable=True, comment="chunk order id for maintaining sequence") column_group_id = Column("group_id", String(256), nullable=True, comment="group id for external retrieval") @@ -710,6 +743,239 @@ def _add_column(self, table_name: str, column: Column): CRUD operations """ + def _fusion_search_parallel_in_memory( + self, + *, + index_name: str, + fulltext_search_hint: str, + fulltext_search_score_expr: str, + fulltext_search_filter: str, + fulltext_topn: int, + vector_search_expr: str, + vector_search_score_expr: str, + vector_search_filter: str, + vector_similarity_threshold: float, + vector_topn: int, + filters_expr: str, + output_fields: list[str], + fields_expr: str, + vector_similarity_weight: float, + group_results: bool, + offset: int, + limit: int, + ) -> tuple[list[dict], int]: + """ + For use_fulltext_first_fusion_search == False only. + Runs fulltext and vector top-K queries in parallel, fuses scores in memory, then fetches full fields for candidates. + Returns (paged_rows, total_count_for_pagination). + """ + # Keep behavior consistent with SQL `LIMIT offset, 0`: + # return no rows but still compute `total_count` for pagination. + if limit < 0: + limit = 0 + + def _query_fulltext(): + sql = ( + f"SELECT {fulltext_search_hint} id, {PAGERANK_FLD}, {fulltext_search_score_expr} AS relevance" + f" FROM {index_name}" + f" WHERE {filters_expr} AND {fulltext_search_filter}" + f" ORDER BY relevance DESC" + f" LIMIT {fulltext_topn}" + ) + res = self.client.perform_raw_text_sql(sql) + return res.fetchall() if res else [] + + def _query_vector(): + sql = ( + # Select distance once and ORDER BY its alias to avoid computing cosine_distance twice. + f"SELECT /*+index({index_name},q_1024_vec_idx)*/ id, {PAGERANK_FLD}, {vector_search_expr} AS distance" + f" FROM {index_name}" + # NOTE: + # Do NOT put cosine_distance-based threshold in WHERE. It can force computing distance for a large + # portion of rows and may prevent the vector index/ANN from being used efficiently. + # We'll apply the similarity threshold in Python after retrieving ANN top-K. + f" WHERE {filters_expr}" + f" ORDER BY distance" + f" APPROXIMATE LIMIT {vector_topn}" + ) + res = self.client.perform_raw_text_sql(sql) + return res.fetchall() if res else [] + + def _timed_query(fn): + start = time.perf_counter() + rows = fn() + return rows, (time.perf_counter() - start) + + query_start = time.perf_counter() + (fulltext_rows, fulltext_elapsed), (vector_rows, vector_elapsed) = run_functions_tuples_in_parallel( + [(lambda: _timed_query(_query_fulltext), ()), (lambda: _timed_query(_query_vector), ())], + allow_failures=False, + max_workers=2, + ) + query_time = time.perf_counter() - query_start + logger.info( + "OBConnection.search fusion parallel queries table %s elapsed %.3fs (fulltext %.3fs, vector %.3fs), fulltext=%d, vector=%d", + index_name, + query_time, + fulltext_elapsed, + vector_elapsed, + len(fulltext_rows), + len(vector_rows), + ) + + fulltext_dict: dict[str, dict[str, float]] = {} + max_relevance = 0.0 + for row in fulltext_rows: + doc_id = str(row[0]) + pagerank = float(row[1]) if row[1] is not None else 0.0 + relevance = float(row[2]) if row[2] is not None else 0.0 + fulltext_dict[doc_id] = {"pagerank": pagerank, "relevance": relevance} + if relevance > max_relevance: + max_relevance = relevance + + vector_dict: dict[str, dict[str, float]] = {} + for row in vector_rows: + doc_id = str(row[0]) + pagerank = float(row[1]) if row[1] is not None else 0.0 + distance = float(row[2]) if row[2] is not None else 1.0 + similarity = 1.0 - distance + if similarity < vector_similarity_threshold: + continue + vector_dict[doc_id] = {"pagerank": pagerank, "similarity": similarity} + + fused_results: list[dict[str, Any]] = [] + all_ids = set(fulltext_dict.keys()) | set(vector_dict.keys()) + for doc_id in all_ids: + ft = fulltext_dict.get(doc_id) + vec = vector_dict.get(doc_id) + + rel = ft["relevance"] if ft else 0.0 + rel_norm = (rel / max_relevance) if (max_relevance > 0 and rel > 0) else 0.0 + sim = vec["similarity"] if vec else 0.0 + pagerank = (ft["pagerank"] if ft else (vec["pagerank"] if vec else 0.0)) + + score = rel_norm * (1 - vector_similarity_weight) + sim * vector_similarity_weight + (pagerank / 100.0) + fused_results.append({"id": doc_id, "score": score}) + + fused_results.sort(key=lambda x: x["score"], reverse=True) + if not fused_results: + return [], 0 + + score_by_id: dict[str, float] = {r["id"]: float(r["score"]) for r in fused_results} + + # NOTE: output_fields contains "_score" for API contract, but it is NOT a table column. + base_fetch_fields = [f for f in output_fields if f != "_score"] + if "id" not in base_fetch_fields: + base_fetch_fields = ["id"] + base_fetch_fields + + def _fetch_rows_by_ids(select_fields: list[str], ids: list[str], batch_size: int = 1000) -> list[Row]: + if not ids: + return [] + rows: list[Row] = [] + for i in range(0, len(ids), batch_size): + batch = ids[i: i + batch_size] + escaped = [escape_string(str(doc_id)) for doc_id in batch] + ids_str = "', '".join(escaped) + sql = ( + f"SELECT {', '.join(select_fields)}" + f" FROM {index_name}" + f" WHERE id IN ('{ids_str}')" + ) + res = self.client.perform_raw_text_sql(sql) + if res: + rows.extend(res.fetchall()) + return rows + + if group_results: + group_fetch_start = time.time() + gid_rows = _fetch_rows_by_ids(["id", "group_id"], [r["id"] for r in fused_results], batch_size=1000) + group_fetch_time = time.time() - group_fetch_start + + best_by_group: dict[Any, tuple[float, str]] = {} + for row in gid_rows: + doc_id = str(row[0]) + gid = row[1] + sc = score_by_id.get(doc_id) + if sc is None: + continue + key = gid if gid is not None else doc_id + prev = best_by_group.get(key) + if prev is None or sc > prev[0]: + best_by_group[key] = (sc, doc_id) + + group_sorted = sorted(best_by_group.values(), key=lambda t: t[0], reverse=True) + total_count = len(group_sorted) + page_ids = [doc_id for _, doc_id in group_sorted[offset: offset + limit]] + + want_group_id = "group_id" in output_fields + fetch_fields = base_fetch_fields.copy() + if want_group_id and "group_id" not in fetch_fields: + fetch_fields.append("group_id") + + fetch_start = time.time() + full_rows = _fetch_rows_by_ids(fetch_fields, page_ids, batch_size=1000) + fetch_time = time.time() - fetch_start + + full_data: dict[str, dict] = {} + for row in full_rows: + ent = self._row_to_entity(row, fetch_fields) + if "id" in ent: + full_data[str(ent["id"])] = ent + + paged: list[dict] = [] + for doc_id in page_ids: + ent = full_data.get(doc_id) + if not ent: + continue + out = ent.copy() + out["_score"] = score_by_id.get(doc_id, 0.0) + if not want_group_id: + out.pop("group_id", None) + paged.append(out) + + logger.info( + "OBConnection.search fusion in-memory table %s: query=%.3fs group_fetch=%.3fs fetch=%.3fs total=%d returned=%d", + index_name, + query_time, + group_fetch_time, + fetch_time, + total_count, + len(paged), + ) + return paged, total_count + + total_count = len(fused_results) + page_ids = [r["id"] for r in fused_results[offset: offset + limit]] + + fetch_start = time.time() + full_rows = _fetch_rows_by_ids(base_fetch_fields, page_ids, batch_size=1000) + fetch_time = time.time() - fetch_start + + full_data: dict[str, dict] = {} + for row in full_rows: + ent = self._row_to_entity(row, base_fetch_fields) + if "id" in ent: + full_data[str(ent["id"])] = ent + + paged: list[dict] = [] + for doc_id in page_ids: + ent = full_data.get(doc_id) + if not ent: + continue + out = ent.copy() + out["_score"] = score_by_id.get(doc_id, 0.0) + paged.append(out) + + logger.info( + "OBConnection.search fusion in-memory table %s: query=%.3fs fetch=%.3fs total=%d returned=%d", + index_name, + query_time, + fetch_time, + total_count, + len(paged), + ) + return paged, total_count + def search( self, selectFields: list[str], @@ -1000,100 +1266,23 @@ def search( f")" f" SELECT COUNT(*) FROM fulltext_results WHERE {vector_search_filter}" ) - else: - # join-based fusion: fetch top-K fulltext and top-K vector results independently, then FULL OUTER JOIN by id - # NOTE: We use COALESCE for pagerank so vector-only results can also get pagerank. - if group_results: - count_sql = ( - f"WITH fulltext_results AS (" - f" SELECT {fulltext_search_hint} id, {PAGERANK_FLD}, {fulltext_search_score_expr} AS relevance" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {fulltext_search_filter}" - f" ORDER BY relevance DESC" - f" LIMIT {fulltext_topn}" - f")," - f"vector_results AS (" - f" SELECT id, {PAGERANK_FLD}, {vector_search_score_expr} AS similarity" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {vector_search_filter}" - f" ORDER BY {vector_search_expr}" - f" APPROXIMATE LIMIT {vector_topn}" - f")," - f"combined_results AS (" - f" SELECT" - f" COALESCE(f.id, v.id) AS id," - f" (" - # Normalize fulltext relevance within candidate set to make fusion weights meaningful, - # and protect vector-only/fulltext-only rows from NULL-propagation. - f" IFNULL(f.relevance / NULLIF((SELECT MAX(relevance) FROM fulltext_results), 0), 0) * {1 - vector_similarity_weight}" - f" + IFNULL(v.similarity, 0) * {vector_similarity_weight}" - f" + (CAST(IFNULL(COALESCE(f.{PAGERANK_FLD}, v.{PAGERANK_FLD}), 0) AS DECIMAL(10, 2)) / 100)" - f" ) AS score" - f" FROM fulltext_results f" - f" FULL OUTER JOIN vector_results v" - f" ON f.id = v.id" - f")," - f"joined_results AS (" - f" SELECT t.*, c.score AS _score" - f" FROM combined_results c" - f" JOIN {index_name} t" - f" ON c.id = t.id" - f")," - f"group_results AS (" - f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id ORDER BY _score DESC) as rn" - f" FROM joined_results" - f")" - f" SELECT COUNT(*)" - f" FROM group_results" - f" WHERE rn = 1" - ) - else: - count_sql = ( - f"WITH fulltext_results AS (" - f" SELECT {fulltext_search_hint} id, {PAGERANK_FLD}, {fulltext_search_score_expr} AS relevance" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {fulltext_search_filter}" - f" ORDER BY relevance DESC" - f" LIMIT {fulltext_topn}" - f")," - f"vector_results AS (" - f" SELECT id, {PAGERANK_FLD}, {vector_search_score_expr} AS similarity" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {vector_search_filter}" - f" ORDER BY {vector_search_expr}" - f" APPROXIMATE LIMIT {vector_topn}" - f")," - f"combined_results AS (" - f" SELECT" - f" COALESCE(f.id, v.id) AS id" - f" FROM fulltext_results f" - f" FULL OUTER JOIN vector_results v" - f" ON f.id = v.id" - f")" - f" SELECT COUNT(*) FROM combined_results" - ) - logger.debug("OBConnection.search with count sql: %s", count_sql) - start_time = time.time() - - res = self.client.perform_raw_text_sql(count_sql) - total_count = res.fetchone()[0] if res else 0 - result.total += total_count - - elapsed_time = time.time() - start_time - logger.info( - f"OBConnection.search table {index_name}, search type: fusion, step: 1-count, elapsed time: {elapsed_time:.3f} seconds," - f" vector column: '{vector_column_name}'," - f" query text: '{fulltext_query}'," - f" condition: '{condition}'," - f" vector_similarity_threshold: {vector_similarity_threshold}," - f" got count: {total_count}" - ) - - if total_count == 0: - continue + logger.debug("OBConnection.search with count sql: %s", count_sql) + start_time = time.time() + res = self.client.perform_raw_text_sql(count_sql) + total_count = res.fetchone()[0] if res else 0 + elapsed_time = time.time() - start_time + logger.info( + f"OBConnection.search table {index_name}, search type: fusion, step: 1-count, elapsed time: {elapsed_time:.3f} seconds," + f" vector column: '{vector_column_name}'," + f" query text: '{fulltext_query}'," + f" condition: '{condition}'," + f" vector_similarity_threshold: {vector_similarity_threshold}," + f" got count: {total_count}" + ) + if total_count == 0: + continue - if self.use_fulltext_first_fusion_search: # NOTE: # MATCH ... AGAINST relevance is not bounded and can dwarf vector similarity (typically in [-1, 1]). # Normalize relevance within the candidate set (fulltext_results) so weights behave as intended. @@ -1138,102 +1327,50 @@ def search( f" ORDER BY _score DESC" f" LIMIT {offset}, {limit}" ) - else: - join_fields_expr = ", ".join([f"t.{f} as {f}" for f in output_fields if f != "_score"]) - relevance_norm_expr = "IFNULL(f.relevance / NULLIF((SELECT MAX(relevance) FROM fulltext_results), 0), 0)" - score_expr = ( - f"({relevance_norm_expr} * {1 - vector_similarity_weight}" - f" + IFNULL(v.similarity, 0) * {vector_similarity_weight}" - f" + (CAST(IFNULL(COALESCE(f.{PAGERANK_FLD}, v.{PAGERANK_FLD}), 0) AS DECIMAL(10, 2)) / 100))" + logger.debug("OBConnection.search with fusion sql: %s", fusion_sql) + + start_time = time.time() + res = self.client.perform_raw_text_sql(fusion_sql) + rows = res.fetchall() + elapsed_time = time.time() - start_time + logger.info( + f"OBConnection.search table {index_name}, search type: fusion, step: 2-query, elapsed time: {elapsed_time:.3f} seconds," + f" select fields: '{output_fields}'," + f" vector column: '{vector_column_name}'," + f" query text: '{fulltext_query}'," + f" condition: '{condition}'," + f" vector_similarity_threshold: {vector_similarity_threshold}," + f" vector_similarity_weight: {vector_similarity_weight}," + f" return rows count: {len(rows)}" ) - if group_results: - fusion_sql = ( - f"WITH fulltext_results AS (" - f" SELECT {fulltext_search_hint} id, {PAGERANK_FLD}, {fulltext_search_score_expr} AS relevance" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {fulltext_search_filter}" - f" ORDER BY relevance DESC" - f" LIMIT {fulltext_topn}" - f")," - f"vector_results AS (" - f" SELECT id, {PAGERANK_FLD}, {vector_search_score_expr} AS similarity" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {vector_search_filter}" - f" ORDER BY {vector_search_expr}" - f" APPROXIMATE LIMIT {vector_topn}" - f")," - f"combined_results AS (" - f" SELECT COALESCE(f.id, v.id) AS id, {score_expr} AS score" - f" FROM fulltext_results f" - f" FULL OUTER JOIN vector_results v" - f" ON f.id = v.id" - f")," - f"joined_results AS (" - f" SELECT {join_fields_expr}, c.score AS _score, t.group_id as group_id" - f" FROM combined_results c" - f" JOIN {index_name} t" - f" ON c.id = t.id" - f")," - f"group_results AS (" - f" SELECT *, ROW_NUMBER() OVER (PARTITION BY group_id ORDER BY _score DESC) as rn" - f" FROM joined_results" - f")" - f" SELECT {fields_expr}, _score" - f" FROM group_results" - f" WHERE rn = 1" - f" ORDER BY _score DESC" - f" LIMIT {offset}, {limit}" - ) - else: - fusion_sql = ( - f"WITH fulltext_results AS (" - f" SELECT {fulltext_search_hint} id, {PAGERANK_FLD}, {fulltext_search_score_expr} AS relevance" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {fulltext_search_filter}" - f" ORDER BY relevance DESC" - f" LIMIT {fulltext_topn}" - f")," - f"vector_results AS (" - f" SELECT id, {PAGERANK_FLD}, {vector_search_score_expr} AS similarity" - f" FROM {index_name}" - f" WHERE {filters_expr} AND {vector_search_filter}" - f" ORDER BY {vector_search_expr}" - f" APPROXIMATE LIMIT {vector_topn}" - f")," - f"combined_results AS (" - f" SELECT COALESCE(f.id, v.id) AS id, {score_expr} AS score" - f" FROM fulltext_results f" - f" FULL OUTER JOIN vector_results v" - f" ON f.id = v.id" - f")" - f" SELECT {join_fields_expr}, c.score as _score" - f" FROM combined_results c" - f" JOIN {index_name} t" - f" ON c.id = t.id" - f" ORDER BY score DESC" - f" LIMIT {offset}, {limit}" - ) - logger.debug("OBConnection.search with fusion sql: %s", fusion_sql) - - start_time = time.time() - res = self.client.perform_raw_text_sql(fusion_sql) - rows = res.fetchall() - - elapsed_time = time.time() - start_time - logger.info( - f"OBConnection.search table {index_name}, search type: fusion, step: 2-query, elapsed time: {elapsed_time:.3f} seconds," - f" select fields: '{output_fields}'," - f" vector column: '{vector_column_name}'," - f" query text: '{fulltext_query}'," - f" condition: '{condition}'," - f" vector_similarity_threshold: {vector_similarity_threshold}," - f" vector_similarity_weight: {vector_similarity_weight}," - f" return rows count: {len(rows)}" - ) + paged_rows = [self._row_to_entity(row, output_fields) for row in rows] + else: + # join-based fusion is replaced with parallel in-memory fusion + paged_rows, total_count = self._fusion_search_parallel_in_memory( + index_name=index_name, + fulltext_search_hint=fulltext_search_hint, + fulltext_search_score_expr=fulltext_search_score_expr, + fulltext_search_filter=fulltext_search_filter, + fulltext_topn=fulltext_topn, + vector_search_expr=vector_search_expr, + vector_search_score_expr=vector_search_score_expr, + vector_search_filter=vector_search_filter, + vector_similarity_threshold=vector_similarity_threshold, + vector_topn=vector_topn, + filters_expr=filters_expr, + output_fields=output_fields + ["_score"] if "_score" not in output_fields else output_fields, + fields_expr=fields_expr, + vector_similarity_weight=vector_similarity_weight, + group_results=group_results, + offset=offset, + limit=limit, + ) + if total_count <= 0: + continue - for row in rows: - result.chunks.append(self._row_to_entity(row, output_fields)) + result.total += total_count + result.chunks.extend(paged_rows) elif search_type == "vector": # vector search, usually used for graph search count_sql = f"SELECT COUNT(id) FROM {index_name} WHERE {filters_expr} AND {vector_search_filter}" From d7c78619dbdba97b663ff20aa9d10bafd54b319d Mon Sep 17 00:00:00 2001 From: "keyang.lk" Date: Mon, 29 Dec 2025 14:27:44 +0800 Subject: [PATCH 3/3] bugfix:UNION_MERGE hint is removed in 4.4.1, use INDEX_MERGE hint instead --- rag/utils/ob_conn.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/rag/utils/ob_conn.py b/rag/utils/ob_conn.py index acc7ef5c9..6c445ffef 100644 --- a/rag/utils/ob_conn.py +++ b/rag/utils/ob_conn.py @@ -484,6 +484,8 @@ def _check_ob_version(self): raise Exception( f"The version of OceanBase needs to be higher than or equal to 4.3.5.1, current version is {version_str}" ) + + self.is_less_than_441 = ob_version < ObVersion.from_db_version_nums(4, 4, 1, 0) def _try_to_update_ob_query_timeout(self): try: @@ -1225,8 +1227,12 @@ def search( if not self._check_table_exists_cached(index_name): continue - - fulltext_search_hint = f"/*+ UNION_MERGE({index_name} {' '.join(fulltext_search_idx_list)}) */" if self.use_fulltext_hint else "" + # https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000004480377#13-title-INDEX_MERGE%20Hint + # UNION_MERGE hint is removed in 4.4.1, use INDEX_MERGE hint instead + if self.is_less_than_441: + fulltext_search_hint = f"/*+ UNION_MERGE({index_name} {' '.join(fulltext_search_idx_list)}) */" if self.use_fulltext_hint else "" + else: + fulltext_search_hint = f"/*+ INDEX_MERGE({index_name} {' '.join(fulltext_search_idx_list)}) */" if self.use_fulltext_hint else "" if search_type == "fusion": # fusion search, usually for chat