Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions .github/workflows/compare-branchs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
name: Compare Branch Behavior

on:
pull_request:
branches: [ main ]

jobs:
compare-branches:
runs-on: ubuntu-latest
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:latest
env:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
ALLOW_PLAINTEXT_LISTENER: "yes"
ports:
- 9092:9092
options: >-
--health-cmd "kafka-topics.sh --bootstrap-server localhost:9092 --list"
--health-interval 10s
--health-timeout 5s
--health-retries 10

steps:
- name: Checkout PR branch
uses: actions/checkout@v4
with:
path: feature

- name: Checkout main branch
uses: actions/checkout@v4
with:
ref: main
path: main

- name: Build Docker containers
run: |
docker build -t app_main ./main
docker build -t app_feature ./feature

- name: Run containers and inject messages
run: |
docker network create kafka_net

docker run -d --name app_main --network kafka_net -e INPUT_TOPIC=entrada -e OUTPUT_TOPIC=saida_main app_main
docker run -d --name app_feature --network kafka_net -e INPUT_TOPIC=entrada -e OUTPUT_TOPIC=saida_feature app_feature

# injeta a mesma mensagem nos dois
echo '{"valor": 10}' | kafka-console-producer.sh --broker-list localhost:9092 --topic entrada

# espera processamento
sleep 10

- name: Compare Results
run: |
pip install kafka-python
python compare/compare_outputs.py
4 changes: 3 additions & 1 deletion app/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@
WAHA_ADMIN: Final[str] = os.getenv("WAHA_ADMIN")
ENV : Final[str] = os.getenv("ENV")
DATABASE_HOST : Final[str] = os.getenv("DATABASE_HOST")
DATABASE_PORT : Final[str] = os.getenv("DATABASE_PORT")
DATABASE_PORT : Final[str] = os.getenv("DATABASE_PORT")
KAFKA_BROKER : Final[str] = os.getenv('KAFKA_BROKER')
KAFKA_ADMIN_CLIENT : Final[str] = os.getenv('KAFKA_ADMIN_CLIENT')
4 changes: 2 additions & 2 deletions app/lib/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ def save_messages(self, data: dict) -> None:
if "_id" in data.keys():
del data['_id']
except Exception as e:
print(e)
return {'error': e}

def save_answer(self, data: str) -> None:
try:
self.db.answer.insert_one({"answer": data})
except Exception as e:
print(e)
return {'error': e}
33 changes: 18 additions & 15 deletions app/lib/waha_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,25 @@ class WhatsError(Exception):
pass

def send_message(chat_id: int, text: str) -> dict | None:
url = f"http://{WAHA_HOST}:{WAHA_PORT}/api/sendText"
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
data = {
"session": WAHA_ADMIN,
"chatId": f"55{chat_id}@c.us",
"text": text
}
response = requests.post(url, headers=headers, json=data)
try:
url = f"http://{WAHA_HOST}:{WAHA_PORT}/api/sendText"
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
data = {
"session": WAHA_ADMIN,
"chatId": f"55{chat_id}@c.us",
"text": text
}
response = requests.post(url, headers=headers, json=data)

if response.status_code <= 204:
return response.json()
else:
raise WahaAPI.WhatsError("Have any error in your datas, please check!")
if response.status_code <= 204:
return response.json()
else:
raise WahaAPI.WhatsError("Have any error in your datas, please check!")
except Exception as e:
return {"message": e}

def reply(chat_id: int, message_id: int, text: str):
url = f"http://{WAHA_HOST}:{WAHA_PORT}/api/reply/"
Expand Down
66 changes: 46 additions & 20 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,53 @@
from app.lib.gemini import Gemini
from app.lib.waha_api import WahaAPI
from app.lib.database import DataBase
from app.lib.sheets import Sheets
from fastapi import FastAPI, Request
from app.services.message_service import MessageService
from app.config.settings import KAFKA_ADMIN_CLIENT, KAFKA_BROKER
from app.models.producer import ProduceMessage
from app.utils.producer import produce_kafka_message, KAFKA_TOPIC
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Depends, BackgroundTasks
from fastapi.responses import JSONResponse
from kafka.admin import KafkaAdminClient, NewTopic

sheets = Sheets()
app = FastAPI()
gemini = Gemini()
db = DataBase()
@asynccontextmanager
async def lifespan(app: FastAPI):

