Skip to content

Latest commit

 

History

History
980 lines (825 loc) · 44.1 KB

File metadata and controls

980 lines (825 loc) · 44.1 KB

OpenSearch Schema Design for Visual Search

Overview

Multi-index architecture with FAISS-based clustering for Google Photos-like capabilities:

  1. Indexes - Category-specific storage (global, vehicles, people, faces)
  2. Clustering - FAISS IVF for scalable grouping with incremental updates
  3. Search - k-NN within clusters for speed, or global for accuracy

Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                         Visual Search Architecture                           │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │                        INGESTION PIPELINE                            │    │
│  │                                                                      │    │
│  │  Image → YOLO → MobileCLIP → OpenSearch (with cluster assignment)   │    │
│  │                      ↓                                               │    │
│  │              FAISS IVF Index                                        │    │
│  │           (find nearest centroid)                                   │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │
│  │   GLOBAL    │  │  VEHICLES   │  │   PEOPLE    │  │   FACES     │        │
│  │   INDEX     │  │   INDEX     │  │   INDEX     │  │   INDEX     │        │
│  │             │  │             │  │             │  │             │        │
│  │ + cluster_id│  │ + cluster_id│  │ + cluster_id│  │ + cluster_id│        │
│  │ + centroid  │  │ + centroid  │  │ + centroid  │  │ + person_id │        │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘        │
│         │                │                │                │                │
│         └────────────────┴────────────────┴────────────────┘                │
│                                    │                                        │
│                         ┌──────────┴──────────┐                             │
│                         │   FAISS IVF Index   │                             │
│                         │   (per category)    │                             │
│                         │                     │                             │
│                         │  • Centroids (GPU)  │                             │
│                         │  • Fast assignment  │                             │
│                         │  • Incremental      │                             │
│                         └─────────────────────┘                             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Clustering Architecture (Industry Standard)

Why FAISS IVF? (Deep Dive)

The Problem with Traditional Clustering

When building a Google Photos-like system with millions of images, traditional clustering algorithms face critical limitations:

K-Means (sklearn)

  • Must re-run on ALL data when adding new images
  • O(n × k × d × iterations) complexity - becomes prohibitive at scale
  • 1M images × 512 dims × 100 clusters × 10 iterations = hours of computation
  • No incremental updates possible

DBSCAN / HDBSCAN

  • Density-based clustering - great for finding arbitrary shapes
  • O(n²) memory and time complexity in worst case
  • Cannot incrementally add points without full re-clustering
  • Designed for static datasets, not streaming data

Hierarchical Clustering

  • Beautiful dendrograms, but O(n²) or O(n³) complexity
  • Not designed for high-dimensional vectors (512 dims)
  • Memory explodes with large datasets

Why FAISS IVF is the Industry Standard

Google, Meta (Facebook), Spotify, Pinterest, and other companies handling billions of embeddings use Inverted File (IVF) indexes because:

Approach Full Re-cluster Incremental Scale Memory Used By
K-Means Every time No <100K O(n×d) Academic
DBSCAN Every time No <100K O(n²) Academic
FAISS IVF Initial only Yes Billions O(k×d) Google, Meta
FAISS IVF-PQ Initial only Yes Billions+ O(k×m) Google, Meta

Key Insight: FAISS IVF separates training (expensive, done once) from assignment (cheap, done every insert).

The Algorithm Explained

FAISS IVF (Inverted File with Flat Quantizer) works in two phases:

PHASE 1: TRAINING (One-time, O(n×k×d×iterations))
═══════════════════════════════════════════════════

Input: Sample of embeddings (e.g., 100K vectors)
Output: k centroids (e.g., 1024 cluster centers)

Algorithm:
1. Run k-means on training sample
2. Compute k centroids in d-dimensional space
3. Store centroids in a "quantizer" (flat index)
4. Save to disk for persistence

Time: ~15 seconds for 1M vectors, 1024 clusters on GPU


