Skip to content

[FEATURE] Automatic Bucket Processing with User Access Controls #26

@davidamacey

Description

@davidamacey

Automatic Bucket Processing and User Access Control

Problem Statement

Users need to upload files directly to MinIO for automatic processing, with proper access controls.

Solution

Implement a bucket watcher service with user access controls.

1. Bucket Watcher Service

New Module: app/services/bucket_watcher.py

from minio import Minio
from minio.notification import NotificationConfig
from sqlalchemy.orm import Session
import logging

class BucketWatcher:
    def __init__(self):
        self.bucket_name = settings.MEDIA_BUCKET_NAME
        self.input_prefix = "auto-ingest/"
        
    def start_watching(self):
        config = NotificationConfig(
            queue_config_list=[
                NotificationConfig.QueueConfig(
                    "arn:minio:sqs::1:webhook",
                    ["s3:ObjectCreated:*"],
                    prefix=f"{self.input_prefix}user-",
                    suffix=".mp3,.wav,.m4a,.mp4,.mov"
                )
            ]
        )
        minio_client.set_bucket_notification(self.bucket_name, config)

2. User Access Controls

New Model: app/models/access.py

from sqlalchemy import Column, Integer, ForeignKey, Boolean, DateTime
from sqlalchemy.orm import relationship

class UserBucket(Base):
    __tablename__ = "user_buckets"
    
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey("user.id"), nullable=False)
    bucket_name = Column(String, unique=True, nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=func.now())
    
    user = relationship("User", back_populates="buckets")

3. API Endpoints

New Module: app/api/endpoints/buckets.py

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session

router = APIRouter()

@router.post("/buckets/")
def create_bucket(
    bucket_name: str,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_active_user)
):
    """Create a new user bucket"""
    if db.query(UserBucket).filter(UserBucket.bucket_name == bucket_name).first():
        raise HTTPException(400, "Bucket name already exists")
        
    bucket = UserBucket(
        user_id=current_user.id,
        bucket_name=bucket_name
    )
    
    db.add(bucket)
    db.commit()
    return {"message": "Bucket created"}

4. Configuration

Add to .env:

AUTO_INGEST_BUCKET=opentranscribe-ingest
AUTO_INGEST_PREFIX=uploads/

5. Deployment

Update docker-compose.yml:

services:
  bucket-watcher:
    build: ./backend
    command: python -m app.services.bucket_watcher
    environment:
      - MINIO_HOST=minio
      - MINIO_PORT=9000
    depends_on:
      - minio
      - redis

Testing Plan

  1. Unit Tests

    • Test bucket creation
    • Test file processing
    • Test access controls
  2. Integration Tests

    • Test complete upload flow
    • Test permission checks
  3. Manual Testing

    • Upload files via MinIO client
    • Verify processing
    • Check access controls

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    backendBackend related issues and featuresenhancementNew feature or requestsecuritySecurity related issues and improvements

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions