-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery_engine.py
More file actions
204 lines (168 loc) · 5.57 KB
/
query_engine.py
File metadata and controls
204 lines (168 loc) · 5.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import os
from typing import List, Dict, Optional
import google.genai as genai
from google.genai.types import GenerateContentConfig, EmbedContentConfig
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
COLLECTION_NAME = "vector_insight_chunks"
EMBEDDING_MODEL = "text-embedding-004"
GEMINI_MODEL = "gemini-2.0-flash"
def get_gemini_client() -> genai.Client:
"""Create a Gemini client using the API key from environment variables."""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise RuntimeError("GEMINI_API_KEY is not set in the environment.")
return genai.Client(api_key=api_key)
def get_qdrant_client() -> QdrantClient:
"""
Create a local Qdrant client.
This uses embedded Qdrant storage in the current working directory,
so it works both on your laptop and on Streamlit Community Cloud
without any external Qdrant Cloud credentials.
"""
print("[query] Using local Qdrant at ./qdrant_data")
return QdrantClient(
path="./qdrant_data",
prefer_grpc=False,
)
def embed_query(text: str) -> List[float]:
"""Embed a user question as a retrieval query vector."""
client = get_gemini_client()
result = client.models.embed_content(
model=EMBEDDING_MODEL,
contents=text,
config=EmbedContentConfig(task_type="RETRIEVAL_QUERY"),
)
return result.embeddings[0].values
def search_similar_chunks(
question: str,
top_k: int = 3,
project: Optional[str] = None,
) -> List[Dict]:
"""
Search Qdrant for the most relevant chunks to the given question.
Returns a list of dictionaries with:
- index (1 based, for citations)
- score
- text
- project
- document_name
"""
qdrant = get_qdrant_client()
query_vector = embed_query(question)
query_filter = None
if project:
# Filter by project label so each project is its own semantic space
query_filter = rest.Filter(
must=[
rest.FieldCondition(
key="project",
match=rest.MatchValue(value=project),
)
]
)
print("[query] searching similar chunks, top_k =", top_k)
# Very important: use the argument name "query_filter", not "filter"
resp = qdrant.query_points(
collection_name=COLLECTION_NAME,
query=query_vector,
query_filter=query_filter,
limit=top_k,
with_payload=True,
with_vectors=False,
)
hits: List[Dict] = []
for i, point in enumerate(resp.points, start=1):
payload = point.payload or {}
hits.append(
{
"index": i,
"score": point.score,
"text": payload.get("text", ""),
"project": payload.get("project", ""),
"document_name": payload.get("document_name", ""),
}
)
return hits
def build_context_block(hits: List[Dict]) -> str:
"""
Build a numbered context block for Gemini from Qdrant hits.
Example:
[1] Some text snippet here
[2] Another relevant paragraph here
"""
lines: List[str] = []
for hit in hits:
idx = hit["index"]
text = hit["text"].replace("\n", " ").strip()
if not text:
continue
lines.append(f"[{idx}] {text}")
return "\n".join(lines)
def call_gemini_with_context(question: str, context_block: str) -> str:
"""
Call Gemini with the user question and the retrieved context snippets.
"""
prompt = f"""
You are an analytical assistant working in a retrieval augmented system.
You receive:
1. A user question.
2. A set of numbered context snippets from a vector database.
3. Each snippet may include different parts of one or more documents.
Your job:
- Answer the question only using the context snippets.
- Cite snippets in square brackets like [1], [2] wherever you use them.
- If the context is not sufficient, clearly say you are not sure instead of guessing.
- Keep the answer concise and focused on practical insight.
User question:
{question}
Context snippets:
{context_block}
"""
client = get_gemini_client()
response = client.models.generate_content(
model=GEMINI_MODEL,
contents=prompt,
config=GenerateContentConfig(
temperature=0.35,
max_output_tokens=512,
),
)
return (response.text or "").strip()
def answer_question(
question: str,
top_k: int = 3,
project: Optional[str] = None,
) -> Dict:
"""
Main entry point used by the Streamlit app.
Returns a dictionary with:
- answer: str
- hits: list of {index, score, text, project, document_name}
- raw_context: str
"""
hits = search_similar_chunks(question, top_k=top_k, project=project)
context_block = build_context_block(hits)
if not context_block:
answer_text = (
"I could not find any relevant context in the current project. "
"Try ingesting more documents first."
)
return {
"answer": answer_text,
"hits": [],
"raw_context": "",
}
answer = call_gemini_with_context(question, context_block)
return {
"answer": answer,
"hits": hits,
"raw_context": context_block,
}
if __name__ == "__main__":
demo_question = "What is the main goal of the Vector Insight Engine project?"
result = answer_question(demo_question, top_k=3, project="demo")
print("Answer:")
print(result["answer"])
print("\nContext:")
print(result["raw_context"])