PHASE 2: ASSIGNMENT (Every insert, O(k×d))
═══════════════════════════════════════════════════

Input: New embedding vector (512 dims)
Output: cluster_id, distance_to_centroid

Algorithm:
1. Compare new vector to ALL k centroids (1024 comparisons)
2. Find nearest centroid using inner product
3. Assign cluster_id = index of nearest centroid
4. Store distance for quality assessment

Time: ~0.1ms per vector (10,000 vectors/second)


PHASE 3: SEARCH (Query time, O(nprobe×n/k×d))
═══════════════════════════════════════════════════

Input: Query vector, nprobe (clusters to search)
Output: Top-k similar items

Algorithm:
1. Find nprobe nearest centroids to query
2. Only search items in those clusters
3. If nprobe=16 and k=1024: search only 1.5% of data

Speedup: 64x faster than exhaustive search

Visual Representation

                    512-dimensional embedding space

    ┌─────────────────────────────────────────────────────────┐
    │                                                         │
    │    Cluster 0        Cluster 1        Cluster 2          │
    │    (beaches)        (cars)           (people)           │
    │       ┌─┐             ┌─┐              ┌─┐              │
    │      ╱   ╲           ╱   ╲            ╱   ╲             │
    │     │  ●  │         │  ●  │          │  ●  │            │
    │     │ ··· │         │ ··· │          │ ··· │            │
    │     │·····│         │·····│          │·····│            │
    │      ╲   ╱           ╲   ╱            ╲   ╱             │
    │       └─┘             └─┘              └─┘              │
    │         ↑               ↑                ↑              │
    │     centroid        centroid         centroid           │
    │                                                         │
    │   New image arrives:                                    │
    │                                                         │
    │        ★ (beach sunset photo)                           │
    │        │                                                │
    │        ├─→ Compare to Cluster 0 centroid: 0.92 ✓        │
    │        ├─→ Compare to Cluster 1 centroid: 0.31          │
    │        └─→ Compare to Cluster 2 centroid: 0.28          │
    │                                                         │
    │   Result: Assign to Cluster 0 (beaches)                 │
    │   Time: 0.1ms (only compared to 1024 centroids)         │
    │                                                         │
    └─────────────────────────────────────────────────────────┘

Advantages of FAISS IVF

Advantage Description Impact
Incremental Updates Add new items without re-training Real-time ingestion possible
GPU Acceleration Training uses CUDA for 10-12x speedup 1M vectors trained in 15s
Constant Assignment Time O(k) regardless of dataset size 0.1ms whether 1K or 1B items
Search Pruning nprobe controls speed/accuracy tradeoff 64x faster with nprobe=16
Memory Efficient Only store k×d centroids (not n×d) 2MB for 1024 clusters vs 2GB for 1M vectors
Persistence Save/load trained index to disk Survives restarts
Proven at Scale Used by Google, Meta, Spotify Battle-tested on billions

Disadvantages and Mitigations

Disadvantage Description Mitigation
Initial Training Required Need sample data before clustering works Train after first 1K images, retrain periodically
Fixed Cluster Count k is set at training time Choose k based on expected dataset size
Centroid Drift Centroids may become stale as data changes Periodic rebalancing when imbalance detected
Coarse Clustering Not as precise as DBSCAN for arbitrary shapes Use for grouping, not exact clustering
Training Memory Need to load training data into RAM/GPU Sample if dataset too large

How It Works In This Application

