From 6da014e8a12b6b692165c410e2a52f7598f03dc9 Mon Sep 17 00:00:00 2001 From: eulucaslim Date: Tue, 10 Jun 2025 20:17:11 -0400 Subject: [PATCH 1/8] Testing other image in actions --- .github/workflows/compare-branchs.yml | 4 ++-- compare/compare_outputs.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/compare-branchs.yml b/.github/workflows/compare-branchs.yml index 5658198..fde1528 100644 --- a/.github/workflows/compare-branchs.yml +++ b/.github/workflows/compare-branchs.yml @@ -9,11 +9,11 @@ jobs: runs-on: ubuntu-latest services: zookeeper: - image: confluentinc/cp-zookeeper:latest + image: bitnami/zookeeper:latest ports: - 2181:2181 kafka: - image: confluentinc/cp-kafka:latest + image: bitnami/kafka:latest env: KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 diff --git a/compare/compare_outputs.py b/compare/compare_outputs.py index 86d03b7..0fc39f5 100644 --- a/compare/compare_outputs.py +++ b/compare/compare_outputs.py @@ -1,7 +1,7 @@ from kafka import KafkaConsumer import json -def get_messages(topic): +def get_messages(topic: str) -> list: consumer = KafkaConsumer( topic, bootstrap_servers='localhost:9092', From 9d61581634442a006188d7d55d49d1039ccfa90d Mon Sep 17 00:00:00 2001 From: eulucaslim Date: Tue, 10 Jun 2025 20:20:23 -0400 Subject: [PATCH 2/8] edit --- .github/workflows/compare-branchs.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/compare-branchs.yml b/.github/workflows/compare-branchs.yml index fde1528..7c10b0c 100644 --- a/.github/workflows/compare-branchs.yml +++ b/.github/workflows/compare-branchs.yml @@ -10,6 +10,8 @@ jobs: services: zookeeper: image: bitnami/zookeeper:latest + env: + ALLOW_ANONYMOUS_LOGIN: "yes" ports: - 2181:2181 kafka: From 288e76b62a786950b531e00f2bd1c65c0c36415f Mon Sep 17 00:00:00 2001 From: eulucaslim Date: Tue, 10 Jun 2025 20:21:27 -0400 Subject: [PATCH 3/8] edit --- .github/workflows/compare-branchs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/compare-branchs.yml b/.github/workflows/compare-branchs.yml index 7c10b0c..4ae2356 100644 --- a/.github/workflows/compare-branchs.yml +++ b/.github/workflows/compare-branchs.yml @@ -11,7 +11,7 @@ jobs: zookeeper: image: bitnami/zookeeper:latest env: - ALLOW_ANONYMOUS_LOGIN: "yes" + ALLOW_ANONYMOUS_LOGIN: yes ports: - 2181:2181 kafka: From 402f729feac63e09b25f4bee8ab09e5c07393080 Mon Sep 17 00:00:00 2001 From: eulucaslim Date: Sat, 14 Jun 2025 10:52:18 -0400 Subject: [PATCH 4/8] Testing a GitHubActions --- .github/workflows/compare-branchs.yml | 69 +++++++++------------- app/main.py | 83 ++++++++++++++------------- app/utils/producer.py | 27 --------- compare/compare_outputs.py | 22 ------- compare/kafka_comparator.py | 40 +++++++++++++ consumer/consumer.py | 27 +++++++++ docker-compose.yml | 18 +++--- requirements.txt | 1 + 8 files changed, 150 insertions(+), 137 deletions(-) delete mode 100644 app/utils/producer.py delete mode 100644 compare/compare_outputs.py create mode 100644 compare/kafka_comparator.py create mode 100644 consumer/consumer.py diff --git a/.github/workflows/compare-branchs.yml b/.github/workflows/compare-branchs.yml index 4ae2356..9babe6b 100644 --- a/.github/workflows/compare-branchs.yml +++ b/.github/workflows/compare-branchs.yml @@ -1,65 +1,52 @@ -name: Compare Branch Behavior +name: Compare API Responses via Kafka on: pull_request: - branches: [ main ] jobs: - compare-branches: + compare: runs-on: ubuntu-latest services: zookeeper: - image: bitnami/zookeeper:latest - env: - ALLOW_ANONYMOUS_LOGIN: yes + image: wurstmeister/zookeeper:3.4.6 ports: - 2181:2181 kafka: - image: bitnami/kafka:latest - env: - KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - ALLOW_PLAINTEXT_LISTENER: "yes" + image: wurstmeister/kafka:2.12-2.2.1 ports: - 9092:9092 - options: >- - --health-cmd "kafka-topics.sh --bootstrap-server localhost:9092 --list" - --health-interval 10s - --health-timeout 5s - --health-retries 10 + env: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 steps: - name: Checkout PR branch uses: actions/checkout@v4 with: - path: feature + ref: ${{ github.head_ref }} + path: pr_branch - name: Checkout main branch uses: actions/checkout@v4 with: - ref: main - path: main + ref: ${{ github.base_ref }} + path: main_branch - - name: Build Docker containers + - name: Build and run both APIs run: | - docker build -t app_main ./main - docker build -t app_feature ./feature - - - name: Run containers and inject messages - run: | - docker network create kafka_net - - docker run -d --name app_main --network kafka_net -e INPUT_TOPIC=entrada -e OUTPUT_TOPIC=saida_main app_main - docker run -d --name app_feature --network kafka_net -e INPUT_TOPIC=entrada -e OUTPUT_TOPIC=saida_feature app_feature - - # injeta a mesma mensagem nos dois - echo '{"valor": 10}' | kafka-console-producer.sh --broker-list localhost:9092 --topic entrada - - # espera processamento - sleep 10 - - - name: Compare Results - run: | - pip install kafka-python - python compare/compare_outputs.py \ No newline at end of file + cd pr_branch + docker build -t api-pr . + docker run -d -p 8001:8000 --name api-pr api-pr + cd ../main_branch + docker build -t api-main . + docker run -d -p 8002:8000 --name api-main api-main + + - name: Install dependencies + run: pip install confluent-kafka httpx + + - name: Run Kafka test comparator + run: python main_branch/compare/kafka_comparator.py + env: + KAFKA_BROKER: localhost:9092 + API_MAIN_URL: http://localhost:8001 # ajuste a porta se necessário + API_BRANCH_URL: http://localhost:8002 # ajuste a porta se \ No newline at end of file diff --git a/app/main.py b/app/main.py index 6f66b2f..f4b2d2e 100644 --- a/app/main.py +++ b/app/main.py @@ -1,53 +1,58 @@ -from app.services.message_service import MessageService -from app.config.settings import KAFKA_ADMIN_CLIENT, KAFKA_BROKER -from app.models.producer import ProduceMessage -from app.utils.producer import produce_kafka_message, KAFKA_TOPIC -from contextlib import asynccontextmanager -from fastapi import FastAPI, Request, Depends, BackgroundTasks +from threading import Thread +from app.config.settings import KAFKA_BROKER +from consumer.consumer import KAFKA_TOPIC +from fastapi import FastAPI, Request from fastapi.responses import JSONResponse -from kafka.admin import KafkaAdminClient, NewTopic - -@asynccontextmanager -async def lifespan(app: FastAPI): - - admin_client = KafkaAdminClient( - bootstrap_servers=KAFKA_BROKER, - client_id=KAFKA_ADMIN_CLIENT - ) - if not KAFKA_TOPIC in admin_client.list_topics(): - admin_client.create_topics( - new_topics=[ - NewTopic( - name=KAFKA_TOPIC, - num_partitions=1, - replication_factor=1 - ) - ], - validate_only=False - ) - # admin_client.delete_topics(topics=[KAFKA_TOPIC]) - yield # separation point +from kafka import KafkaProducer +from confluent_kafka import Consumer +import json + app = FastAPI( title="ChatBot in Python!", - version="1.0.0", - lifespan=lifespan + version="1.0.0" +) + +producer = KafkaProducer( + bootstrap_servers=KAFKA_BROKER, + value_serializer=lambda v: json.dumps(v).encode('utf-8') ) +messages = [] + +def kafka_consumer(): + consumer = Consumer({ + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'mygroup', + 'auto.offset.reset': 'earliest' + }) + consumer.subscribe(['test-topic']) + + while True: + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + print(f"Consumer error: {msg.error()}") + continue + # Armazena a mensagem em memória + messages.append(msg.value().decode('utf-8')) + +# Inicia o consumer em background +Thread(target=kafka_consumer, daemon=True).start() @app.post("/receive_message", tags=['Webhook']) async def receive_message( - request: Request, - message_service: MessageService = Depends() + request: Request ): try: - response = await request.json() - await message_service.process_message(response) - return JSONResponse(content=response, status_code=200) + payload = await request.json() + producer.send(KAFKA_TOPIC, payload) + producer.flush() + return JSONResponse(content={"message": "The message send to broker!"}, status_code=200) except Exception as e: return JSONResponse({"error": str(e)}, status_code=400) -@app.post('/produce/message/', tags=["Produce Message"]) -async def produce_message(messageRequest : ProduceMessage, background_tasks : BackgroundTasks): - background_tasks.add_task(produce_kafka_message, messageRequest) - return {"message": "Message Received, tahnk you for sending a message!"} \ No newline at end of file +@app.get("/results") +async def results(): + return {"messages": messages} \ No newline at end of file diff --git a/app/utils/producer.py b/app/utils/producer.py deleted file mode 100644 index ef2f0f8..0000000 --- a/app/utils/producer.py +++ /dev/null @@ -1,27 +0,0 @@ -from app.config.settings import KAFKA_BROKER -from app.models.producer import ProduceMessage -from kafka import KafkaProducer -from fastapi import HTTPException -import json - -# Constant Values Section -KAFKA_TOPIC = 'fastapi-topic' -PRODUCER_CLIENT_ID = 'fastapi_producer' - -def serializer(message: json) -> str: - return json.dumps(message).encode() - -producer = KafkaProducer( - api_version=(0, 8, 0), - bootstrap_servers=KAFKA_BROKER, - value_serializer=serializer, - client_id=PRODUCER_CLIENT_ID -) - -def produce_kafka_message(messageRequest: ProduceMessage): - try: - producer.send(KAFKA_TOPIC, json.dumps({'message': messageRequest.message})) - producer.flush() # Ensures all messages are sent - except Exception as error: - print(error) - raise HTTPException(status_code=500, detail="Failed to send message") \ No newline at end of file diff --git a/compare/compare_outputs.py b/compare/compare_outputs.py deleted file mode 100644 index 0fc39f5..0000000 --- a/compare/compare_outputs.py +++ /dev/null @@ -1,22 +0,0 @@ -from kafka import KafkaConsumer -import json - -def get_messages(topic: str) -> list: - consumer = KafkaConsumer( - topic, - bootstrap_servers='localhost:9092', - auto_offset_reset='earliest', - group_id=None - ) - return [json.loads(msg.value.decode()) for msg in consumer] - -m1 = get_messages("saida_main") -m2 = get_messages("saida_feature") - -if m1 != m2: - print("As branches produzem resultados diferentes:") - print("main:", m1) - print("feature:", m2) - exit(1) -else: - print("As branches são comportamentalmente idênticas!") \ No newline at end of file diff --git a/compare/kafka_comparator.py b/compare/kafka_comparator.py new file mode 100644 index 0000000..92979e1 --- /dev/null +++ b/compare/kafka_comparator.py @@ -0,0 +1,40 @@ +import os +import json +from kafka import KafkaProducer +import httpx +import time + +KAFKA_BROKER = os.environ['KAFKA_BROKER'] +API_MAIN_URL = os.environ['API_MAIN_URL'] +API_BRANCH_URL = os.environ['API_BRANCH_URL'] +TOPIC = 'test-topic' + +producer = KafkaProducer({'bootstrap.servers': KAFKA_BROKER}) + +def send_test_message(payload): + producer.send(TOPIC, value=json.dumps(payload).encode('utf-8')) + producer.flush() + +test_payloads = [ + {"input": "teste1"}, + {"input": "teste2"}, + {"input": "teste3"}, +] + +for payload in test_payloads: + send_test_message(payload) + +print("Esperando 5 segundos para ler as mensagens ") +time.sleep(5) + +with httpx.Client() as client: + main_resp = client.get(f"{API_MAIN_URL}/results").json() + branch_resp = client.get(f"{API_BRANCH_URL}/results").json() + +if main_resp != branch_resp: + print("Diferença encontrada!") + print("Main:", main_resp) + print("Branch:", branch_resp) + exit(1) +else: + print("Respostas iguais.") \ No newline at end of file diff --git a/consumer/consumer.py b/consumer/consumer.py new file mode 100644 index 0000000..dded36c --- /dev/null +++ b/consumer/consumer.py @@ -0,0 +1,27 @@ +from app.config.settings import KAFKA_BROKER +from app.services.message_service import MessageService +from kafka import KafkaConsumer +import json + + +# Constant Values Section +KAFKA_TOPIC = 'message-chatbot' +KAFKA_GROUP_ID = 'group_chatbot' + +class ConsumerKafka: + + def __init__(self): + self.service = MessageService() + self.consumer = KafkaConsumer( + KAFKA_TOPIC, + bootstrap_servers='localhost:9092', + auto_offset_reset='earliest', + group_id=KAFKA_GROUP_ID + ) + + def process_messages(self): + if self.consumer: + for msg in self.consumer: + data = json.loads(msg.value.decode('utf-8')) + self.service.process_message(data) + print("Mensagem Processada!") diff --git a/docker-compose.yml b/docker-compose.yml index 0374c6c..a9c48b0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,13 @@ services: - # chatbot: - # container_name: chatbot - # image: chatbot1.0 - # ports: - # - "${CHATBOT_PORT}:${CHATBOT_PORT}" - # networks: - # - chatbot_network + chatbot: + container_name: chatbot + build: + context: . + dockerfile: Dockerfile + ports: + - "${CHATBOT_PORT}:${CHATBOT_PORT}" + networks: + - chatbot_network waha_api: container_name: waha_api @@ -14,7 +16,7 @@ services: - "${WAHA_PORT}:${WAHA_PORT}" networks: - chatbot_network - + zookeeper: image: confluentinc/cp-zookeeper:latest container_name: zookeeper diff --git a/requirements.txt b/requirements.txt index 473fcc8..d619971 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ cffi==1.17.1 charset-normalizer==3.4.1 ChatterBot==1.2.0 click==8.1.8 +confluent-kafka==2.10.1 consonance==0.1.2 cryptography==44.0.0 decorator==5.1.1 From 261bf499cbe95b54344462ffa9a3dbbf25d90ab8 Mon Sep 17 00:00:00 2001 From: eulucaslim Date: Sat, 14 Jun 2025 10:55:03 -0400 Subject: [PATCH 5/8] Switch images --- .github/workflows/compare-branchs.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/compare-branchs.yml b/.github/workflows/compare-branchs.yml index 9babe6b..f223eaa 100644 --- a/.github/workflows/compare-branchs.yml +++ b/.github/workflows/compare-branchs.yml @@ -8,11 +8,11 @@ jobs: runs-on: ubuntu-latest services: zookeeper: - image: wurstmeister/zookeeper:3.4.6 + image: confluentinc/cp-zookeeper:latest ports: - 2181:2181 kafka: - image: wurstmeister/kafka:2.12-2.2.1 + image: confluentinc/cp-kafka:latest ports: - 9092:9092 env: From 72df152d7121e33ed551b8650e6fc077a6d4e551 Mon Sep 17 00:00:00 2001 From: eulucaslim Date: Sat, 14 Jun 2025 10:59:25 -0400 Subject: [PATCH 6/8] Switch path because not stay in the main --- .github/workflows/compare-branchs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/compare-branchs.yml b/.github/workflows/compare-branchs.yml index f223eaa..d359197 100644 --- a/.github/workflows/compare-branchs.yml +++ b/.github/workflows/compare-branchs.yml @@ -45,7 +45,7 @@ jobs: run: pip install confluent-kafka httpx - name: Run Kafka test comparator - run: python main_branch/compare/kafka_comparator.py + run: python pr_branch/compare/kafka_comparator.py env: KAFKA_BROKER: localhost:9092 API_MAIN_URL: http://localhost:8001 # ajuste a porta se necessário From 65f8377f5c60121aff2eebb0ecd9e30bc0aa1eea Mon Sep 17 00:00:00 2001 From: eulucaslim Date: Sat, 14 Jun 2025 11:04:39 -0400 Subject: [PATCH 7/8] Switch path because not stay in the main --- .github/workflows/compare-branchs.yml | 2 +- compare/kafka_comparator.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/compare-branchs.yml b/.github/workflows/compare-branchs.yml index d359197..3e5912e 100644 --- a/.github/workflows/compare-branchs.yml +++ b/.github/workflows/compare-branchs.yml @@ -42,7 +42,7 @@ jobs: docker run -d -p 8002:8000 --name api-main api-main - name: Install dependencies - run: pip install confluent-kafka httpx + run: pip install confluent-kafka httpx - name: Run Kafka test comparator run: python pr_branch/compare/kafka_comparator.py diff --git a/compare/kafka_comparator.py b/compare/kafka_comparator.py index 92979e1..7749d9c 100644 --- a/compare/kafka_comparator.py +++ b/compare/kafka_comparator.py @@ -1,6 +1,6 @@ import os import json -from kafka import KafkaProducer +from confluent_kafka import Producer import httpx import time @@ -9,7 +9,7 @@ API_BRANCH_URL = os.environ['API_BRANCH_URL'] TOPIC = 'test-topic' -producer = KafkaProducer({'bootstrap.servers': KAFKA_BROKER}) +producer = Producer({'bootstrap.servers': KAFKA_BROKER}) def send_test_message(payload): producer.send(TOPIC, value=json.dumps(payload).encode('utf-8')) From 4701a18b344dc3dabe4ecc79beefd5abdcfebc4d Mon Sep 17 00:00:00 2001 From: eulucaslim Date: Sat, 14 Jun 2025 11:10:14 -0400 Subject: [PATCH 8/8] Edit Produce --- compare/kafka_comparator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compare/kafka_comparator.py b/compare/kafka_comparator.py index 7749d9c..7bd762f 100644 --- a/compare/kafka_comparator.py +++ b/compare/kafka_comparator.py @@ -12,7 +12,7 @@ producer = Producer({'bootstrap.servers': KAFKA_BROKER}) def send_test_message(payload): - producer.send(TOPIC, value=json.dumps(payload).encode('utf-8')) + producer.produce(TOPIC, value=json.dumps(payload).encode('utf-8')) producer.flush() test_payloads = [