From 66540ad03a96c07f198c508e76c55c6506ba20f1 Mon Sep 17 00:00:00 2001 From: Bkhalaf Date: Tue, 3 Feb 2026 15:41:49 +0100 Subject: [PATCH 1/2] Pushed first project version --- Dockerfile | 12 +++ README.md | 3 + consumer.py | 165 +++++++++++++++++++++++++++++++++++++++++ docker_compose.yaml | 126 +++++++++++++++++++++++++++++++ flink-job/Dockerfile | 16 ++++ flink-job/flink_job.py | 88 ++++++++++++++++++++++ init.sql | 8 ++ producer.py | 51 +++++++++++++ requirements.txt | 6 ++ 9 files changed, 475 insertions(+) create mode 100644 Dockerfile create mode 100644 consumer.py create mode 100644 docker_compose.yaml create mode 100644 flink-job/Dockerfile create mode 100644 flink-job/flink_job.py create mode 100644 init.sql create mode 100644 producer.py create mode 100644 requirements.txt diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..3545ea4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY producer.py . + +EXPOSE 5000 + +CMD ["uvicorn", "producer:app", "--host", "0.0.0.0", "--port", "5000"] diff --git a/README.md b/README.md index e5afc95..1f4a87f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,5 @@ # event-intelligence Cloud-native real-time streaming core built on Apache Kafka and Apache Flink. Designed for scalable event processing with native support for AI models and agents. +------------------------ + +How to run the project? \ No newline at end of file diff --git a/consumer.py b/consumer.py new file mode 100644 index 0000000..dbb5a6c --- /dev/null +++ b/consumer.py @@ -0,0 +1,165 @@ +from kafka import KafkaConsumer +from langfuse import Langfuse +import requests +import psycopg2 +import json +import sys +import os +import time + +# Get topic name from command line argument, default to 'output-topic' +TOPIC_NAME = sys.argv[1] if len(sys.argv) > 1 else 'output-topic' + +# PostgreSQL connection +DB_CONFIG = { + 'host': 'localhost', + 'port': 5433, + 'database': 'kafka_events', + 'user': 'admin', + 'password': 'admin123' +} + +# Ollama configuration +OLLAMA_URL = "http://localhost:11434/api/generate" +OLLAMA_MODEL = "gemma3:1b" + +# Langfuse configuration +langfuse = Langfuse( + public_key=os.getenv('LANGFUSE_PUBLIC_KEY'), + secret_key=os.getenv('LANGFUSE_SECRET_KEY'), + host="http://localhost:3000" +) + +def get_db_connection(): + return psycopg2.connect(**DB_CONFIG) + +def analyze_with_llm(message_text, trace_name): + """Send message to Ollama for analysis with Langfuse tracing""" + start_time = time.time() + + prompt = f"""You are a message analyzer. Analyze the given message and provide: +1) Category (e.g., greeting, question, command, information) +2) Sentiment (positive/negative/neutral) +3) Brief summary + +Keep response concise in JSON format. + +Analyze this message: {message_text}""" + + # Create Langfuse trace (v2 API) + trace = langfuse.trace( + name=trace_name, + metadata={"source": "kafka-consumer", "topic": TOPIC_NAME} + ) + + try: + response = requests.post( + OLLAMA_URL, + json={ + "model": OLLAMA_MODEL, + "prompt": prompt, + "stream": False + }, + timeout=60 + ) + response.raise_for_status() + result = response.json().get("response", "No response") + + latency_ms = int((time.time() - start_time) * 1000) + + # Log generation to Langfuse (v2 API - create from trace) + generation = trace.generation( + name="ollama-analysis", + model=OLLAMA_MODEL, + input=prompt, + output=result, + metadata={"latency_ms": latency_ms} + ) + generation.end() + + return result + + except Exception as e: + error_msg = f"LLM Error: {str(e)}" + generation = trace.generation( + name="ollama-analysis", + model=OLLAMA_MODEL, + input=prompt, + output=error_msg, + metadata={"error": str(e)} + ) + generation.end() + return error_msg + +def insert_message(conn, message_data, ai_analysis): + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO kafka_messages (original_message, transformed_message, processed_at, ai_analysis) + VALUES (%s, %s, %s, %s) + """, + ( + message_data.get('original_message'), + message_data.get('transformed_message'), + message_data.get('processed_at'), + ai_analysis + ) + ) + conn.commit() + cursor.close() + +# Create Kafka consumer +consumer = KafkaConsumer( + TOPIC_NAME, + bootstrap_servers=['localhost:9092'], + auto_offset_reset='earliest', + enable_auto_commit=True, + group_id=f'consumer-{TOPIC_NAME}', + value_deserializer=lambda x: json.loads(x.decode('utf-8')) +) + +print(f"Starting consumer, listening to topic: {TOPIC_NAME}") +print(f"Using Ollama ({OLLAMA_MODEL}) for AI analysis") +print("Langfuse tracing: http://localhost:3000") +print("Waiting for messages... (Press Ctrl+C to stop)") + +try: + conn = get_db_connection() + print("Connected to PostgreSQL") + + for message in consumer: + print(f"Received: {message.value}") + print(f" Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}") + + # Generate trace name + trace_name = f"kafka-{message.topic}-{message.partition}-{message.offset}" + + # Get AI analysis + original_msg = message.value.get('original_message', '') + print(f" -> Analyzing with Ollama (trace: {trace_name})...") + ai_analysis = analyze_with_llm(original_msg, trace_name) + print(f" -> AI Analysis: {ai_analysis}") + + # Write to PostgreSQL + try: + insert_message(conn, message.value, ai_analysis) + print(" -> Saved to PostgreSQL") + except Exception as db_error: + print(f" -> DB Error: {db_error}") + conn = get_db_connection() + + # Flush Langfuse + langfuse.flush() + + print("-" * 50) + +except KeyboardInterrupt: + print("\nConsumer stopped by user") +except Exception as e: + print(f"Error: {e}") +finally: + consumer.close() + langfuse.shutdown() + if 'conn' in locals(): + conn.close() + print("Consumer and database connection closed") diff --git a/docker_compose.yaml b/docker_compose.yaml new file mode 100644 index 0000000..159281e --- /dev/null +++ b/docker_compose.yaml @@ -0,0 +1,126 @@ +version: '3.8' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.5.0 + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "2181:2181" + + kafka: + image: confluentinc/cp-kafka:7.5.0 + container_name: kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + producer-api: + build: + context: . + dockerfile: Dockerfile + container_name: producer-api + depends_on: + - kafka + ports: + - "5000:5000" + restart: on-failure + + jobmanager: + image: flink:1.18-java11 + container_name: jobmanager + ports: + - "8081:8081" + command: jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + volumes: + - ./flink-job:/opt/flink/jobs + + taskmanager: + image: flink:1.18-java11 + container_name: taskmanager + depends_on: + - jobmanager + command: taskmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + taskmanager.numberOfTaskSlots: 2 + volumes: + - ./flink-job:/opt/flink/jobs + + flink-job: + build: + context: ./flink-job + dockerfile: Dockerfile + container_name: flink-job + depends_on: + - kafka + - jobmanager + - taskmanager + restart: on-failure + + kafka-ui: + image: provectuslabs/kafka-ui:latest + container_name: kafka-ui + depends_on: + - kafka + ports: + - "8080:8080" + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 + + postgres: + image: postgres:15 + container_name: postgres + ports: + - "5433:5432" + environment: + POSTGRES_USER: admin + POSTGRES_PASSWORD: admin123 + POSTGRES_DB: kafka_events + volumes: + - ./init.sql:/docker-entrypoint-initdb.d/init.sql + + langfuse-db: + image: postgres:15 + container_name: langfuse-db + environment: + POSTGRES_USER: langfuse + POSTGRES_PASSWORD: langfuse123 + POSTGRES_DB: langfuse + volumes: + - langfuse_postgres:/var/lib/postgresql/data + + langfuse: + image: langfuse/langfuse:2 + container_name: langfuse + depends_on: + - langfuse-db + ports: + - "3000:3000" + environment: + DATABASE_URL: postgresql://langfuse:langfuse123@langfuse-db:5432/langfuse + NEXTAUTH_SECRET: mysecretkey123456789012345678901234 + NEXTAUTH_URL: http://localhost:3000 + SALT: mysaltvalue1234567890123456789012 + +volumes: + langfuse_postgres: diff --git a/flink-job/Dockerfile b/flink-job/Dockerfile new file mode 100644 index 0000000..44d741e --- /dev/null +++ b/flink-job/Dockerfile @@ -0,0 +1,16 @@ +FROM flink:1.18-java11 + +# Install Python and PyFlink +RUN apt-get update && \ + apt-get install -y python3 python3-pip && \ + pip3 install apache-flink==1.18.1 && \ + apt-get clean + +# Download Kafka connector +RUN wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.18/flink-sql-connector-kafka-3.1.0-1.18.jar + +WORKDIR /opt/flink/jobs + +COPY flink_job.py . + +CMD ["python3", "flink_job.py"] diff --git a/flink-job/flink_job.py b/flink-job/flink_job.py new file mode 100644 index 0000000..a46974b --- /dev/null +++ b/flink-job/flink_job.py @@ -0,0 +1,88 @@ +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment, EnvironmentSettings +import time +import sys + +def create_kafka_source_ddl(): + return """ + CREATE TABLE kafka_source ( + `data` ROW<`message` STRING>, + `timestamp` DOUBLE + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'test-topic', + 'properties.bootstrap.servers' = 'kafka:29092', + 'properties.group.id' = 'flink-consumer-group', + 'scan.startup.mode' = 'earliest-offset', + 'format' = 'json', + 'json.ignore-parse-errors' = 'true' + ) + """ + +def create_kafka_sink_ddl(): + return """ + CREATE TABLE kafka_sink ( + `original_message` STRING, + `transformed_message` STRING, + `processed_at` TIMESTAMP(3) + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'output-topic', + 'properties.bootstrap.servers' = 'kafka:29092', + 'format' = 'json' + ) + """ + +def create_transformation_query(): + return """ + INSERT INTO kafka_sink + SELECT + `data`.`message` as original_message, + UPPER(`data`.`message`) as transformed_message, + CURRENT_TIMESTAMP as processed_at + FROM kafka_source + """ + +def main(): + print("Waiting for Kafka to be ready...") + time.sleep(30) # Wait for Kafka to be fully ready + + print("Initializing Flink environment...") + sys.stdout.flush() + + try: + env = StreamExecutionEnvironment.get_execution_environment() + env.set_parallelism(1) + + settings = EnvironmentSettings.new_instance().in_streaming_mode().build() + t_env = StreamTableEnvironment.create(env, environment_settings=settings) + + # Add Kafka connector JAR + kafka_jar = "file:///opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar" + t_env.get_config().set("pipeline.jars", kafka_jar) + + print("Creating source table (test-topic)...") + sys.stdout.flush() + t_env.execute_sql(create_kafka_source_ddl()) + + print("Creating sink table (output-topic)...") + sys.stdout.flush() + t_env.execute_sql(create_kafka_sink_ddl()) + + print("Starting Flink job: Reading from 'test-topic', transforming to UPPERCASE, writing to 'output-topic'") + print("Job is running... waiting for messages") + sys.stdout.flush() + + # Execute and wait for the job to run + table_result = t_env.execute_sql(create_transformation_query()) + table_result.wait() # This blocks and keeps the job running + + except Exception as e: + print(f"ERROR: {e}") + sys.stdout.flush() + import traceback + traceback.print_exc() + raise + +if __name__ == "__main__": + main() diff --git a/init.sql b/init.sql new file mode 100644 index 0000000..76613a1 --- /dev/null +++ b/init.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS kafka_messages ( + id SERIAL PRIMARY KEY, + original_message TEXT, + transformed_message TEXT, + processed_at TIMESTAMP, + ai_analysis TEXT, + inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); diff --git a/producer.py b/producer.py new file mode 100644 index 0000000..dede531 --- /dev/null +++ b/producer.py @@ -0,0 +1,51 @@ +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from kafka import KafkaProducer +import json +import time + +app = FastAPI(title="Kafka Producer API", description="Send messages to Kafka") + +TOPIC_NAME = 'test-topic' +KAFKA_BROKER = 'kafka:29092' + +producer = None + +class Message(BaseModel): + message: str + +def get_producer(): + global producer + if producer is None: + producer = KafkaProducer( + bootstrap_servers=[KAFKA_BROKER], + value_serializer=lambda v: json.dumps(v).encode('utf-8') + ) + return producer + +@app.post("/send") +def send_message(data: Message): + try: + message = { + 'data': data.model_dump(), + 'timestamp': time.time() + } + + kafka_producer = get_producer() + future = kafka_producer.send(TOPIC_NAME, value=message) + result = future.get(timeout=10) + + return { + 'status': 'success', + 'message': 'Message sent to Kafka', + 'topic': TOPIC_NAME, + 'partition': result.partition, + 'offset': result.offset + } + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/health") +def health(): + return {'status': 'healthy'} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e517c79 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +kafka-python +fastapi +uvicorn +psycopg2-binary +openai +langfuse>=2.50,<3.0 From ffb3f5e37ce490297440f171a6bbd825fec43f07 Mon Sep 17 00:00:00 2001 From: Haneen Date: Thu, 19 Feb 2026 12:56:10 +0200 Subject: [PATCH 2/2] Update docker-compose with Ollama and fix indentation --- consumer.py | 4 ++-- docker_compose.yaml => docker-compose.yaml | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) rename docker_compose.yaml => docker-compose.yaml (94%) diff --git a/consumer.py b/consumer.py index dbb5a6c..3925657 100644 --- a/consumer.py +++ b/consumer.py @@ -8,7 +8,7 @@ import time # Get topic name from command line argument, default to 'output-topic' -TOPIC_NAME = sys.argv[1] if len(sys.argv) > 1 else 'output-topic' +TOPIC_NAME = sys.argv[1] if len(sys.argv) > 1 else 'test-topic' # PostgreSQL connection DB_CONFIG = { @@ -21,7 +21,7 @@ # Ollama configuration OLLAMA_URL = "http://localhost:11434/api/generate" -OLLAMA_MODEL = "gemma3:1b" +OLLAMA_MODEL = "gemma:2b" # Langfuse configuration langfuse = Langfuse( diff --git a/docker_compose.yaml b/docker-compose.yaml similarity index 94% rename from docker_compose.yaml rename to docker-compose.yaml index 159281e..a303c0b 100644 --- a/docker_compose.yaml +++ b/docker-compose.yaml @@ -122,5 +122,14 @@ services: NEXTAUTH_URL: http://localhost:3000 SALT: mysaltvalue1234567890123456789012 + ollama: + image: ollama/ollama:latest + container_name: ollama + ports: + - "11434:11434" + volumes: + - ollama_data:/root/.ollama + volumes: langfuse_postgres: + ollama_data: \ No newline at end of file