Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deploy/docker/cp-ai/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ RUN yum update -y && \
unzip \
git \
nginx \
bzip2
bzip2 \
redis

# Install MongoDB
RUN dnf install -y dnf-utils && \
Expand Down
38 changes: 38 additions & 0 deletions deploy/docker/cp-ai/cp_ai/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,41 @@ def map_message(m: Union[ChatMessage, str]):
return await default_agent.astream_chat(
message=prompt,
)

def answer_using_default_agent_sync(
message: str | ChatMessage | None = None,
messages: list[Union[ChatMessage, str]] | None = None,
**kwargs
) -> StreamingAgentChatResponse:
def map_message(m: Union[ChatMessage, str]):
if isinstance(m, str):
return ChatMessage(content=m)
return m
if messages is None:
messages = []
_messages = [map_message(m) for m in messages]
if message is not None:
_messages.append(map_message(message))

last_message = _messages.pop()
history = '\n\n'.join([m.__str__() for m in _messages])
last_launch_payload = extract_launch_payload(history)
user_query = last_message.content
prompt = f'''
This is user query, provide an answer for this query using provided agents or general LLM knowledge:
-------------
{history}
{user_query}
-------------
IMPORTANT:
- You must include any and all blocks that look like `<<<...>>>` verbatim and exactly as they appeared in the tool output.
- Do not rephrase, omit, or filter out these `<<<...>>>` blocks. Treat them as immutable text.
- If the tool output contains **references**, include such references to the final response.
- If 'LaunchException' is returned from some of the agent, STOP execution and request details from user considering LaunchException info.
- Use Markdown format; include all available links and references (prefer format [entity name](entity url)].
'''
default_agent = get_default_agent(launch_payload=last_launch_payload,
**kwargs)
return default_agent.stream_chat(
message=prompt,
)
40 changes: 39 additions & 1 deletion deploy/docker/cp-ai/cp_ai/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from http.cookies import SimpleCookie
from fastapi import FastAPI, Depends, Cookie
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from llama_index.core.llms import ChatMessage
from pydantic import BaseModel
from cp_ai.common.types import Chat, Message
import cp_ai.database.chats.methods as chats
from cp_ai.agents import answer_using_default_agent
from cp_ai.api.utilities.logger import api_logger
from cp_ai.api.utilities.decorators import app_response

from cp_ai.celery.tasks import assist, get_celery_task_status, stream_celery_task_response

app = FastAPI()
sio_server = socketio.AsyncServer(
Expand Down Expand Up @@ -139,6 +140,43 @@ async def handle_message(sid, request_data: dict):

await sio_server.emit('done', 'done', to=sid)

@sio_server.on('assistant_celery')
async def handle_message(sid, request_data: dict):
try:
bearer = await _get_bearer_from_socket_session(sid)
api_logger.info(f'socket "assistant" event:\nsid: {sid}\nbearer: {bearer}\n\nPayload:\n{repr(request_data)}')
chat_id = request_data.get('chat_id')
if chat_id is None:
raise RuntimeError('chat identifier is not specified')
chat_messages = chats.get_messages(chat_id)
messages = [ ChatMessage.from_str(dict(m)['content'], dict(m)['role']) for m in chat_messages ]
task = assist.delay(messages=messages,
chat_id=chat_id,
bearer=bearer)
await sio_server.emit('task_id', task.id, to=sid)

except Exception as e:
api_logger.error('error handling user message', exc_info=e)
await sio_server.emit('error', e.__str__(), to=sid)


@app.get("/assistant_celery/{task_id}/status")
async def get_task_status(task_id: str):
try:
return get_celery_task_status(task_id=task_id)
except Exception as e:
api_logger.error('error getting llm status', exc_info=e)
return {"error": e.__str__()}

@app.get("/assistant_celery/{task_id}/result")
async def get_task_result(task_id: str):
try:
return StreamingResponse(stream_celery_task_response(task_id),
media_type="text/plain")
except Exception as e:
api_logger.error('error getting llm result', exc_info=e)
return e.__str__()

@sio_server.on('assistant_test')
async def handle_message(sid, request_data: dict):
try:
Expand Down
Empty file.
61 changes: 61 additions & 0 deletions deploy/docker/cp-ai/cp_ai/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import asyncio
import time
from typing import Any

from cp_ai.agents import answer_using_default_agent_sync
import cp_ai.database.chats.methods as chats
from cp_ai.common.types import Message
from celery import Celery, current_task
from celery.result import AsyncResult
from cp_ai.common.settings import cp_ai_settings

import redis
import redis.asyncio as aioredis

cp_ai_service_broker = cp_ai_settings.CP_AI_SERVICE_BROKER
cp_ai_service_backend = cp_ai_settings.CP_AI_SERVICE_BACKEND
celery_app = Celery(
"worker",
broker=cp_ai_service_broker,
backend=cp_ai_service_backend
)
celery_app.conf.update(
task_track_started=True
)
r = redis.Redis()
aior = aioredis.Redis()

@celery_app.task
def assist(messages, chat_id, bearer):
task_id = current_task.request.id
resp = answer_using_default_agent_sync(
messages=messages,
bearer=bearer
)
for chunk in resp.response_gen:
r.rpush(task_id, chunk)
print(chunk)
r.rpush(task_id, "done")
message = Message.create_message(
resp.response,
chat_id=chat_id,
is_assistant=True
)
chats.save_message(chat_id, message)

async def stream_celery_task_response(task_id):
start = time.time()
while True:
if time.time() - start > cp_ai_settings.CP_AI_RESPONSE_TIMEOUT:
raise asyncio.TimeoutError("llm getting result timed out")
chunk = await aior.lpop(task_id)
if chunk is None:
await asyncio.sleep(0.5)
continue
if "done" == chunk.decode():
break
yield chunk

def get_celery_task_status(task_id: str) -> dict[str, Any]:
result = AsyncResult(task_id)
return {"status": result.status}
6 changes: 6 additions & 0 deletions deploy/docker/cp-ai/cp_ai/common/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ class CpAiSettings(BaseModel):
CHATS_DB_NAME: str = os.environ.get('CHATS_DB_NAME', 'chatbot-db')
# ------------------

# Celery settings
CP_AI_SERVICE_BROKER: str | None =os.environ.get('CP_AI_SERVICE_BROKER', 'redis://localhost:6379')
CP_AI_SERVICE_BACKEND: str | None = os.environ.get('CP_AI_SERVICE_BACKEND', 'redis://localhost:6379')
CP_AI_RESPONSE_TIMEOUT: int | None = os.environ.get('CP_AI_RESPONSE_TIMEOUT', 30)
# ------------------

# General settings
CP_AI_LOGS_DIR: str = os.environ.get('CP_AI_LOGS_DIR', '/var/log')
_CP_AI_API_LOGS: str | None = os.environ.get('CP_AI_API_LOGS', None)
Expand Down
3 changes: 3 additions & 0 deletions deploy/docker/cp-ai/launch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ EOM
mongod --config "$MONGODB_CONFIG_PATH" &
MONGO_PID=$!

echo "Starting celery worker"
celery -A cp-ai.celery.tasks worker -l info

echo "Creating documents index"
python -m cp_ai.database.create

Expand Down
4 changes: 3 additions & 1 deletion deploy/docker/cp-ai/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ python-dotenv==1.0.1
pydantic==2.10.3
chromadb==0.6.3
requests==2.32.4
pymongo==4.13.0
pymongo==4.13.0
celery==5.4.0
redis==5.2.1