Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@

from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import List
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from typing import Dict, List

import os
from fastapi import FastAPI
from fastapi.responses import JSONResponse
Expand Down Expand Up @@ -31,6 +43,7 @@
)
from src.fraud import check_fraud_rules


app = FastAPI(
title="Veritix Microservice",
version="0.1.0",
Expand All @@ -40,6 +53,87 @@
logger = logging.getLogger("veritix")

# Global model pipeline; created at startup

model_pipeline: Pipeline | None = None

mock_user_events: Dict[str, list[str]] = {
"user1": ["concert_A", "concert_B"],
"user2": ["concert_B", "concert_C"],
"user3": ["concert_A", "concert_C", "concert_D"],
"user4": ["concert_D", "concert_E"],
}

class PredictRequest(BaseModel):
"""Request body for /predict-scalper endpoint.

Each record represents aggregated event signals for a buyer/session.
"""
features: List[float] = Field(
..., description="Feature vector: e.g., [tickets_per_txn, txns_per_min, avg_price_ratio, account_age_days, zip_mismatch, device_changes]"
)


class PredictResponse(BaseModel):
probability: float

class RecommendRequest(BaseModel):
user_id: str


class RecommendResponse(BaseModel):
recommendations: List[str]

def generate_synthetic_event_data(num_samples: int = 2000, random_seed: int = 42) -> tuple[np.ndarray, np.ndarray]:
"""Generate synthetic data for scalper detection.

Features (example semantics):
0: tickets_per_txn (0-12)
1: txns_per_min (0-10)
2: avg_price_ratio (0.5-2.0)
3: account_age_days (0-3650)
4: zip_mismatch (0 or 1)
5: device_changes (0-6)
"""
rng = np.random.default_rng(random_seed)

tickets_per_txn = rng.integers(1, 13, size=num_samples)
txns_per_min = rng.uniform(0, 10, size=num_samples)
avg_price_ratio = rng.uniform(0.5, 2.0, size=num_samples)
account_age_days = rng.integers(0, 3651, size=num_samples)
zip_mismatch = rng.integers(0, 2, size=num_samples)
device_changes = rng.integers(0, 7, size=num_samples)

X = np.column_stack([
tickets_per_txn,
txns_per_min,
avg_price_ratio,
account_age_days,
zip_mismatch,
device_changes,
])

# True underlying weights to simulate scalper probability
weights = np.array([0.35, 0.45, 0.25, -0.002, 0.4, 0.3])
bias = -2.0
logits = X @ weights + bias
probs = 1 / (1 + np.exp(-logits))
y = rng.binomial(1, probs)
return X.astype(float), y.astype(int)


def train_logistic_regression_pipeline() -> Pipeline:
"""Train a simple standardize+logistic-regression pipeline on synthetic data."""
X, y = generate_synthetic_event_data()
X_train, _, y_train, _ = train_test_split(X, y, test_size=0.2, random_state=123)
pipeline = Pipeline(
steps=[
("scaler", StandardScaler()),
("clf", LogisticRegression(max_iter=1000, solver="lbfgs")),
]
)
pipeline.fit(X_train, y_train)
return pipeline

model_pipeline: Any | None = None
etl_scheduler: "BackgroundScheduler | None" = None

Expand Down Expand Up @@ -88,6 +182,39 @@ def health_check():

@app.post("/predict-scalper", response_model=PredictResponse)
def predict_scalper(payload: PredictRequest):
if model_pipeline is None:
return JSONResponse(status_code=503, content={"detail": "Model not ready"})
features = np.array(payload.features, dtype=float).reshape(1, -1)
proba = float(model_pipeline.predict_proba(features)[0, 1])
return PredictResponse(probability=proba)

# If you run this file directly (e.g., in a local development environment outside Docker):
# if __name__ == "__main__":
# import uvicorn
# # Note: host="0.0.0.0" is crucial for Docker development
# uvicorn.run(app, host="0.0.0.0", port=8000)

@app.post("/recommend-events", response_model=RecommendResponse)
def recommend_events(payload: RecommendRequest):
user_id = payload.user_id
if user_id not in mock_user_events:
raise HTTPException(
status_code=404,
detail={"message": "User not found"}
)
user_events = set(mock_user_events[user_id])
scores = {}
for other_user, events in mock_user_events.items():
if other_user == user_id:
continue
overlap = len(user_events.intersection(events))
for e in events:
if e not in user_events:
scores[e] = scores.get(e, 0) + overlap

recommended = sorted(scores, key=scores.get, reverse=True)[:3]
return RecommendResponse(recommendations=recommended)

global model_pipeline
try:
if model_pipeline is None:
Expand Down Expand Up @@ -163,3 +290,4 @@ def on_shutdown() -> None:
etl_scheduler.shutdown(wait=False)
except Exception:
pass

18 changes: 18 additions & 0 deletions tests/test_recommend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from fastapi.testclient import TestClient
from src.main import app

client = TestClient(app)

def test_recommend_events_valid_user():
response = client.post("/recommend-events", json={"user_id": "user1"})
assert response.status_code == 200
data = response.json()
assert "recommendations" in data
assert isinstance(data["recommendations"], list)
assert len(data["recommendations"]) <= 3

def test_recommend_events_user_not_found():
response = client.post("/recommend-events", json={"user_id": "unknown"})
assert response.status_code == 404
data = response.json()
assert data["detail"] == "User not found"
Loading