-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
332 lines (274 loc) · 10.6 KB
/
app.py
File metadata and controls
332 lines (274 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
import io
import os
from typing import List
import streamlit as st
import google.genai as genai
from google.genai.types import GenerateContentConfig
from pypdf import PdfReader
from dotenv import load_dotenv
from ingest import ingest_text
from query_engine import answer_question
GEMINI_MODEL = "gemini-2.0-flash"
# Load environment variables from .env so Qdrant and Gemini work
load_dotenv()
def get_gemini_client() -> genai.Client:
"""Create a Gemini client using the API key from the environment."""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise RuntimeError("GEMINI_API_KEY is not set in the environment.")
return genai.Client(api_key=api_key)
def summarize_document(text: str) -> str:
"""
Use Gemini to generate a short bullet point summary of the ingested text.
To avoid sending extremely large payloads, we trim the document
to a safe length before summarization.
"""
trimmed_text = text[:8000]
prompt = f"""
You are a data analyst.
You receive a raw text document.
Write a concise summary in bullet points with:
- 3 to 5 key insights
- Focus on practical findings and risks
- Use simple language
Document:
{trimmed_text}
"""
client = get_gemini_client()
response = client.models.generate_content(
model=GEMINI_MODEL,
contents=prompt,
config=GenerateContentConfig(
temperature=0.4,
max_output_tokens=256,
),
)
return response.text.strip()
def extract_text_from_uploaded_file(uploaded_file) -> str:
"""
Read text from an uploaded txt or pdf file.
For txt files we decode bytes as utf-8.
For pdf files we use pypdf and limit the number of pages we read
so that very large documents do not consume too many resources.
"""
if uploaded_file is None:
return ""
# Try to reset pointer
try:
uploaded_file.seek(0)
except Exception:
pass
file_name = uploaded_file.name.lower()
# Plain text file
if file_name.endswith(".txt") or uploaded_file.type == "text/plain":
raw_bytes = uploaded_file.read()
try:
return raw_bytes.decode("utf-8")
except UnicodeDecodeError:
return raw_bytes.decode("latin-1", errors="ignore")
# PDF file
if file_name.endswith(".pdf") or uploaded_file.type == "application/pdf":
try:
# Let PdfReader stream from the uploaded file instead of
# reading the entire pdf into memory first.
reader = PdfReader(uploaded_file)
except Exception:
# Fallback: read bytes into memory if the above fails
raw_bytes = uploaded_file.read()
pdf_bytes = io.BytesIO(raw_bytes)
reader = PdfReader(pdf_bytes)
pages_text: List[str] = []
max_pages = 5 # safety limit
for page_index, page in enumerate(reader.pages):
if page_index >= max_pages:
break
try:
page_text = page.extract_text() or ""
except Exception:
page_text = ""
pages_text.append(page_text)
text = "\n\n".join(pages_text)
# Final safety limit on text size
return text[:20000]
# Unsupported type
return ""
def init_session_state() -> None:
"""Prepare Streamlit session state variables."""
if "history" not in st.session_state:
st.session_state["history"] = [] # list of {question, answer, project}
if "last_summary" not in st.session_state:
st.session_state["last_summary"] = ""
if "current_project" not in st.session_state:
st.session_state["current_project"] = "demo"
if "documents" not in st.session_state:
# list of {project, document_name}
st.session_state["documents"] = []
def main():
st.set_page_config(
page_title="Vector Insight Engine",
layout="wide",
)
init_session_state()
# Project selector at the top
st.title("Vector Insight Engine")
st.write(
"Turn unstructured notes into searchable insights for data analysis and research."
)
project_label = st.text_input(
"Project label",
value=st.session_state["current_project"],
help="Use the same label to group documents and questions by project or client.",
)
st.session_state["current_project"] = project_label.strip() or "default"
col_ingest, col_query = st.columns(2)
# Sidebar
with st.sidebar:
st.markdown("### About")
st.write(
"Vector Insight Engine turns messy text into searchable insights "
"using Google Gemini for reasoning and Qdrant for vector search."
)
st.markdown("### Tech stack")
st.markdown(
"""
- Language model: Gemini `gemini-2.0-flash`
- Embeddings: `text-embedding-004`
- Vector database: Qdrant Cloud
- Web app: Streamlit
- Language: Python 3.11
"""
)
st.markdown("### Ingested documents (this session)")
if not st.session_state["documents"]:
st.write("No documents ingested yet.")
else:
for doc in reversed(st.session_state["documents"]):
st.write(f"{doc['project']} · {doc['document_name']}")
st.markdown("### How to use")
st.write(
"1. Choose a project label at the top.\n"
"2. Paste text or upload a txt/pdf file.\n"
"3. Click **Ingest into Qdrant**.\n"
"4. Ask project specific questions in Step 2.\n"
"5. Inspect retrieved context and citations."
)
# Step 1 - ingest
with col_ingest:
st.subheader("Step 1. Ingest a document")
default_text = (
"Machine learning models depend on the quality and coverage of their training data.\n"
"Poor inputs lead to unstable predictions and biased outcomes.\n"
"Robust data pipelines improve accuracy and interpretability for high impact decisions.\n"
"Vector Insight Engine is built to help analysts turn messy documents into searchable insights."
)
input_text = st.text_area(
"Paste a document or notes (optional if you upload a file):",
value=default_text,
height=220,
)
uploaded_file = st.file_uploader(
"Or upload a plain text or PDF file",
type=["txt", "pdf"],
)
ingest_button = st.button("Ingest into Qdrant")
if ingest_button:
raw_text = ""
document_name = "pasted_text"
if uploaded_file is not None:
document_name = uploaded_file.name
with st.spinner("Reading file and embedding chunks..."):
raw_text = extract_text_from_uploaded_file(uploaded_file)
else:
raw_text = input_text
if not raw_text or not raw_text.strip():
st.error("There is no text to ingest. Please paste some text or upload a file.")
else:
with st.spinner("Embedding text and writing chunks to Qdrant..."):
written = ingest_text(
raw_text,
project=st.session_state["current_project"],
document_name=document_name,
)
try:
summary = summarize_document(raw_text)
except Exception as e:
summary = ""
st.warning(
f"Document ingested successfully, "
f"but summary generation failed: {e}"
)
st.session_state["last_summary"] = summary
st.session_state["documents"].append(
{
"project": st.session_state["current_project"],
"document_name": document_name,
}
)
st.success(
f"Ingested {written} chunks into project "
f"`{st.session_state['current_project']}`."
)
if st.session_state.get("last_summary"):
st.markdown("#### Key insights from the last document")
st.markdown(st.session_state["last_summary"])
# Step 2 - query
with col_query:
st.subheader("Step 2. Ask a question")
st.caption(
f"Current project: `{st.session_state['current_project']}`. "
"Questions will search only within this project."
)
question = st.text_input(
"Ask a question about the ingested content:",
value="What is the main goal of the Vector Insight Engine project?",
)
top_k = st.slider(
"Number of context snippets to use",
min_value=1,
max_value=5,
value=3,
)
if st.button("Get insight"):
if not question.strip():
st.error("Please enter a question.")
else:
with st.spinner(
"Retrieving context from Qdrant and querying Gemini..."
):
result = answer_question(
question,
top_k=top_k,
project=st.session_state["current_project"],
)
answer_text = result["answer"]
st.session_state["history"].append(
{
"question": question,
"answer": answer_text,
"project": st.session_state["current_project"],
}
)
st.markdown("### Answer")
st.write(answer_text)
with st.expander("Show retrieved context"):
for hit in result["hits"]:
st.markdown(
f"[{hit['index']}] "
f"(score: {hit['score']:.4f}) "
f"[project: {hit['project']} | doc: {hit['document_name']}] "
f"{hit['text']}"
)
# History section
st.markdown("---")
st.subheader("Session history")
if not st.session_state["history"]:
st.write("No questions asked yet.")
else:
for i, item in enumerate(reversed(st.session_state["history"]), start=1):
st.markdown(
f"**Q{i} [{item['project']}]** {item['question']}"
)
st.write(item["answer"])
st.markdown("")
if __name__ == "__main__":
main()