Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 26 additions & 37 deletions .github/workflows/compare-branchs.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
name: Compare Branch Behavior
name: Compare API Responses via Kafka

on:
pull_request:
branches: [ main ]

jobs:
compare-branches:
compare:
runs-on: ubuntu-latest
services:
zookeeper:
Expand All @@ -14,50 +13,40 @@ jobs:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:latest
env:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
ALLOW_PLAINTEXT_LISTENER: "yes"
ports:
- 9092:9092
options: >-
--health-cmd "kafka-topics.sh --bootstrap-server localhost:9092 --list"
--health-interval 10s
--health-timeout 5s
--health-retries 10
env:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

steps:
- name: Checkout PR branch
uses: actions/checkout@v4
with:
path: feature
ref: ${{ github.head_ref }}
path: pr_branch

- name: Checkout main branch
uses: actions/checkout@v4
with:
ref: main
path: main

- name: Build Docker containers
run: |
docker build -t app_main ./main
docker build -t app_feature ./feature

- name: Run containers and inject messages
run: |
docker network create kafka_net

docker run -d --name app_main --network kafka_net -e INPUT_TOPIC=entrada -e OUTPUT_TOPIC=saida_main app_main
docker run -d --name app_feature --network kafka_net -e INPUT_TOPIC=entrada -e OUTPUT_TOPIC=saida_feature app_feature

# injeta a mesma mensagem nos dois
echo '{"valor": 10}' | kafka-console-producer.sh --broker-list localhost:9092 --topic entrada
ref: ${{ github.base_ref }}
path: main_branch

# espera processamento
sleep 10

- name: Compare Results
- name: Build and run both APIs
run: |
pip install kafka-python
python compare/compare_outputs.py
cd pr_branch
docker build -t api-pr .
docker run -d -p 8001:8000 --name api-pr api-pr
cd ../main_branch
docker build -t api-main .
docker run -d -p 8002:8000 --name api-main api-main

- name: Install dependencies
run: pip install confluent-kafka httpx

- name: Run Kafka test comparator
run: python pr_branch/compare/kafka_comparator.py
env:
KAFKA_BROKER: localhost:9092
API_MAIN_URL: http://localhost:8001 # ajuste a porta se necessário
API_BRANCH_URL: http://localhost:8002 # ajuste a porta se
83 changes: 44 additions & 39 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,58 @@
from app.services.message_service import MessageService
from app.config.settings import KAFKA_ADMIN_CLIENT, KAFKA_BROKER
from app.models.producer import ProduceMessage
from app.utils.producer import produce_kafka_message, KAFKA_TOPIC
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Depends, BackgroundTasks
from threading import Thread
from app.config.settings import KAFKA_BROKER
from consumer.consumer import KAFKA_TOPIC
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from kafka.admin import KafkaAdminClient, NewTopic

@asynccontextmanager
async def lifespan(app: FastAPI):

admin_client = KafkaAdminClient(
bootstrap_servers=KAFKA_BROKER,
client_id=KAFKA_ADMIN_CLIENT
)
if not KAFKA_TOPIC in admin_client.list_topics():
admin_client.create_topics(
new_topics=[
NewTopic(
name=KAFKA_TOPIC,
num_partitions=1,
replication_factor=1
)
],
validate_only=False
)
# admin_client.delete_topics(topics=[KAFKA_TOPIC])
yield # separation point
from kafka import KafkaProducer
from confluent_kafka import Consumer
import json


app = FastAPI(
title="ChatBot in Python!",
version="1.0.0",
lifespan=lifespan
version="1.0.0"
)

producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

messages = []

def kafka_consumer():
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['test-topic'])

while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
# Armazena a mensagem em memória
messages.append(msg.value().decode('utf-8'))

# Inicia o consumer em background
Thread(target=kafka_consumer, daemon=True).start()

@app.post("/receive_message", tags=['Webhook'])
async def receive_message(
request: Request,
message_service: MessageService = Depends()
request: Request
):
try:
response = await request.json()
await message_service.process_message(response)
return JSONResponse(content=response, status_code=200)
payload = await request.json()
producer.send(KAFKA_TOPIC, payload)
producer.flush()
return JSONResponse(content={"message": "The message send to broker!"}, status_code=200)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=400)

@app.post('/produce/message/', tags=["Produce Message"])
async def produce_message(messageRequest : ProduceMessage, background_tasks : BackgroundTasks):
background_tasks.add_task(produce_kafka_message, messageRequest)
return {"message": "Message Received, tahnk you for sending a message!"}
@app.get("/results")
async def results():
return {"messages": messages}
27 changes: 0 additions & 27 deletions app/utils/producer.py

This file was deleted.

22 changes: 0 additions & 22 deletions compare/compare_outputs.py

This file was deleted.

40 changes: 40 additions & 0 deletions compare/kafka_comparator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os
import json
from confluent_kafka import Producer
import httpx
import time

KAFKA_BROKER = os.environ['KAFKA_BROKER']
API_MAIN_URL = os.environ['API_MAIN_URL']
API_BRANCH_URL = os.environ['API_BRANCH_URL']
TOPIC = 'test-topic'

producer = Producer({'bootstrap.servers': KAFKA_BROKER})

def send_test_message(payload):
producer.produce(TOPIC, value=json.dumps(payload).encode('utf-8'))
producer.flush()

test_payloads = [
{"input": "teste1"},
{"input": "teste2"},
{"input": "teste3"},
]

for payload in test_payloads:
send_test_message(payload)

print("Esperando 5 segundos para ler as mensagens ")
time.sleep(5)

with httpx.Client() as client:
main_resp = client.get(f"{API_MAIN_URL}/results").json()
branch_resp = client.get(f"{API_BRANCH_URL}/results").json()

if main_resp != branch_resp:
print("Diferença encontrada!")
print("Main:", main_resp)
print("Branch:", branch_resp)
exit(1)
else:
print("Respostas iguais.")
27 changes: 27 additions & 0 deletions consumer/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from app.config.settings import KAFKA_BROKER
from app.services.message_service import MessageService
from kafka import KafkaConsumer
import json


# Constant Values Section
KAFKA_TOPIC = 'message-chatbot'
KAFKA_GROUP_ID = 'group_chatbot'

class ConsumerKafka:

def __init__(self):
self.service = MessageService()
self.consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
group_id=KAFKA_GROUP_ID
)

def process_messages(self):
if self.consumer:
for msg in self.consumer:
data = json.loads(msg.value.decode('utf-8'))
self.service.process_message(data)
print("Mensagem Processada!")
18 changes: 10 additions & 8 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
services:
# chatbot:
# container_name: chatbot
# image: chatbot1.0
# ports:
# - "${CHATBOT_PORT}:${CHATBOT_PORT}"
# networks:
# - chatbot_network
chatbot:
container_name: chatbot
build:
context: .
dockerfile: Dockerfile
ports:
- "${CHATBOT_PORT}:${CHATBOT_PORT}"
networks:
- chatbot_network

waha_api:
container_name: waha_api
Expand All @@ -14,7 +16,7 @@ services:
- "${WAHA_PORT}:${WAHA_PORT}"
networks:
- chatbot_network

zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cffi==1.17.1
charset-normalizer==3.4.1
ChatterBot==1.2.0
click==8.1.8
confluent-kafka==2.10.1
consonance==0.1.2
cryptography==44.0.0
decorator==5.1.1
Expand Down
Loading