Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY producer.py .

EXPOSE 5000

CMD ["uvicorn", "producer:app", "--host", "0.0.0.0", "--port", "5000"]
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# event-intelligence
Cloud-native real-time streaming core built on Apache Kafka and Apache Flink. Designed for scalable event processing with native support for AI models and agents.
------------------------

How to run the project?
165 changes: 165 additions & 0 deletions consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
from kafka import KafkaConsumer
from langfuse import Langfuse
import requests
import psycopg2
import json
import sys
import os
import time

# Get topic name from command line argument, default to 'output-topic'
TOPIC_NAME = sys.argv[1] if len(sys.argv) > 1 else 'test-topic'

# PostgreSQL connection
DB_CONFIG = {
'host': 'localhost',
'port': 5433,
'database': 'kafka_events',
'user': 'admin',
'password': 'admin123'
}

# Ollama configuration
OLLAMA_URL = "http://localhost:11434/api/generate"
OLLAMA_MODEL = "gemma:2b"

# Langfuse configuration
langfuse = Langfuse(
public_key=os.getenv('LANGFUSE_PUBLIC_KEY'),
secret_key=os.getenv('LANGFUSE_SECRET_KEY'),
host="http://localhost:3000"
)

def get_db_connection():
return psycopg2.connect(**DB_CONFIG)

def analyze_with_llm(message_text, trace_name):
"""Send message to Ollama for analysis with Langfuse tracing"""
start_time = time.time()

prompt = f"""You are a message analyzer. Analyze the given message and provide:
1) Category (e.g., greeting, question, command, information)
2) Sentiment (positive/negative/neutral)
3) Brief summary

Keep response concise in JSON format.

Analyze this message: {message_text}"""

# Create Langfuse trace (v2 API)
trace = langfuse.trace(
name=trace_name,
metadata={"source": "kafka-consumer", "topic": TOPIC_NAME}
)

try:
response = requests.post(
OLLAMA_URL,
json={
"model": OLLAMA_MODEL,
"prompt": prompt,
"stream": False
},
timeout=60
)
response.raise_for_status()
result = response.json().get("response", "No response")

latency_ms = int((time.time() - start_time) * 1000)

# Log generation to Langfuse (v2 API - create from trace)
generation = trace.generation(
name="ollama-analysis",
model=OLLAMA_MODEL,
input=prompt,
output=result,
metadata={"latency_ms": latency_ms}
)
generation.end()

return result

except Exception as e:
error_msg = f"LLM Error: {str(e)}"
generation = trace.generation(
name="ollama-analysis",
model=OLLAMA_MODEL,
input=prompt,
output=error_msg,
metadata={"error": str(e)}
)
generation.end()
return error_msg

def insert_message(conn, message_data, ai_analysis):
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO kafka_messages (original_message, transformed_message, processed_at, ai_analysis)
VALUES (%s, %s, %s, %s)
""",
(
message_data.get('original_message'),
message_data.get('transformed_message'),
message_data.get('processed_at'),
ai_analysis
)
)
conn.commit()
cursor.close()

# Create Kafka consumer
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id=f'consumer-{TOPIC_NAME}',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

print(f"Starting consumer, listening to topic: {TOPIC_NAME}")
print(f"Using Ollama ({OLLAMA_MODEL}) for AI analysis")
print("Langfuse tracing: http://localhost:3000")
print("Waiting for messages... (Press Ctrl+C to stop)")

try:
conn = get_db_connection()
print("Connected to PostgreSQL")

for message in consumer:
print(f"Received: {message.value}")
print(f" Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")

# Generate trace name
trace_name = f"kafka-{message.topic}-{message.partition}-{message.offset}"

# Get AI analysis
original_msg = message.value.get('original_message', '')
print(f" -> Analyzing with Ollama (trace: {trace_name})...")
ai_analysis = analyze_with_llm(original_msg, trace_name)
print(f" -> AI Analysis: {ai_analysis}")

# Write to PostgreSQL
try:
insert_message(conn, message.value, ai_analysis)
print(" -> Saved to PostgreSQL")
except Exception as db_error:
print(f" -> DB Error: {db_error}")
conn = get_db_connection()

# Flush Langfuse
langfuse.flush()

print("-" * 50)

except KeyboardInterrupt:
print("\nConsumer stopped by user")
except Exception as e:
print(f"Error: {e}")
finally:
consumer.close()
langfuse.shutdown()
if 'conn' in locals():
conn.close()
print("Consumer and database connection closed")
135 changes: 135 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
version: '3.8'

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

producer-api:
build:
context: .
dockerfile: Dockerfile
container_name: producer-api
depends_on:
- kafka
ports:
- "5000:5000"
restart: on-failure

jobmanager:
image: flink:1.18-java11
container_name: jobmanager
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
volumes:
- ./flink-job:/opt/flink/jobs

taskmanager:
image: flink:1.18-java11
container_name: taskmanager
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
volumes:
- ./flink-job:/opt/flink/jobs

flink-job:
build:
context: ./flink-job
dockerfile: Dockerfile
container_name: flink-job
depends_on:
- kafka
- jobmanager
- taskmanager
restart: on-failure

kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

postgres:
image: postgres:15
container_name: postgres
ports:
- "5433:5432"
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: admin123
POSTGRES_DB: kafka_events
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql

langfuse-db:
image: postgres:15
container_name: langfuse-db
environment:
POSTGRES_USER: langfuse
POSTGRES_PASSWORD: langfuse123
POSTGRES_DB: langfuse
volumes:
- langfuse_postgres:/var/lib/postgresql/data

langfuse:
image: langfuse/langfuse:2
container_name: langfuse
depends_on:
- langfuse-db
ports:
- "3000:3000"
environment:
DATABASE_URL: postgresql://langfuse:langfuse123@langfuse-db:5432/langfuse
NEXTAUTH_SECRET: mysecretkey123456789012345678901234
NEXTAUTH_URL: http://localhost:3000
SALT: mysaltvalue1234567890123456789012

ollama:
image: ollama/ollama:latest
container_name: ollama
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama

volumes:
langfuse_postgres:
ollama_data:
16 changes: 16 additions & 0 deletions flink-job/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM flink:1.18-java11

# Install Python and PyFlink
RUN apt-get update && \
apt-get install -y python3 python3-pip && \
pip3 install apache-flink==1.18.1 && \
apt-get clean

# Download Kafka connector
RUN wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.18/flink-sql-connector-kafka-3.1.0-1.18.jar

WORKDIR /opt/flink/jobs

COPY flink_job.py .

CMD ["python3", "flink_job.py"]
Loading