┌─────────────────────────────────────────────────────────────────────────────┐
│                    OpenProcessor Visual Search Pipeline                         │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  1. INITIAL SETUP (one-time)                                                │
│     ┌────────────────────────────────────────────────────────────────────┐  │
│     │  POST /index/create       # Create OpenSearch indexes      │  │
│     │  POST /ingest (× 1000)    # Ingest initial images          │  │
│     │  POST /clusters/train/global  # Train FAISS clustering     │  │
│     │                                                                     │  │
│     │  Result: 1024 clusters trained, all images assigned cluster_id     │  │
│     └────────────────────────────────────────────────────────────────────┘  │
│                                                                              │
│  2. ONGOING INGESTION (continuous)                                          │
│     ┌────────────────────────────────────────────────────────────────────┐  │
│     │  Image → YOLO detect → MobileCLIP embed → FAISS assign → OpenSearch │  │
│     │                                                                     │  │
│     │  Each image gets:                                                   │  │
│     │  • global_embedding: 512-dim vector                                 │  │
│     │  • cluster_id: nearest centroid (0-1023)                            │  │
│     │  • cluster_distance: how close to centroid                          │  │
│     │                                                                     │  │
│     │  Time: ~50ms total (inference + assignment + indexing)              │  │
│     └────────────────────────────────────────────────────────────────────┘  │
│                                                                              │
│  3. SEARCH (user queries)                                                   │
│     ┌────────────────────────────────────────────────────────────────────┐  │
│     │  Option A: Standard k-NN (searches all documents)                   │  │
│     │  POST /search/image                                         │  │
│     │                                                                     │  │
│     │  Option B: Cluster-optimized (searches subset)                      │  │
│     │  1. Find query's nearest clusters via FAISS                         │  │
│     │  2. Search OpenSearch with cluster_id filter                        │  │
│     │  3. 10-100x faster for large indexes                                │  │
│     └────────────────────────────────────────────────────────────────────┘  │
│                                                                              │
│  4. ALBUMS (Google Photos-like)                                             │
│     ┌────────────────────────────────────────────────────────────────────┐  │
│     │  GET /albums                                                │  │
│     │  → Returns clusters sorted by size                                  │  │
│     │  → Each cluster = auto-generated album                              │  │
│     │                                                                     │  │
│     │  GET /clusters/global/42                                    │  │
│     │  → Returns all images in cluster 42                                 │  │
│     │  → Sorted by distance (most representative first)                   │  │
│     └────────────────────────────────────────────────────────────────────┘  │
│                                                                              │
│  5. MAINTENANCE (periodic)                                                  │
│     ┌────────────────────────────────────────────────────────────────────┐  │
│     │  GET /clusters/balance/global                               │  │
│     │  → Check if rebalancing needed                                      │  │
│     │                                                                     │  │
│     │  POST /clusters/rebalance/global                            │  │
│     │  → Re-train from current data if clusters became uneven             │  │
│     │                                                                     │  │
│     │  Trigger conditions:                                                │  │
│     │  • Max cluster > 10x average size                                   │  │
│     │  • >10% empty clusters                                              │  │
│     │  • >50% new data since training                                     │  │
│     └────────────────────────────────────────────────────────────────────┘  │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Data Flow Diagram

┌──────────────┐     ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│   FastAPI    │     │    Triton    │     │    FAISS     │     │  OpenSearch  │
│   (yolo-api) │     │  (GPU Infer) │     │ (Clustering) │     │  (Storage)   │
└──────┬───────┘     └──────┬───────┘     └──────┬───────┘     └──────┬───────┘
       │                    │                    │                    │
       │  1. /ingest        │                    │                    │
       │  (image bytes)     │                    │                    │
       ├───────────────────►│                    │                    │
       │                    │                    │                    │
       │  2. YOLO + CLIP    │                    │                    │
       │◄───────────────────┤                    │                    │
       │  (embedding 512d)  │                    │                    │
       │                    │                    │                    │
       │  3. assign_cluster │                    │                    │
       ├────────────────────┼───────────────────►│                    │
       │                    │                    │                    │
       │  4. cluster_id=42  │                    │                    │
       │◄───────────────────┼────────────────────┤                    │
       │     distance=0.12  │                    │                    │
       │                    │                    │                    │
       │  5. index document │                    │                    │
       ├────────────────────┼────────────────────┼───────────────────►│
       │    {embedding,     │                    │                    │
       │     cluster_id,    │                    │                    │
       │     cluster_dist}  │                    │                    │
       │                    │                    │                    │
       │  6. success        │                    │                    │
       │◄───────────────────┼────────────────────┼────────────────────┤
       │                    │                    │                    │

Recommended Cluster Counts

Based on dataset size and use case:

Dataset Size Global Vehicles People Faces Rationale
< 10K 128 32 64 128 Small dataset, fewer clusters
10K - 100K 512 128 256 512 Medium dataset
100K - 1M 1024 256 512 1024 Default (recommended)
1M - 10M 4096 1024 2048 4096 Large photo library
> 10M 16384 4096 8192 16384 Production scale

Rule of thumb: sqrt(n) clusters for n items, with minimum 64 and maximum 65536.

Performance Benchmarks (RTX A6000)

Operation 100K Items 1M Items 10M Items
Initial Training 2s 15s 120s
Single Assignment 0.1ms 0.1ms 0.1ms
Batch 100 1ms 1ms 1ms
Search (nprobe=16) 2ms 5ms 15ms
Rebalance 5s 45s 300s

How It Works

┌─────────────────────────────────────────────────────────────────────────────┐
│                         FAISS IVF Clustering                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  1. INITIAL TRAINING (one-time or periodic)                                 │
│     ┌─────────────────────────────────────────────────────────────────┐     │
│     │  Sample embeddings → K-Means → Centroids (e.g., 1024 clusters)  │     │
│     └─────────────────────────────────────────────────────────────────┘     │
│                                                                              │
│  2. INCREMENTAL ASSIGNMENT (real-time, every new item)                      │
│     ┌─────────────────────────────────────────────────────────────────┐     │
│     │  New embedding → Find nearest centroid → Assign cluster_id      │     │
│     │                                                                  │     │
│     │  Time: O(n_clusters) = ~0.1ms for 1024 clusters                 │     │
│     └─────────────────────────────────────────────────────────────────┘     │
│                                                                              │
│  3. SEARCH OPTIMIZATION                                                      │
│     ┌─────────────────────────────────────────────────────────────────┐     │
│     │  Query → Find k nearest centroids → Search only those clusters  │     │
│     │                                                                  │     │
│     │  nprobe=16: Search 16 of 1024 clusters = 64x faster             │     │
│     └─────────────────────────────────────────────────────────────────┘     │
│                                                                              │
│  4. PERIODIC REBALANCING (optional, when distribution shifts)               │
│     ┌─────────────────────────────────────────────────────────────────┐     │
│     │  Re-train centroids from current data if:                        │     │
│     │  - Cluster sizes become very uneven                              │     │
│     │  - >50% new data since last training                             │     │
│     └─────────────────────────────────────────────────────────────────┘     │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Cluster Configuration by Index

Index n_clusters nprobe Rebalance* Use Case
global 1024 32 Weekly Scene similarity, albums
vehicles 256 16 Monthly Same car across images
people 512 24 Weekly Same outfit/appearance
faces 1024 64 On-demand Identity matching

Rebalancing Strategy Guide

The table above shows baseline recommendations. Actual rebalancing frequency depends on your ingestion pattern:

By Ingestion Volume

Daily Volume Rebalance Frequency Rationale
< 1K images Weekly/Monthly Clusters remain stable
1K - 10K Daily Moderate drift
10K - 100K Every 6-12 hours Significant new data
100K+ Every 1-2 hours High velocity requires frequent rebalancing

By Ingestion Pattern

Pattern Example Rebalancing Strategy
Continuous streaming Security cameras, social media Schedule rebalance during low-traffic hours (e.g., 3 AM)
Hourly batches Hourly photo sync Rebalance after every 2-3 batch cycles
Daily bulk upload End-of-day photo dump Rebalance immediately after bulk upload completes
Weekly imports Weekly backup ingestion Rebalance after each weekly import
Sporadic large batches User uploads 10K vacation photos Trigger rebalance when vectors_since_training exceeds threshold

