Skip to content

j112929/online_data_infra

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

4 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Unified Real-Time Feature Platform

A production-ready reference architecture for real-time feature engineering, combining streaming processing with low-latency serving.

πŸ— Architecture

Data Flow: Kafka (Ingestion) β†’ Flink (Stateful Processing) β†’ Redis (Feature Store) β†’ FastAPI (Serving)

Observability: Jaeger (Distributed Tracing) + Prometheus (Metrics) + Grafana (Dashboards)

Key Features

Feature Implementation Detail
Exactly-once Flink Checkpointing + Transactional Connectors + Idempotent Redis Writes.
Backfill Unified Source API (supports rewinding offsets or swapping to S3/Iceberg sources).
Schema Evolution Confluent Schema Registry (Avro) ensures data compatibility.
Latency Monitor End-to-End Tracing (OpenTelemetry) + Prometheus Metrics + Grafana Dashboards.
Cost Model Serving layer tracks compute/network latency per feature with Prometheus histograms.
Batch Inference High-performance batch API using Redis pipelines.
SDK Compiler Auto-generate Flink SQL jobs from Python feature definitions.
πŸ€– LLM Copilot Natural language feature generation, recommendations, and interactive assistant.

πŸš€ Quick Start

Prerequisites

  • Docker & Docker Compose
  • Java 11 (Required for PyFlink Local Execution)
  • Python 3.8+

1. Start Infrastructure

Launch Kafka, Zookeeper, Redis, Schema Registry, and Monitoring stack.

cd deployment
docker-compose up -d

2. Setup Processing Engine (PyFlink)

PyFlink requires specific JARs to interact with Kafka and Schema Registry.

cd processing
# 1. Install Python Deps
pip install -r requirements.txt

# 2. Download Java Dependencies
bash download_libs.sh

3. Run the Pipeline

Terminal 1: Real-Time Feature Engine (Flink)

Note: Set JAVA_HOME to your Java 11 installation.

# macOS Example
export JAVA_HOME="/opt/homebrew/opt/openjdk@11/libexec/openjdk.jdk/Contents/Home"
export PATH="$JAVA_HOME/bin:$PATH"

python processing/feature_job.py

Expected Output: "Submitting Job..." and stays running.

Terminal 2: Data Generator (Producer)

Simulate user behavior (Clicks/Views).

python ingestion/producer.py

Terminal 3: Feature Serving API

Start the HTTP Service.

python -m uvicorn serving.app:app --host 0.0.0.0 --port 8000

4. Verify & Explore

Single User Feature Fetch

curl http://localhost:8000/features/1
# {"user_id":"1","click_count":5,"source":"feature_store"}

Batch Feature Fetch (NEW!)

Optimized for ranking models - fetch features for many users in one request.

curl -X POST http://localhost:8000/features/batch \
  -H "Content-Type: application/json" \
  -d '{"user_ids": ["1", "2", "3", "4", "5"]}'
# {"results":[...], "latency_ms": 1.23, "batch_size": 5}

Prometheus Metrics

curl http://localhost:8000/metrics

πŸ“Š Grafana Dashboards

A pre-built dashboard is available at monitoring/grafana-dashboard.json.

Import Dashboard

  1. Open Grafana: http://localhost:3000 (admin/admin)
  2. Go to Dashboards β†’ Import
  3. Upload monitoring/grafana-dashboard.json
  4. Select Prometheus as the data source

Dashboard Panels

  • Feature Latency (p50/p95/p99) - Time series of serving latency
  • Request Rate by Endpoint - Single vs Batch requests
  • Feature Store Hit Rate - Percentage of requests served from Redis vs defaults
  • Batch Request Size Distribution - Histogram of batch sizes

πŸ›  SDK Compiler

Define features in Python, auto-generate Flink SQL jobs.

List Available Features

python -m sdk.compiler --list
# Available FeatureViews:
#   - user_click_stats
#       β€’ click_count_1m: Total clicks in the last minute
#       β€’ view_count_1m: Total views in the last minute
#   - user_purchase_stats
#       β€’ purchase_count_5m: Total purchases in the last 5 minutes

Generate Flink Job

python -m sdk.compiler -f user_click_stats -o processing/generated_user_click_stats.py

Define New Features

Edit sdk/feature_definition.py:

from sdk import FeatureView, Feature, Source, Window, AggregationType, WindowType

