From c17d4eb043785a914100865d387683877274fc1d Mon Sep 17 00:00:00 2001 From: eulucaslim Date: Tue, 10 Jun 2025 00:37:22 -0400 Subject: [PATCH 1/2] Editing endpoint --- app/lib/database.py | 4 ++-- app/lib/waha_api.py | 33 ++++++++++++++++++--------------- app/main.py | 31 ++++++++++++------------------- app/services/message_service.py | 21 +++++++++++++++++++++ 4 files changed, 53 insertions(+), 36 deletions(-) create mode 100644 app/services/message_service.py diff --git a/app/lib/database.py b/app/lib/database.py index b0d48eb..869dafb 100644 --- a/app/lib/database.py +++ b/app/lib/database.py @@ -14,10 +14,10 @@ def save_messages(self, data: dict) -> None: if "_id" in data.keys(): del data['_id'] except Exception as e: - print(e) + return {'error': e} def save_answer(self, data: str) -> None: try: self.db.answer.insert_one({"answer": data}) except Exception as e: - print(e) \ No newline at end of file + return {'error': e} \ No newline at end of file diff --git a/app/lib/waha_api.py b/app/lib/waha_api.py index 1a91d1f..e251647 100644 --- a/app/lib/waha_api.py +++ b/app/lib/waha_api.py @@ -9,22 +9,25 @@ class WhatsError(Exception): pass def send_message(chat_id: int, text: str) -> dict | None: - url = f"http://{WAHA_HOST}:{WAHA_PORT}/api/sendText" - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } - data = { - "session": WAHA_ADMIN, - "chatId": f"55{chat_id}@c.us", - "text": text - } - response = requests.post(url, headers=headers, json=data) + try: + url = f"http://{WAHA_HOST}:{WAHA_PORT}/api/sendText" + headers = { + 'accept': 'application/json', + 'Content-Type': 'application/json' + } + data = { + "session": WAHA_ADMIN, + "chatId": f"55{chat_id}@c.us", + "text": text + } + response = requests.post(url, headers=headers, json=data) - if response.status_code <= 204: - return response.json() - else: - raise WahaAPI.WhatsError("Have any error in your datas, please check!") + if response.status_code <= 204: + return response.json() + else: + raise WahaAPI.WhatsError("Have any error in your datas, please check!") + except Exception as e: + return {"message": e} def reply(chat_id: int, message_id: int, text: str): url = f"http://{WAHA_HOST}:{WAHA_PORT}/api/reply/" diff --git a/app/main.py b/app/main.py index c159d90..8ea95a2 100644 --- a/app/main.py +++ b/app/main.py @@ -1,27 +1,20 @@ -from app.lib.gemini import Gemini -from app.lib.waha_api import WahaAPI -from app.lib.database import DataBase -from app.lib.sheets import Sheets -from fastapi import FastAPI, Request +from fastapi import FastAPI, Request, Depends from fastapi.responses import JSONResponse +from app.services.message_service import MessageService -sheets = Sheets() -app = FastAPI() -gemini = Gemini() -db = DataBase() +app = FastAPI( + title="ChatBot in Python!", + version="1.0.0" +) -@app.post("/receive_message") -async def receive_message(request: Request): +@app.post("/receive_message", tags=['Webhook']) +async def receive_message( + request: Request, + message_service: MessageService = Depends() +): try: response = await request.json() - ia_response = sheets.verify_response(response["payload"]["body"]) - db.save_answer(ia_response) - db.save_messages(response) - WahaAPI.reply( - response["payload"]["from"], - response["payload"]["id"], - ia_response - ) + await message_service.process_message(response) return JSONResponse(content=response, status_code=200) except Exception as e: return JSONResponse({"error": str(e)}, status_code=400) \ No newline at end of file diff --git a/app/services/message_service.py b/app/services/message_service.py new file mode 100644 index 0000000..e87b8b9 --- /dev/null +++ b/app/services/message_service.py @@ -0,0 +1,21 @@ +from app.lib.waha_api import WahaAPI +from app.lib.database import DataBase +from app.lib.sheets import Sheets + +class MessageService: + def __init__(self): + self.sheets = Sheets() + self.db = DataBase() + self.waha_api = WahaAPI() + + async def process_message(self, response: dict) -> None: + ia_response = self.sheets.verify_response(response["payload"]["body"]) + + self.db.save_answer(ia_response) + self.db.save_messages(response) + + self.waha_api.reply( + response["payload"]["from"], + response["payload"]["id"], + ia_response + ) \ No newline at end of file From d890e4c909dbf55e926afd13edd520f5dc897037 Mon Sep 17 00:00:00 2001 From: eulucaslim Date: Tue, 10 Jun 2025 20:07:35 -0400 Subject: [PATCH 2/2] Add actions using kafka --- .github/workflows/compare-branchs.yml | 63 +++++++++++++++++++++++++++ app/config/settings.py | 4 +- app/main.py | 41 +++++++++++++++-- app/models/producer.py | 4 ++ app/utils/producer.py | 27 ++++++++++++ compare/compare_outputs.py | 22 ++++++++++ docker-compose.yml | 46 ++++++++++++------- requirements.txt | 8 +++- 8 files changed, 193 insertions(+), 22 deletions(-) create mode 100644 .github/workflows/compare-branchs.yml create mode 100644 app/models/producer.py create mode 100644 app/utils/producer.py create mode 100644 compare/compare_outputs.py diff --git a/.github/workflows/compare-branchs.yml b/.github/workflows/compare-branchs.yml new file mode 100644 index 0000000..5658198 --- /dev/null +++ b/.github/workflows/compare-branchs.yml @@ -0,0 +1,63 @@ +name: Compare Branch Behavior + +on: + pull_request: + branches: [ main ] + +jobs: + compare-branches: + runs-on: ubuntu-latest + services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + ports: + - 2181:2181 + kafka: + image: confluentinc/cp-kafka:latest + env: + KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + ALLOW_PLAINTEXT_LISTENER: "yes" + ports: + - 9092:9092 + options: >- + --health-cmd "kafka-topics.sh --bootstrap-server localhost:9092 --list" + --health-interval 10s + --health-timeout 5s + --health-retries 10 + + steps: + - name: Checkout PR branch + uses: actions/checkout@v4 + with: + path: feature + + - name: Checkout main branch + uses: actions/checkout@v4 + with: + ref: main + path: main + + - name: Build Docker containers + run: | + docker build -t app_main ./main + docker build -t app_feature ./feature + + - name: Run containers and inject messages + run: | + docker network create kafka_net + + docker run -d --name app_main --network kafka_net -e INPUT_TOPIC=entrada -e OUTPUT_TOPIC=saida_main app_main + docker run -d --name app_feature --network kafka_net -e INPUT_TOPIC=entrada -e OUTPUT_TOPIC=saida_feature app_feature + + # injeta a mesma mensagem nos dois + echo '{"valor": 10}' | kafka-console-producer.sh --broker-list localhost:9092 --topic entrada + + # espera processamento + sleep 10 + + - name: Compare Results + run: | + pip install kafka-python + python compare/compare_outputs.py \ No newline at end of file diff --git a/app/config/settings.py b/app/config/settings.py index 8902718..766f7e5 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -13,4 +13,6 @@ WAHA_ADMIN: Final[str] = os.getenv("WAHA_ADMIN") ENV : Final[str] = os.getenv("ENV") DATABASE_HOST : Final[str] = os.getenv("DATABASE_HOST") -DATABASE_PORT : Final[str] = os.getenv("DATABASE_PORT") \ No newline at end of file +DATABASE_PORT : Final[str] = os.getenv("DATABASE_PORT") +KAFKA_BROKER : Final[str] = os.getenv('KAFKA_BROKER') +KAFKA_ADMIN_CLIENT : Final[str] = os.getenv('KAFKA_ADMIN_CLIENT') \ No newline at end of file diff --git a/app/main.py b/app/main.py index 8ea95a2..6f66b2f 100644 --- a/app/main.py +++ b/app/main.py @@ -1,12 +1,40 @@ -from fastapi import FastAPI, Request, Depends -from fastapi.responses import JSONResponse from app.services.message_service import MessageService +from app.config.settings import KAFKA_ADMIN_CLIENT, KAFKA_BROKER +from app.models.producer import ProduceMessage +from app.utils.producer import produce_kafka_message, KAFKA_TOPIC +from contextlib import asynccontextmanager +from fastapi import FastAPI, Request, Depends, BackgroundTasks +from fastapi.responses import JSONResponse +from kafka.admin import KafkaAdminClient, NewTopic + +@asynccontextmanager +async def lifespan(app: FastAPI): + + admin_client = KafkaAdminClient( + bootstrap_servers=KAFKA_BROKER, + client_id=KAFKA_ADMIN_CLIENT + ) + if not KAFKA_TOPIC in admin_client.list_topics(): + admin_client.create_topics( + new_topics=[ + NewTopic( + name=KAFKA_TOPIC, + num_partitions=1, + replication_factor=1 + ) + ], + validate_only=False + ) + # admin_client.delete_topics(topics=[KAFKA_TOPIC]) + yield # separation point app = FastAPI( title="ChatBot in Python!", - version="1.0.0" + version="1.0.0", + lifespan=lifespan ) + @app.post("/receive_message", tags=['Webhook']) async def receive_message( request: Request, @@ -17,4 +45,9 @@ async def receive_message( await message_service.process_message(response) return JSONResponse(content=response, status_code=200) except Exception as e: - return JSONResponse({"error": str(e)}, status_code=400) \ No newline at end of file + return JSONResponse({"error": str(e)}, status_code=400) + +@app.post('/produce/message/', tags=["Produce Message"]) +async def produce_message(messageRequest : ProduceMessage, background_tasks : BackgroundTasks): + background_tasks.add_task(produce_kafka_message, messageRequest) + return {"message": "Message Received, tahnk you for sending a message!"} \ No newline at end of file diff --git a/app/models/producer.py b/app/models/producer.py new file mode 100644 index 0000000..87a2fa0 --- /dev/null +++ b/app/models/producer.py @@ -0,0 +1,4 @@ +from pydantic import BaseModel, Field + +class ProduceMessage(BaseModel): + message : str = Field(min_length=1, max_length=250) \ No newline at end of file diff --git a/app/utils/producer.py b/app/utils/producer.py new file mode 100644 index 0000000..ef2f0f8 --- /dev/null +++ b/app/utils/producer.py @@ -0,0 +1,27 @@ +from app.config.settings import KAFKA_BROKER +from app.models.producer import ProduceMessage +from kafka import KafkaProducer +from fastapi import HTTPException +import json + +# Constant Values Section +KAFKA_TOPIC = 'fastapi-topic' +PRODUCER_CLIENT_ID = 'fastapi_producer' + +def serializer(message: json) -> str: + return json.dumps(message).encode() + +producer = KafkaProducer( + api_version=(0, 8, 0), + bootstrap_servers=KAFKA_BROKER, + value_serializer=serializer, + client_id=PRODUCER_CLIENT_ID +) + +def produce_kafka_message(messageRequest: ProduceMessage): + try: + producer.send(KAFKA_TOPIC, json.dumps({'message': messageRequest.message})) + producer.flush() # Ensures all messages are sent + except Exception as error: + print(error) + raise HTTPException(status_code=500, detail="Failed to send message") \ No newline at end of file diff --git a/compare/compare_outputs.py b/compare/compare_outputs.py new file mode 100644 index 0000000..86d03b7 --- /dev/null +++ b/compare/compare_outputs.py @@ -0,0 +1,22 @@ +from kafka import KafkaConsumer +import json + +def get_messages(topic): + consumer = KafkaConsumer( + topic, + bootstrap_servers='localhost:9092', + auto_offset_reset='earliest', + group_id=None + ) + return [json.loads(msg.value.decode()) for msg in consumer] + +m1 = get_messages("saida_main") +m2 = get_messages("saida_feature") + +if m1 != m2: + print("As branches produzem resultados diferentes:") + print("main:", m1) + print("feature:", m2) + exit(1) +else: + print("As branches são comportamentalmente idênticas!") \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index cbe4f62..0374c6c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,11 @@ services: - chatbot: - container_name: chatbot - image: chatbot1.0 - ports: - - "${CHATBOT_PORT}:${CHATBOT_PORT}" - networks: - - chatbot_network + # chatbot: + # container_name: chatbot + # image: chatbot1.0 + # ports: + # - "${CHATBOT_PORT}:${CHATBOT_PORT}" + # networks: + # - chatbot_network waha_api: container_name: waha_api @@ -14,18 +14,34 @@ services: - "${WAHA_PORT}:${WAHA_PORT}" networks: - chatbot_network - depends_on: - - chatbot - - kafka: - container_name: 'kafka-chatbot' - image: apache/kafka:3.9.1 + + zookeeper: + image: confluentinc/cp-zookeeper:latest + container_name: zookeeper + environment: + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + networks: + - chatbot_network ports: - - "${KAFKA_PORT}:${KAFKA_PORT}" + - "2181:2181" + + broker: + container_name: broker + image: confluentinc/cp-kafka:latest networks: - chatbot_network + ports: + - "${KAFKA_PORT}:${KAFKA_PORT}" depends_on: - - chatbot + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 networks: chatbot_network: diff --git a/requirements.txt b/requirements.txt index 5fd4119..473fcc8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -47,13 +47,16 @@ ipython==8.32.0 jedi==0.19.2 Jinja2==3.1.5 jiter==0.8.2 +kafka-python==2.2.11 markdown-it-py==3.0.0 MarkupSafe==3.0.2 mathparse==0.1.2 matplotlib-inline==0.1.7 mdurl==0.1.2 multidict==6.1.0 +numpy==2.2.4 openai==0.27.10 +pandas==2.2.3 parso==0.8.4 pexpect==4.9.0 prompt_toolkit==3.0.50 @@ -68,6 +71,7 @@ pycparser==2.22 pydantic==2.10.6 pydantic_core==2.27.2 Pygments==2.19.1 +pymongo==4.12.0 pyparsing==3.2.1 python-axolotl==0.2.2 python-axolotl-curve25519==0.4.1.post2 @@ -75,7 +79,6 @@ python-dateutil==2.9.0.post0 python-dotenv==1.0.1 python-multipart==0.0.20 pytz==2025.1 -pymongo==4.12.0 PyYAML==6.0.2 regex==2024.11.6 requests==2.32.3 @@ -89,11 +92,13 @@ SQLAlchemy==2.0.38 stack-data==0.6.3 starlette==0.45.3 tabulate==0.9.0 +tiktoken==0.3.3 tqdm==4.67.1 traitlets==5.14.3 transitions==0.9.2 typer==0.15.1 typing_extensions==4.12.2 +tzdata==2025.2 uritemplate==4.1.1 urllib3==2.3.0 uvicorn==0.34.0 @@ -102,4 +107,3 @@ watchfiles==1.0.4 wcwidth==0.2.13 websockets==14.2 yarl==1.18.3 -pandas==2.2.3 \ No newline at end of file