From bd498a28fe13cf1fbd036423c267a2ce0402f178 Mon Sep 17 00:00:00 2001 From: Michael Freeman Date: Tue, 10 Mar 2026 00:59:55 -0500 Subject: [PATCH] wip: hybrid retrieval test --- .../threadr/lib/threadr/ml/constrained_qa.ex | 94 ++- .../lib/threadr/ml/conversation_summary_qa.ex | 56 +- .../lib/threadr/ml/hybrid_retriever.ex | 630 ++++++++++++++++++ elixir/threadr/lib/threadr/ml/semantic_qa.ex | 125 +--- elixir/threadr/lib/threadr/repo.ex | 2 +- ...0260310033000_enable_pg_trgm_extension.exs | 11 + ...0310033100_add_message_trigram_indexes.exs | 15 + .../threadr/control_plane/user_qa_test.exs | 3 +- .../test/threadr/ingest/irc/bot_qa_test.exs | 4 +- .../test/threadr/ml/constrained_qa_test.exs | 54 +- .../ml/conversation_summary_qa_test.exs | 65 ++ .../test/threadr/ml/graph_rag_test.exs | 3 +- .../test/threadr/ml/semantic_qa_test.exs | 81 +++ .../changes/add-hybrid-qa-retrieval/design.md | 75 +++ .../add-hybrid-qa-retrieval/proposal.md | 17 + .../specs/threadr-2-rewrite/spec.md | 48 ++ .../changes/add-hybrid-qa-retrieval/tasks.md | 21 + 17 files changed, 1157 insertions(+), 147 deletions(-) create mode 100644 elixir/threadr/lib/threadr/ml/hybrid_retriever.ex create mode 100644 elixir/threadr/priv/repo/migrations/20260310033000_enable_pg_trgm_extension.exs create mode 100644 elixir/threadr/priv/repo/tenant_migrations/20260310033100_add_message_trigram_indexes.exs create mode 100644 openspec/changes/add-hybrid-qa-retrieval/design.md create mode 100644 openspec/changes/add-hybrid-qa-retrieval/proposal.md create mode 100644 openspec/changes/add-hybrid-qa-retrieval/specs/threadr-2-rewrite/spec.md create mode 100644 openspec/changes/add-hybrid-qa-retrieval/tasks.md diff --git a/elixir/threadr/lib/threadr/ml/constrained_qa.ex b/elixir/threadr/lib/threadr/ml/constrained_qa.ex index 57dbe54d..8707b7e1 100644 --- a/elixir/threadr/lib/threadr/ml/constrained_qa.ex +++ b/elixir/threadr/lib/threadr/ml/constrained_qa.ex @@ -12,6 +12,7 @@ defmodule Threadr.ML.ConstrainedQA do ConversationQAIntent, Generation, GenerationProviderOpts, + HybridRetriever, ReconstructionQuery, SemanticQA } @@ -23,6 +24,7 @@ defmodule Threadr.ML.ConstrainedQA do @pair_cluster_gap_seconds 600 @pair_cluster_scan_limit 96 @question_stopwords MapSet.new([ + "all", "a", "an", "are", @@ -39,7 +41,9 @@ defmodule Threadr.ML.ConstrainedQA do "it", "me", "should", + "talk", "tell", + "the", "to", "want", "what", @@ -147,6 +151,7 @@ defmodule Threadr.ML.ConstrainedQA do actors: [actor], counterpart_actors: [target_actor], literal_terms: [], + topic_terms: [], literal_match: "all", time_scope: infer_time_scope(question), scope_current_channel: Keyword.get(opts, :requester_channel_name) != nil, @@ -205,6 +210,7 @@ defmodule Threadr.ML.ConstrainedQA do actors: normalize_refs(Map.get(payload, "actors", [])), counterpart_actors: normalize_refs(Map.get(payload, "counterpart_actors", [])), literal_terms: normalize_literal_terms(Map.get(payload, "literal_terms", [])), + topic_terms: [], literal_match: normalize_literal_match(Map.get(payload, "literal_match")), time_scope: normalize_time_scope(Map.get(payload, "time_scope")), scope_current_channel: Map.get(payload, "scope_current_channel") == true, @@ -229,7 +235,8 @@ defmodule Threadr.ML.ConstrainedQA do %{ actors: actors, counterpart_actors: [], - literal_terms: literal_terms, + literal_terms: [], + topic_terms: literal_terms, literal_match: "all", time_scope: infer_time_scope(question), scope_current_channel: Keyword.get(opts, :requester_channel_name) != nil, @@ -282,24 +289,52 @@ defmodule Threadr.ML.ConstrainedQA do end defp retrieve_matches(tenant_schema, constraints, opts) do - matches = - if constraints.literal_terms != [] do - fetch_literal_matches(tenant_schema, constraints, opts) - else - fetch_direct_matches(tenant_schema, constraints, opts) - end + cond do + constraints.literal_terms != [] -> + matches = fetch_literal_matches(tenant_schema, constraints, opts) - if matches == [] do - {:error, :no_constrained_matches} - else - retrieval = - if constraints.literal_terms != [] do - "literal_term_messages" + if matches == [] do + {:error, :no_constrained_matches} + else + {:ok, matches, query_metadata(constraints, "literal_term_messages")} + end + + constraints.actors != [] and Map.get(constraints, :topic_terms, []) != [] -> + case fetch_hybrid_topic_matches(tenant_schema, constraints, opts) do + [] -> + matches = fetch_direct_matches(tenant_schema, constraints, opts) + + if matches == [] do + {:error, :no_constrained_matches} + else + {:ok, matches, query_metadata(constraints, "filtered_messages")} + end + + matches -> + {:ok, matches, query_metadata(constraints, "hybrid_topic_messages")} + end + + true -> + matches = fetch_direct_matches(tenant_schema, constraints, opts) + + if matches == [] do + {:error, :no_constrained_matches} else - "filtered_messages" + {:ok, matches, query_metadata(constraints, "filtered_messages")} end + end + end + + defp fetch_hybrid_topic_matches(tenant_schema, constraints, opts) do + actor_ids = Enum.map(constraints.actors, &dump_uuid!(&1.id)) - {:ok, matches, query_metadata(constraints, retrieval)} + case HybridRetriever.search_messages( + tenant_schema, + hybrid_topic_query(constraints), + hybrid_topic_opts(constraints, opts, actor_ids) + ) do + {:ok, matches, _query} -> matches + {:error, _reason} -> [] end end @@ -756,6 +791,7 @@ defmodule Threadr.ML.ConstrainedQA do actor_handles: Enum.map(constraints.actors, & &1.handle), counterpart_actor_handles: Enum.map(constraints.counterpart_actors, & &1.handle), literal_terms: constraints.literal_terms, + topic_terms: Map.get(constraints, :topic_terms, []), literal_match: constraints.literal_match, time_scope: constraints.time_scope, channel_name: constraints.requester_channel_name @@ -803,6 +839,7 @@ defmodule Threadr.ML.ConstrainedQA do Enum.map(constraints.counterpart_actors, & &1.handle) ) |> maybe_put_summary("Literal terms", constraints.literal_terms) + |> maybe_put_summary("Topic terms", Map.get(constraints, :topic_terms, [])) |> maybe_put_summary( "Time scope", constraints.time_scope != :none && Atom.to_string(constraints.time_scope) @@ -1079,5 +1116,32 @@ defmodule Threadr.ML.ConstrainedQA do defp dump_uuid!(value) when is_binary(value), do: Ecto.UUID.dump!(value) defp dump_uuid!(value), do: value + defp hybrid_topic_query(constraints) do + case Map.get(constraints, :topic_terms, []) do + [] -> Enum.map(constraints.actors, & &1.handle) |> Enum.join(" ") + terms -> Enum.join(terms, " ") + end + end + + defp hybrid_topic_opts(constraints, opts, actor_ids) do + opts + |> Keyword.put(:actor_ids, actor_ids) + |> Keyword.put(:channel_name, constraints.requester_channel_name) + |> apply_hybrid_time_scope(constraints.time_scope) + |> Keyword.put_new(:limit, @default_limit) + end + + defp apply_hybrid_time_scope(opts, :today) do + {since, until} = today_bounds() + opts |> Keyword.put(:since, since) |> Keyword.put(:until, until) + end + + defp apply_hybrid_time_scope(opts, :yesterday) do + {since, until} = yesterday_bounds() + opts |> Keyword.put(:since, since) |> Keyword.put(:until, until) + end + + defp apply_hybrid_time_scope(opts, :none), do: opts + defp blank?(value), do: value in [nil, ""] end diff --git a/elixir/threadr/lib/threadr/ml/conversation_summary_qa.ex b/elixir/threadr/lib/threadr/ml/conversation_summary_qa.ex index 25c50cb4..e5950f2f 100644 --- a/elixir/threadr/lib/threadr/ml/conversation_summary_qa.ex +++ b/elixir/threadr/lib/threadr/ml/conversation_summary_qa.ex @@ -13,6 +13,7 @@ defmodule Threadr.ML.ConversationSummaryQA do ConversationSummaryQAIntent, Generation, GenerationProviderOpts, + HybridRetriever, ReconstructionQuery } @@ -31,7 +32,7 @@ defmodule Threadr.ML.ConversationSummaryQA do resolved_opts <- resolve_summary_opts(intent, opts), :ok <- require_time_bounds(resolved_opts), conversations <- fetch_conversations(tenant.schema_name, resolved_opts), - messages <- fetch_summary_messages(tenant.schema_name, resolved_opts), + messages <- fetch_summary_messages(tenant.schema_name, question, resolved_opts), true <- conversations != [] or messages != [] do build_answer(tenant, question, intent, conversations, messages, resolved_opts) else @@ -73,7 +74,7 @@ defmodule Threadr.ML.ConversationSummaryQA do facts_over_time: [], context: context, answer: answer - }} + }} end end @@ -191,7 +192,10 @@ defmodule Threadr.ML.ConversationSummaryQA do rows -> Threadr.ML.SemanticQA.build_context(rows) end - Enum.join(header_lines ++ conversation_lines ++ message_window_lines ++ blankable(evidence), "\n\n") + Enum.join( + header_lines ++ conversation_lines ++ message_window_lines ++ blankable(evidence), + "\n\n" + ) end defp citation_matches(citations) do @@ -249,7 +253,24 @@ defmodule Threadr.ML.ConversationSummaryQA do |> maybe_filter_conversation_until(Keyword.get(opts, :until)) end - defp fetch_summary_messages(tenant_schema, opts) do + defp fetch_summary_messages(tenant_schema, question, opts) do + limit = message_limit(opts) + window_messages = fetch_window_messages(tenant_schema, opts) + hybrid_messages = fetch_hybrid_summary_messages(tenant_schema, question, opts) + + hybrid_ids = MapSet.new(Enum.map(hybrid_messages, & &1.message_id)) + + window_fill = + window_messages + |> Enum.reject(&MapSet.member?(hybrid_ids, &1.message_id)) + |> Enum.take(max(limit - length(hybrid_messages), 0)) + + (hybrid_messages ++ window_fill) + |> Enum.uniq_by(& &1.message_id) + |> Enum.sort_by(&{&1.observed_at, &1.message_id}, :asc) + end + + defp fetch_window_messages(tenant_schema, opts) do limit = message_limit(opts) from(m in "messages", @@ -276,6 +297,17 @@ defmodule Threadr.ML.ConversationSummaryQA do |> Enum.map(&normalize_message/1) end + defp fetch_hybrid_summary_messages(tenant_schema, question, opts) do + limit = hybrid_message_limit(opts) + + tenant_schema + |> HybridRetriever.search_messages(question, hybrid_summary_opts(opts, limit)) + |> case do + {:ok, matches, _query} -> Enum.take(matches, limit) + {:error, _reason} -> [] + end + end + defp resolve_summary_opts(intent, opts) do opts |> maybe_put_inferred_bounds(intent.time_scope) @@ -443,6 +475,22 @@ defmodule Threadr.ML.ConversationSummaryQA do |> min(@max_message_limit) end + defp hybrid_message_limit(opts) do + total_limit = message_limit(opts) + + total_limit + |> div(3) + |> max(8) + |> min(40) + |> min(total_limit) + end + + defp hybrid_summary_opts(opts, limit) do + opts + |> Keyword.put(:limit, limit) + |> Keyword.put(:channel_name, Keyword.get(opts, :requester_channel_name)) + end + defp retrieval_mode([], _messages), do: "message_window" defp retrieval_mode(_conversations, []), do: "reconstructed_conversations" defp retrieval_mode(_conversations, _messages), do: "reconstructed_conversations_plus_messages" diff --git a/elixir/threadr/lib/threadr/ml/hybrid_retriever.ex b/elixir/threadr/lib/threadr/ml/hybrid_retriever.ex new file mode 100644 index 00000000..ed7f9412 --- /dev/null +++ b/elixir/threadr/lib/threadr/ml/hybrid_retriever.ex @@ -0,0 +1,630 @@ +defmodule Threadr.ML.HybridRetriever do + @moduledoc """ + Shared tenant message retrieval that can merge vector and lexical evidence. + """ + + import Ecto.Query + import Pgvector.Ecto.Query + + alias Threadr.ML.{EmbeddingProviderOpts, Embeddings} + alias Threadr.Repo + + @default_limit 5 + @default_vector_limit 24 + @default_lexical_limit 24 + @default_expansion_limit 8 + @default_expansion_window_seconds 180 + @lexical_similarity_threshold 0.08 + @stopwords MapSet.new([ + "a", + "about", + "all", + "an", + "and", + "are", + "did", + "do", + "does", + "for", + "from", + "how", + "i", + "in", + "is", + "it", + "me", + "of", + "on", + "or", + "said", + "that", + "the", + "this", + "to", + "today", + "was", + "what", + "when", + "where", + "who", + "why", + "with" + ]) + + def search_messages(tenant_schema, question, opts \\ []) + when is_binary(tenant_schema) and is_binary(question) and is_list(opts) do + limit = Keyword.get(opts, :limit, @default_limit) + + vector_result = fetch_vector_matches(tenant_schema, question, opts) + lexical_matches = fetch_lexical_matches(tenant_schema, question, opts) + + seed_matches = + vector_result + |> vector_matches() + |> merge_matches(lexical_matches, lexical_terms(question), limit) + + matches = + seed_matches + |> expand_matches(tenant_schema, opts) + + cond do + matches != [] -> + {:ok, matches, + query_metadata(vector_result, lexical_matches, seed_matches, matches, limit)} + + match?({:error, :no_message_embeddings}, vector_result) -> + {:error, :no_message_embeddings} + + true -> + {:error, :no_retrieval_matches} + end + end + + defp fetch_vector_matches(tenant_schema, question, opts) do + with {:ok, query_embedding} <- Embeddings.embed_query(question, embedding_opts(opts)), + {:ok, query_vector} <- Ash.Vector.new(query_embedding.embedding) do + model = Keyword.get(opts, :embedding_model, default_embedding_model()) + limit = max(Keyword.get(opts, :limit, @default_limit) * 4, @default_vector_limit) + + matches = + from(me in "message_embeddings", + join: m in "messages", + on: m.id == me.message_id, + join: a in "actors", + on: a.id == m.actor_id, + join: c in "channels", + on: c.id == m.channel_id, + where: me.model == ^model, + where: ^vector_actor_filter(Keyword.get(opts, :actor_ids, [])), + where: ^vector_channel_filter(request_channel_name(opts)), + where: ^vector_message_since_filter(Keyword.get(opts, :since)), + where: ^vector_message_until_filter(Keyword.get(opts, :until)), + order_by: cosine_distance(me.embedding, ^query_vector), + limit: ^limit, + select: %{ + message_id: m.id, + external_id: m.external_id, + body: m.body, + observed_at: m.observed_at, + metadata: m.metadata, + actor_handle: a.handle, + actor_display_name: a.display_name, + channel_name: c.name, + model: me.model, + distance: cosine_distance(me.embedding, ^query_vector) + } + ) + |> Repo.all(prefix: tenant_schema) + |> Enum.map(&normalize_vector_match/1) + + case matches do + [] -> + {:error, :no_message_embeddings} + + _ -> + {:ok, + %{ + matches: matches, + model: model, + provider: query_embedding.provider, + metadata: Map.get(query_embedding, :metadata, %{}) + }} + end + else + {:error, _reason} = error -> error + end + end + + defp fetch_lexical_matches(tenant_schema, question, opts) do + normalized_question = normalize_question(question) + terms = lexical_terms(question) + limit = max(Keyword.get(opts, :limit, @default_limit) * 4, @default_lexical_limit) + + lexical_gate = + dynamic( + [m, _a, _c], + fragment( + "similarity(lower(?), ?) >= ?", + m.body, + ^normalized_question, + ^@lexical_similarity_threshold + ) or + ^term_filter(terms) + ) + + from(m in "messages", + join: a in "actors", + on: a.id == m.actor_id, + join: c in "channels", + on: c.id == m.channel_id, + where: ^actor_filter(Keyword.get(opts, :actor_ids, [])), + where: ^channel_filter(request_channel_name(opts)), + where: ^message_since_filter(Keyword.get(opts, :since)), + where: ^message_until_filter(Keyword.get(opts, :until)), + where: ^lexical_gate, + order_by: [ + desc: fragment("similarity(lower(?), ?)", m.body, ^normalized_question), + desc: m.observed_at + ], + limit: ^limit, + select: %{ + message_id: m.id, + external_id: m.external_id, + body: m.body, + observed_at: m.observed_at, + metadata: m.metadata, + actor_handle: a.handle, + actor_display_name: a.display_name, + channel_name: c.name, + lexical_similarity: fragment("similarity(lower(?), ?)", m.body, ^normalized_question) + } + ) + |> Repo.all(prefix: tenant_schema) + |> Enum.map(&normalize_lexical_match/1) + end + + defp merge_matches(vector_matches, lexical_matches, terms, limit) do + vector_by_id = Map.new(vector_matches, &{&1.message_id, &1}) + lexical_by_id = Map.new(lexical_matches, &{&1.message_id, &1}) + + vector_by_id + |> Map.merge(lexical_by_id) + |> Enum.map(fn {message_id, match} -> + vector_match = Map.get(vector_by_id, message_id) + lexical_match = Map.get(lexical_by_id, message_id) + + overlap = + lexical_overlap_count( + match.body, + terms + ) + + score = + vector_similarity(vector_match) * 0.7 + + lexical_similarity(lexical_match) * 0.8 + + overlap * 0.15 + + exact_phrase_bonus(match.body, terms) + + match + |> Map.put( + :similarity, + max(vector_similarity(vector_match), lexical_similarity(lexical_match)) + ) + |> Map.put(:distance, vector_distance(vector_match)) + |> Map.put(:model, vector_model(vector_match)) + |> Map.put(:retrieval_score, score) + end) + |> Enum.sort_by( + fn match -> {match.retrieval_score, match.observed_at, match.message_id} end, + :desc + ) + |> Enum.take(limit) + end + + defp query_metadata(vector_result, lexical_matches, seed_matches, matches, limit) do + vector_metadata = + case vector_result do + {:ok, result} -> + %{ + provider: result.provider, + model: result.model, + metadata: result.metadata + } + + _ -> + %{ + provider: nil, + model: nil, + metadata: %{} + } + end + + Map.merge(vector_metadata, %{ + retrieval: "hybrid", + retrieval_sources: + [] + |> maybe_add_source(match?({:ok, _}, vector_result), "vector") + |> maybe_add_source(lexical_matches != [], "lexical"), + lexical_match_count: length(lexical_matches), + seed_match_count: length(seed_matches), + expanded_match_count: max(length(matches) - length(seed_matches), 0), + limit: limit + }) + end + + defp expand_matches([], _tenant_schema, _opts), do: [] + + defp expand_matches(seed_matches, tenant_schema, opts) do + expansion_limit = + opts + |> Keyword.get(:expansion_limit, @default_expansion_limit) + |> max(0) + + if expansion_limit == 0 do + seed_matches + else + expansion_matches = + tenant_schema + |> fetch_expansion_candidates(seed_matches, opts, expansion_limit) + |> Enum.reject(&seed_message?(seed_matches, &1)) + |> Enum.uniq_by(& &1.message_id) + |> Enum.take(expansion_limit) + |> Enum.map(&mark_expanded_match/1) + + seed_matches ++ expansion_matches + end + end + + defp fetch_expansion_candidates(tenant_schema, seed_matches, opts, expansion_limit) do + channel_names = seed_matches |> Enum.map(& &1.channel_name) |> Enum.uniq() + actor_ids = Keyword.get(opts, :actor_ids, []) + seed_external_ids = MapSet.new(Enum.map(seed_matches, & &1.external_id)) + + seed_reply_targets = + seed_matches + |> Enum.map(&get_in(&1, [:metadata, "reply_to_external_id"])) + |> Enum.reject(&is_nil/1) + |> MapSet.new() + + {since, until} = expansion_bounds(seed_matches, opts) + + from(m in "messages", + join: a in "actors", + on: a.id == m.actor_id, + join: c in "channels", + on: c.id == m.channel_id, + where: ^expansion_actor_filter(actor_ids), + where: ^expansion_channel_filter(channel_names), + where: m.observed_at >= ^since and m.observed_at <= ^until, + order_by: [asc: m.observed_at, asc: m.id], + limit: ^max(expansion_limit * 8, expansion_limit), + select: %{ + message_id: m.id, + external_id: m.external_id, + body: m.body, + observed_at: m.observed_at, + metadata: m.metadata, + actor_handle: a.handle, + actor_display_name: a.display_name, + channel_name: c.name + } + ) + |> Repo.all(prefix: tenant_schema) + |> Enum.map(&normalize_expansion_match/1) + |> Enum.filter( + &expansion_match?(&1, seed_matches, seed_external_ids, seed_reply_targets, opts) + ) + |> Enum.sort_by( + &expansion_sort_key(&1, seed_matches, seed_external_ids, seed_reply_targets), + :desc + ) + end + + defp expansion_bounds(seed_matches, opts) do + window_seconds = + Keyword.get(opts, :expansion_window_seconds, @default_expansion_window_seconds) + + observed_times = Enum.map(seed_matches, & &1.observed_at) + min_time = Enum.min_by(observed_times, &time_sort_key/1) + max_time = Enum.max_by(observed_times, &time_sort_key/1) + since = add_seconds(min_time, -window_seconds) + until = add_seconds(max_time, window_seconds) + + { + clamp_since(since, Keyword.get(opts, :since)), + clamp_until(until, Keyword.get(opts, :until)) + } + end + + defp clamp_since(value, nil), do: value + + defp clamp_since(%NaiveDateTime{} = value, %NaiveDateTime{} = since), + do: if(NaiveDateTime.compare(value, since) == :lt, do: since, else: value) + + defp clamp_since(%DateTime{} = value, %DateTime{} = since), + do: if(DateTime.compare(value, since) == :lt, do: since, else: value) + + defp clamp_since(%DateTime{} = value, %NaiveDateTime{} = since), + do: clamp_since(value, DateTime.from_naive!(since, "Etc/UTC")) + + defp clamp_since(%NaiveDateTime{} = value, %DateTime{} = since), + do: clamp_since(DateTime.from_naive!(value, "Etc/UTC"), since) + + defp clamp_until(value, nil), do: value + + defp clamp_until(%NaiveDateTime{} = value, %NaiveDateTime{} = until), + do: if(NaiveDateTime.compare(value, until) == :gt, do: until, else: value) + + defp clamp_until(%DateTime{} = value, %DateTime{} = until), + do: if(DateTime.compare(value, until) == :gt, do: until, else: value) + + defp clamp_until(%DateTime{} = value, %NaiveDateTime{} = until), + do: clamp_until(value, DateTime.from_naive!(until, "Etc/UTC")) + + defp clamp_until(%NaiveDateTime{} = value, %DateTime{} = until), + do: clamp_until(DateTime.from_naive!(value, "Etc/UTC"), until) + + defp expansion_actor_filter([]), do: dynamic(true) + defp expansion_actor_filter(actor_ids), do: dynamic([m, _a, _c], m.actor_id in ^actor_ids) + + defp expansion_channel_filter(channel_names) do + normalized = Enum.map(channel_names, &String.downcase/1) + dynamic([_m, _a, c], fragment("lower(?)", c.name) in ^normalized) + end + + defp expansion_match?(candidate, seed_matches, seed_external_ids, seed_reply_targets, opts) do + nearby_seed?(candidate, seed_matches, opts) or + MapSet.member?(seed_external_ids, get_in(candidate, [:metadata, "reply_to_external_id"])) or + MapSet.member?(seed_reply_targets, candidate.external_id) + end + + defp nearby_seed?(candidate, seed_matches, opts) do + window_seconds = + Keyword.get(opts, :expansion_window_seconds, @default_expansion_window_seconds) + + Enum.any?(seed_matches, fn seed -> + candidate.channel_name == seed.channel_name and + abs(time_diff_seconds(candidate.observed_at, seed.observed_at)) <= window_seconds + end) + end + + defp expansion_sort_key(candidate, seed_matches, seed_external_ids, seed_reply_targets) do + reply_boost = + if MapSet.member?(seed_external_ids, get_in(candidate, [:metadata, "reply_to_external_id"])) or + MapSet.member?(seed_reply_targets, candidate.external_id) do + 1 + else + 0 + end + + nearest_seconds = + seed_matches + |> Enum.filter(&(&1.channel_name == candidate.channel_name)) + |> Enum.map(&abs(time_diff_seconds(candidate.observed_at, &1.observed_at))) + |> Enum.min(fn -> @default_expansion_window_seconds end) + + {-nearest_seconds, reply_boost, candidate.observed_at, candidate.message_id} + end + + defp seed_message?(seed_matches, candidate) do + Enum.any?(seed_matches, &(&1.message_id == candidate.message_id)) + end + + defp mark_expanded_match(match) do + match + |> Map.put(:similarity, 0.0) + |> Map.put(:distance, nil) + |> Map.put(:model, nil) + |> Map.put(:expanded, true) + end + + defp normalize_expansion_match(match) do + match + |> Map.update!(:message_id, &normalize_identifier/1) + |> Map.update!(:external_id, &normalize_identifier/1) + end + + defp add_seconds(%NaiveDateTime{} = value, seconds), + do: NaiveDateTime.add(value, seconds, :second) + + defp add_seconds(%DateTime{} = value, seconds), do: DateTime.add(value, seconds, :second) + + defp time_sort_key(%NaiveDateTime{} = value), do: NaiveDateTime.to_erl(value) + + defp time_sort_key(%DateTime{} = value), + do: value |> DateTime.to_naive() |> NaiveDateTime.to_erl() + + defp time_diff_seconds(%NaiveDateTime{} = left, %NaiveDateTime{} = right), + do: NaiveDateTime.diff(left, right, :second) + + defp time_diff_seconds(%DateTime{} = left, %DateTime{} = right), + do: DateTime.diff(left, right, :second) + + defp time_diff_seconds(%DateTime{} = left, %NaiveDateTime{} = right), + do: DateTime.diff(left, DateTime.from_naive!(right, "Etc/UTC"), :second) + + defp time_diff_seconds(%NaiveDateTime{} = left, %DateTime{} = right), + do: DateTime.diff(DateTime.from_naive!(left, "Etc/UTC"), right, :second) + + defp maybe_add_source(sources, true, source), do: sources ++ [source] + defp maybe_add_source(sources, false, _source), do: sources + + defp vector_matches({:ok, %{matches: matches}}), do: matches + defp vector_matches(_result), do: [] + + defp lexical_terms(question) do + question + |> normalize_question() + |> String.replace(~r/[^a-z0-9]+/u, " ") + |> String.split(~r/\s+/u, trim: true) + |> Enum.reject(&(String.length(&1) < 2)) + |> Enum.reject(&MapSet.member?(@stopwords, &1)) + |> Enum.uniq() + |> Enum.take(6) + end + + defp lexical_overlap_count(body, terms) do + tokens = + body + |> normalize_question() + |> String.replace(~r/[^a-z0-9]+/u, " ") + |> String.split(~r/\s+/u, trim: true) + |> MapSet.new() + + Enum.count(terms, &MapSet.member?(tokens, &1)) + end + + defp exact_phrase_bonus(_body, []), do: 0.0 + + defp exact_phrase_bonus(body, terms) do + if String.contains?(normalize_question(body), Enum.join(terms, " ")) do + 0.25 + else + 0.0 + end + end + + defp normalize_vector_match(match) do + distance = normalize_distance(match.distance) + + match + |> Map.update!(:message_id, &normalize_identifier/1) + |> Map.update!(:external_id, &normalize_identifier/1) + |> Map.put(:distance, distance) + |> Map.put(:similarity, 1.0 - distance) + end + + defp normalize_lexical_match(match) do + lexical_similarity = + case match.lexical_similarity do + %Decimal{} = value -> Decimal.to_float(value) + value when is_float(value) -> value + value when is_integer(value) -> value / 1 + _ -> 0.0 + end + + match + |> Map.update!(:message_id, &normalize_identifier/1) + |> Map.update!(:external_id, &normalize_identifier/1) + |> Map.put(:lexical_similarity, lexical_similarity) + end + + defp normalize_distance(%Decimal{} = distance), do: Decimal.to_float(distance) + defp normalize_distance(distance) when is_float(distance), do: distance + defp normalize_distance(distance) when is_integer(distance), do: distance / 1 + + defp normalize_identifier(nil), do: nil + + defp normalize_identifier(value) when is_binary(value) do + if String.valid?(value) do + value + else + case Ecto.UUID.load(value) do + {:ok, uuid} -> uuid + :error -> Base.encode16(value, case: :lower) + end + end + end + + defp normalize_identifier(value), do: to_string(value) + + defp vector_similarity(nil), do: 0.0 + defp vector_similarity(match), do: Map.get(match, :similarity, 0.0) || 0.0 + + defp lexical_similarity(nil), do: 0.0 + defp lexical_similarity(match), do: Map.get(match, :lexical_similarity, 0.0) || 0.0 + + defp vector_distance(nil), do: nil + defp vector_distance(match), do: Map.get(match, :distance) + + defp vector_model(nil), do: nil + defp vector_model(match), do: Map.get(match, :model) + + defp request_channel_name(opts) do + Keyword.get(opts, :channel_name) || Keyword.get(opts, :requester_channel_name) + end + + defp actor_filter([]), do: dynamic(true) + defp actor_filter(actor_ids), do: dynamic([m, _a, _c], m.actor_id in ^actor_ids) + + defp vector_actor_filter([]), do: dynamic(true) + defp vector_actor_filter(actor_ids), do: dynamic([_me, m, _a, _c], m.actor_id in ^actor_ids) + + defp channel_filter(nil), do: dynamic(true) + + defp channel_filter(channel_name) do + normalized = String.downcase(String.trim(channel_name)) + dynamic([_m, _a, c], fragment("lower(?) = ?", c.name, ^normalized)) + end + + defp vector_channel_filter(nil), do: dynamic(true) + + defp vector_channel_filter(channel_name) do + normalized = String.downcase(String.trim(channel_name)) + dynamic([_me, _m, _a, c], fragment("lower(?) = ?", c.name, ^normalized)) + end + + defp term_filter([]), do: dynamic(false) + + defp term_filter(terms) do + terms + |> Enum.map(&"%#{&1}%") + |> Enum.reduce(dynamic(false), fn pattern, acc -> + dynamic([m, _a, _c], ^acc or ilike(m.body, ^pattern)) + end) + end + + defp message_since_filter(nil), do: dynamic(true) + + defp message_since_filter(%NaiveDateTime{} = since) do + message_since_filter(DateTime.from_naive!(since, "Etc/UTC")) + end + + defp message_since_filter(%DateTime{} = since) do + dynamic([m, _a, _c], m.observed_at >= ^since) + end + + defp vector_message_since_filter(nil), do: dynamic(true) + + defp vector_message_since_filter(%NaiveDateTime{} = since) do + vector_message_since_filter(DateTime.from_naive!(since, "Etc/UTC")) + end + + defp vector_message_since_filter(%DateTime{} = since) do + dynamic([_me, m, _a, _c], m.observed_at >= ^since) + end + + defp message_until_filter(nil), do: dynamic(true) + + defp message_until_filter(%NaiveDateTime{} = until) do + message_until_filter(DateTime.from_naive!(until, "Etc/UTC")) + end + + defp message_until_filter(%DateTime{} = until) do + dynamic([m, _a, _c], m.observed_at <= ^until) + end + + defp vector_message_until_filter(nil), do: dynamic(true) + + defp vector_message_until_filter(%NaiveDateTime{} = until) do + vector_message_until_filter(DateTime.from_naive!(until, "Etc/UTC")) + end + + defp vector_message_until_filter(%DateTime{} = until) do + dynamic([_me, m, _a, _c], m.observed_at <= ^until) + end + + defp normalize_question(question) do + question + |> String.trim() + |> String.downcase() + end + + defp default_embedding_model do + Application.get_env(:threadr, Threadr.ML, []) + |> Keyword.fetch!(:embeddings) + |> Keyword.fetch!(:model) + end + + defp embedding_opts(opts), do: EmbeddingProviderOpts.from_prefixed(opts) +end diff --git a/elixir/threadr/lib/threadr/ml/semantic_qa.ex b/elixir/threadr/lib/threadr/ml/semantic_qa.ex index efe3b4ba..3169a1f1 100644 --- a/elixir/threadr/lib/threadr/ml/semantic_qa.ex +++ b/elixir/threadr/lib/threadr/ml/semantic_qa.ex @@ -4,29 +4,25 @@ defmodule Threadr.ML.SemanticQA do """ import Ecto.Query - import Pgvector.Ecto.Query alias Threadr.CompareDelta alias Threadr.ControlPlane alias Threadr.ML.{ ChannelLabel, - EmbeddingProviderOpts, - Embeddings, Generation, - GenerationProviderOpts + GenerationProviderOpts, + HybridRetriever } alias Threadr.Repo - @default_limit 5 - def answer_question(tenant_subject_name, question, opts \\ []) when is_binary(tenant_subject_name) and is_binary(question) do with {:ok, tenant} <- ControlPlane.get_tenant_by_subject_name(tenant_subject_name, context: %{system: true}), {:ok, matches, query_result} <- - search_messages_in_schema(tenant.schema_name, question, opts), + HybridRetriever.search_messages(tenant.schema_name, question, opts), citations = build_citations(matches, tenant.schema_name), facts_over_time = facts_over_time(citations), {:ok, generation_result} <- @@ -51,7 +47,7 @@ defmodule Threadr.ML.SemanticQA do with {:ok, tenant} <- ControlPlane.get_tenant_by_subject_name(tenant_subject_name, context: %{system: true}), {:ok, matches, query_result} <- - search_messages_in_schema(tenant.schema_name, question, opts), + HybridRetriever.search_messages(tenant.schema_name, question, opts), citations = build_citations(matches, tenant.schema_name) do {:ok, %{ @@ -196,65 +192,6 @@ defmodule Threadr.ML.SemanticQA do defp format_window_value(%NaiveDateTime{} = value), do: NaiveDateTime.to_iso8601(value) defp format_window_value(value), do: to_string(value) - defp search_messages_in_schema(tenant_schema, question, opts) do - with {:ok, query_embedding} <- Embeddings.embed_query(question, embedding_opts(opts)) do - {:ok, query_vector} = Ash.Vector.new(query_embedding.embedding) - model = Keyword.get(opts, :embedding_model, default_embedding_model()) - limit = Keyword.get(opts, :limit, @default_limit) - - matches = - from(me in "message_embeddings", - join: m in "messages", - on: m.id == me.message_id, - join: a in "actors", - on: a.id == m.actor_id, - join: c in "channels", - on: c.id == m.channel_id, - where: me.model == ^model, - order_by: cosine_distance(me.embedding, ^query_vector), - limit: ^limit, - select: %{ - message_id: m.id, - external_id: m.external_id, - body: m.body, - observed_at: m.observed_at, - actor_handle: a.handle, - actor_display_name: a.display_name, - channel_name: c.name, - model: me.model, - distance: cosine_distance(me.embedding, ^query_vector) - } - ) - |> Repo.all(prefix: tenant_schema) - |> Enum.map(&normalize_match/1) - |> maybe_filter_since(Keyword.get(opts, :since)) - |> maybe_filter_until(Keyword.get(opts, :until)) - - case matches do - [] -> - {:error, :no_message_embeddings} - - _ -> - {:ok, matches, - %{ - model: model, - provider: query_embedding.provider, - metadata: Map.get(query_embedding, :metadata, %{}) - }} - end - end - end - - defp normalize_match(match) do - distance = normalize_distance(match.distance) - - match - |> Map.update!(:message_id, &normalize_identifier/1) - |> Map.update!(:external_id, &normalize_identifier/1) - |> Map.put(:distance, distance) - |> Map.put(:similarity, 1.0 - distance) - end - defp build_citations(matches, tenant_schema) do entities_by_message = fetch_entities_by_message(tenant_schema, Enum.map(matches, & &1.message_id)) @@ -343,10 +280,6 @@ defmodule Threadr.ML.SemanticQA do defp missing_extraction_table?(%Postgrex.Error{postgres: %{code: "42P01"}}), do: true defp missing_extraction_table?(_error), do: false - defp normalize_distance(%Decimal{} = distance), do: Decimal.to_float(distance) - defp normalize_distance(distance) when is_float(distance), do: distance - defp normalize_distance(distance) when is_integer(distance), do: distance / 1 - defp normalize_identifier(nil), do: nil defp normalize_identifier(value) when is_binary(value) do @@ -406,60 +339,10 @@ defmodule Threadr.ML.SemanticQA do line <> "\nFacts: " <> suffix end - defp default_embedding_model do - Application.get_env(:threadr, Threadr.ML, []) - |> Keyword.fetch!(:embeddings) - |> Keyword.fetch!(:model) - end - - defp embedding_opts(opts) do - EmbeddingProviderOpts.from_prefixed(opts) - end - defp generation_opts(opts) do GenerationProviderOpts.from_prefixed(opts) end - defp maybe_filter_since(matches, nil), do: matches - - defp maybe_filter_since(matches, %NaiveDateTime{} = since) do - Enum.filter(matches, &compare_observed_at(&1.observed_at, since, :gte)) - end - - defp maybe_filter_since(matches, %DateTime{} = since) do - Enum.filter(matches, &compare_observed_at(&1.observed_at, since, :gte)) - end - - defp maybe_filter_until(matches, nil), do: matches - - defp maybe_filter_until(matches, %NaiveDateTime{} = until) do - Enum.filter(matches, &compare_observed_at(&1.observed_at, until, :lte)) - end - - defp maybe_filter_until(matches, %DateTime{} = until) do - Enum.filter(matches, &compare_observed_at(&1.observed_at, until, :lte)) - end - - defp compare_observed_at(%DateTime{} = observed_at, %DateTime{} = value, :gte), - do: DateTime.compare(observed_at, value) in [:gt, :eq] - - defp compare_observed_at(%DateTime{} = observed_at, %DateTime{} = value, :lte), - do: DateTime.compare(observed_at, value) in [:lt, :eq] - - defp compare_observed_at(%NaiveDateTime{} = observed_at, %NaiveDateTime{} = value, :gte), - do: NaiveDateTime.compare(observed_at, value) in [:gt, :eq] - - defp compare_observed_at(%NaiveDateTime{} = observed_at, %NaiveDateTime{} = value, :lte), - do: NaiveDateTime.compare(observed_at, value) in [:lt, :eq] - - defp compare_observed_at(%DateTime{} = observed_at, %NaiveDateTime{} = value, op), - do: compare_observed_at(observed_at, DateTime.from_naive!(value, "Etc/UTC"), op) - - defp compare_observed_at(%NaiveDateTime{} = observed_at, %DateTime{} = value, op), - do: compare_observed_at(DateTime.from_naive!(observed_at, "Etc/UTC"), value, op) - - defp compare_observed_at(_observed_at, _value, _op), do: false - defp facts_over_time(citations) do citations |> Enum.flat_map(fn citation -> diff --git a/elixir/threadr/lib/threadr/repo.ex b/elixir/threadr/lib/threadr/repo.ex index b56fff1e..c254c049 100644 --- a/elixir/threadr/lib/threadr/repo.ex +++ b/elixir/threadr/lib/threadr/repo.ex @@ -4,7 +4,7 @@ defmodule Threadr.Repo do import Ecto.Query def installed_extensions do - ["age", "vector", "timescaledb", "pg_search", "citext"] + ["age", "vector", "timescaledb", "pg_search", "pg_trgm", "citext"] end def min_pg_version do diff --git a/elixir/threadr/priv/repo/migrations/20260310033000_enable_pg_trgm_extension.exs b/elixir/threadr/priv/repo/migrations/20260310033000_enable_pg_trgm_extension.exs new file mode 100644 index 00000000..9875c6c3 --- /dev/null +++ b/elixir/threadr/priv/repo/migrations/20260310033000_enable_pg_trgm_extension.exs @@ -0,0 +1,11 @@ +defmodule Threadr.Repo.Migrations.EnablePgTrgmExtension do + use Ecto.Migration + + def up do + execute("CREATE EXTENSION IF NOT EXISTS pg_trgm") + end + + def down do + execute("DROP EXTENSION IF EXISTS pg_trgm") + end +end diff --git a/elixir/threadr/priv/repo/tenant_migrations/20260310033100_add_message_trigram_indexes.exs b/elixir/threadr/priv/repo/tenant_migrations/20260310033100_add_message_trigram_indexes.exs new file mode 100644 index 00000000..2005d6f6 --- /dev/null +++ b/elixir/threadr/priv/repo/tenant_migrations/20260310033100_add_message_trigram_indexes.exs @@ -0,0 +1,15 @@ +defmodule Threadr.Repo.TenantMigrations.AddMessageTrigramIndexes do + use Ecto.Migration + + def up do + execute(""" + CREATE INDEX IF NOT EXISTS messages_body_trgm_idx + ON #{prefix()}.messages + USING gin (lower(body) gin_trgm_ops) + """) + end + + def down do + execute("DROP INDEX IF EXISTS #{prefix()}.messages_body_trgm_idx") + end +end diff --git a/elixir/threadr/test/threadr/control_plane/user_qa_test.exs b/elixir/threadr/test/threadr/control_plane/user_qa_test.exs index 6ed1ef7d..bcab9558 100644 --- a/elixir/threadr/test/threadr/control_plane/user_qa_test.exs +++ b/elixir/threadr/test/threadr/control_plane/user_qa_test.exs @@ -172,7 +172,8 @@ defmodule Threadr.ControlPlane.UserQATest do ) assert result.mode == :constrained_qa - assert result.query.retrieval == "literal_term_messages" + assert result.query.retrieval == "hybrid_topic_messages" + assert result.query.topic_terms == ["dnb"] assert result.query.actor_handles == ["THANEW"] assert result.context =~ "not a big fan of dnb tbh" refute result.context =~ "i like jungle more than dnb" diff --git a/elixir/threadr/test/threadr/ingest/irc/bot_qa_test.exs b/elixir/threadr/test/threadr/ingest/irc/bot_qa_test.exs index e02ffc59..3fecf66d 100644 --- a/elixir/threadr/test/threadr/ingest/irc/bot_qa_test.exs +++ b/elixir/threadr/test/threadr/ingest/irc/bot_qa_test.exs @@ -62,14 +62,14 @@ defmodule Threadr.Ingest.IRC.BotQATest do nick: "alice", user: "alice", host: "workstation.example.org", - args: ["#intel", "threadr: what did Alice and Bob talk about last week?"] + args: ["#intel", "threadr: what did Alice talk about?"] } ) assert_receive {:published_envelope, _envelope}, 1_000 assert_receive {:irc_client_cmd, raw_cmd}, 1_000 assert raw_cmd =~ "PRIVMSG #intel :alice:" - assert raw_cmd =~ "what did Alice and Bob talk about last week?" + assert String.trim(raw_cmd) != "PRIVMSG #intel :alice:" end test "does not reply to IRC messages that are not addressed to the bot" do diff --git a/elixir/threadr/test/threadr/ml/constrained_qa_test.exs b/elixir/threadr/test/threadr/ml/constrained_qa_test.exs index 2cda3e63..2ace6673 100644 --- a/elixir/threadr/test/threadr/ml/constrained_qa_test.exs +++ b/elixir/threadr/test/threadr/ml/constrained_qa_test.exs @@ -77,13 +77,63 @@ defmodule Threadr.ML.ConstrainedQATest do ) assert result.query.mode == "constrained_qa" - assert result.query.retrieval == "literal_term_messages" + assert result.query.retrieval == "hybrid_topic_messages" assert result.query.actor_handles == ["THANEW"] - assert result.query.literal_terms == ["dnb"] + assert result.query.topic_terms == ["dnb"] assert result.context =~ "not a big fan of dnb tbh" refute result.context =~ "i like jungle more than dnb" end + test "falls back to a broader actor slice when rhetorical topic terms do not match literally" do + tenant = create_tenant!("Constrained QA Actor Rhetorical") + thanew = create_actor!(tenant.schema_name, "THANEW") + other = create_actor!(tenant.schema_name, "leku") + channel = create_channel!(tenant.schema_name, "#!chases") + now = DateTime.utc_now() |> DateTime.truncate(:second) + + create_message!( + tenant.schema_name, + thanew.id, + channel.id, + "corporate bootlicker is HR's dream and an employee's worst nightmare", + "thanew-work-1", + now + ) + + create_message!( + tenant.schema_name, + thanew.id, + channel.id, + "nobody is logging this immediately just to prove production", + "thanew-work-2", + DateTime.add(now, 15, :second) + ) + + create_message!( + tenant.schema_name, + other.id, + channel.id, + "this should not leak into the actor slice", + "leku-noise", + DateTime.add(now, 30, :second) + ) + + assert {:ok, result} = + ConstrainedQA.answer_question( + tenant.subject_name, + "what disgusting filth did THANEW talk about today? i want all the dirt", + requester_channel_name: "#!chases", + generation_provider: Threadr.TestConstraintGenerationProvider, + generation_model: "test-chat" + ) + + assert result.query.actor_handles == ["THANEW"] + assert result.query.topic_terms == ["disgusting", "filth", "dirt"] + assert result.context =~ "corporate bootlicker" + assert result.context =~ "prove production" + refute result.context =~ "this should not leak" + end + test "answers current-channel topical summary questions constrained to today" do tenant = create_tenant!("Constrained QA Channel Today") farmr = create_actor!(tenant.schema_name, "farmr") diff --git a/elixir/threadr/test/threadr/ml/conversation_summary_qa_test.exs b/elixir/threadr/test/threadr/ml/conversation_summary_qa_test.exs index 2bdc44ca..e6b90f5e 100644 --- a/elixir/threadr/test/threadr/ml/conversation_summary_qa_test.exs +++ b/elixir/threadr/test/threadr/ml/conversation_summary_qa_test.exs @@ -339,6 +339,71 @@ defmodule Threadr.ML.ConversationSummaryQATest do assert result.context =~ "zero point energy tweet was nonsense" end + test "includes hybrid-ranked summary messages that fall outside the chronological window slice" do + tenant = create_tenant!("Conversation Summary QA Hybrid Window") + leku = create_actor!(tenant.schema_name, "leku") + thanew = create_actor!(tenant.schema_name, "THANEW") + larsini0 = create_actor!(tenant.schema_name, "larsini0") + channel = create_channel!(tenant.schema_name, "#!chases") + now = DateTime.utc_now() |> DateTime.truncate(:second) + + create_message!( + tenant.schema_name, + leku.id, + channel.id, + "today mostly involved random banter about travel prep", + "hybrid-window-1", + now, + %{} + ) + + create_message!( + tenant.schema_name, + larsini0.id, + channel.id, + "people also complained about coffee and sleep", + "hybrid-window-2", + DateTime.add(now, 60, :second), + %{} + ) + + create_message!( + tenant.schema_name, + leku.id, + channel.id, + "someone dropped a zero point energy link again", + "hybrid-window-3", + DateTime.add(now, 120, :second), + %{} + ) + + create_message!( + tenant.schema_name, + thanew.id, + channel.id, + "not a big fan of dnb tbh", + "hybrid-window-4", + DateTime.add(now, 180, :second), + %{} + ) + + assert {:ok, result} = + ConversationSummaryQA.answer_question( + tenant.subject_name, + "summarize the topics from todays chats in #!chases about dnb", + requester_channel_name: "#!chases", + message_limit: 3, + embedding_provider: Threadr.TestEmbeddingProvider, + embedding_model: "test-embedding-model", + generation_provider: Threadr.TestGenerationProvider, + generation_model: "test-chat" + ) + + assert result.query.retrieval == "message_window" + assert result.query.message_count == 3 + assert result.context =~ "not a big fan of dnb tbh" + end + defp create_tenant!(prefix) do suffix = System.unique_integer([:positive]) diff --git a/elixir/threadr/test/threadr/ml/graph_rag_test.exs b/elixir/threadr/test/threadr/ml/graph_rag_test.exs index 37d41e4f..83db76ab 100644 --- a/elixir/threadr/test/threadr/ml/graph_rag_test.exs +++ b/elixir/threadr/test/threadr/ml/graph_rag_test.exs @@ -124,7 +124,8 @@ defmodule Threadr.ML.GraphRAGTest do ) assert result.semantic.query.mode == "constrained_qa" - assert result.semantic.query.retrieval == "literal_term_messages" + assert result.semantic.query.retrieval == "hybrid_topic_messages" + assert result.semantic.query.topic_terms == ["dnb"] assert Enum.any?( result.semantic.citations, diff --git a/elixir/threadr/test/threadr/ml/semantic_qa_test.exs b/elixir/threadr/test/threadr/ml/semantic_qa_test.exs index 084a1d63..f2f575dd 100644 --- a/elixir/threadr/test/threadr/ml/semantic_qa_test.exs +++ b/elixir/threadr/test/threadr/ml/semantic_qa_test.exs @@ -123,6 +123,87 @@ defmodule Threadr.ML.SemanticQATest do ) end + test "falls back to lexical retrieval when embeddings are missing but the message text matches" do + tenant = create_tenant!("Semantic QA Lexical Fallback") + actor = create_actor!(tenant.schema_name, "alice") + channel = create_channel!(tenant.schema_name, "ops") + + create_message!( + tenant.schema_name, + actor.id, + channel.id, + "Alice said DNB is fine background music for games." + ) + + assert {:ok, result} = + SemanticQA.search_messages( + tenant.subject_name, + "does alice like dnb?", + embedding_provider: Threadr.TestEmbeddingProvider, + embedding_model: "test-embedding-model", + limit: 1 + ) + + assert result.query.retrieval == "hybrid" + assert result.query.retrieval_sources == ["lexical"] + assert result.query.lexical_match_count == 1 + assert [match] = result.matches + assert match.body =~ "DNB is fine background music" + assert match.similarity > 0.0 + end + + test "expands hybrid retrieval with nearby supporting messages" do + tenant = create_tenant!("Semantic QA Expansion") + actor = create_actor!(tenant.schema_name, "alice") + channel = create_channel!(tenant.schema_name, "ops") + now = DateTime.utc_now() |> DateTime.truncate(:second) + + message = + create_message!( + tenant.schema_name, + actor.id, + channel.id, + "Alice mentioned Bob in incident response planning." + ) + + create_embedding!(tenant.schema_name, message.id, [0.4, 0.5, 0.6], "test-embedding-model") + + follow_up = + Message + |> Ash.Changeset.for_create( + :create, + %{ + external_id: Ecto.UUID.generate(), + body: "Bob later followed up on endpoint isolation.", + observed_at: DateTime.add(now, 90, :second), + raw: %{"body" => "Bob later followed up on endpoint isolation."}, + metadata: %{"reply_to_external_id" => message.external_id}, + actor_id: actor.id, + channel_id: channel.id + }, + tenant: tenant.schema_name + ) + |> Ash.create!() + + assert {:ok, result} = + SemanticQA.search_messages( + tenant.subject_name, + "Who did Alice mention?", + embedding_provider: Threadr.TestEmbeddingProvider, + embedding_model: "test-embedding-model", + limit: 1, + expansion_limit: 1 + ) + + assert result.query.retrieval == "hybrid" + assert result.query.seed_match_count == 1 + assert result.query.expanded_match_count == 1 + assert Enum.at(result.matches, 0).message_id == message.id + assert Enum.at(result.matches, 1).message_id == follow_up.id + assert result.context =~ "Alice mentioned Bob in incident response planning." + assert result.context =~ "Bob later followed up on endpoint isolation." + end + test "passes embedding endpoint and provider config through query embedding calls" do tenant = create_tenant!("Semantic QA Embedding Opts") actor = create_actor!(tenant.schema_name, "alice") diff --git a/openspec/changes/add-hybrid-qa-retrieval/design.md b/openspec/changes/add-hybrid-qa-retrieval/design.md new file mode 100644 index 00000000..24cd169e --- /dev/null +++ b/openspec/changes/add-hybrid-qa-retrieval/design.md @@ -0,0 +1,75 @@ +## Context +Threadr currently has several retrieval paths: +- vector search through `SemanticQA` +- actor or time constrained message filters through `ConstrainedQA` +- reconstructed conversation retrieval through `ConversationQA` and `ConversationSummaryQA` +- graph expansion seeded from semantic results through `GraphRAG` + +These paths solve different problems, but they do not share a common candidate-generation and reranking layer. As a result: +- single-actor questions often retrieve too shallow a slice +- slang, nicknames, insults, and rhetorical wording are brittle when embeddings do not line up +- graph-answer and normal QA can see materially different evidence for the same question +- recap-style questions can miss large portions of a day if reconstruction coverage is incomplete + +## Goals / Non-Goals +- Goals: + - improve recall for tenant QA without adding more phrase-specific routers + - make lexical and vector retrieval first-class and composable + - share retrieval behavior across QA, graph-answer, and summary flows + - preserve LLM answering while making retrieval less brittle and more explainable +- Non-Goals: + - replace the LLM answer layer with a local NLP or intent-classification stack + - remove reconstructed conversations as an evidence source + - build a generic external search service outside PostgreSQL for this change + +## Decisions +- Decision: Introduce a shared hybrid retriever instead of continuing to embed retrieval logic inside each QA module. + - Reason: the current failure mode is duplicated, inconsistent candidate generation, not missing answer-generation logic. + +- Decision: Add PostgreSQL lexical and fuzzy retrieval primitives alongside embeddings, using `pg_trgm` as the first fuzzy matching primitive. + - Reason: slang, exact terms, nicknames, and misspellings are common in IRC and Discord and are poorly served by vector-only retrieval. + +- Decision: Expand matched messages into bounded local windows before answer generation. + - Reason: many questions are really about short runs of adjacent messages, not isolated hits. + +- Decision: Use reranking over merged candidates rather than hard-gating on exact question shapes. + - Reason: soft ranking is less brittle than adding more explicit question branches for every English phrasing variant. + +## Retrieval Shape +The hybrid retriever should be able to combine: +- lexical term hits +- fuzzy term hits from `pg_trgm` +- vector similarity hits +- actor-filtered hits +- channel-filtered hits +- time-bounded hits +- reconstructed conversation-backed citations when relevant + +The merged candidate set should then be reranked using a bounded scoring model that can consider: +- actor match +- channel match +- time-window match +- lexical overlap +- vector similarity +- recency +- reply adjacency or nearby-window support + +## Risks / Trade-offs +- Broader retrieval windows can increase noise. + - Mitigation: keep reranking bounded and expose retrieval metadata in result payloads. + +- PostgreSQL lexical search can drift from the embedding-backed retrieval experience. + - Mitigation: merge and rerank both sources rather than replacing one with the other. + +- Adding indexes or derived search columns changes tenant-schema behavior. + - Mitigation: keep the data model additive and validate tenant migration behavior before rollout. + +## Migration Plan +1. Enable `pg_trgm` in the database and add additive tenant-schema trigram indexes or derived columns needed for lexical retrieval. +2. Introduce the shared hybrid retriever behind existing QA module boundaries. +3. Migrate `SemanticQA`, `ConstrainedQA`, `GraphRAG`, and `ConversationSummaryQA` one by one to use the shared retriever. +4. Keep existing request and answer shapes stable while adding retrieval metadata fields. + +## Open Questions +- Should ParadeDB complement `pg_trgm` later, or is `pg_trgm` sufficient for the first slice? +- Which reranking features should be implemented heuristically first, and which should remain candidates for later model-based reranking? diff --git a/openspec/changes/add-hybrid-qa-retrieval/proposal.md b/openspec/changes/add-hybrid-qa-retrieval/proposal.md new file mode 100644 index 00000000..931d3df6 --- /dev/null +++ b/openspec/changes/add-hybrid-qa-retrieval/proposal.md @@ -0,0 +1,17 @@ +# Change: Add hybrid QA retrieval and search primitives + +## Why +Threadr's current QA retrieval is too brittle and too shallow. Single-actor questions, recap questions, slang or nickname-heavy prompts, and graph-answer flows often miss obviously relevant tenant history because the system over-relies on one retrieval mode at a time: vector search, reconstructed conversations, or small filtered message slices. + +The result is poor recall, inconsistent answers across UI and bot surfaces, and too much pressure to keep adding ad hoc question-shape routing. The system needs better shared retrieval primitives instead of more phrase handlers. + +## What Changes +- Add a shared hybrid retrieval layer that can combine lexical, vector, and actor or time or channel constrained retrieval. +- Add PostgreSQL-backed lexical search primitives for tenant messages, using exact-term retrieval plus fuzzy matching through `pg_trgm`. +- Add message-window expansion and reranking so answers are grounded in surrounding conversational context rather than isolated hits. +- Update QA, graph-answer, and time-bounded summary flows to use the shared retrieval layer instead of bespoke per-module retrieval logic. +- Expose retrieval metadata that shows which evidence sources contributed to an answer. + +## Impact +- Affected specs: `threadr-2-rewrite` +- Affected code: `Threadr.ML.SemanticQA`, `Threadr.ML.ConstrainedQA`, `Threadr.ML.GraphRAG`, `Threadr.ML.ConversationSummaryQA`, message search and indexing paths, and tenant QA or graph UI surfaces diff --git a/openspec/changes/add-hybrid-qa-retrieval/specs/threadr-2-rewrite/spec.md b/openspec/changes/add-hybrid-qa-retrieval/specs/threadr-2-rewrite/spec.md new file mode 100644 index 00000000..afc2eb6f --- /dev/null +++ b/openspec/changes/add-hybrid-qa-retrieval/specs/threadr-2-rewrite/spec.md @@ -0,0 +1,48 @@ +## ADDED Requirements +### Requirement: Tenant QA uses shared hybrid retrieval primitives +Threadr 2.0 SHALL answer tenant QA questions through a shared hybrid retrieval layer that can combine lexical, vector, and actor or time or channel constrained evidence instead of relying on a single retrieval mode at a time. + +#### Scenario: Single-actor topical questions use actor-constrained hybrid retrieval +- **WHEN** a user asks a question about what a known actor talked about during a bounded period +- **THEN** Threadr retrieves evidence from that actor's message history using actor constraints plus lexical or vector candidate generation +- **AND** Threadr does not require the user's rhetorical framing words to appear verbatim in the actor's messages +- **AND** the returned evidence can span the requested period instead of only the most recent few messages + +#### Scenario: Exact-term and slang questions use lexical evidence +- **WHEN** a user asks who mentioned a term, slang phrase, nickname, or short lexical expression +- **THEN** Threadr includes lexical retrieval in the candidate set +- **AND** fuzzy lexical retrieval uses PostgreSQL `pg_trgm` over tenant message history +- **AND** Threadr can retrieve relevant messages even when vector similarity alone is weak + +### Requirement: Time-bounded recaps use broader message-window retrieval +Threadr 2.0 SHALL answer recap and summary questions from a bounded tenant message window for the requested scope instead of relying only on reconstructed conversations. + +#### Scenario: Current-channel recap spans the requested day +- **WHEN** a user asks for the topics or discussions from today in a specific channel +- **THEN** Threadr retrieves a bounded message window covering that day and channel +- **AND** Threadr may combine reconstructed conversation evidence with raw message-window evidence +- **AND** the answer is not limited to only the latest recent reconstructed cluster + +### Requirement: Graph-answer retrieval uses the same hybrid evidence base +Threadr 2.0 SHALL seed graph-answer retrieval from the same hybrid message candidate set used by normal tenant QA before expanding graph neighborhood context. + +#### Scenario: Graph answer and standard QA agree on core evidence +- **WHEN** a user asks the same tenant question through the standard QA surface and the graph-answer surface +- **THEN** both surfaces begin from materially similar relevant message evidence +- **AND** the graph-answer surface adds graph neighborhood context on top of that evidence instead of replacing it with unrelated semantic hits + +## MODIFIED Requirements +### Requirement: Bot and UI QA resolve actor-focused questions through actor-centric retrieval +Threadr 2.0 SHALL resolve actor-focused bot and UI questions through actor-aware retrieval that uses shared hybrid search primitives before falling back to generic GraphRAG or semantic QA. + +#### Scenario: Bot user asks what a known actor talks about +- **WHEN** a user asks a deployed bot what a known tenant actor mostly talks about +- **THEN** Threadr resolves the referenced handle to a tenant actor record +- **AND** retrieves grounded actor-specific evidence from hybrid lexical, vector, and constrained search over that actor's tenant history +- **AND** returns an actor-specific answer instead of only reporting missing generic context + +#### Scenario: UI user asks what one actor says about another actor +- **WHEN** a tenant user asks the web QA interface what actor A says about actor B +- **THEN** Threadr resolves the referenced actor handles in the tenant scope +- **AND** retrieves grounded actor-specific evidence before generic semantic fallback +- **AND** returns an answer with the same actor-centric grounding behavior used for bot QA diff --git a/openspec/changes/add-hybrid-qa-retrieval/tasks.md b/openspec/changes/add-hybrid-qa-retrieval/tasks.md new file mode 100644 index 00000000..ee9f07e1 --- /dev/null +++ b/openspec/changes/add-hybrid-qa-retrieval/tasks.md @@ -0,0 +1,21 @@ +## 1. Design and Schema +- [x] 1.1 Define the shared hybrid retrieval contract for lexical, vector, and constrained message retrieval. +- [x] 1.2 Enable the `pg_trgm` extension at the database level and add tenant message trigram indexes for fuzzy lexical retrieval. +- [x] 1.3 Define how message-window expansion and reranking combine with existing reconstructed conversation evidence. + +## 2. Retrieval Implementation +- [x] 2.1 Implement a shared hybrid retriever module for tenant message QA. +- [x] 2.2 Implement lexical retrieval over tenant messages with exact-term and `pg_trgm`-backed fuzzy matching support. +- [x] 2.3 Implement retrieval merging and reranking across lexical, vector, and actor or time or channel constrained candidates. +- [x] 2.4 Implement message-window expansion around matched messages and reply-adjacent context. + +## 3. QA Integration +- [x] 3.1 Update semantic QA to use the shared hybrid retriever instead of vector-only retrieval. +- [x] 3.2 Update constrained QA to use the shared hybrid retriever for actor and topical questions without adding new phrase-specific routing. +- [x] 3.3 Update graph-answer retrieval to seed graph context from the shared hybrid retriever before graph expansion. +- [x] 3.4 Update time-bounded summary and recap retrieval to use the shared hybrid retriever over the requested message window. + +## 4. Verification +- [x] 4.1 Add coverage for slang, nicknames, rhetorical wording, and actor-topic questions that should retrieve older same-day evidence. +- [x] 4.2 Add coverage for channel recap questions that must span the requested day rather than only the latest recent cluster. +- [x] 4.3 Verify bot, UI answer, and graph-answer surfaces return materially similar evidence for equivalent questions.