forked from Walkover-Web-Solution/gtwy-ai
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.py
More file actions
145 lines (120 loc) · 4.91 KB
/
index.py
File metadata and controls
145 lines (120 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import src.routes.rag_routes
from fastapi import FastAPI
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import asyncio
from contextlib import asynccontextmanager
from src.services.utils.batch_script import repeat_function
from config import Config
from src.routes.chatBot_routes import router as chatbot_router
from src.routes.v2.modelRouter import router as v2_router
from src.services.commonServices.queueService.queueService import queue_obj
from src.services.commonServices.queueService.queueLogService import sub_queue_obj
from src.services.utils.logger import logger
from src.routes.rag_routes import router as rag_routes
from src.routes.image_process_routes import router as image_process_routes
from models.Timescale.connections import init_async_dbservice
from src.configs.model_configuration import init_model_configuration, background_listen_for_changes
from globals import *
# Initialize Atatus only when properly configured in PRODUCTION
atatus_client = None
AtatusMiddleware = None
if (Config.ENVIROMENT or "").upper() == 'PRODUCTION' and Config.ATATUS_LICENSE_KEY:
try:
import atatus
from atatus.contrib.starlette import create_client, Atatus as _Atatus
logger.info("Initializing Atatus client...")
atatus_client = create_client({
"APP_NAME": "Python - GTWY - Backend - PROD",
"LICENSE_KEY": Config.ATATUS_LICENSE_KEY,
"ANALYTICS": True,
"ANALYTICS_CAPTURE_OUTGOING": True,
"LOG_BODY": "response",
"INSTRUMENTATIONS": {
"httpx": False,
},
})
AtatusMiddleware = _Atatus
except Exception as e:
logger.error(f"Failed to initialize Atatus: {e}")
async def consume_messages_in_executor():
await queue_obj.consume_messages()
async def consume_sub_messages_in_executor():
await sub_queue_obj.consume_messages()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Handle startup and shutdown events."""
logger.info("Starting up...")
await init_model_configuration()
# Run the consumer in the background without blocking the main event loop
await queue_obj.connect()
await queue_obj.create_queue_if_not_exists()
await sub_queue_obj.connect()
await sub_queue_obj.create_queue_if_not_exists()
consume_task = None
consume_sub_task = None
if Config.CONSUMER_STATUS.lower() == "true":
consume_task = asyncio.create_task(consume_messages_in_executor())
consume_sub_task = asyncio.create_task(consume_sub_messages_in_executor())
asyncio.create_task(init_async_dbservice()) if Config.ENVIROMENT == 'LOCAL' else await init_async_dbservice()
asyncio.create_task(repeat_function())
logger.info("Starting MongoDB change stream listener as a background task.")
change_stream_task = asyncio.create_task(background_listen_for_changes())
yield # Startup logic is complete
# Shutdown logic
logger.info("Shutting down...")
logger.info("Shutting down MongoDB change stream listener.")
change_stream_task.cancel()
if consume_task:
consume_task.cancel()
if consume_sub_task:
consume_sub_task.cancel()
await queue_obj.disconnect()
await sub_queue_obj.disconnect()
try:
if consume_task:
await consume_task
if consume_sub_task:
await consume_sub_task
except asyncio.CancelledError:
logger.error("Consumer task was cancelled during shutdown.")
try:
await change_stream_task
except asyncio.CancelledError:
logger.info("MongoDB change stream listener task successfully cancelled.")
# Initialize the FastAPI app
app = FastAPI(debug=True, lifespan=lifespan)
if AtatusMiddleware and atatus_client:
app.add_middleware(AtatusMiddleware, client=atatus_client)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
max_age=86400
)
# Healthcheck route
@app.get("/healthcheck")
async def healthcheck():
return JSONResponse(status_code=200, content={
"status": "OK running good... v1.2",
})
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(exc: RequestValidationError):
return JSONResponse(
status_code=400,
content={"detail": "Custom error message", "errors": exc.errors()},
)
# Include routers
app.include_router(v2_router, prefix="/api/v2/model")
app.include_router(chatbot_router, prefix="/chatbot")
app.include_router(image_process_routes, prefix="/image/processing" )
app.include_router(image_process_routes, prefix="/files" )
app.include_router(rag_routes,prefix="/rag")
if __name__ == "__main__":
PORT = int(Config.PORT)
uvicorn.run(app, host="0.0.0.0", port=PORT)