@app.post("/receive_message")
async def receive_message(request: Request):
admin_client = KafkaAdminClient(
bootstrap_servers=KAFKA_BROKER,
client_id=KAFKA_ADMIN_CLIENT
)
if not KAFKA_TOPIC in admin_client.list_topics():
admin_client.create_topics(
new_topics=[
NewTopic(
name=KAFKA_TOPIC,
num_partitions=1,
replication_factor=1
)
],
validate_only=False
)
# admin_client.delete_topics(topics=[KAFKA_TOPIC])
yield # separation point

app = FastAPI(
title="ChatBot in Python!",
version="1.0.0",
lifespan=lifespan
)


@app.post("/receive_message", tags=['Webhook'])
async def receive_message(
request: Request,
message_service: MessageService = Depends()
):
try:
response = await request.json()
ia_response = sheets.verify_response(response["payload"]["body"])
db.save_answer(ia_response)
db.save_messages(response)
WahaAPI.reply(
response["payload"]["from"],
response["payload"]["id"],
ia_response
)
await message_service.process_message(response)
return JSONResponse(content=response, status_code=200)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=400)
return JSONResponse({"error": str(e)}, status_code=400)

@app.post('/produce/message/', tags=["Produce Message"])
async def produce_message(messageRequest : ProduceMessage, background_tasks : BackgroundTasks):
background_tasks.add_task(produce_kafka_message, messageRequest)
return {"message": "Message Received, tahnk you for sending a message!"}
4 changes: 4 additions & 0 deletions app/models/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from pydantic import BaseModel, Field

class ProduceMessage(BaseModel):
message : str = Field(min_length=1, max_length=250)
21 changes: 21 additions & 0 deletions app/services/message_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from app.lib.waha_api import WahaAPI
from app.lib.database import DataBase
from app.lib.sheets import Sheets

class MessageService:
def __init__(self):
self.sheets = Sheets()
self.db = DataBase()
self.waha_api = WahaAPI()

async def process_message(self, response: dict) -> None:
ia_response = self.sheets.verify_response(response["payload"]["body"])

self.db.save_answer(ia_response)
self.db.save_messages(response)

self.waha_api.reply(
response["payload"]["from"],
response["payload"]["id"],
ia_response
)
27 changes: 27 additions & 0 deletions app/utils/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from app.config.settings import KAFKA_BROKER
from app.models.producer import ProduceMessage
from kafka import KafkaProducer
from fastapi import HTTPException
import json

# Constant Values Section
KAFKA_TOPIC = 'fastapi-topic'
PRODUCER_CLIENT_ID = 'fastapi_producer'

def serializer(message: json) -> str:
return json.dumps(message).encode()

producer = KafkaProducer(
api_version=(0, 8, 0),
bootstrap_servers=KAFKA_BROKER,
value_serializer=serializer,
client_id=PRODUCER_CLIENT_ID
)

def produce_kafka_message(messageRequest: ProduceMessage):
try:
producer.send(KAFKA_TOPIC, json.dumps({'message': messageRequest.message}))
producer.flush() # Ensures all messages are sent
except Exception as error:
print(error)
raise HTTPException(status_code=500, detail="Failed to send message")
22 changes: 22 additions & 0 deletions compare/compare_outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from kafka import KafkaConsumer
import json

def get_messages(topic):
consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
group_id=None
)
return [json.loads(msg.value.decode()) for msg in consumer]

m1 = get_messages("saida_main")
m2 = get_messages("saida_feature")

if m1 != m2:
print("As branches produzem resultados diferentes:")
print("main:", m1)
print("feature:", m2)
exit(1)
else:
print("As branches são comportamentalmente idênticas!")
46 changes: 31 additions & 15 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
services:
chatbot:
container_name: chatbot
image: chatbot1.0
ports:
- "${CHATBOT_PORT}:${CHATBOT_PORT}"
networks:
- chatbot_network
# chatbot:
# container_name: chatbot
# image: chatbot1.0
# ports:
# - "${CHATBOT_PORT}:${CHATBOT_PORT}"
# networks:
# - chatbot_network

waha_api:
container_name: waha_api
Expand All @@ -14,18 +14,34 @@ services:
- "${WAHA_PORT}:${WAHA_PORT}"
networks:
- chatbot_network
depends_on:
- chatbot

kafka:
container_name: 'kafka-chatbot'
image: apache/kafka:3.9.1

zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- chatbot_network
ports:
- "${KAFKA_PORT}:${KAFKA_PORT}"
- "2181:2181"

broker:
container_name: broker
image: confluentinc/cp-kafka:latest
networks:
- chatbot_network
ports:
- "${KAFKA_PORT}:${KAFKA_PORT}"
depends_on:
- chatbot
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

networks:
chatbot_network:
Expand Down
Loading
Loading