my_feature = FeatureView(
    name="my_feature_view",
    source=Source(topic="my_topic"),
    features=[
        Feature(
            name="my_count",
            description="Count of events",
            aggregation=AggregationType.COUNT,
            source_field="*"
        )
    ],
    window=Window(type=WindowType.TUMBLE, size_minutes=5),
    entity_key="user_id"
)

πŸ€– LLM Copilot (AI-Powered Feature Engineering)

The platform includes an LLM-powered assistant for feature engineering.

Setup

# Install LLM dependencies
pip install openai anthropic

# Set API key
export OPENAI_API_KEY="sk-..."
# OR
export ANTHROPIC_API_KEY="..."

CLI: Interactive Copilot

Chat with the SDK Copilot for help with feature definitions:

python sdk/cli.py chat
# πŸ€– Feature Platform SDK Copilot
# Ask me anything about feature engineering!
# 
# πŸ§‘ You: How do I create a feature that counts purchases per hour?
# πŸ€– Copilot: Here's how you can create a purchase count feature...

CLI: Generate Features from Natural Language

python sdk/cli.py generate
# πŸ“ Describe your features:
# > I want to track clicks and views per user in 5-minute windows
# 
# πŸ“¦ Generated Code:
# from sdk import FeatureView, Feature, ...

CLI: Get Feature Recommendations

python sdk/cli.py recommend
# 🎯 Use Case: CTR prediction for e-commerce
# 
# πŸ’‘ Feature Recommendations:
# 1. click_through_rate_1h - Recent engagement signal
# 2. purchase_frequency_7d - Long-term conversion indicator
# ...

REST API Endpoints

The LLM features are also available via REST API:

Endpoint Method Description
/api/llm/generate POST Generate feature code from description
/api/llm/recommend POST Get AI-powered feature recommendations
/api/llm/chat POST Chat with SDK Copilot
/api/llm/health GET Check LLM API availability

Example API call:

curl -X POST http://localhost:8000/api/llm/generate \
  -H "Content-Type: application/json" \
  -d '{
    "description": "Count user clicks and purchases in the last 10 minutes",
    "source_schema": {
      "user_id": "STRING",
      "action_type": "STRING",
      "timestamp": "BIGINT"
    }
  }'

πŸ“‚ Project Structure

online_data_infra/
β”œβ”€β”€ ingestion/              # Kafka Producers (Avro + Schema Registry)
β”œβ”€β”€ processing/             # PyFlink Jobs (SQL + DataStream Hybrid API)
β”‚   β”œβ”€β”€ feature_job.py      # Main Pipeline: Aggregation -> Redis Sink
β”‚   β”œβ”€β”€ generated_*.py      # Auto-generated jobs from SDK
β”‚   └── lib/                # Dependency JARs
β”œβ”€β”€ serving/                # FastAPI + OpenTelemetry + Prometheus Metrics
β”‚   β”œβ”€β”€ app.py              # Single + Batch endpoints
β”‚   └── llm_api.py          # LLM-powered feature engineering API
β”œβ”€β”€ sdk/                    # Feature Definition SDK
β”‚   β”œβ”€β”€ feature_definition.py  # Define features here
β”‚   β”œβ”€β”€ compiler.py         # Transpiles Python β†’ Flink SQL
β”‚   β”œβ”€β”€ llm_integration.py  # LLM client and agents
β”‚   └── cli.py              # Interactive CLI tool
β”œβ”€β”€ monitoring/             # Grafana dashboards, Prometheus configs
β”‚   └── grafana-dashboard.json
└── deployment/             # Docker Compose Environment

πŸ”— Access Points

Service URL
Feature API http://localhost:8000
API Docs (Swagger) http://localhost:8000/docs
LLM API Docs http://localhost:8000/api/docs
Prometheus Metrics http://localhost:8000/metrics
Grafana http://localhost:3000
Jaeger (Traces) http://localhost:16686
Schema Registry http://localhost:8082
Flink Dashboard http://localhost:8081

πŸ›  Tech Stack

  • Streaming: Apache Flink 1.18 (PyFlink)
  • Messaging: Kafka + Confluent Schema Registry
  • Storage: Redis (Online Store)
  • Serving: FastAPI + Prometheus + OpenTelemetry
  • Observability: Jaeger, Prometheus, Grafana
  • AI/LLM: OpenAI GPT-4o / Anthropic Claude 3

πŸ“ Next Steps

See NEXT_ITERATION_PLAN.md for the roadmap towards v2.

About

A production-ready reference architecture for real-time feature engineering, combining streaming processing with low-latency serving.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors