-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
261 lines (205 loc) · 9.01 KB
/
server.py
File metadata and controls
261 lines (205 loc) · 9.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
import os
import time
import uvicorn
from dotenv import load_dotenv
# Importing the current Date & Time
from datetime import datetime
# Importing the FastAPI & Google's Modules
from fastapi import FastAPI, HTTPException, Depends, Security
from fastapi.security.api_key import APIKeyHeader
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List
from google.api_core.exceptions import ResourceExhausted
# Importing the Langchain Modules
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Importing Locally Written Search Functions
from functions.obsidian_searcher import local_search_notes # Offline-Chroma-db
# from functions.pinecone_searcher import pinecone_search_notes # Online-Pinecone
from functions.qdrant_searcher import qdrant_search_notes # Online-Qdrant
# Loading the Environment Variables
load_dotenv()
# Checking Local / Remote
IS_PRODUCTION = os.getenv("RENDER") == "true"
# Configuring the App
app = FastAPI(
title="Obsidian RAG API",
description="A Second Brain API that answers your questions based on your Local Obsidian Notes.",
version="1.0.0"
)
# Security Configuration
API_KEY_NAME = "SERVER_PASSWORD"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
async def get_api_key(api_key_header: str = Security(api_key_header)):
# 1. Accessing the real password from environment variables
SERVER_PASSWORD = os.getenv("SERVER_PASSWORD")
# 2. Check if the user provided the correct password
if api_key_header == SERVER_PASSWORD:
return api_key_header
else:
raise HTTPException(
status_code=403,
detail="Access Denied! You need a valid API Key to Access the Second Brain.\n"
)
# Enabling CORS for future front-end to talk with AI
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Step-1: We definte the data's models, basically what kind of input should be Accepted
# & what kind of output should be Returned
class Message(BaseModel):
role: str # Is either 'user' / 'ai'
content: str # The actual content of the message / query
class QueryRequest(BaseModel):
question: str
history: List[Message] = [] # Defaults to an empty list
top_k: int = 4 # No. of notes to refer for the answer, default is 4, can be increased
class AIResponse(BaseModel):
answer: str
context_used: str # For debugging purposes, will show sources
VAULT_PATH = os.getenv("VAULT_PATH")
# Defining the Write back Data Structure!
class NotePayLoad(BaseModel):
filename: str
content: str
folder: str = "My_Obs_RAG" # Specifying a subfolder to save chats
# Step-2: Setting up the Brain of the Resources, only need to initialize once
if not os.getenv("GOOGLE_API_KEY"):
raise ValueError("GOOGLE_API_KEY Not Found! Please check your '.env' file!")
# Setting the model up
llm = ChatGoogleGenerativeAI(
model="gemini-2.5-flash", # Again 2.5-flash for speed
temperature=0.3,
)
# Hypnotizing AI for best performance 👁️👄👁️ -> 😵💫 -> ⚡😎⚡
system_prompt="""
You are an intelligent "Second Brain" AI Assistant Agent.
You have access to the user's personal notes.
Here is the conversation history till now.
{chat_history}
Here is the context retrieved from the notes.
{context}
Current Question: {question}
Instructions:
- Answer the question using ONLY the context provided above.
- If the context doesn't contain the answer, admit that you do not know based on the notes.
- Cite the source (filename) if available in the context.
"""
prompt_template = ChatPromptTemplate.from_template(system_prompt)
@app.get("/")
async def health_check():
"""A simple heartbeat endpoint to check if the server is running."""
return {"status": "online", "model": "gemini-2.5-flash"}
@app.post("/chat", response_model=AIResponse)
async def chat_endpoint(
request: QueryRequest,
api_key: str= Depends(get_api_key)
):
"""
MAIN RAG Endpoint.
1. Receives the input question / prompt.
2. Searches the embedded VectorDB.
3. Generates Answer with Auto-Retry for Rate Limits.
"""
# Formatting the Chat History into a readable block of text!
formatted_history = ""
if request.history:
for msg in request.history[-4:]:
formatted_history += f"{msg.role.capitalize()}: {msg.content}\n"
try:
# A. Logging the request Serverside!
print(f"Request Received: {request.question}")
# B. Retrieving relevant information based on the Context provided
if IS_PRODUCTION:
print("☁️ Production Environment Detected: Routing to Pinecone Cloud DB...")
context_snippet = qdrant_search_notes(request.question, top_k=request.top_k)
else:
print("🏠 Local Environment Detected: Connecting to Local Chroma DB...")
context_snippet = local_search_notes(request.question)
print(f"Retrieved Context Length: {len(context_snippet)} chars.")
# C. Generating the Answer
prompt_chain = prompt_template | llm | StrOutputParser()
response_text = ""
max_retries = 3
for attempt in range(max_retries):
try:
# '.invoke()' is used instead of '.stream()' for standard HTTP requests
response_text = prompt_chain.invoke({
"chat_history": formatted_history,
"context": context_snippet,
"question": request.question
})
break # Breaking out of the retry loop upon Success!
except ResourceExhausted:
wait_time = 2 * (attempt + 1) # Short Exponential Backoff
print(f"⚠️Quota Hit! Retrying in {wait_time}s...")
time.sleep(wait_time)
if attempt == max_retries - 1:
raise HTTPException(status_code=429, detail="AI Overlaod! Please Try Again in a minute.")
# The Chat Saving Function getting called!
try:
# 1. Grabbing the current date & time for file creation!
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# 2. Formatting the User's & AI's messages
md_content = f"Chat Record: {timestamp}\n\n"
md_content += f"**User:** {request.question}\n\n"
md_content += f"**AI:** {response_text}\n\n"
md_content += f"---\n*Context Snippet:* {context_snippet[:50]}...\n"
# 3. Packaging the Chat ready to be saved into Pydantic Model
log_payload = NotePayLoad(
filename=f"Log_{timestamp}.md",
content=md_content,
folder="My_Obs_RAG"
)
# 4. Firing the save function! (Passing the API Key we already checked)
save_response = await save_obsidian_note(note=log_payload, api_key=api_key)
if save_response.get("status") == "success":
print(f"✅ Chat Successfully saved to: {save_response.get('file')}")
else:
print(f"❌ Auto-save Failed Internally: {save_response.get('message')}")
except Exception as e:
print(f"⚠️ Could not save chat log: {str(e)}")
# D. Returning a structured JSON
return AIResponse(
answer=response_text,
context_used=context_snippet[:500] + "..." # Sending back a limited snippet, for debugging
)
except Exception as e:
print(f"❌ Error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/notes/save")
async def save_obsidian_note(
note: NotePayLoad,
api_key: str = Depends(get_api_key) # Security Shield
):
try:
# ---> THE CLOUD SHORT-CIRCUIT <---
if IS_PRODUCTION:
print("☁️ Cloud Environment: Skipping local file save.")
return {"status": "skipped", "message": "Local saving disabled in prod"}
# Building the save location!
save_location = os.path.join(VAULT_PATH, note.folder)
os.makedirs(save_location, exist_ok=True)
# Making sure '.md' extension exists!
if not note.filename.endswith(".md"):
note.filename += ".md"
file_path = os.path.join(save_location, note.filename)
# Write to path
with open(file_path, "w", encoding="utf-8") as f1:
print(VAULT_PATH)
print(file_path)
f1.write(note.content)
return {"status": "success", "file": file_path}
except Exception as e:
return {"status": "error", "message": str(e)}
# Step-4: The Entry Point into the Program
if __name__=="__main__":
# With this we run the 'python server.py' directly
print("🚀 Starting the Second Brain API...")
uvicorn.run(app, host="0.0.0.0", port=8080)