Automatic Rebalancing Logic

The system tracks rebalancing needs via GET /clusters/balance/{index}:

{
  "needs_rebalance": true,
  "reason": "Significant new data: 50000 vectors since training",
  "vectors_since_training": 50000,
  "imbalance_ratio": 8.5,
  "empty_ratio": 0.02
}

Triggers for needs_rebalance=true:

  • vectors_since_training > 50% of original training set
  • imbalance_ratio > 10 (largest cluster is 10x smallest)
  • empty_ratio > 10% (too many empty clusters)

Recommended Automation

# Example: Cron job or background task
async def check_and_rebalance():
    for index in ['global', 'vehicles', 'people', 'faces']:
        balance = await search_service.check_cluster_balance(index)

        if balance['needs_rebalance']:
            logger.info(f"Rebalancing {index}: {balance['reason']}")
            await search_service.rebalance_clusters(index)

# Schedule based on your ingestion pattern:
# - High volume: Every hour
# - Medium volume: Every 6 hours
# - Low volume: Daily at 3 AM

Post-Bulk-Ingestion Workflow

When ingesting large batches (e.g., 10K+ images at once):

# 1. Ingest all images (clustering happens automatically if trained)
for image in batch:
    POST /ingest

# 2. Check balance after bulk ingestion
GET /clusters/balance/global

# 3. If needs_rebalance=true, trigger rebalance
POST /clusters/rebalance/global

# 4. Verify new cluster distribution
GET /clusters/stats/global

Updated Index Schemas (with Clustering)

1. visual_search_global

{
  "mappings": {
    "properties": {
      "image_id": { "type": "keyword" },
      "image_path": { "type": "keyword" },
      "global_embedding": {
        "type": "knn_vector",
        "dimension": 512,
        "method": {
          "name": "hnsw",
          "space_type": "cosinesimil",
          "engine": "faiss",
          "parameters": { "ef_construction": 512, "m": 16 }
        }
      },
      "cluster_id": { "type": "integer" },
      "cluster_distance": { "type": "float" },
      "width": { "type": "integer" },
      "height": { "type": "integer" },
      "metadata": { "type": "object" },
      "indexed_at": { "type": "date" },
      "clustered_at": { "type": "date" }
    }
  }
}

2. visual_search_vehicles

{
  "mappings": {
    "properties": {
      "detection_id": { "type": "keyword" },
      "image_id": { "type": "keyword" },
      "image_path": { "type": "keyword" },
      "embedding": {
        "type": "knn_vector",
        "dimension": 512,
        "method": {
          "name": "hnsw",
          "space_type": "cosinesimil",
          "engine": "faiss",
          "parameters": { "ef_construction": 256, "m": 12 }
        }
      },
      "cluster_id": { "type": "integer" },
      "cluster_distance": { "type": "float" },
      "box": { "type": "float" },
      "class_id": { "type": "integer" },
      "class_name": { "type": "keyword" },
      "confidence": { "type": "float" },
      "metadata": { "type": "object" },
      "indexed_at": { "type": "date" },
      "clustered_at": { "type": "date" }
    }
  }
}

3. visual_search_people

{
  "mappings": {
    "properties": {
      "detection_id": { "type": "keyword" },
      "image_id": { "type": "keyword" },
      "image_path": { "type": "keyword" },
      "embedding": {
        "type": "knn_vector",
        "dimension": 512,
        "method": {
          "name": "hnsw",
          "space_type": "cosinesimil",
          "engine": "faiss",
          "parameters": { "ef_construction": 512, "m": 16 }
        }
      },
      "cluster_id": { "type": "integer" },
      "cluster_distance": { "type": "float" },
      "box": { "type": "float" },
      "confidence": { "type": "float" },
      "has_face": { "type": "boolean" },
      "face_id": { "type": "keyword" },
      "metadata": { "type": "object" },
      "indexed_at": { "type": "date" },
      "clustered_at": { "type": "date" }
    }
  }
}

