diff --git a/alembic/versions/20260325_2133_8338ee0a5c4d_add_google_drive_projects_table.py b/alembic/versions/20260325_2133_8338ee0a5c4d_add_google_drive_projects_table.py new file mode 100644 index 0000000..759d5f5 --- /dev/null +++ b/alembic/versions/20260325_2133_8338ee0a5c4d_add_google_drive_projects_table.py @@ -0,0 +1,59 @@ +"""add google_drive_projects table + +Revision ID: 8338ee0a5c4d +Revises: ed1d201ecb55 +Create Date: 2026-03-25 21:33:31.416445 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + + +revision: str = "8338ee0a5c4d" +down_revision: Union[str, None] = "ed1d201ecb55" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "google_drive_projects", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column( + "user_id", + sa.Integer(), + sa.ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("name", sa.String(200), nullable=False), + sa.Column("folder_id", sa.String(200), nullable=False, server_default="root"), + sa.Column("folder_name", sa.String(500), nullable=True), + sa.Column( + "collection_id", + sa.Integer(), + sa.ForeignKey("knowledge_collections.id"), + nullable=True, + ), + sa.Column("sync_status", sa.String(20), server_default="idle"), + sa.Column("sync_error", sa.Text(), nullable=True), + sa.Column("last_synced", sa.DateTime(), nullable=True), + sa.Column("file_count", sa.Integer(), server_default="0"), + sa.Column("total_size_bytes", sa.Integer(), server_default="0"), + sa.Column("include_mime_types", sa.Text(), nullable=True), + sa.Column( + "workspace_id", + sa.Integer(), + sa.ForeignKey("workspaces.id"), + nullable=False, + server_default="1", + ), + sa.Column("created", sa.DateTime(), server_default=sa.text("CURRENT_TIMESTAMP")), + sa.Column("updated", sa.DateTime(), server_default=sa.text("CURRENT_TIMESTAMP")), + ) + + +def downgrade() -> None: + op.drop_table("google_drive_projects") diff --git a/app/services/google_drive_sync_service.py b/app/services/google_drive_sync_service.py new file mode 100644 index 0000000..34ae159 --- /dev/null +++ b/app/services/google_drive_sync_service.py @@ -0,0 +1,247 @@ +"""Google Drive sync service — downloads files from Drive folder, converts to markdown.""" + +import logging +import re +from pathlib import Path + +import httpx + + +logger = logging.getLogger(__name__) + +# Google Apps MIME types that can be exported +EXPORTABLE_TYPES = { + "application/vnd.google-apps.document": ("text/plain", ".txt", "google_docs"), + "application/vnd.google-apps.spreadsheet": ( + "text/csv", + ".csv", + "google_sheets", + ), + "application/vnd.google-apps.presentation": ( + "text/plain", + ".txt", + "google_slides", + ), +} + +# Regular file types we can read as text +TEXT_EXTENSIONS = { + ".txt", + ".md", + ".csv", + ".json", + ".xml", + ".html", + ".yml", + ".yaml", + ".toml", + ".ini", + ".cfg", + ".log", + ".py", + ".js", + ".ts", + ".vue", + ".jsx", + ".tsx", + ".css", + ".scss", + ".sql", + ".sh", + ".bash", + ".rb", + ".go", + ".rs", + ".java", + ".kt", + ".c", + ".cpp", + ".h", + ".hpp", +} + +MAX_FILE_SIZE = 512 * 1024 # 512 KB per file + + +async def list_drive_folder_recursive( + access_token: str, + folder_id: str = "root", + max_files: int = 500, +) -> list[dict]: + """List all files in a Drive folder recursively (up to max_files).""" + files: list[dict] = [] + folders_to_scan = [folder_id] + + async with httpx.AsyncClient(timeout=30) as client: + while folders_to_scan and len(files) < max_files: + current_folder = folders_to_scan.pop(0) + page_token = None + + while True: + params: dict = { + "q": f"'{current_folder}' in parents and trashed = false", + "fields": "nextPageToken, files(id, name, mimeType, size, modifiedTime)", + "pageSize": 100, + } + if page_token: + params["pageToken"] = page_token + + resp = await client.get( + "https://www.googleapis.com/drive/v3/files", + headers={"Authorization": f"Bearer {access_token}"}, + params=params, + ) + resp.raise_for_status() + data = resp.json() + + for f in data.get("files", []): + if f["mimeType"] == "application/vnd.google-apps.folder": + folders_to_scan.append(f["id"]) + else: + files.append(f) + if len(files) >= max_files: + break + + page_token = data.get("nextPageToken") + if not page_token or len(files) >= max_files: + break + + return files + + +async def download_and_convert_file( + access_token: str, + file_info: dict, +) -> tuple[str, str, int] | None: + """Download a single file and convert to text. + + Returns (title, content, size_bytes) or None if file can't be processed. + """ + file_id = file_info["id"] + name = file_info["name"] + mime_type = file_info["mimeType"] + size = int(file_info.get("size", 0)) + + async with httpx.AsyncClient(timeout=60) as client: + headers = {"Authorization": f"Bearer {access_token}"} + + # Google Apps files (Docs, Sheets, Slides) — export + if mime_type in EXPORTABLE_TYPES: + export_mime, _ext, _source = EXPORTABLE_TYPES[mime_type] + resp = await client.get( + f"https://www.googleapis.com/drive/v3/files/{file_id}/export", + headers=headers, + params={"mimeType": export_mime}, + ) + if resp.status_code != 200: + logger.warning(f"Failed to export {name}: {resp.status_code}") + return None + content = resp.text + return (name, content, len(content.encode("utf-8"))) + + # Regular files — check size and extension + if size > MAX_FILE_SIZE: + logger.info(f"Skipping {name}: too large ({size} bytes)") + return None + + ext = Path(name).suffix.lower() + if ext not in TEXT_EXTENSIONS: + logger.info(f"Skipping {name}: unsupported extension {ext}") + return None + + # Download as text + resp = await client.get( + f"https://www.googleapis.com/drive/v3/files/{file_id}", + headers=headers, + params={"alt": "media"}, + ) + if resp.status_code != 200: + logger.warning(f"Failed to download {name}: {resp.status_code}") + return None + + try: + content = resp.text + except Exception: + return None + + return (name, content, len(content.encode("utf-8"))) + + +def _sanitize_filename(name: str) -> str: + """Convert file name to safe filename for disk.""" + name = re.sub(r"[^\w\s\-.]", "_", name) + name = re.sub(r"\s+", "_", name) + return name[:200] + + +async def sync_drive_folder( + access_token: str, + folder_id: str, + output_dir: str, + max_files: int = 500, +) -> list[dict]: + """Sync entire Drive folder to disk as markdown/text files. + + Returns list of document dicts for DatasetSynced event. + """ + Path(output_dir).mkdir(parents=True, exist_ok=True) + + # Clean old files + for old_file in Path(output_dir).glob("*.md"): + old_file.unlink() + + # List all files + drive_files = await list_drive_folder_recursive(access_token, folder_id, max_files) + logger.info(f"Google Drive sync: found {len(drive_files)} files in folder {folder_id}") + + documents: list[dict] = [] + total_size = 0 + + for file_info in drive_files: + try: + result = await download_and_convert_file(access_token, file_info) + if not result: + continue + title, content, size_bytes = result + except Exception as e: + logger.warning(f"Error processing {file_info['name']}: {e}") + continue + + # Determine source type + mime_type = file_info["mimeType"] + if mime_type in EXPORTABLE_TYPES: + source_type = EXPORTABLE_TYPES[mime_type][2] + else: + source_type = "google_drive" + + # Write to disk as markdown + safe_name = _sanitize_filename(title) + if not safe_name.endswith(".md"): + safe_name = ( + safe_name.rsplit(".", 1)[0] + ".md" if "." in safe_name else safe_name + ".md" + ) + filepath = Path(output_dir) / safe_name + + # Wrap content in markdown with title + md_content = f"# {title}\n\n{content}" + with open(filepath, "w", encoding="utf-8") as f: + f.write(md_content) + + # Count sections + section_count = len(re.findall(r"^#{2,3}\s+.+$", md_content, re.MULTILINE)) + + documents.append( + { + "filename": safe_name, + "title": title, + "source_type": source_type, + "file_size_bytes": size_bytes, + "section_count": max(section_count, 1), + } + ) + total_size += size_bytes + + logger.info( + f"Google Drive sync complete: {len(documents)} documents, {total_size / 1024:.1f} KB total" + ) + return documents diff --git a/modules/knowledge/models.py b/modules/knowledge/models.py index 754a83d..5163b44 100644 --- a/modules/knowledge/models.py +++ b/modules/knowledge/models.py @@ -1,4 +1,4 @@ -"""Knowledge base models: collections, documents, FAQ, GitHub repos.""" +"""Knowledge base models: collections, documents, FAQ, GitHub repos, Google Drive.""" import json from datetime import datetime @@ -255,3 +255,56 @@ def to_dict(self) -> dict: "created": self.created.isoformat() if self.created else None, "updated": self.updated.isoformat() if self.updated else None, } + + +class GoogleDriveProject(Base): + """Google Drive folder connected as a knowledge base collection.""" + + __tablename__ = "google_drive_projects" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + name: Mapped[str] = mapped_column(String(200), nullable=False) + user_id: Mapped[int] = mapped_column( + Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False + ) + folder_id: Mapped[str] = mapped_column(String(200), nullable=False, server_default="root") + folder_name: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) + collection_id: Mapped[Optional[int]] = mapped_column( + Integer, ForeignKey("knowledge_collections.id"), nullable=True + ) + sync_status: Mapped[str] = mapped_column(String(20), default="idle", server_default="idle") + sync_error: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + last_synced: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) + file_count: Mapped[int] = mapped_column(Integer, default=0, server_default="0") + total_size_bytes: Mapped[int] = mapped_column(Integer, default=0, server_default="0") + include_mime_types: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + workspace_id: Mapped[int] = mapped_column( + Integer, ForeignKey("workspaces.id"), nullable=False, server_default="1" + ) + created: Mapped[datetime] = mapped_column( + DateTime, default=datetime.utcnow, server_default=text("CURRENT_TIMESTAMP") + ) + updated: Mapped[datetime] = mapped_column( + DateTime, + default=datetime.utcnow, + onupdate=datetime.utcnow, + server_default=text("CURRENT_TIMESTAMP"), + ) + + def to_dict(self) -> dict: + return { + "id": self.id, + "name": self.name, + "user_id": self.user_id, + "folder_id": self.folder_id, + "folder_name": self.folder_name, + "collection_id": self.collection_id, + "sync_status": self.sync_status, + "sync_error": self.sync_error, + "last_synced": self.last_synced.isoformat() if self.last_synced else None, + "file_count": self.file_count, + "total_size_bytes": self.total_size_bytes, + "workspace_id": self.workspace_id, + "created": self.created.isoformat() if self.created else None, + "updated": self.updated.isoformat() if self.updated else None, + } diff --git a/modules/knowledge/router_google_drive.py b/modules/knowledge/router_google_drive.py new file mode 100644 index 0000000..f4bdc63 --- /dev/null +++ b/modules/knowledge/router_google_drive.py @@ -0,0 +1,250 @@ +"""Google Drive RAG projects — CRUD + sync.""" + +import asyncio +import logging +from datetime import datetime + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel, Field + +from auth_manager import User, require_permission +from db.database import AsyncSessionLocal +from modules.core.events import DatasetSynced +from modules.knowledge.models import GoogleDriveProject, KnowledgeCollection + + +try: + from sqlalchemy import delete, select +except ImportError: + from sqlalchemy import delete + from sqlalchemy.future import select # type: ignore[assignment] + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/admin/google-drive", tags=["google-drive-rag"]) + + +class CreateGoogleDriveProjectRequest(BaseModel): + name: str = Field(..., min_length=1, max_length=200) + folder_id: str = Field(default="root") + folder_name: str | None = None + + +class UpdateGoogleDriveProjectRequest(BaseModel): + name: str | None = None + folder_id: str | None = None + folder_name: str | None = None + + +@router.get("/projects") +async def list_projects( + _user: User = Depends(require_permission("wiki", "view")), +): + """List all Google Drive RAG projects.""" + async with AsyncSessionLocal() as session: + result = await session.execute( + select(GoogleDriveProject).order_by(GoogleDriveProject.id.desc()) + ) + projects = result.scalars().all() + return [p.to_dict() for p in projects] + + +@router.post("/projects") +async def create_project( + req: CreateGoogleDriveProjectRequest, + user: User = Depends(require_permission("wiki", "edit")), +): + """Create a Google Drive RAG project and start initial sync.""" + slug = f"gdrive-{req.name.lower().replace(' ', '-')[:50]}" + + async with AsyncSessionLocal() as session: + # Create collection + collection = KnowledgeCollection( + name=f"Google Drive: {req.name}", + slug=slug, + description=f"Synced from Google Drive folder: {req.folder_name or req.folder_id}", + base_dir=f"data/google-drive/{slug}", + workspace_id=1, + ) + session.add(collection) + await session.flush() + + # Create project + project = GoogleDriveProject( + name=req.name, + user_id=user.id, + folder_id=req.folder_id, + folder_name=req.folder_name, + collection_id=collection.id, + sync_status="syncing", + workspace_id=1, + ) + session.add(project) + await session.commit() + await session.refresh(project) + project_dict = project.to_dict() + + # Start sync in background + asyncio.create_task(_sync_project(project_dict["id"])) + + return project_dict + + +@router.post("/projects/{project_id}/sync") +async def sync_project( + project_id: int, + _user: User = Depends(require_permission("wiki", "edit")), +): + """Manually trigger sync for a Google Drive project.""" + async with AsyncSessionLocal() as session: + result = await session.execute( + select(GoogleDriveProject).where(GoogleDriveProject.id == project_id) + ) + project = result.scalar_one_or_none() + if not project: + raise HTTPException(status_code=404, detail="Project not found") + if project.sync_status == "syncing": + raise HTTPException(status_code=409, detail="Sync already in progress") + + project.sync_status = "syncing" + project.sync_error = None + await session.commit() + + asyncio.create_task(_sync_project(project_id)) + return {"status": "syncing"} + + +@router.delete("/projects/{project_id}") +async def delete_project( + project_id: int, + _user: User = Depends(require_permission("wiki", "manage")), +): + """Delete a Google Drive RAG project.""" + async with AsyncSessionLocal() as session: + result = await session.execute( + select(GoogleDriveProject).where(GoogleDriveProject.id == project_id) + ) + project = result.scalar_one_or_none() + if not project: + raise HTTPException(status_code=404, detail="Project not found") + + collection_id = project.collection_id + + # Delete project + await session.execute(delete(GoogleDriveProject).where(GoogleDriveProject.id == project_id)) + await session.commit() + + # Publish clear event if collection existed + if collection_id: + from app.dependencies import ServiceContainer + + container = ServiceContainer.get_instance() + if container and container.event_bus: + await container.event_bus.publish( + DatasetSynced( + source="google_drive", + collection_slug="", + action="cleared", + collection_name="", + collection_description="", + base_dir="", + documents=[], + delete_collection=True, + ) + ) + + return {"status": "ok"} + + +async def _sync_project(project_id: int) -> None: + """Background task: sync Google Drive folder to RAG collection.""" + from app.services.google_drive_sync_service import sync_drive_folder + from modules.google.service import google_oauth_service + + try: + # Load project + async with AsyncSessionLocal() as session: + result = await session.execute( + select(GoogleDriveProject).where(GoogleDriveProject.id == project_id) + ) + project = result.scalar_one_or_none() + if not project: + return + + user_id = project.user_id + folder_id = project.folder_id + collection_id = project.collection_id + + # Get collection info + col_result = await session.execute( + select(KnowledgeCollection).where(KnowledgeCollection.id == collection_id) + ) + collection = col_result.scalar_one_or_none() + if not collection: + return + + slug = collection.slug + base_dir = collection.base_dir + col_name = collection.name + col_desc = collection.description or "" + + # Get valid Google credentials + creds = await google_oauth_service.get_valid_credentials(user_id) + if not creds: + raise ValueError("Google not connected — re-authenticate in Settings") + + # Sync files + from pathlib import Path as _Path + + _Path(base_dir).mkdir(parents=True, exist_ok=True) + documents = await sync_drive_folder(creds["access_token"], folder_id, base_dir) + + # Update project status + total_size = sum(d["file_size_bytes"] for d in documents) + async with AsyncSessionLocal() as session: + result = await session.execute( + select(GoogleDriveProject).where(GoogleDriveProject.id == project_id) + ) + project = result.scalar_one_or_none() + if project: + project.sync_status = "idle" + project.sync_error = None + project.last_synced = datetime.utcnow() + project.file_count = len(documents) + project.total_size_bytes = total_size + await session.commit() + + # Publish DatasetSynced event + from app.dependencies import ServiceContainer + + container = ServiceContainer.get_instance() + if container and container.event_bus: + await container.event_bus.publish( + DatasetSynced( + source="google_drive", + collection_slug=slug, + action="synced", + collection_name=col_name, + collection_description=col_desc, + base_dir=base_dir, + documents=documents, + delete_collection=False, + ) + ) + + logger.info( + f"Google Drive sync OK: project {project_id}, " + f"{len(documents)} docs, {total_size / 1024:.1f} KB" + ) + + except Exception as e: + logger.error(f"Google Drive sync failed for project {project_id}: {e}") + async with AsyncSessionLocal() as session: + result = await session.execute( + select(GoogleDriveProject).where(GoogleDriveProject.id == project_id) + ) + project = result.scalar_one_or_none() + if project: + project.sync_status = "error" + project.sync_error = str(e)[:500] + await session.commit() diff --git a/orchestrator.py b/orchestrator.py index 4a43de4..74a4930 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -80,6 +80,7 @@ from modules.channels.widget.router_public import router as widget_public_router # noqa: E402 from modules.compat.router import router as compat_router # noqa: E402 from modules.core.router_health import router as health_router # noqa: E402 +from modules.knowledge.router_google_drive import router as google_drive_rag_router # noqa: E402 from modules.monitoring.router_logs import router as logs_router # noqa: E402 @@ -114,6 +115,7 @@ app.include_router(amocrm.webhook_router) app.include_router(google.callback_router) # Must be before static mount app.include_router(google.router) +app.include_router(google_drive_rag_router) app.include_router(health_router) app.include_router(compat_router) app.include_router(logs_router)