diff --git a/.github/workflows/compare-branchs.yml b/.github/workflows/compare-branchs.yml index 5658198..3e5912e 100644 --- a/.github/workflows/compare-branchs.yml +++ b/.github/workflows/compare-branchs.yml @@ -1,11 +1,10 @@ -name: Compare Branch Behavior +name: Compare API Responses via Kafka on: pull_request: - branches: [ main ] jobs: - compare-branches: + compare: runs-on: ubuntu-latest services: zookeeper: @@ -14,50 +13,40 @@ jobs: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest - env: - KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - ALLOW_PLAINTEXT_LISTENER: "yes" ports: - 9092:9092 - options: >- - --health-cmd "kafka-topics.sh --bootstrap-server localhost:9092 --list" - --health-interval 10s - --health-timeout 5s - --health-retries 10 + env: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 steps: - name: Checkout PR branch uses: actions/checkout@v4 with: - path: feature + ref: ${{ github.head_ref }} + path: pr_branch - name: Checkout main branch uses: actions/checkout@v4 with: - ref: main - path: main - - - name: Build Docker containers - run: | - docker build -t app_main ./main - docker build -t app_feature ./feature - - - name: Run containers and inject messages - run: | - docker network create kafka_net - - docker run -d --name app_main --network kafka_net -e INPUT_TOPIC=entrada -e OUTPUT_TOPIC=saida_main app_main - docker run -d --name app_feature --network kafka_net -e INPUT_TOPIC=entrada -e OUTPUT_TOPIC=saida_feature app_feature - - # injeta a mesma mensagem nos dois - echo '{"valor": 10}' | kafka-console-producer.sh --broker-list localhost:9092 --topic entrada + ref: ${{ github.base_ref }} + path: main_branch - # espera processamento - sleep 10 - - - name: Compare Results + - name: Build and run both APIs run: | - pip install kafka-python - python compare/compare_outputs.py \ No newline at end of file + cd pr_branch + docker build -t api-pr . + docker run -d -p 8001:8000 --name api-pr api-pr + cd ../main_branch + docker build -t api-main . + docker run -d -p 8002:8000 --name api-main api-main + + - name: Install dependencies + run: pip install confluent-kafka httpx + + - name: Run Kafka test comparator + run: python pr_branch/compare/kafka_comparator.py + env: + KAFKA_BROKER: localhost:9092 + API_MAIN_URL: http://localhost:8001 # ajuste a porta se necessário + API_BRANCH_URL: http://localhost:8002 # ajuste a porta se \ No newline at end of file diff --git a/app/main.py b/app/main.py index 6f66b2f..f4b2d2e 100644 --- a/app/main.py +++ b/app/main.py @@ -1,53 +1,58 @@ -from app.services.message_service import MessageService -from app.config.settings import KAFKA_ADMIN_CLIENT, KAFKA_BROKER -from app.models.producer import ProduceMessage -from app.utils.producer import produce_kafka_message, KAFKA_TOPIC -from contextlib import asynccontextmanager -from fastapi import FastAPI, Request, Depends, BackgroundTasks +from threading import Thread +from app.config.settings import KAFKA_BROKER +from consumer.consumer import KAFKA_TOPIC +from fastapi import FastAPI, Request from fastapi.responses import JSONResponse -from kafka.admin import KafkaAdminClient, NewTopic - -@asynccontextmanager -async def lifespan(app: FastAPI): - - admin_client = KafkaAdminClient( - bootstrap_servers=KAFKA_BROKER, - client_id=KAFKA_ADMIN_CLIENT - ) - if not KAFKA_TOPIC in admin_client.list_topics(): - admin_client.create_topics( - new_topics=[ - NewTopic( - name=KAFKA_TOPIC, - num_partitions=1, - replication_factor=1 - ) - ], - validate_only=False - ) - # admin_client.delete_topics(topics=[KAFKA_TOPIC]) - yield # separation point +from kafka import KafkaProducer +from confluent_kafka import Consumer +import json + app = FastAPI( title="ChatBot in Python!", - version="1.0.0", - lifespan=lifespan + version="1.0.0" +) + +producer = KafkaProducer( + bootstrap_servers=KAFKA_BROKER, + value_serializer=lambda v: json.dumps(v).encode('utf-8') ) +messages = [] + +def kafka_consumer(): + consumer = Consumer({ + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'mygroup', + 'auto.offset.reset': 'earliest' + }) + consumer.subscribe(['test-topic']) + + while True: + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + print(f"Consumer error: {msg.error()}") + continue + # Armazena a mensagem em memória + messages.append(msg.value().decode('utf-8')) + +# Inicia o consumer em background +Thread(target=kafka_consumer, daemon=True).start() @app.post("/receive_message", tags=['Webhook']) async def receive_message( - request: Request, - message_service: MessageService = Depends() + request: Request ): try: - response = await request.json() - await message_service.process_message(response) - return JSONResponse(content=response, status_code=200) + payload = await request.json() + producer.send(KAFKA_TOPIC, payload) + producer.flush() + return JSONResponse(content={"message": "The message send to broker!"}, status_code=200) except Exception as e: return JSONResponse({"error": str(e)}, status_code=400) -@app.post('/produce/message/', tags=["Produce Message"]) -async def produce_message(messageRequest : ProduceMessage, background_tasks : BackgroundTasks): - background_tasks.add_task(produce_kafka_message, messageRequest) - return {"message": "Message Received, tahnk you for sending a message!"} \ No newline at end of file +@app.get("/results") +async def results(): + return {"messages": messages} \ No newline at end of file diff --git a/app/utils/producer.py b/app/utils/producer.py deleted file mode 100644 index ef2f0f8..0000000 --- a/app/utils/producer.py +++ /dev/null @@ -1,27 +0,0 @@ -from app.config.settings import KAFKA_BROKER -from app.models.producer import ProduceMessage -from kafka import KafkaProducer -from fastapi import HTTPException -import json - -# Constant Values Section -KAFKA_TOPIC = 'fastapi-topic' -PRODUCER_CLIENT_ID = 'fastapi_producer' - -def serializer(message: json) -> str: - return json.dumps(message).encode() - -producer = KafkaProducer( - api_version=(0, 8, 0), - bootstrap_servers=KAFKA_BROKER, - value_serializer=serializer, - client_id=PRODUCER_CLIENT_ID -) - -def produce_kafka_message(messageRequest: ProduceMessage): - try: - producer.send(KAFKA_TOPIC, json.dumps({'message': messageRequest.message})) - producer.flush() # Ensures all messages are sent - except Exception as error: - print(error) - raise HTTPException(status_code=500, detail="Failed to send message") \ No newline at end of file diff --git a/compare/compare_outputs.py b/compare/compare_outputs.py deleted file mode 100644 index 86d03b7..0000000 --- a/compare/compare_outputs.py +++ /dev/null @@ -1,22 +0,0 @@ -from kafka import KafkaConsumer -import json - -def get_messages(topic): - consumer = KafkaConsumer( - topic, - bootstrap_servers='localhost:9092', - auto_offset_reset='earliest', - group_id=None - ) - return [json.loads(msg.value.decode()) for msg in consumer] - -m1 = get_messages("saida_main") -m2 = get_messages("saida_feature") - -if m1 != m2: - print("As branches produzem resultados diferentes:") - print("main:", m1) - print("feature:", m2) - exit(1) -else: - print("As branches são comportamentalmente idênticas!") \ No newline at end of file diff --git a/compare/kafka_comparator.py b/compare/kafka_comparator.py new file mode 100644 index 0000000..7bd762f --- /dev/null +++ b/compare/kafka_comparator.py @@ -0,0 +1,40 @@ +import os +import json +from confluent_kafka import Producer +import httpx +import time + +KAFKA_BROKER = os.environ['KAFKA_BROKER'] +API_MAIN_URL = os.environ['API_MAIN_URL'] +API_BRANCH_URL = os.environ['API_BRANCH_URL'] +TOPIC = 'test-topic' + +producer = Producer({'bootstrap.servers': KAFKA_BROKER}) + +def send_test_message(payload): + producer.produce(TOPIC, value=json.dumps(payload).encode('utf-8')) + producer.flush() + +test_payloads = [ + {"input": "teste1"}, + {"input": "teste2"}, + {"input": "teste3"}, +] + +for payload in test_payloads: + send_test_message(payload) + +print("Esperando 5 segundos para ler as mensagens ") +time.sleep(5) + +with httpx.Client() as client: + main_resp = client.get(f"{API_MAIN_URL}/results").json() + branch_resp = client.get(f"{API_BRANCH_URL}/results").json() + +if main_resp != branch_resp: + print("Diferença encontrada!") + print("Main:", main_resp) + print("Branch:", branch_resp) + exit(1) +else: + print("Respostas iguais.") \ No newline at end of file diff --git a/consumer/consumer.py b/consumer/consumer.py new file mode 100644 index 0000000..dded36c --- /dev/null +++ b/consumer/consumer.py @@ -0,0 +1,27 @@ +from app.config.settings import KAFKA_BROKER +from app.services.message_service import MessageService +from kafka import KafkaConsumer +import json + + +# Constant Values Section +KAFKA_TOPIC = 'message-chatbot' +KAFKA_GROUP_ID = 'group_chatbot' + +class ConsumerKafka: + + def __init__(self): + self.service = MessageService() + self.consumer = KafkaConsumer( + KAFKA_TOPIC, + bootstrap_servers='localhost:9092', + auto_offset_reset='earliest', + group_id=KAFKA_GROUP_ID + ) + + def process_messages(self): + if self.consumer: + for msg in self.consumer: + data = json.loads(msg.value.decode('utf-8')) + self.service.process_message(data) + print("Mensagem Processada!") diff --git a/docker-compose.yml b/docker-compose.yml index 0374c6c..a9c48b0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,13 @@ services: - # chatbot: - # container_name: chatbot - # image: chatbot1.0 - # ports: - # - "${CHATBOT_PORT}:${CHATBOT_PORT}" - # networks: - # - chatbot_network + chatbot: + container_name: chatbot + build: + context: . + dockerfile: Dockerfile + ports: + - "${CHATBOT_PORT}:${CHATBOT_PORT}" + networks: + - chatbot_network waha_api: container_name: waha_api @@ -14,7 +16,7 @@ services: - "${WAHA_PORT}:${WAHA_PORT}" networks: - chatbot_network - + zookeeper: image: confluentinc/cp-zookeeper:latest container_name: zookeeper diff --git a/requirements.txt b/requirements.txt index 473fcc8..d619971 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ cffi==1.17.1 charset-normalizer==3.4.1 ChatterBot==1.2.0 click==8.1.8 +confluent-kafka==2.10.1 consonance==0.1.2 cryptography==44.0.0 decorator==5.1.1