4. visual_search_faces (Future)

{
  "mappings": {
    "properties": {
      "face_id": { "type": "keyword" },
      "image_id": { "type": "keyword" },
      "image_path": { "type": "keyword" },
      "person_detection_id": { "type": "keyword" },
      "embedding": {
        "type": "knn_vector",
        "dimension": 512,
        "method": {
          "name": "hnsw",
          "space_type": "cosinesimil",
          "engine": "faiss",
          "parameters": { "ef_construction": 1024, "m": 32 }
        }
      },
      "cluster_id": { "type": "integer" },
      "cluster_distance": { "type": "float" },
      "person_id": { "type": "keyword" },
      "person_name": { "type": "keyword" },
      "is_reference": { "type": "boolean" },
      "box": { "type": "float" },
      "landmarks": {
        "type": "object",
        "properties": {
          "left_eye": { "type": "float" },
          "right_eye": { "type": "float" },
          "nose": { "type": "float" },
          "left_mouth": { "type": "float" },
          "right_mouth": { "type": "float" }
        }
      },
      "confidence": { "type": "float" },
      "quality_score": { "type": "float" },
      "metadata": { "type": "object" },
      "indexed_at": { "type": "date" },
      "clustered_at": { "type": "date" }
    }
  }
}

ClusteringService Implementation

Service Architecture

class ClusteringService:
    """
    FAISS-based clustering for all visual search indexes.

    Features:
    - GPU-accelerated training and assignment
    - Incremental cluster assignment (no re-training needed)
    - Persistent index storage
    - Automatic rebalancing when needed
    """

    def __init__(self, index_dir: str = "faiss_indexes"):
        self.index_dir = Path(index_dir)
        self.indexes: dict[str, faiss.Index] = {}
        self.gpu_resources = faiss.StandardGpuResources()

    # === TRAINING ===

    async def train_index(
        self,
        index_name: str,
        embeddings: np.ndarray,
        n_clusters: int = 1024,
        use_gpu: bool = True,
    ) -> ClusterStats:
        """
        Train FAISS IVF index from embeddings.

        Called once initially, then periodically for rebalancing.
        """
        d = embeddings.shape[1]  # 512

        # Create IVF index with flat quantizer
        quantizer = faiss.IndexFlatIP(d)  # Inner product (for normalized vectors)
        index = faiss.IndexIVFFlat(quantizer, d, n_clusters)

        # Move to GPU for faster training
        if use_gpu and faiss.get_num_gpus() > 0:
            index = faiss.index_cpu_to_gpu(self.gpu_resources, 0, index)

        # Train on embeddings
        index.train(embeddings.astype('float32'))

        # Add all embeddings
        index.add(embeddings.astype('float32'))

        # Save to disk
        self._save_index(index_name, index)
        self.indexes[index_name] = index

        return self._compute_stats(index)

    # === INCREMENTAL ASSIGNMENT ===

    def assign_cluster(
        self,
        index_name: str,
        embedding: np.ndarray,
    ) -> tuple[int, float]:
        """
        Assign single embedding to nearest cluster.

        Time: ~0.1ms (real-time capable)
        Returns: (cluster_id, distance_to_centroid)
        """
        index = self._get_index(index_name)

        # Search for nearest centroid
        embedding = embedding.reshape(1, -1).astype('float32')
        distances, cluster_ids = index.quantizer.search(embedding, 1)

        return int(cluster_ids[0, 0]), float(distances[0, 0])

    def assign_clusters_batch(
        self,
        index_name: str,
        embeddings: np.ndarray,
    ) -> list[tuple[int, float]]:
        """
        Batch assign embeddings to clusters.

        Time: ~1ms for 100 embeddings
        """
        index = self._get_index(index_name)
        embeddings = embeddings.astype('float32')

        distances, cluster_ids = index.quantizer.search(embeddings, 1)

        return [
            (int(cluster_ids[i, 0]), float(distances[i, 0]))
            for i in range(len(embeddings))
        ]

    # === CLUSTER SEARCH ===

    def search_within_cluster(
        self,
        index_name: str,
        query_embedding: np.ndarray,
        cluster_id: int,
        top_k: int = 10,
    ) -> list[tuple[int, float]]:
        """
        Search only within a specific cluster.

        Faster than global search when cluster is known.
        """
        # Implementation uses inverted lists
        ...

    def search_similar_clusters(
        self,
        index_name: str,
        query_embedding: np.ndarray,
        n_clusters: int = 5,
    ) -> list[int]:
        """
        Find clusters most similar to query.

        Use for "find all similar" queries.
        """
        index = self._get_index(index_name)
        query = query_embedding.reshape(1, -1).astype('float32')

        _, cluster_ids = index.quantizer.search(query, n_clusters)
        return cluster_ids[0].tolist()

    # === REBALANCING ===

    async def check_balance(self, index_name: str) -> ClusterBalance:
        """
        Check if clusters need rebalancing.

        Criteria:
        - Max cluster > 10x min cluster size
        - Empty clusters > 10%
        - >50% new data since training
        """
        ...

    async def rebalance_if_needed(
        self,
        index_name: str,
        opensearch_client: OpenSearchClient,
    ) -> bool:
        """
        Rebalance clusters if needed.

        1. Export all embeddings from OpenSearch
        2. Re-train FAISS index
        3. Re-assign all cluster_ids
        4. Bulk update OpenSearch
        """
        ...

