-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdev_server.py
More file actions
169 lines (135 loc) · 5.84 KB
/
dev_server.py
File metadata and controls
169 lines (135 loc) · 5.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
Local dev server for SSE streaming.
Usage: python3 dev_server.py
"""
import json
import logging
import os
from dotenv import load_dotenv
load_dotenv()
import flask
from flask import Flask, request, Response, stream_with_context
from flask_cors import CORS
from factory.core.chatbot import generate_response_stream
from factory.core.retrieval import retrieve_relevant_chunks
from factory.core.bot_utils import load_bot_config, log_chat_interaction
from factory.core.auth import validate_api_key
from factory.core.self_heal import invoke_self_heal_async, get_pending_result
logging.getLogger().setLevel(logging.INFO)
app = Flask(__name__)
# Only enable Flask CORS locally — in prod, the Lambda Function URL handles CORS
if os.getenv("APP_ENV", "local") != "production":
CORS(app)
@app.route("/")
def index():
if os.getenv("APP_ENV", "local") == "production":
return {"status": "ok", "service": "bot-factory-stream"}
with open("app/chat_stream.html") as f:
return Response(f.read(), content_type="text/html")
@app.route("/demo/<bot_id>")
def demo(bot_id):
"""Serve a bot's demo page (local dev only)."""
if os.getenv("APP_ENV", "local") == "production":
return {"error": "not available"}, 404
demo_dir = os.path.join("scripts", "bots", bot_id)
return flask.send_from_directory(demo_dir, "demo.html")
@app.route("/demo/<bot_id>/<path:filename>")
def demo_static(bot_id, filename):
"""Serve static assets for a bot's demo page."""
if os.getenv("APP_ENV", "local") == "production":
return {"error": "not available"}, 404
demo_dir = os.path.join("scripts", "bots", bot_id)
return flask.send_from_directory(demo_dir, filename)
@app.route("/bot_scripts/<path:filename>")
def bot_scripts(filename):
"""Serve shared bot scripts."""
if os.getenv("APP_ENV", "local") == "production":
return {"error": "not available"}, 404
return flask.send_from_directory("app/bot_scripts", filename)
@app.route("/health", methods=["GET"])
def health():
return {"status": "ok", "service": "bot-factory"}
@app.route("/chat", methods=["POST"])
def chat():
body = request.get_json()
bot_id = body.get("bot_id")
message = body.get("message")
conversation_history = body.get("conversation_history", [])
if not bot_id or not message:
return Response(
f'data: {json.dumps({"error": "bot_id and message are required"})}\n\n',
status=400,
content_type="text/event-stream",
)
api_key = request.headers.get("X-API-Key", "")
if not validate_api_key(api_key, bot_id):
return Response(
f'data: {json.dumps({"error": "unauthorized"})}\n\n',
status=401,
content_type="text/event-stream",
)
config = load_bot_config(bot_id)
rag = config.get("bot", {}).get("rag", {})
top_k = rag.get("top_k", 5)
similarity_threshold = rag.get("similarity_threshold", 0.3)
# Check for pending self-heal results from a previous request
pending = get_pending_result(bot_id)
# Self-heal config
agentic = config.get("bot", {}).get("agentic", {})
self_heal_enabled = agentic.get("self_heal", False)
confidence_threshold = agentic.get("confidence_threshold", 0.5)
def generate():
try:
# Piggyback: notify user about previously auto-learned content
if pending:
topic = pending.get("topic", "a new topic")
heal_msg = f"I just learned about {topic}! Try asking me again."
yield f"data: {json.dumps({'type': 'self_heal', 'message': heal_msg})}\n\n"
# Emit sources for local debug UI
chunks = retrieve_relevant_chunks(
bot_id=bot_id,
query=message,
top_k=top_k,
similarity_threshold=0.0,
)
sources = [
{"heading": c.get("heading", ""), "category": c["category"], "similarity": round(c["similarity"], 4)}
for c in chunks
]
yield f"data: {json.dumps({'type': 'sources', 'sources': sources})}\n\n"
metadata = {}
full_response = []
for token in generate_response_stream(
bot_id=bot_id,
user_message=message,
top_k=top_k,
similarity_threshold=similarity_threshold,
conversation_history=conversation_history,
metadata_out=metadata,
):
full_response.append(token)
yield f"data: {json.dumps({'token': token})}\n\n"
# Log the interaction
client_ip = request.headers.get("X-Forwarded-For", request.remote_addr) or ""
log_chat_interaction(bot_id, message, "".join(full_response), sources, client_ip=client_ip)
# Invoke self-heal BEFORE [DONE] — all tokens are already streamed,
# but Lambda can freeze the container after [DONE] so we must fire first.
top_score = metadata.get("top_score", 1.0)
if self_heal_enabled and top_score < confidence_threshold:
app.logger.info(
f"[self_heal:{bot_id}] low confidence ({top_score:.3f} < {confidence_threshold}) "
f"— spawning self-heal"
)
invoke_self_heal_async(bot_id, message, config)
yield "data: [DONE]\n\n"
except Exception as e:
yield f'data: {json.dumps({"error": str(e)})}\n\n'
return Response(
stream_with_context(generate()),
content_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
if __name__ == "__main__":
port = int(os.getenv("DEV_PORT", "8001"))
print(f"🎸 Dev server (streaming) on http://localhost:{port}")
app.run(host="0.0.0.0", port=port, debug=True, threaded=True)