Integration with OpenSearch

class OpenSearchClient:
    """Updated to support clustering."""

    async def ingest_image(
        self,
        image_id: str,
        image_path: str,
        global_embedding: np.ndarray,
        clustering_service: ClusteringService,
        ...
    ) -> dict:
        """Ingest with automatic cluster assignment."""

        # Assign cluster for global embedding
        cluster_id, cluster_dist = clustering_service.assign_cluster(
            'global', global_embedding
        )

        doc = {
            'image_id': image_id,
            'image_path': image_path,
            'global_embedding': global_embedding.tolist(),
            'cluster_id': cluster_id,
            'cluster_distance': cluster_dist,
            'indexed_at': datetime.now(UTC).isoformat(),
            'clustered_at': datetime.now(UTC).isoformat(),
        }

        # ... route to other indexes with their own cluster assignments

Query Patterns with Clustering

1. Find Similar Images (Same Cluster)

# Fast: Search only within same cluster
async def find_similar_in_cluster(image_id: str):
    # Get source image's cluster
    doc = await opensearch.get(index='visual_search_global', id=image_id)
    cluster_id = doc['cluster_id']
    embedding = doc['global_embedding']

    # Search only that cluster
    results = await opensearch.search(
        index='visual_search_global',
        body={
            'query': {
                'bool': {
                    'must': [
                        {'knn': {'global_embedding': {'vector': embedding, 'k': 20}}}
                    ],
                    'filter': [
                        {'term': {'cluster_id': cluster_id}}
                    ]
                }
            }
        }
    )
    return results

2. Find All Similar (Multi-Cluster)

# Thorough: Search across similar clusters
async def find_similar_global(embedding: np.ndarray, top_k: int = 50):
    # Find similar clusters
    similar_clusters = clustering_service.search_similar_clusters(
        'global', embedding, n_clusters=5
    )

    # Search across those clusters
    results = await opensearch.search(
        index='visual_search_global',
        body={
            'query': {
                'bool': {
                    'must': [
                        {'knn': {'global_embedding': {'vector': embedding.tolist(), 'k': top_k}}}
                    ],
                    'filter': [
                        {'terms': {'cluster_id': similar_clusters}}
                    ]
                }
            }
        }
    )
    return results

3. Get Cluster Members (Album View)

# Get all images in a cluster (like a Google Photos album)
async def get_cluster_album(cluster_id: int, page: int = 0, size: int = 50):
    results = await opensearch.search(
        index='visual_search_global',
        body={
            'query': {
                'term': {'cluster_id': cluster_id}
            },
            'sort': [
                {'cluster_distance': 'asc'}  # Most representative first
            ],
            'from': page * size,
            'size': size
        }
    )
    return results

4. Find All Instances of a Vehicle

# Find same car across all images
async def find_same_vehicle(vehicle_embedding: np.ndarray):
    # Assign to cluster
    cluster_id, _ = clustering_service.assign_cluster('vehicles', vehicle_embedding)

    # Search within cluster first
    results = await opensearch.search(
        index='visual_search_vehicles',
        body={
            'size': 100,
            'min_score': 0.8,  # High threshold for "same vehicle"
            'query': {
                'bool': {
                    'must': [
                        {'knn': {'embedding': {'vector': vehicle_embedding.tolist(), 'k': 100}}}
                    ],
                    'filter': [
                        {'term': {'cluster_id': cluster_id}}
                    ]
                }
            }
        }
    )
    return results

API Endpoints

Clustering Management

# Training & Rebalancing
POST /clusters/train/{index_name}     # Initial training
POST /clusters/rebalance/{index_name} # Force rebalance
GET  /clusters/stats/{index_name}     # Cluster statistics
GET  /clusters/balance/{index_name}   # Check if rebalance needed

# Cluster Operations
GET  /clusters/{index_name}/{cluster_id}          # Get cluster members
GET  /clusters/{index_name}/{cluster_id}/centroid # Get cluster centroid
POST /clusters/{index_name}/merge                 # Merge clusters
POST /clusters/{index_name}/split                 # Split cluster

Category-Specific Search

# Global (whole image)
POST /search/image          # Similar images
POST /search/text           # Text-to-image
GET  /albums                # List auto-generated albums (clusters)
GET  /albums/{cluster_id}   # Get album contents

# Vehicles
POST /search/vehicles       # Find similar vehicles
GET  /vehicles/clusters     # Vehicle groupings

# People
POST /search/people         # Find by appearance
GET  /people/clusters       # Appearance groupings

# Faces (Future)
POST /search/faces          # Find same person
GET  /people/identities     # Unique people in library

Performance Considerations

Clustering Benchmarks (RTX A6000)

Operation 100K Items 1M Items 10M Items
Initial Training 2s 15s 120s
Single Assignment 0.1ms 0.1ms 0.1ms
Batch 100 1ms 1ms 1ms
Search (nprobe=16) 2ms 5ms 15ms
Rebalance 5s 45s 300s

Memory Requirements

Items FAISS Index RAM OpenSearch Total
100K 200MB 500MB 700MB
1M 2GB 5GB 7GB
10M 20GB 50GB 70GB

GPU vs CPU

Operation GPU CPU Speedup
Training 1M 15s 180s 12x
Batch 1000 2ms 20ms 10x
Search 2ms 5ms 2.5x

Implementation Phases

Phase 1: Core Clustering (Complete)

  • ClusteringService with FAISS IVF (src/services/clustering.py)
  • Update OpenSearch schemas with cluster_id (src/clients/opensearch.py)
  • Incremental assignment on ingest (via clustering_service parameter)
  • Cluster-filtered search (via cluster_ids parameter)
  • Clustering API endpoints (/clusters/*)

Phase 2: Auto-Albums

  • "Smart albums" from clusters (like Google Photos)
  • Cluster naming (most common object/scene)
  • Album API endpoints

Phase 3: Face Identity

  • Integrate RetinaFace detection
  • Integrate ArcFace embedding
  • Identity clustering (same person)
  • Person naming/labeling

Phase 4: Scale Optimization

  • IVF-PQ for 100M+ scale
  • Distributed clustering
  • Real-time rebalancing