diff --git a/.current-upstream-version b/.current-upstream-version new file mode 100644 index 000000000..fb7a04cff --- /dev/null +++ b/.current-upstream-version @@ -0,0 +1 @@ +v0.4.0 diff --git a/.github/workflows/sync-build.yml b/.github/workflows/sync-build.yml new file mode 100644 index 000000000..66b59d72b --- /dev/null +++ b/.github/workflows/sync-build.yml @@ -0,0 +1,104 @@ +name: Sync upstream & build custom image + +on: + schedule: + - cron: '0 */6 * * *' # ogni 6 ore + workflow_dispatch: # trigger manuale + push: + branches: [main] # rebuild ad ogni push su main + +jobs: + sync-and-build: + runs-on: ubuntu-latest + steps: + - name: Checkout fork + uses: actions/checkout@v4 + with: + fetch-depth: 0 + token: ${{ secrets.PAT_TOKEN }} + + - name: Notify Telegram (start) + run: | + curl -s -X POST "https://api.telegram.org/bot${{ secrets.TELEGRAM_BOT_TOKEN }}/sendMessage" \ + -d chat_id="${{ secrets.TELEGRAM_CHAT_ID }}" \ + -d parse_mode="Markdown" \ + -d text="🚀 *OpenFang Build Started*%0A%0ATrigger: \`${{ github.event_name }}\`%0ACommit: \`${GITHUB_SHA::7}\`%0A[View run](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }})" + + - name: Fetch upstream tags + if: github.event_name != 'push' + run: | + git remote add upstream https://github.com/RightNow-AI/openfang.git || true + git fetch upstream --tags + + - name: Check for new release + id: check + if: github.event_name != 'push' + run: | + LATEST_TAG=$(git tag -l 'v*' --sort=-v:refname | head -1) + CURRENT=$(cat .current-upstream-version 2>/dev/null || echo "none") + echo "latest=$LATEST_TAG" >> "$GITHUB_OUTPUT" + echo "current=$CURRENT" >> "$GITHUB_OUTPUT" + if [ "$LATEST_TAG" != "$CURRENT" ]; then + echo "new_release=true" >> "$GITHUB_OUTPUT" + else + echo "new_release=false" >> "$GITHUB_OUTPUT" + fi + + - name: Rebase on latest tag + if: github.event_name != 'push' && steps.check.outputs.new_release == 'true' + run: | + git config user.name "github-actions" + git config user.email "actions@github.com" + GIT_SEQUENCE_EDITOR=true git rebase ${{ steps.check.outputs.latest }} || while git rebase --skip 2>/dev/null; do :; done + echo "${{ steps.check.outputs.latest }}" > .current-upstream-version + git add .current-upstream-version + git commit -m "chore: sync to upstream ${{ steps.check.outputs.latest }}" || true + git push --force + + - name: Set up Docker Buildx + if: github.event_name == 'push' || steps.check.outputs.new_release == 'true' + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker Hub + if: github.event_name == 'push' || steps.check.outputs.new_release == 'true' + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Build and push + if: github.event_name == 'push' || steps.check.outputs.new_release == 'true' + uses: docker/build-push-action@v6 + with: + context: . + push: true + tags: | + fliva/openfang:latest + fliva/openfang:${{ steps.check.outputs.latest || 'custom' }} + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Update Docker Hub description + if: github.event_name == 'push' || steps.check.outputs.new_release == 'true' + uses: peter-evans/dockerhub-description@v4 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + repository: fliva/openfang + readme-filepath: ./DOCKER_README.md + + - name: Notify Telegram (success) + if: success() && (github.event_name == 'push' || steps.check.outputs.new_release == 'true') + run: | + curl -s -X POST "https://api.telegram.org/bot${{ secrets.TELEGRAM_BOT_TOKEN }}/sendMessage" \ + -d chat_id="${{ secrets.TELEGRAM_CHAT_ID }}" \ + -d parse_mode="Markdown" \ + -d text="✅ *OpenFang Build OK*%0A%0ATag: \`${{ steps.check.outputs.latest || 'custom' }}\`%0ACommit: \`${GITHUB_SHA::7}\`%0A[View run](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }})" + + - name: Notify Telegram (failure) + if: failure() + run: | + curl -s -X POST "https://api.telegram.org/bot${{ secrets.TELEGRAM_BOT_TOKEN }}/sendMessage" \ + -d chat_id="${{ secrets.TELEGRAM_CHAT_ID }}" \ + -d parse_mode="Markdown" \ + -d text="❌ *OpenFang Build FAILED*%0A%0ACommit: \`${GITHUB_SHA::7}\`%0A[View run](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }})" diff --git a/DOCKER_README.md b/DOCKER_README.md new file mode 100644 index 000000000..d3e09a054 --- /dev/null +++ b/DOCKER_README.md @@ -0,0 +1,42 @@ +# OpenFang for Lazycat NAS + +Custom [OpenFang](https://github.com/RightNow-AI/openfang) Docker image optimized for deployment on Lazycat LCMD Microserver. + +**Automatically rebuilt on every new upstream release via GitHub Actions.** + +## What's included + +- **OpenFang Agent OS** — Rust-based autonomous AI agent daemon +- **Claude Code CLI** — Anthropic's CLI for Claude, as LLM provider +- **Node.js 22** — JavaScript runtime +- **Python 3** — Python runtime +- **Go** — via Homebrew +- **Homebrew** — package manager for additional tools +- **uv** — fast Python package manager +- **gh** — GitHub CLI +- **gog** — [Google Workspace CLI](https://gogcli.sh/) (Gmail, Calendar, Drive, Sheets, etc.) +- **ffmpeg** — multimedia processing +- **jq** — JSON processor +- **git, curl, wget** — standard utilities + +## Non-root execution + +The image uses `gosu` to drop root privileges to the `openfang` user at runtime. This is required because Claude Code's `--dangerously-skip-permissions` flag refuses to run as root. + +The `openfang` user has passwordless `sudo` access, so it can still install system packages when needed. + +## Usage + +```bash +docker run -d \ + -p 4200:4200 \ + -v openfang-data:/data \ + -v openfang-home:/home/openfang \ + -e OPENFANG_HOME=/data \ + fliva/openfang:latest +``` + +## Source + +- **This fork**: [github.com/f-liva/openfang](https://github.com/f-liva/openfang) +- **Upstream**: [github.com/RightNow-AI/openfang](https://github.com/RightNow-AI/openfang) diff --git a/Dockerfile b/Dockerfile index d794943ed..b56fe096e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,11 +10,36 @@ COPY packages ./packages RUN cargo build --release --bin openfang FROM debian:bookworm-slim -RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y ca-certificates curl git ffmpeg python3 python3-pip chromium gosu sudo procps build-essential jq && rm -rf /var/lib/apt/lists/* +RUN ln -s /usr/bin/python3 /usr/bin/python +RUN curl -LsSf https://astral.sh/uv/install.sh | sh +RUN (type -p wget >/dev/null || (apt-get update && apt-get install -y wget)) && \ + mkdir -p -m 755 /etc/apt/keyrings && \ + out=$(mktemp) && wget -qO "$out" https://cli.github.com/packages/githubcli-archive-keyring.gpg && \ + cat "$out" | tee /etc/apt/keyrings/githubcli-archive-keyring.gpg > /dev/null && \ + chmod go+r /etc/apt/keyrings/githubcli-archive-keyring.gpg && \ + echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/githubcli-archive-keyring.gpg] https://cli.github.com/packages stable main" | tee /etc/apt/sources.list.d/github-cli.list > /dev/null && \ + apt-get update && apt-get install -y gh && rm -rf /var/lib/apt/lists/* +RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - && \ + apt-get install -y nodejs && \ + npm install -g @anthropic-ai/claude-code @qwen-code/qwen-code && \ + rm -rf /var/lib/apt/lists/* +RUN useradd -m -s /bin/bash openfang && echo "openfang ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers.d/openfang +USER openfang +RUN NONINTERACTIVE=1 /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" +RUN eval "$(/home/linuxbrew/.linuxbrew/bin/brew shellenv)" && brew install steipete/tap/gogcli +USER root +RUN echo 'eval "$(/home/linuxbrew/.linuxbrew/bin/brew shellenv)"' >> /home/openfang/.bashrc && \ + echo 'eval "$(/home/linuxbrew/.linuxbrew/bin/brew shellenv)"' >> /root/.bashrc && \ + echo 'export PATH="/data/npm-global/bin:$PATH"' >> /home/openfang/.bashrc && \ + echo 'export PATH="/data/npm-global/bin:$PATH"' >> /root/.bashrc COPY --from=builder /build/target/release/openfang /usr/local/bin/ COPY --from=builder /build/agents /opt/openfang/agents +RUN mkdir -p /data && chown openfang:openfang /data +COPY entrypoint.sh /usr/local/bin/entrypoint.sh +RUN chmod +x /usr/local/bin/entrypoint.sh EXPOSE 4200 VOLUME /data ENV OPENFANG_HOME=/data -ENTRYPOINT ["openfang"] +ENTRYPOINT ["entrypoint.sh"] CMD ["start"] diff --git a/crates/openfang-api/src/routes.rs b/crates/openfang-api/src/routes.rs index 68e728673..919b5dc6d 100644 --- a/crates/openfang-api/src/routes.rs +++ b/crates/openfang-api/src/routes.rs @@ -358,27 +358,69 @@ pub async fn send_message( } } + // Convert metadata from the gateway into a SenderContext for the kernel + let sender_context = req.metadata.as_ref().map(|meta| { + openfang_types::message::SenderContext { + channel: meta.get("channel").and_then(|v| v.as_str().map(String::from)), + sender_id: meta.get("sender").and_then(|v| v.as_str().map(String::from)), + sender_name: meta.get("sender_name").and_then(|v| v.as_str().map(String::from)), + } + }); + + // SECURITY: Check allowed_users for channel-based messages (WhatsApp, etc.) + // If allowed_users is empty, all senders are permitted (open mode). + if let Some(ref ctx) = sender_context { + if let Some(ref sender_id) = ctx.sender_id { + if let Some(ref channel) = ctx.channel { + let channels_config = state.channels_config.read().await; + let blocked = match channel.as_str() { + "whatsapp" => channels_config.whatsapp.as_ref().map_or(false, |wa| { + !wa.allowed_users.is_empty() + && !wa.allowed_users.iter().any(|u| u == sender_id) + }), + _ => false, + }; + if blocked { + tracing::warn!( + "Rejected message from unlisted {channel} user {sender_id}" + ); + return ( + StatusCode::FORBIDDEN, + Json(serde_json::json!({"error": "Sender not in allowed_users list"})), + ); + } + } + } + } + let kernel_handle: Arc = state.kernel.clone() as Arc; match state .kernel - .send_message_with_handle(agent_id, &req.message, Some(kernel_handle)) + .send_message_with_handle_and_blocks(agent_id, &req.message, Some(kernel_handle), None, sender_context) .await { Ok(result) => { // Strip ... blocks from model output let cleaned = crate::ws::strip_think_tags(&result.response); - // Guard: ensure we never return an empty response to the client - let response = if cleaned.trim().is_empty() { - format!( - "[The agent completed processing but returned no text response. ({} in / {} out | {} iter)]", - result.total_usage.input_tokens, - result.total_usage.output_tokens, - result.iterations, - ) - } else { - cleaned - }; + // If the agent intentionally chose not to reply (NO_REPLY / [[silent]]), + // OR if the response is empty after stripping tags (agent only + // produced internal reasoning), treat as silent — never leak debug info. + if result.silent || cleaned.trim().is_empty() { + return ( + StatusCode::OK, + Json(serde_json::json!(MessageResponse { + response: String::new(), + input_tokens: result.total_usage.input_tokens, + output_tokens: result.total_usage.output_tokens, + iterations: result.iterations, + cost_usd: result.cost_usd, + silent: true, + })), + ); + } + + let response = cleaned; ( StatusCode::OK, Json(serde_json::json!(MessageResponse { @@ -387,6 +429,7 @@ pub async fn send_message( output_tokens: result.total_usage.output_tokens, iterations: result.iterations, cost_usd: result.cost_usd, + silent: false, })), ) } @@ -7023,6 +7066,7 @@ pub async fn set_provider_key( model: model_id, api_key_env: env_var.clone(), base_url: None, + vision_model: None, }; let mut guard = state .kernel @@ -7186,6 +7230,7 @@ pub async fn test_provider( Some(base_url) }, skip_permissions: true, + profiles: vec![], }; match openfang_runtime::drivers::create_driver(&driver_config) { diff --git a/crates/openfang-api/src/types.rs b/crates/openfang-api/src/types.rs index 80b71140a..503b8e6f6 100644 --- a/crates/openfang-api/src/types.rs +++ b/crates/openfang-api/src/types.rs @@ -42,6 +42,15 @@ pub struct MessageRequest { /// Optional file attachments (uploaded via /upload endpoint). #[serde(default)] pub attachments: Vec, + /// Optional channel metadata (sender identity, channel type). + /// + /// Used by external gateways (e.g. WhatsApp) to forward sender information + /// so the agent knows who is writing. Expected keys: + /// - `channel`: channel name (e.g. "whatsapp", "telegram") + /// - `sender`: platform-specific sender ID (e.g. phone number) + /// - `sender_name`: human-readable sender name + #[serde(default)] + pub metadata: Option>, } /// Response from sending a message. @@ -53,6 +62,8 @@ pub struct MessageResponse { pub iterations: u32, #[serde(skip_serializing_if = "Option::is_none")] pub cost_usd: Option, + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub silent: bool, } /// Request to install a skill from the marketplace. diff --git a/crates/openfang-api/src/ws.rs b/crates/openfang-api/src/ws.rs index bb3752c63..ca3bcf74d 100644 --- a/crates/openfang-api/src/ws.rs +++ b/crates/openfang-api/src/ws.rs @@ -636,17 +636,22 @@ async fn handle_text_message( // (e.g. MiniMax, DeepSeek reasoning tokens) let cleaned_response = strip_think_tags(&result.response); - // Guard: ensure we never send an empty response - let content = if cleaned_response.trim().is_empty() { - format!( - "[The agent completed processing but returned no text response. ({} in / {} out | {} iter)]", - result.total_usage.input_tokens, - result.total_usage.output_tokens, - result.iterations, + // If response is empty after stripping think tags, + // treat as silent — don't leak debug info + if cleaned_response.trim().is_empty() { + let _ = send_json( + sender, + &serde_json::json!({ + "type": "silent_complete", + "input_tokens": result.total_usage.input_tokens, + "output_tokens": result.total_usage.output_tokens, + }), ) - } else { - cleaned_response - }; + .await; + return; + } + + let content = cleaned_response; // Estimate context pressure from last call let per_call = if result.iterations > 0 { diff --git a/crates/openfang-api/static/css/layout.css b/crates/openfang-api/static/css/layout.css index 880e6ca3f..33919dac4 100644 --- a/crates/openfang-api/static/css/layout.css +++ b/crates/openfang-api/static/css/layout.css @@ -50,6 +50,11 @@ transition: opacity 0.2s, transform 0.2s; } +[data-theme="light"] .sidebar-logo img, +[data-theme="light"] .message-avatar img { + filter: invert(1); +} + .sidebar-logo img:hover { opacity: 1; transform: scale(1.05); diff --git a/crates/openfang-api/static/logo.png b/crates/openfang-api/static/logo.png index cfe72c0b5..7f900d403 100644 Binary files a/crates/openfang-api/static/logo.png and b/crates/openfang-api/static/logo.png differ diff --git a/crates/openfang-channels/src/bridge.rs b/crates/openfang-channels/src/bridge.rs index 60b295622..1db733868 100644 --- a/crates/openfang-channels/src/bridge.rs +++ b/crates/openfang-channels/src/bridge.rs @@ -529,26 +529,35 @@ async fn dispatch_message( return; } - // For images: download, base64 encode, and send as multimodal content blocks + // For images: build content blocks with the image URL for vision models. + // We pass the original URL rather than downloading + base64-encoding because + // many providers (DashScope/Qwen, OpenAI) prefer or require direct URLs. if let ChannelContent::Image { ref url, ref caption } = message.content { - let blocks = download_image_to_blocks(url, caption.as_deref()).await; - if blocks.iter().any(|b| matches!(b, ContentBlock::Image { .. })) { - // We have actual image data — send as structured blocks for vision - dispatch_with_blocks( - blocks, - message, - handle, - router, - adapter, - ct_str, - thread_id, - output_format, - lifecycle_reactions, - ) - .await; - return; + let mut blocks = Vec::new(); + if let Some(cap) = caption { + if !cap.is_empty() { + blocks.push(ContentBlock::Text { + text: cap.clone(), + provider_metadata: None, + }); + } } - // Image download failed — fall through to text description below + blocks.push(ContentBlock::ImageUrl { + url: url.clone(), + }); + dispatch_with_blocks( + blocks, + message, + handle, + router, + adapter, + ct_str, + thread_id, + output_format, + lifecycle_reactions, + ) + .await; + return; } let text = match &message.content { diff --git a/crates/openfang-channels/src/whatsapp.rs b/crates/openfang-channels/src/whatsapp.rs index 82ad5840d..b8508c3c3 100644 --- a/crates/openfang-channels/src/whatsapp.rs +++ b/crates/openfang-channels/src/whatsapp.rs @@ -162,8 +162,8 @@ impl WhatsAppAdapter { } /// Check if a phone number is allowed. - #[allow(dead_code)] - fn is_allowed(&self, phone: &str) -> bool { + /// Returns true if allowed_users is empty (open mode) or phone is in the list. + pub fn is_allowed(&self, phone: &str) -> bool { self.allowed_users.is_empty() || self.allowed_users.iter().any(|u| u == phone) } diff --git a/crates/openfang-kernel/src/kernel.rs b/crates/openfang-kernel/src/kernel.rs index 99f5de91f..1587fef20 100644 --- a/crates/openfang-kernel/src/kernel.rs +++ b/crates/openfang-kernel/src/kernel.rs @@ -582,6 +582,7 @@ impl OpenFangKernel { .clone() .or_else(|| config.provider_urls.get(&config.default_model.provider).cloned()), skip_permissions: true, + profiles: vec![], }; // Primary driver failure is non-fatal: the dashboard should remain accessible // even if the LLM provider is misconfigured. Users can fix config via dashboard. @@ -603,6 +604,7 @@ impl OpenFangKernel { api_key: std::env::var(env_var).ok(), base_url: config.provider_urls.get(provider).cloned(), skip_permissions: true, + profiles: vec![], }; match drivers::create_driver(&auto_config) { Ok(d) => { @@ -648,6 +650,7 @@ impl OpenFangKernel { .clone() .or_else(|| config.provider_urls.get(&fb.provider).cloned()), skip_permissions: true, + profiles: vec![], }; match drivers::create_driver(&fb_config) { Ok(d) => { @@ -1393,6 +1396,27 @@ impl OpenFangKernel { .await } + /// Send a message with channel sender context (from external gateways like WhatsApp). + /// + /// The sender context is injected into the agent's system prompt so the agent + /// can identify who is writing and apply appropriate privacy rules. + pub async fn send_message_with_sender_context( + &self, + agent_id: AgentId, + message: &str, + sender_context: openfang_types::message::SenderContext, + kernel_handle: Option>, + ) -> KernelResult { + self.send_message_with_handle_and_blocks( + agent_id, + message, + kernel_handle, + None, + Some(sender_context), + ) + .await + } + /// Send a multimodal message (text + images) to an agent and get a response. /// /// Used by channel bridges when a user sends a photo — the image is downloaded, @@ -1408,7 +1432,7 @@ impl OpenFangKernel { .get() .and_then(|w| w.upgrade()) .map(|arc| arc as Arc); - self.send_message_with_handle_and_blocks(agent_id, message, handle, Some(blocks)) + self.send_message_with_handle_and_blocks(agent_id, message, handle, Some(blocks), None) .await } @@ -1419,7 +1443,7 @@ impl OpenFangKernel { message: &str, kernel_handle: Option>, ) -> KernelResult { - self.send_message_with_handle_and_blocks(agent_id, message, kernel_handle, None) + self.send_message_with_handle_and_blocks(agent_id, message, kernel_handle, None, None) .await } @@ -1429,6 +1453,9 @@ impl OpenFangKernel { /// multimodal content (text + images) instead of just a text string. This /// enables vision models to process images sent from channels like Telegram. /// + /// When `sender_context` is `Some`, the sender identity is injected into the + /// agent's system prompt so it can distinguish between its owner and other users. + /// /// Per-agent locking ensures that concurrent messages for the same agent /// are serialized (preventing session corruption), while messages for /// different agents run in parallel. @@ -1438,6 +1465,7 @@ impl OpenFangKernel { message: &str, kernel_handle: Option>, content_blocks: Option>, + sender_context: Option, ) -> KernelResult { // Acquire per-agent lock to serialize concurrent messages for the same agent. // This prevents session corruption when multiple messages arrive in quick @@ -1467,7 +1495,7 @@ impl OpenFangKernel { self.execute_python_agent(&entry, agent_id, message).await } else { // Default: LLM agent loop (builtin:chat or any unrecognized module) - self.execute_llm_agent(&entry, agent_id, message, kernel_handle, content_blocks) + self.execute_llm_agent(&entry, agent_id, message, kernel_handle, content_blocks, sender_context) .await }; @@ -1710,6 +1738,8 @@ impl OpenFangKernel { .and_then(|(s, _)| s), user_name, channel_type: None, + sender_id: None, + sender_name: None, is_subagent: manifest .metadata .get("is_subagent") @@ -2076,6 +2106,7 @@ impl OpenFangKernel { message: &str, kernel_handle: Option>, content_blocks: Option>, + sender_context: Option, ) -> KernelResult { // Check metering quota before starting self.metering @@ -2180,7 +2211,9 @@ impl OpenFangKernel { .ok() .and_then(|(s, _)| s), user_name, - channel_type: None, + channel_type: sender_context.as_ref().and_then(|sc| sc.channel.clone()), + sender_id: sender_context.as_ref().and_then(|sc| sc.sender_id.clone()), + sender_name: sender_context.as_ref().and_then(|sc| sc.sender_name.clone()), is_subagent: manifest .metadata .get("is_subagent") @@ -2274,6 +2307,55 @@ impl OpenFangKernel { } } + // Vision model selection for image content. + // Priority: 1) explicit vision_model from config (forced override) + // 2) current agent model if it supports vision (no swap needed) + // 3) error — no vision capability available + if let Some(ref blocks) = content_blocks { + let has_images = blocks.iter().any(|b| { + matches!( + b, + openfang_types::message::ContentBlock::Image { .. } + | openfang_types::message::ContentBlock::ImageUrl { .. } + ) + }); + if has_images { + if let Some(ref vision_model) = self.config.default_model.vision_model { + // Explicit vision_model configured — always use it + info!( + agent = %manifest.name, + current_model = %manifest.model.model, + vision_model = %vision_model, + "Swapping to configured vision model for image content" + ); + manifest.model.model = vision_model.clone(); + manifest.model.provider = self.config.default_model.provider.clone(); + } else { + // No vision_model forced — check if current model handles vision + let current_supports_vision = self + .model_catalog + .read() + .ok() + .and_then(|cat| cat.find_model(&manifest.model.model).map(|m| m.supports_vision)) + .unwrap_or(false); + + if current_supports_vision { + info!( + agent = %manifest.name, + model = %manifest.model.model, + "Current model supports vision — no swap needed" + ); + } else { + warn!( + agent = %manifest.name, + model = %manifest.model.model, + "Image received but no vision_model configured and current model lacks vision support" + ); + } + } + } + } + let driver = self.resolve_driver(&manifest)?; // Look up model's actual context window from the catalog @@ -4342,6 +4424,7 @@ impl OpenFangKernel { api_key, base_url, skip_permissions: true, + profiles: vec![], }; match drivers::create_driver(&driver_config) { @@ -4388,6 +4471,7 @@ impl OpenFangKernel { .clone() .or_else(|| self.lookup_provider_url(&fb.provider)), skip_permissions: true, + profiles: vec![], }; match drivers::create_driver(&config) { Ok(d) => chain.push((d, fb.model.clone())), @@ -5133,7 +5217,7 @@ fn apply_budget_defaults( budget: &openfang_types::config::BudgetConfig, resources: &mut ResourceQuota, ) { - // Only override hourly if agent has unlimited (0.0) and global is set + // Only override hourly if agent has unlimited default (0.0) and global is set if budget.max_hourly_usd > 0.0 && resources.max_cost_per_hour_usd == 0.0 { resources.max_cost_per_hour_usd = budget.max_hourly_usd; } diff --git a/crates/openfang-memory/src/session.rs b/crates/openfang-memory/src/session.rs index 74862c372..a7d1a83aa 100644 --- a/crates/openfang-memory/src/session.rs +++ b/crates/openfang-memory/src/session.rs @@ -584,6 +584,9 @@ impl SessionStore { ContentBlock::Image { media_type, .. } => { text_parts.push(format!("[image: {media_type}]")); } + ContentBlock::ImageUrl { ref url } => { + text_parts.push(format!("[image: {url}]")); + } ContentBlock::Thinking { thinking } => { text_parts.push(format!( "[thinking: {}]", diff --git a/crates/openfang-runtime/src/compactor.rs b/crates/openfang-runtime/src/compactor.rs index 855705469..e9e246a86 100644 --- a/crates/openfang-runtime/src/compactor.rs +++ b/crates/openfang-runtime/src/compactor.rs @@ -399,6 +399,9 @@ fn build_conversation_text(messages: &[Message], config: &CompactionConfig) -> S ContentBlock::Image { media_type, .. } => { conversation_text.push_str(&format!("[Image: {media_type}]\n\n")); } + ContentBlock::ImageUrl { url } => { + conversation_text.push_str(&format!("[Image: {url}]\n\n")); + } ContentBlock::Thinking { .. } => {} ContentBlock::Unknown => {} } diff --git a/crates/openfang-runtime/src/drivers/anthropic.rs b/crates/openfang-runtime/src/drivers/anthropic.rs index 857774e26..4d79c2a81 100644 --- a/crates/openfang-runtime/src/drivers/anthropic.rs +++ b/crates/openfang-runtime/src/drivers/anthropic.rs @@ -573,6 +573,12 @@ fn convert_message(msg: &Message) -> ApiMessage { data: data.clone(), }, }), + ContentBlock::ImageUrl { url } => { + // Anthropic requires base64; pass as text description for now. + Some(ApiContentBlock::Text { + text: format!("[Image: {url}]"), + }) + } ContentBlock::ToolUse { id, name, input, .. } => Some(ApiContentBlock::ToolUse { id: id.clone(), name: name.clone(), diff --git a/crates/openfang-runtime/src/drivers/claude_code.rs b/crates/openfang-runtime/src/drivers/claude_code.rs index 1cdfe3b44..262c1c2a8 100644 --- a/crates/openfang-runtime/src/drivers/claude_code.rs +++ b/crates/openfang-runtime/src/drivers/claude_code.rs @@ -7,8 +7,9 @@ use crate::llm_driver::{CompletionRequest, CompletionResponse, LlmDriver, LlmError, StreamEvent}; use async_trait::async_trait; -use openfang_types::message::{ContentBlock, Role, StopReason, TokenUsage}; +use openfang_types::message::{ContentBlock, MessageContent, Role, StopReason, TokenUsage}; use serde::Deserialize; +use std::path::PathBuf; use tokio::io::AsyncBufReadExt; use tracing::{debug, warn}; @@ -90,8 +91,12 @@ impl ClaudeCodeDriver { } /// Build a text prompt from the completion request messages. - fn build_prompt(request: &CompletionRequest) -> String { + /// + /// Image content blocks are referenced using Claude Code's `@path` syntax, + /// which tells the CLI to read the local file directly. + pub fn build_prompt(request: &CompletionRequest, image_files: &[PathBuf]) -> String { let mut parts = Vec::new(); + let mut img_idx = 0; if let Some(ref sys) = request.system { parts.push(format!("[System]\n{sys}")); @@ -103,17 +108,134 @@ impl ClaudeCodeDriver { Role::Assistant => "Assistant", Role::System => "System", }; - let text = msg.content.text_content(); - if !text.is_empty() { - parts.push(format!("[{role_label}]\n{text}")); + + let mut msg_parts = Vec::new(); + + match &msg.content { + MessageContent::Text(s) => { + if !s.is_empty() { + msg_parts.push(s.clone()); + } + } + MessageContent::Blocks(blocks) => { + for block in blocks { + match block { + ContentBlock::Text { text, .. } => { + if !text.is_empty() { + msg_parts.push(text.clone()); + } + } + ContentBlock::Image { .. } | ContentBlock::ImageUrl { .. } => { + if img_idx < image_files.len() { + let path = &image_files[img_idx]; + msg_parts.push(format!("@{}", path.display())); + img_idx += 1; + } + } + _ => {} + } + } + } + } + + if !msg_parts.is_empty() { + let combined = msg_parts.join("\n"); + parts.push(format!("[{role_label}]\n{combined}")); } } parts.join("\n\n") } + /// Extract image content blocks from messages and write them to temp files. + /// + /// Returns the list of temp file paths. The caller is responsible for + /// cleaning them up after the CLI finishes. + pub async fn extract_images_to_temp(request: &CompletionRequest) -> Vec { + use base64::Engine; + + let mut paths = Vec::new(); + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or(0); + + for msg in &request.messages { + if let MessageContent::Blocks(blocks) = &msg.content { + for (i, block) in blocks.iter().enumerate() { + match block { + ContentBlock::Image { media_type, data } => { + let ext = media_type + .strip_prefix("image/") + .unwrap_or("png"); + let path = PathBuf::from(format!( + "/tmp/openfang-img-{ts}-{i}.{ext}" + )); + if let Ok(bytes) = + base64::engine::general_purpose::STANDARD.decode(data) + { + if std::fs::write(&path, &bytes).is_ok() { + paths.push(path); + } + } + } + ContentBlock::ImageUrl { url } => { + // If it's a data URI, decode it; otherwise download + if let Some(rest) = url.strip_prefix("data:") { + // data:image/png;base64, + if let Some((meta, b64)) = rest.split_once(",") { + let ext = meta + .split(';') + .next() + .and_then(|m| m.strip_prefix("image/")) + .unwrap_or("png"); + let path = PathBuf::from(format!( + "/tmp/openfang-img-{ts}-{i}.{ext}" + )); + if let Ok(bytes) = + base64::engine::general_purpose::STANDARD.decode(b64) + { + if std::fs::write(&path, &bytes).is_ok() { + paths.push(path); + } + } + } + } else { + // HTTP(S) URL — try to download + let path = PathBuf::from(format!( + "/tmp/openfang-img-{ts}-{i}.jpg" + )); + match reqwest::get(url).await { + Ok(resp) => { + if let Ok(bytes) = resp.bytes().await { + if std::fs::write(&path, &bytes).is_ok() { + paths.push(path); + } + } + } + Err(e) => { + warn!(url = %url, error = %e, "Failed to download image for Claude CLI"); + } + } + } + } + _ => {} + } + } + } + } + paths + } + + /// Clean up temporary image files. + pub fn cleanup_temp_images(paths: &[PathBuf]) { + for p in paths { + let _ = std::fs::remove_file(p); + } + } + /// Map a model ID like "claude-code/opus" to CLI --model flag value. - fn model_flag(model: &str) -> Option { + pub fn model_flag(model: &str) -> Option { let stripped = model .strip_prefix("claude-code/") .unwrap_or(model); @@ -125,12 +247,48 @@ impl ClaudeCodeDriver { } } + /// Build the CLI argument list for a completion request. + /// + /// Exposed as a testable method so unit tests can verify that + /// `--dangerously-skip-permissions`, `--model`, and output format flags + /// are set correctly. + pub fn build_args( + &self, + prompt: &str, + model_flag: Option<&str>, + streaming: bool, + ) -> Vec { + let mut args = vec![ + "-p".to_string(), + prompt.to_string(), + ]; + + if self.skip_permissions { + args.push("--dangerously-skip-permissions".to_string()); + } + + args.push("--output-format".to_string()); + if streaming { + args.push("stream-json".to_string()); + args.push("--verbose".to_string()); + } else { + args.push("json".to_string()); + } + + if let Some(model) = model_flag { + args.push("--model".to_string()); + args.push(model.to_string()); + } + + args + } + /// Apply security env filtering to a command. /// /// Instead of `env_clear()` (which breaks Node.js, NVM, SSL, proxies), /// we keep the full environment and only remove known sensitive API keys /// from other LLM providers. - fn apply_env_filter(cmd: &mut tokio::process::Command) { + pub fn apply_env_filter(cmd: &mut tokio::process::Command) { for key in SENSITIVE_ENV_EXACT { cmd.env_remove(key); } @@ -196,22 +354,14 @@ impl LlmDriver for ClaudeCodeDriver { &self, request: CompletionRequest, ) -> Result { - let prompt = Self::build_prompt(&request); + let image_files = Self::extract_images_to_temp(&request).await; + let prompt = Self::build_prompt(&request, &image_files); let model_flag = Self::model_flag(&request.model); - let mut cmd = tokio::process::Command::new(&self.cli_path); - cmd.arg("-p") - .arg(&prompt) - .arg("--output-format") - .arg("json"); + let args = self.build_args(&prompt, model_flag.as_deref(), false); - if self.skip_permissions { - cmd.arg("--dangerously-skip-permissions"); - } - - if let Some(ref model) = model_flag { - cmd.arg("--model").arg(model); - } + let mut cmd = tokio::process::Command::new(&self.cli_path); + cmd.args(&args); Self::apply_env_filter(&mut cmd); @@ -230,6 +380,7 @@ impl LlmDriver for ClaudeCodeDriver { )))?; if !output.status.success() { + Self::cleanup_temp_images(&image_files); let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); let detail = if !stderr.is_empty() { &stderr } else { &stdout }; @@ -261,6 +412,7 @@ impl LlmDriver for ClaudeCodeDriver { }); } + Self::cleanup_temp_images(&image_files); let stdout = String::from_utf8_lossy(&output.stdout); // Try JSON parse first @@ -299,23 +451,14 @@ impl LlmDriver for ClaudeCodeDriver { request: CompletionRequest, tx: tokio::sync::mpsc::Sender, ) -> Result { - let prompt = Self::build_prompt(&request); + let image_files = Self::extract_images_to_temp(&request).await; + let prompt = Self::build_prompt(&request, &image_files); let model_flag = Self::model_flag(&request.model); - let mut cmd = tokio::process::Command::new(&self.cli_path); - cmd.arg("-p") - .arg(&prompt) - .arg("--output-format") - .arg("stream-json") - .arg("--verbose"); - - if self.skip_permissions { - cmd.arg("--dangerously-skip-permissions"); - } + let args = self.build_args(&prompt, model_flag.as_deref(), true); - if let Some(ref model) = model_flag { - cmd.arg("--model").arg(model); - } + let mut cmd = tokio::process::Command::new(&self.cli_path); + cmd.args(&args); Self::apply_env_filter(&mut cmd); @@ -412,6 +555,8 @@ impl LlmDriver for ClaudeCodeDriver { .await .map_err(|e| LlmError::Http(format!("Claude CLI wait failed: {e}")))?; + Self::cleanup_temp_images(&image_files); + if !status.success() { warn!(code = ?status.code(), "Claude CLI exited with error"); } @@ -486,7 +631,7 @@ mod tests { thinking: None, }; - let prompt = ClaudeCodeDriver::build_prompt(&request); + let prompt = ClaudeCodeDriver::build_prompt(&request, &[]); assert!(prompt.contains("[System]")); assert!(prompt.contains("You are helpful.")); assert!(prompt.contains("[User]")); @@ -538,6 +683,34 @@ mod tests { assert!(!driver.skip_permissions); } + #[test] + fn test_build_args_with_skip_permissions() { + let driver = ClaudeCodeDriver::new(None, true); + let args = driver.build_args("hello", Some("opus"), false); + assert!(args.contains(&"--dangerously-skip-permissions".to_string()), + "should contain --dangerously-skip-permissions when skip_permissions=true"); + assert!(args.contains(&"json".to_string())); + assert!(args.contains(&"--model".to_string())); + assert!(args.contains(&"opus".to_string())); + } + + #[test] + fn test_build_args_without_skip_permissions() { + let driver = ClaudeCodeDriver::new(None, false); + let args = driver.build_args("hello", Some("sonnet"), false); + assert!(!args.contains(&"--dangerously-skip-permissions".to_string()), + "should NOT contain --dangerously-skip-permissions when skip_permissions=false"); + } + + #[test] + fn test_build_args_streaming() { + let driver = ClaudeCodeDriver::new(None, true); + let args = driver.build_args("hello", None, true); + assert!(args.contains(&"stream-json".to_string())); + assert!(args.contains(&"--verbose".to_string())); + assert!(!args.contains(&"--model".to_string()), "no model flag when model_flag is None"); + } + #[test] fn test_sensitive_env_list_coverage() { // Ensure all major provider keys are in the strip list diff --git a/crates/openfang-runtime/src/drivers/gemini.rs b/crates/openfang-runtime/src/drivers/gemini.rs index 05144d30e..51369ee90 100644 --- a/crates/openfang-runtime/src/drivers/gemini.rs +++ b/crates/openfang-runtime/src/drivers/gemini.rs @@ -304,6 +304,14 @@ fn convert_messages( }, }); } + ContentBlock::ImageUrl { url } => { + // Gemini supports fileData for URL-based images; + // fall back to a text description if not supported. + parts.push(GeminiPart::Text { + text: format!("[Image: {url}]"), + thought_signature: None, + }); + } ContentBlock::ToolResult { content, tool_name, .. } => { diff --git a/crates/openfang-runtime/src/drivers/mod.rs b/crates/openfang-runtime/src/drivers/mod.rs index 4ecb7c0c0..9dfebffd4 100644 --- a/crates/openfang-runtime/src/drivers/mod.rs +++ b/crates/openfang-runtime/src/drivers/mod.rs @@ -7,6 +7,7 @@ pub mod anthropic; pub mod claude_code; pub mod copilot; +pub mod multi_profile; pub mod fallback; pub mod gemini; pub mod openai; @@ -151,6 +152,11 @@ fn provider_defaults(provider: &str) -> Option { api_key_env: "", key_required: false, }), + "qwen-code" => Some(ProviderDefaults { + base_url: "", + api_key_env: "", + key_required: false, + }), "moonshot" | "kimi" | "kimi2" => Some(ProviderDefaults { base_url: MOONSHOT_BASE_URL, api_key_env: "MOONSHOT_API_KEY", @@ -301,9 +307,18 @@ pub fn create_driver(config: &DriverConfig) -> Result, LlmErr return Ok(Arc::new(openai::OpenAIDriver::new(api_key, base_url))); } - // Claude Code CLI — subprocess-based, no API key needed + // Claude Code CLI — subprocess-based, no API key needed. + // When multiple profiles are configured, use the multi-profile driver + // for automatic rotation on rate-limit errors. if provider == "claude-code" { let cli_path = config.base_url.clone(); + if config.profiles.len() > 1 { + return Ok(Arc::new(multi_profile::MultiProfileDriver::new( + cli_path, + config.skip_permissions, + config.profiles.clone(), + ))); + } return Ok(Arc::new(claude_code::ClaudeCodeDriver::new( cli_path, config.skip_permissions, @@ -421,7 +436,7 @@ pub fn create_driver(config: &DriverConfig) -> Result, LlmErr "Unknown provider '{}'. Supported: anthropic, gemini, openai, groq, openrouter, \ deepseek, together, mistral, fireworks, ollama, vllm, lmstudio, perplexity, \ cohere, ai21, cerebras, sambanova, huggingface, xai, replicate, github-copilot, \ - chutes, venice, codex, claude-code. Or set base_url for a custom OpenAI-compatible endpoint.", + chutes, venice, codex, claude-code, qwen-code. Or set base_url for a custom OpenAI-compatible endpoint.", provider ), }) @@ -537,6 +552,7 @@ mod tests { api_key: Some("test".to_string()), base_url: Some("http://localhost:9999/v1".to_string()), skip_permissions: true, + profiles: vec![], }; let driver = create_driver(&config); assert!(driver.is_ok()); @@ -549,6 +565,7 @@ mod tests { api_key: None, base_url: None, skip_permissions: true, + profiles: vec![], }; let driver = create_driver(&config); assert!(driver.is_err()); @@ -651,6 +668,7 @@ mod tests { api_key: None, // not explicitly passed base_url: Some("https://integrate.api.nvidia.com/v1".to_string()), skip_permissions: true, + profiles: vec![], }; let driver = create_driver(&config); assert!(driver.is_ok(), "Custom provider with env var convention should succeed"); @@ -665,6 +683,7 @@ mod tests { api_key: None, base_url: None, skip_permissions: true, + profiles: vec![], }; let driver = create_driver(&config); assert!(driver.is_err()); @@ -680,6 +699,7 @@ mod tests { api_key: None, base_url: None, skip_permissions: true, + profiles: vec![], }; let result = create_driver(&config); assert!(result.is_err()); @@ -704,6 +724,7 @@ mod tests { api_key: Some("explicit-key".to_string()), base_url: Some("https://api.example.com/v1".to_string()), skip_permissions: true, + profiles: vec![], }; let driver = create_driver(&config); assert!(driver.is_ok()); diff --git a/crates/openfang-runtime/src/drivers/multi_profile.rs b/crates/openfang-runtime/src/drivers/multi_profile.rs new file mode 100644 index 000000000..b7b6a0a13 --- /dev/null +++ b/crates/openfang-runtime/src/drivers/multi_profile.rs @@ -0,0 +1,1111 @@ +//! Multi-profile wrapper for CLI-based LLM drivers (Claude Code, Qwen Code). +//! +//! When a user has multiple subscriptions, each with its own OAuth token stored +//! in a separate config directory, this wrapper automatically rotates between +//! them on rate-limit errors. Cooldown timestamps are derived from the +//! Anthropic usage API (`/api/oauth/usage`) so profiles are re-enabled at +//! exactly the right time. + +use crate::llm_driver::{CompletionRequest, CompletionResponse, LlmDriver, LlmError, StreamEvent}; +use async_trait::async_trait; +use serde::Deserialize; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::{debug, info, warn}; + +// ── Anthropic usage API ────────────────────────────────────────────────── + +const ANTHROPIC_USAGE_URL: &str = "https://api.anthropic.com/api/oauth/usage"; +const ANTHROPIC_BETA_HEADER: &str = "oauth-2025-04-20"; + +/// Utilisation data for a single rate-limit window. +#[derive(Debug, Deserialize)] +struct UsageWindow { + /// Percentage of the window consumed (0.0 – 100.0). + #[serde(default)] + utilization: f64, + /// ISO-8601 timestamp when the window resets. + #[serde(default)] + resets_at: Option, +} + +/// Response from `GET /api/oauth/usage`. +#[derive(Debug, Deserialize)] +struct UsageResponse { + #[serde(default)] + five_hour: Option, + #[serde(default)] + seven_day: Option, +} + +/// Parsed usage info we care about. +#[derive(Debug, Clone)] +pub struct ProfileUsage { + pub five_hour_utilization: f64, + pub five_hour_resets_at: Option>, + pub seven_day_utilization: f64, +} + +// ── Profile state ──────────────────────────────────────────────────────── + +/// Runtime state for a single credential profile. +struct ProfileState { + /// Display name (directory basename). + name: String, + /// Absolute path to the config directory containing `.credentials.json`. + config_dir: PathBuf, + /// When `Some`, the profile is in cooldown until this instant. + cooldown_until: Option, + /// Cached OAuth access token (read once from disk). + access_token: Option, +} + +impl ProfileState { + fn new(config_dir: PathBuf) -> Self { + let name = config_dir + .file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| "default".to_string()); + + Self { + name, + config_dir, + cooldown_until: None, + access_token: None, + } + } + + /// Read the OAuth access token from the credentials file on disk. + fn load_token(&mut self) -> Option<&str> { + if self.access_token.is_some() { + return self.access_token.as_deref(); + } + + let cred_paths = [ + self.config_dir.join(".credentials.json"), + self.config_dir.join("credentials.json"), + ]; + + for path in &cred_paths { + if let Ok(contents) = std::fs::read_to_string(path) { + if let Ok(json) = serde_json::from_str::(&contents) { + if let Some(token) = json + .get("claudeAiOauth") + .and_then(|o| o.get("accessToken")) + .and_then(|t| t.as_str()) + { + self.access_token = Some(token.to_string()); + debug!(profile = %self.name, path = %path.display(), "Loaded OAuth token"); + return self.access_token.as_deref(); + } + } + } + } + + warn!(profile = %self.name, "No valid credentials found"); + None + } + + /// Check if this profile is currently in cooldown. + fn is_available(&self) -> bool { + match self.cooldown_until { + Some(until) => std::time::Instant::now() >= until, + None => true, + } + } + + /// Put this profile in cooldown until the given instant. + fn set_cooldown(&mut self, until: std::time::Instant) { + info!( + profile = %self.name, + cooldown_secs = until.duration_since(std::time::Instant::now()).as_secs(), + "Profile entering cooldown" + ); + self.cooldown_until = Some(until); + } + + /// Clear cooldown (e.g. after a successful request). + fn clear_cooldown(&mut self) { + if self.cooldown_until.is_some() { + info!(profile = %self.name, "Profile cooldown cleared"); + self.cooldown_until = None; + } + } +} + +// ── Usage API client ───────────────────────────────────────────────────── + +/// Fetch usage from the Anthropic API for a given OAuth token. +async fn fetch_usage(access_token: &str) -> Result { + let client = reqwest::Client::new(); + let resp = client + .get(ANTHROPIC_USAGE_URL) + .header("Authorization", format!("Bearer {access_token}")) + .header("anthropic-beta", ANTHROPIC_BETA_HEADER) + .send() + .await + .map_err(|e| format!("HTTP request failed: {e}"))?; + + if resp.status() == 401 { + return Err("OAuth token expired — re-authenticate this profile".to_string()); + } + + let data: UsageResponse = resp + .json() + .await + .map_err(|e| format!("Failed to parse usage response: {e}"))?; + + let (five_util, five_resets) = match data.five_hour { + Some(w) => { + let resets = w.resets_at.and_then(|s| { + chrono::DateTime::parse_from_rfc3339(&s) + .ok() + .map(|dt| dt.with_timezone(&chrono::Utc)) + }); + (w.utilization, resets) + } + None => (0.0, None), + }; + + let seven_util = data.seven_day.map(|w| w.utilization).unwrap_or(0.0); + + Ok(ProfileUsage { + five_hour_utilization: five_util, + five_hour_resets_at: five_resets, + seven_day_utilization: seven_util, + }) +} + +/// Convert a future UTC reset time to a `std::time::Instant`. +fn utc_to_instant(dt: chrono::DateTime) -> std::time::Instant { + let now_utc = chrono::Utc::now(); + if dt <= now_utc { + return std::time::Instant::now(); + } + let delta = (dt - now_utc).to_std().unwrap_or(std::time::Duration::from_secs(0)); + // Add a small buffer (30s) to avoid racing the reset + std::time::Instant::now() + delta + std::time::Duration::from_secs(30) +} + +/// Default cooldown when we can't determine the reset time (30 minutes). +const FALLBACK_COOLDOWN_SECS: u64 = 30 * 60; + +// ── Multi-profile driver ───────────────────────────────────────────────── + +/// A wrapper driver that manages multiple credential profiles for a +/// CLI-based LLM provider, rotating on rate-limit errors. +pub struct MultiProfileDriver { + /// The underlying driver factory. For each request we pick a profile, + /// set `CLAUDE_CONFIG_DIR`, and delegate to a fresh driver instance. + cli_path: String, + skip_permissions: bool, + /// Mutable profile state, protected by a mutex. + profiles: Arc>>, + /// Index of the currently active profile. + current: Arc>, +} + +impl MultiProfileDriver { + /// Create a new multi-profile driver. + /// + /// `profile_dirs` must contain at least one path. Each path should be + /// a directory containing Claude OAuth credentials. If only one path + /// is given, the driver behaves identically to a plain `ClaudeCodeDriver` + /// but with rate-limit detection and reporting. + pub fn new( + cli_path: Option, + skip_permissions: bool, + profile_dirs: Vec, + ) -> Self { + let profiles: Vec = profile_dirs + .into_iter() + .map(|dir| { + let expanded = expand_tilde(&dir); + ProfileState::new(PathBuf::from(expanded)) + }) + .collect(); + + let names: Vec<&str> = profiles.iter().map(|p| p.name.as_str()).collect(); + info!(profiles = ?names, "Multi-profile Claude driver initialized"); + + Self { + cli_path: cli_path + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| "claude".to_string()), + skip_permissions, + profiles: Arc::new(Mutex::new(profiles)), + current: Arc::new(Mutex::new(0)), + } + } + + /// Pick the next available (non-cooldown) profile index. + /// Returns `None` if all profiles are in cooldown. + async fn next_available(&self) -> Option<(usize, PathBuf)> { + let profiles = self.profiles.lock().await; + let count = profiles.len(); + let current = *self.current.lock().await; + + // Try from current, then wrap around + for offset in 0..count { + let idx = (current + offset) % count; + if profiles[idx].is_available() { + return Some((idx, profiles[idx].config_dir.clone())); + } + } + None + } + + /// Mark a profile as rate-limited, querying the API for the exact + /// cooldown duration. + async fn handle_rate_limit(&self, profile_idx: usize) { + let mut profiles = self.profiles.lock().await; + let profile = &mut profiles[profile_idx]; + + // Try to get exact reset time from API + let cooldown_until = if let Some(token) = profile.load_token() { + let token = token.to_string(); + // Drop lock before the async call + drop(profiles); + + match fetch_usage(&token).await { + Ok(usage) => { + info!( + profile_idx, + five_hour_pct = usage.five_hour_utilization, + seven_day_pct = usage.seven_day_utilization, + resets_at = ?usage.five_hour_resets_at, + "Usage API response" + ); + usage + .five_hour_resets_at + .map(utc_to_instant) + .unwrap_or_else(|| { + std::time::Instant::now() + + std::time::Duration::from_secs(FALLBACK_COOLDOWN_SECS) + }) + } + Err(e) => { + warn!(error = %e, "Failed to fetch usage — using fallback cooldown"); + std::time::Instant::now() + + std::time::Duration::from_secs(FALLBACK_COOLDOWN_SECS) + } + } + } else { + drop(profiles); + std::time::Instant::now() + + std::time::Duration::from_secs(FALLBACK_COOLDOWN_SECS) + }; + + // Re-acquire lock and set cooldown + let mut profiles = self.profiles.lock().await; + profiles[profile_idx].set_cooldown(cooldown_until); + + // Advance current to next available + let count = profiles.len(); + for offset in 1..count { + let next = (profile_idx + offset) % count; + if profiles[next].is_available() { + *self.current.lock().await = next; + info!( + from = profiles[profile_idx].name, + to = profiles[next].name, + "Rotated to next profile" + ); + return; + } + } + warn!("All profiles are in cooldown — requests will fail until a window resets"); + } + + /// Check if an error is a rate-limit error. + fn is_rate_limit_error(err: &LlmError) -> bool { + match err { + LlmError::Api { status, message } => { + *status == 429 + || message.to_lowercase().contains("rate limit") + || message.to_lowercase().contains("quota") + || message.to_lowercase().contains("too many requests") + || message.to_lowercase().contains("usage limit") + || message.to_lowercase().contains("overloaded") + } + LlmError::Http(msg) => { + let lower = msg.to_lowercase(); + lower.contains("rate limit") + || lower.contains("429") + || lower.contains("quota") + || lower.contains("usage limit") + } + _ => false, + } + } + +} + +#[async_trait] +impl LlmDriver for MultiProfileDriver { + async fn complete( + &self, + request: CompletionRequest, + ) -> Result { + let profiles_count = self.profiles.lock().await.len(); + + // Try each available profile + for _attempt in 0..profiles_count { + let (idx, config_dir) = match self.next_available().await { + Some(pair) => pair, + None => { + return Err(LlmError::Api { + status: 429, + message: "All Claude Code profiles are rate-limited. \ + Requests will resume when a rate-limit window resets." + .to_string(), + }); + } + }; + + let profile_name = { + let profiles = self.profiles.lock().await; + profiles[idx].name.clone() + }; + + debug!(profile = %profile_name, config_dir = %config_dir.display(), "Using profile for completion"); + + let result = complete_with_config_dir( + &self.cli_path, + self.skip_permissions, + &config_dir, + request.clone(), + ) + .await; + + match &result { + Ok(_) => { + // Success — clear any stale cooldown + let mut profiles = self.profiles.lock().await; + profiles[idx].clear_cooldown(); + *self.current.lock().await = idx; + return result; + } + Err(e) if Self::is_rate_limit_error(e) => { + warn!(profile = %profile_name, error = %e, "Rate limit hit — rotating"); + self.handle_rate_limit(idx).await; + continue; + } + Err(_) => { + // Non-rate-limit error — don't rotate, just return + return result; + } + } + } + + Err(LlmError::Api { + status: 429, + message: "All Claude Code profiles exhausted after rotation attempts".to_string(), + }) + } + + async fn stream( + &self, + request: CompletionRequest, + tx: tokio::sync::mpsc::Sender, + ) -> Result { + let profiles_count = self.profiles.lock().await.len(); + + for _attempt in 0..profiles_count { + let (idx, config_dir) = match self.next_available().await { + Some(pair) => pair, + None => { + return Err(LlmError::Api { + status: 429, + message: "All Claude Code profiles are rate-limited. \ + Requests will resume when a rate-limit window resets." + .to_string(), + }); + } + }; + + let profile_name = { + let profiles = self.profiles.lock().await; + profiles[idx].name.clone() + }; + + debug!(profile = %profile_name, "Using profile for streaming"); + + let result = stream_with_config_dir( + &self.cli_path, + self.skip_permissions, + &config_dir, + request.clone(), + tx.clone(), + ) + .await; + + match &result { + Ok(_) => { + let mut profiles = self.profiles.lock().await; + profiles[idx].clear_cooldown(); + *self.current.lock().await = idx; + return result; + } + Err(e) if Self::is_rate_limit_error(e) => { + warn!(profile = %profile_name, error = %e, "Rate limit hit — rotating"); + self.handle_rate_limit(idx).await; + continue; + } + Err(_) => return result, + } + } + + Err(LlmError::Api { + status: 429, + message: "All Claude Code profiles exhausted after rotation attempts".to_string(), + }) + } +} + +// ── Helpers: spawn claude CLI with CLAUDE_CONFIG_DIR ────────────────────── + +/// Run a non-streaming completion using the Claude CLI with a specific config dir. +async fn complete_with_config_dir( + cli_path: &str, + skip_permissions: bool, + config_dir: &Path, + request: CompletionRequest, +) -> Result { + use super::claude_code::ClaudeCodeDriver; + use openfang_types::message::{ContentBlock, StopReason, TokenUsage}; + + let driver = ClaudeCodeDriver::new(Some(cli_path.to_string()), skip_permissions); + let image_files = ClaudeCodeDriver::extract_images_to_temp(&request).await; + let prompt = ClaudeCodeDriver::build_prompt(&request, &image_files); + let model_flag = ClaudeCodeDriver::model_flag(&request.model); + let args = driver.build_args(&prompt, model_flag.as_deref(), false); + + let mut cmd = tokio::process::Command::new(cli_path); + cmd.args(&args); + cmd.env("CLAUDE_CONFIG_DIR", config_dir); + ClaudeCodeDriver::apply_env_filter(&mut cmd); + cmd.stdout(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::piped()); + + debug!(config_dir = %config_dir.display(), "Spawning Claude CLI with profile"); + + let output = cmd.output().await.map_err(|e| { + LlmError::Http(format!("Claude Code CLI failed to start: {e}")) + })?; + + ClaudeCodeDriver::cleanup_temp_images(&image_files); + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); + let detail = if !stderr.is_empty() { &stderr } else { &stdout }; + let code = output.status.code().unwrap_or(1); + + return Err(LlmError::Api { + status: code as u16, + message: format!("Claude Code CLI exited with code {code}: {detail}"), + }); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + + // Try JSON parse + if let Ok(parsed) = serde_json::from_str::(&stdout) { + let text = parsed + .get("result") + .or_else(|| parsed.get("content")) + .or_else(|| parsed.get("text")) + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + + let (input_tokens, output_tokens) = parsed + .get("usage") + .map(|u| { + ( + u.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0), + u.get("output_tokens").and_then(|v| v.as_u64()).unwrap_or(0), + ) + }) + .unwrap_or((0, 0)); + + return Ok(CompletionResponse { + content: vec![ContentBlock::Text { + text, + provider_metadata: None, + }], + stop_reason: StopReason::EndTurn, + tool_calls: Vec::new(), + usage: TokenUsage { + input_tokens, + output_tokens, + }, + }); + } + + // Fallback: plain text + Ok(CompletionResponse { + content: vec![ContentBlock::Text { + text: stdout.trim().to_string(), + provider_metadata: None, + }], + stop_reason: StopReason::EndTurn, + tool_calls: Vec::new(), + usage: TokenUsage { + input_tokens: 0, + output_tokens: 0, + }, + }) +} + +/// Run a streaming completion using the Claude CLI with a specific config dir. +async fn stream_with_config_dir( + cli_path: &str, + skip_permissions: bool, + config_dir: &Path, + request: CompletionRequest, + tx: tokio::sync::mpsc::Sender, +) -> Result { + use super::claude_code::ClaudeCodeDriver; + use openfang_types::message::{ContentBlock, StopReason, TokenUsage}; + use tokio::io::AsyncBufReadExt; + + let driver = ClaudeCodeDriver::new(Some(cli_path.to_string()), skip_permissions); + let image_files = ClaudeCodeDriver::extract_images_to_temp(&request).await; + let prompt = ClaudeCodeDriver::build_prompt(&request, &image_files); + let model_flag = ClaudeCodeDriver::model_flag(&request.model); + let args = driver.build_args(&prompt, model_flag.as_deref(), true); + + let mut cmd = tokio::process::Command::new(cli_path); + cmd.args(&args); + cmd.env("CLAUDE_CONFIG_DIR", config_dir); + ClaudeCodeDriver::apply_env_filter(&mut cmd); + cmd.stdout(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::piped()); + + let mut child = cmd.spawn().map_err(|e| { + LlmError::Http(format!("Claude Code CLI failed to start: {e}")) + })?; + + let stdout = child + .stdout + .take() + .ok_or_else(|| LlmError::Http("No stdout from claude CLI".to_string()))?; + + let reader = tokio::io::BufReader::new(stdout); + let mut lines = reader.lines(); + + let mut full_text = String::new(); + let mut final_usage = TokenUsage { + input_tokens: 0, + output_tokens: 0, + }; + + while let Ok(Some(line)) = lines.next_line().await { + if line.trim().is_empty() { + continue; + } + + if let Ok(event) = serde_json::from_str::(&line) { + let event_type = event.get("type").and_then(|v| v.as_str()).unwrap_or(""); + + match event_type { + "content" | "text" | "assistant" | "content_block_delta" => { + if let Some(content) = event.get("content").and_then(|v| v.as_str()) { + full_text.push_str(content); + let _ = tx.send(StreamEvent::TextDelta { text: content.to_string() }).await; + } + } + "result" | "done" | "complete" => { + if let Some(result) = event.get("result").and_then(|v| v.as_str()) { + if full_text.is_empty() { + full_text = result.to_string(); + let _ = tx.send(StreamEvent::TextDelta { text: result.to_string() }).await; + } + } + if let Some(usage) = event.get("usage") { + final_usage = TokenUsage { + input_tokens: usage.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0), + output_tokens: usage.get("output_tokens").and_then(|v| v.as_u64()).unwrap_or(0), + }; + } + } + _ => { + if let Some(content) = event.get("content").and_then(|v| v.as_str()) { + full_text.push_str(content); + let _ = tx.send(StreamEvent::TextDelta { text: content.to_string() }).await; + } + } + } + } else { + full_text.push_str(&line); + let _ = tx.send(StreamEvent::TextDelta { text: line }).await; + } + } + + let status = child.wait().await.map_err(|e| { + LlmError::Http(format!("Claude CLI wait failed: {e}")) + })?; + + ClaudeCodeDriver::cleanup_temp_images(&image_files); + + if !status.success() { + let code = status.code().unwrap_or(1); + // Check if the text we got so far indicates rate limiting + let lower = full_text.to_lowercase(); + if lower.contains("rate limit") || lower.contains("quota") || lower.contains("usage limit") { + return Err(LlmError::Api { + status: 429, + message: full_text, + }); + } + if full_text.is_empty() { + return Err(LlmError::Api { + status: code as u16, + message: format!("Claude Code CLI exited with code {code}"), + }); + } + } + + let _ = tx + .send(StreamEvent::ContentComplete { + stop_reason: StopReason::EndTurn, + usage: final_usage, + }) + .await; + + Ok(CompletionResponse { + content: vec![ContentBlock::Text { + text: full_text, + provider_metadata: None, + }], + stop_reason: StopReason::EndTurn, + tool_calls: Vec::new(), + usage: final_usage, + }) +} + +/// Expand a leading `~` to the user's home directory. +fn expand_tilde(path: &str) -> String { + if let Some(rest) = path.strip_prefix("~/") { + if let Ok(home) = std::env::var("HOME") { + return format!("{home}/{rest}"); + } + } else if path == "~" { + if let Ok(home) = std::env::var("HOME") { + return home; + } + } + path.to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_profile_state_new() { + let p = ProfileState::new(PathBuf::from("/home/user/.claude")); + assert_eq!(p.name, ".claude"); + assert!(p.is_available()); + assert!(p.access_token.is_none()); + } + + #[test] + fn test_profile_cooldown() { + let mut p = ProfileState::new(PathBuf::from("/tmp/test-profile")); + assert!(p.is_available()); + + // Set cooldown 1 hour from now + let future = std::time::Instant::now() + std::time::Duration::from_secs(3600); + p.set_cooldown(future); + assert!(!p.is_available()); + + // Set cooldown in the past + let past = std::time::Instant::now() - std::time::Duration::from_secs(1); + p.cooldown_until = Some(past); + assert!(p.is_available()); + + // Clear cooldown + p.set_cooldown(future); + p.clear_cooldown(); + assert!(p.is_available()); + } + + #[test] + fn test_utc_to_instant_future() { + let future = chrono::Utc::now() + chrono::Duration::hours(3); + let instant = utc_to_instant(future); + // Should be roughly 3 hours + 30s buffer from now + let expected_secs = 3 * 3600 + 30; + let actual_secs = instant.duration_since(std::time::Instant::now()).as_secs(); + assert!(actual_secs >= expected_secs - 5 && actual_secs <= expected_secs + 5); + } + + #[test] + fn test_utc_to_instant_past() { + let past = chrono::Utc::now() - chrono::Duration::hours(1); + let instant = utc_to_instant(past); + // Should be approximately now + let diff = instant + .saturating_duration_since(std::time::Instant::now()) + .as_secs(); + assert!(diff <= 1); + } + + #[test] + fn test_is_rate_limit_error() { + assert!(MultiProfileDriver::is_rate_limit_error(&LlmError::Api { + status: 429, + message: "Too many requests".to_string(), + })); + + assert!(MultiProfileDriver::is_rate_limit_error(&LlmError::Api { + status: 1, + message: "You have exceeded your rate limit".to_string(), + })); + + assert!(MultiProfileDriver::is_rate_limit_error( + &LlmError::Http("Error 429: quota exceeded".to_string()) + )); + + assert!(!MultiProfileDriver::is_rate_limit_error(&LlmError::Api { + status: 500, + message: "Internal server error".to_string(), + })); + + assert!(!MultiProfileDriver::is_rate_limit_error( + &LlmError::Http("Connection refused".to_string()) + )); + } + + #[test] + fn test_multi_profile_driver_new() { + let driver = MultiProfileDriver::new( + None, + true, + vec![ + "~/.claude".to_string(), + "~/.claude-profiles/account-2".to_string(), + ], + ); + assert_eq!(driver.cli_path, "claude"); + assert!(driver.skip_permissions); + } + + #[test] + fn test_load_token_from_credentials() { + // Create a temp credentials file + let dir = std::env::temp_dir().join("openfang-test-profile"); + std::fs::create_dir_all(&dir).unwrap(); + let cred_path = dir.join(".credentials.json"); + std::fs::write( + &cred_path, + r#"{"claudeAiOauth":{"accessToken":"test-token-123","refreshToken":"rt","expiresAt":9999999999}}"#, + ) + .unwrap(); + + let mut profile = ProfileState::new(dir.clone()); + let token = profile.load_token(); + assert_eq!(token, Some("test-token-123")); + + // Cleanup + std::fs::remove_dir_all(&dir).unwrap(); + } + + #[tokio::test] + async fn test_next_available_all_in_cooldown() { + let driver = MultiProfileDriver::new( + None, + true, + vec!["/tmp/p1".to_string(), "/tmp/p2".to_string()], + ); + + // Put all in cooldown + { + let mut profiles = driver.profiles.lock().await; + let future = std::time::Instant::now() + std::time::Duration::from_secs(3600); + profiles[0].set_cooldown(future); + profiles[1].set_cooldown(future); + } + + assert!(driver.next_available().await.is_none()); + } + + #[tokio::test] + async fn test_next_available_skips_cooldown() { + let driver = MultiProfileDriver::new( + None, + true, + vec![ + "/tmp/pa".to_string(), + "/tmp/pb".to_string(), + "/tmp/pc".to_string(), + ], + ); + + // Put first profile in cooldown + { + let mut profiles = driver.profiles.lock().await; + let future = std::time::Instant::now() + std::time::Duration::from_secs(3600); + profiles[0].set_cooldown(future); + } + + let (idx, _) = driver.next_available().await.unwrap(); + assert_eq!(idx, 1); // Should skip index 0 + } + + // ── expand_tilde tests ─────────────────────────────────────────── + + #[test] + fn test_expand_tilde_home() { + let home = std::env::var("HOME").unwrap(); + assert_eq!(expand_tilde("~/.claude"), format!("{home}/.claude")); + assert_eq!( + expand_tilde("~/.claude-profiles/acct2"), + format!("{home}/.claude-profiles/acct2") + ); + } + + #[test] + fn test_expand_tilde_bare() { + let home = std::env::var("HOME").unwrap(); + assert_eq!(expand_tilde("~"), home); + } + + #[test] + fn test_expand_tilde_absolute_passthrough() { + assert_eq!(expand_tilde("/etc/claude"), "/etc/claude"); + assert_eq!(expand_tilde("/tmp/profile"), "/tmp/profile"); + } + + #[test] + fn test_expand_tilde_no_prefix() { + assert_eq!(expand_tilde("relative/path"), "relative/path"); + assert_eq!(expand_tilde(""), ""); + } + + // ── fetch_usage parsing tests ──────────────────────────────────── + + #[test] + fn test_parse_usage_response() { + let json = r#"{ + "five_hour": { + "utilization": 66.0, + "resets_at": "2026-03-14T05:00:00Z" + }, + "seven_day": { + "utilization": 14.0, + "resets_at": "2026-03-20T05:00:00Z" + } + }"#; + + let resp: UsageResponse = serde_json::from_str(json).unwrap(); + let five = resp.five_hour.unwrap(); + assert!((five.utilization - 66.0).abs() < 0.1); + assert_eq!(five.resets_at.as_deref(), Some("2026-03-14T05:00:00Z")); + + let seven = resp.seven_day.unwrap(); + assert!((seven.utilization - 14.0).abs() < 0.1); + } + + #[test] + fn test_parse_usage_response_empty_windows() { + let json = r#"{}"#; + let resp: UsageResponse = serde_json::from_str(json).unwrap(); + assert!(resp.five_hour.is_none()); + assert!(resp.seven_day.is_none()); + } + + #[test] + fn test_parse_usage_response_partial() { + let json = r#"{"five_hour": {"utilization": 99.5}}"#; + let resp: UsageResponse = serde_json::from_str(json).unwrap(); + let five = resp.five_hour.unwrap(); + assert!((five.utilization - 99.5).abs() < 0.1); + assert!(five.resets_at.is_none()); // no resets_at field + assert!(resp.seven_day.is_none()); + } + + // ── Profile rotation logic tests ───────────────────────────────── + + #[tokio::test] + async fn test_next_available_wraps_around() { + // If current=2 and profile 2 is in cooldown, should wrap to 0 + let driver = MultiProfileDriver::new( + None, + true, + vec![ + "/tmp/x0".to_string(), + "/tmp/x1".to_string(), + "/tmp/x2".to_string(), + ], + ); + + // Set current to 2, put profile 2 in cooldown + { + *driver.current.lock().await = 2; + let mut profiles = driver.profiles.lock().await; + let future = std::time::Instant::now() + std::time::Duration::from_secs(3600); + profiles[2].set_cooldown(future); + } + + let (idx, _) = driver.next_available().await.unwrap(); + assert_eq!(idx, 0); // Wrapped around to 0 + } + + #[tokio::test] + async fn test_next_available_prefers_current() { + // If current profile is available, it should be returned + let driver = MultiProfileDriver::new( + None, + true, + vec!["/tmp/y0".to_string(), "/tmp/y1".to_string()], + ); + + *driver.current.lock().await = 1; + + let (idx, _) = driver.next_available().await.unwrap(); + assert_eq!(idx, 1); // Should prefer current + } + + #[tokio::test] + async fn test_expired_cooldown_becomes_available() { + let driver = MultiProfileDriver::new( + None, + true, + vec!["/tmp/z0".to_string(), "/tmp/z1".to_string()], + ); + + // Put profile 0 in cooldown that has already expired + { + let mut profiles = driver.profiles.lock().await; + profiles[0].cooldown_until = + Some(std::time::Instant::now() - std::time::Duration::from_secs(1)); + } + + let (idx, _) = driver.next_available().await.unwrap(); + assert_eq!(idx, 0); // Expired cooldown = available + } + + // ── Credentials file tests ─────────────────────────────────────── + + #[test] + fn test_load_token_alternate_filename() { + // Test credentials.json (without leading dot) + let dir = std::env::temp_dir().join("openfang-test-profile-alt"); + std::fs::create_dir_all(&dir).unwrap(); + let cred_path = dir.join("credentials.json"); + std::fs::write( + &cred_path, + r#"{"claudeAiOauth":{"accessToken":"alt-token-456"}}"#, + ) + .unwrap(); + + let mut profile = ProfileState::new(dir.clone()); + let token = profile.load_token(); + assert_eq!(token, Some("alt-token-456")); + + std::fs::remove_dir_all(&dir).unwrap(); + } + + #[test] + fn test_load_token_missing_dir() { + let mut profile = ProfileState::new(PathBuf::from("/nonexistent/path")); + assert!(profile.load_token().is_none()); + } + + #[test] + fn test_load_token_malformed_json() { + let dir = std::env::temp_dir().join("openfang-test-profile-bad"); + std::fs::create_dir_all(&dir).unwrap(); + std::fs::write(dir.join(".credentials.json"), "not json").unwrap(); + + let mut profile = ProfileState::new(dir.clone()); + assert!(profile.load_token().is_none()); + + std::fs::remove_dir_all(&dir).unwrap(); + } + + #[test] + fn test_load_token_missing_oauth_field() { + let dir = std::env::temp_dir().join("openfang-test-profile-nooauth"); + std::fs::create_dir_all(&dir).unwrap(); + std::fs::write( + dir.join(".credentials.json"), + r#"{"someOtherField": "value"}"#, + ) + .unwrap(); + + let mut profile = ProfileState::new(dir.clone()); + assert!(profile.load_token().is_none()); + + std::fs::remove_dir_all(&dir).unwrap(); + } + + #[test] + fn test_load_token_cached_on_second_call() { + let dir = std::env::temp_dir().join("openfang-test-profile-cache"); + std::fs::create_dir_all(&dir).unwrap(); + std::fs::write( + dir.join(".credentials.json"), + r#"{"claudeAiOauth":{"accessToken":"cached-tok"}}"#, + ) + .unwrap(); + + let mut profile = ProfileState::new(dir.clone()); + assert_eq!(profile.load_token(), Some("cached-tok")); + + // Remove the file — second call should still return cached token + std::fs::remove_dir_all(&dir).unwrap(); + assert_eq!(profile.load_token(), Some("cached-tok")); + } + + // ── Rate-limit error detection edge cases ──────────────────────── + + #[test] + fn test_is_rate_limit_overloaded() { + assert!(MultiProfileDriver::is_rate_limit_error(&LlmError::Api { + status: 529, + message: "API is overloaded".to_string(), + })); + } + + #[test] + fn test_is_rate_limit_usage_limit_in_http() { + assert!(MultiProfileDriver::is_rate_limit_error( + &LlmError::Http("Your usage limit has been reached".to_string()) + )); + } + + #[test] + fn test_is_not_rate_limit_auth_error() { + assert!(!MultiProfileDriver::is_rate_limit_error(&LlmError::Api { + status: 401, + message: "Unauthorized".to_string(), + })); + } + + // ── DriverConfig profiles field ────────────────────────────────── + + #[test] + fn test_driver_config_profiles_default_empty() { + let json = r#"{"provider":"claude-code"}"#; + let config: crate::llm_driver::DriverConfig = serde_json::from_str(json).unwrap(); + assert!(config.profiles.is_empty()); + } + + #[test] + fn test_driver_config_profiles_deserialized() { + let json = r#"{ + "provider": "claude-code", + "profiles": ["~/.claude", "~/.claude-profiles/acct2"] + }"#; + let config: crate::llm_driver::DriverConfig = serde_json::from_str(json).unwrap(); + assert_eq!(config.profiles.len(), 2); + assert_eq!(config.profiles[0], "~/.claude"); + assert_eq!(config.profiles[1], "~/.claude-profiles/acct2"); + } +} diff --git a/crates/openfang-runtime/src/drivers/openai.rs b/crates/openfang-runtime/src/drivers/openai.rs index 15a5a6657..c52e772cf 100644 --- a/crates/openfang-runtime/src/drivers/openai.rs +++ b/crates/openfang-runtime/src/drivers/openai.rs @@ -278,6 +278,13 @@ impl LlmDriver for OpenAIDriver { }, }); } + ContentBlock::ImageUrl { url } => { + parts.push(OaiContentPart::ImageUrl { + image_url: OaiImageUrl { + url: url.clone(), + }, + }); + } ContentBlock::Thinking { .. } => {} _ => {} } diff --git a/crates/openfang-runtime/src/drivers/qwen_code.rs b/crates/openfang-runtime/src/drivers/qwen_code.rs index a5ee8e06c..7c78350c1 100644 --- a/crates/openfang-runtime/src/drivers/qwen_code.rs +++ b/crates/openfang-runtime/src/drivers/qwen_code.rs @@ -12,8 +12,8 @@ use serde::Deserialize; use tokio::io::AsyncBufReadExt; use tracing::{debug, warn}; -/// Environment variable names to strip from the subprocess to prevent -/// leaking API keys from other providers. +/// Environment variable names (and suffixes) to strip from the subprocess +/// to prevent leaking API keys from other providers. const SENSITIVE_ENV_EXACT: &[&str] = &[ "OPENAI_API_KEY", "ANTHROPIC_API_KEY", @@ -87,23 +87,37 @@ impl QwenCodeDriver { } } - /// Build the CLI arguments for a given request. - pub fn build_args(&self, prompt: &str, model: &str, streaming: bool) -> Vec { - let mut args = vec!["-p".to_string(), prompt.to_string()]; + /// Build the CLI argument list for a completion request. + /// + /// Exposed as a testable method so unit tests can verify that `--yolo`, + /// `--model`, and output format flags are set correctly. + pub fn build_args( + &self, + prompt: &str, + model: &str, + streaming: bool, + ) -> Vec { + let model_flag = Self::model_flag(model); + + let mut args = vec![ + "-p".to_string(), + prompt.to_string(), + "--output-format".to_string(), + if streaming { + "stream-json".to_string() + } else { + "json".to_string() + }, + ]; - args.push("--output-format".to_string()); if streaming { - args.push("stream-json".to_string()); args.push("--verbose".to_string()); - } else { - args.push("json".to_string()); } if self.skip_permissions { args.push("--yolo".to_string()); } - let model_flag = Self::model_flag(model); if let Some(ref m) = model_flag { args.push("--model".to_string()); args.push(m.clone()); @@ -147,10 +161,15 @@ impl QwenCodeDriver { } /// Apply security env filtering to a command. + /// + /// Instead of `env_clear()` (which breaks Node.js, NVM, SSL, proxies), + /// we keep the full environment and only remove known sensitive API keys + /// from other LLM providers. fn apply_env_filter(cmd: &mut tokio::process::Command) { for key in SENSITIVE_ENV_EXACT { cmd.env_remove(key); } + // Remove any env var with a sensitive suffix, unless it's QWEN_* for (key, _) in std::env::vars() { if key.starts_with("QWEN_") { continue; @@ -167,6 +186,9 @@ impl QwenCodeDriver { } /// JSON output from `qwen -p --output-format json`. +/// +/// The CLI may return the response text in different fields depending on +/// version: `result`, `content`, or `text`. We try all three. #[derive(Debug, Deserialize)] struct QwenJsonOutput { result: Option, @@ -213,9 +235,7 @@ impl LlmDriver for QwenCodeDriver { let args = self.build_args(&prompt, &request.model, false); let mut cmd = tokio::process::Command::new(&self.cli_path); - for arg in &args { - cmd.arg(arg); - } + cmd.args(&args); Self::apply_env_filter(&mut cmd); @@ -239,6 +259,7 @@ impl LlmDriver for QwenCodeDriver { let detail = if !stderr.is_empty() { &stderr } else { &stdout }; let code = output.status.code().unwrap_or(1); + // Provide actionable error messages let message = if detail.contains("not authenticated") || detail.contains("auth") || detail.contains("login") @@ -247,6 +268,13 @@ impl LlmDriver for QwenCodeDriver { format!( "Qwen Code CLI is not authenticated. Run: qwen auth\nDetail: {detail}" ) + } else if detail.contains("permission") + || detail.contains("--yolo") + { + format!( + "Qwen Code CLI requires permissions acceptance. \ + Run: qwen --yolo (once to accept)\nDetail: {detail}" + ) } else { format!("Qwen Code CLI exited with code {code}: {detail}") }; @@ -259,9 +287,9 @@ impl LlmDriver for QwenCodeDriver { let stdout = String::from_utf8_lossy(&output.stdout); + // Try JSON parse first if let Ok(parsed) = serde_json::from_str::(&stdout) { - let text = parsed - .result + let text = parsed.result .or(parsed.content) .or(parsed.text) .unwrap_or_default(); @@ -280,6 +308,7 @@ impl LlmDriver for QwenCodeDriver { }); } + // Fallback: treat entire stdout as plain text let text = stdout.trim().to_string(); Ok(CompletionResponse { content: vec![ContentBlock::Text { @@ -304,9 +333,7 @@ impl LlmDriver for QwenCodeDriver { let args = self.build_args(&prompt, &request.model, true); let mut cmd = tokio::process::Command::new(&self.cli_path); - for arg in &args { - cmd.arg(arg); - } + cmd.args(&args); Self::apply_env_filter(&mut cmd); @@ -343,47 +370,51 @@ impl LlmDriver for QwenCodeDriver { } match serde_json::from_str::(&line) { - Ok(event) => match event.r#type.as_str() { - "content" | "text" | "assistant" | "content_block_delta" => { - if let Some(ref content) = event.content { - full_text.push_str(content); - let _ = tx - .send(StreamEvent::TextDelta { - text: content.clone(), - }) - .await; - } - } - "result" | "done" | "complete" => { - if let Some(ref result) = event.result { - if full_text.is_empty() { - full_text = result.clone(); + Ok(event) => { + match event.r#type.as_str() { + "content" | "text" | "assistant" | "content_block_delta" => { + if let Some(ref content) = event.content { + full_text.push_str(content); let _ = tx .send(StreamEvent::TextDelta { - text: result.clone(), + text: content.clone(), }) .await; } } - if let Some(usage) = event.usage { - final_usage = TokenUsage { - input_tokens: usage.input_tokens, - output_tokens: usage.output_tokens, - }; + "result" | "done" | "complete" => { + if let Some(ref result) = event.result { + if full_text.is_empty() { + full_text = result.clone(); + let _ = tx + .send(StreamEvent::TextDelta { + text: result.clone(), + }) + .await; + } + } + if let Some(usage) = event.usage { + final_usage = TokenUsage { + input_tokens: usage.input_tokens, + output_tokens: usage.output_tokens, + }; + } } - } - _ => { - if let Some(ref content) = event.content { - full_text.push_str(content); - let _ = tx - .send(StreamEvent::TextDelta { - text: content.clone(), - }) - .await; + _ => { + // Unknown event type — try content field as fallback + if let Some(ref content) = event.content { + full_text.push_str(content); + let _ = tx + .send(StreamEvent::TextDelta { + text: content.clone(), + }) + .await; + } } } - }, + } Err(e) => { + // Not valid JSON — treat as raw text warn!(line = %line, error = %e, "Non-JSON line from Qwen CLI"); full_text.push_str(&line); let _ = tx @@ -393,6 +424,7 @@ impl LlmDriver for QwenCodeDriver { } } + // Wait for process to finish let status = child .wait() .await @@ -421,16 +453,19 @@ impl LlmDriver for QwenCodeDriver { } } -/// Check if the Qwen Code CLI is available. +/// Check if the Qwen Code CLI is available and authenticated. pub fn qwen_code_available() -> bool { QwenCodeDriver::detect().is_some() || qwen_credentials_exist() } /// Check if Qwen credentials exist. +/// +/// Qwen Code stores session/credentials in `~/.qwen/` directory. fn qwen_credentials_exist() -> bool { if let Some(home) = home_dir() { let qwen_dir = home.join(".qwen"); - qwen_dir.join("credentials.json").exists() + qwen_dir.exists() + || qwen_dir.join("credentials.json").exists() || qwen_dir.join(".credentials.json").exists() || qwen_dir.join("auth.json").exists() } else { @@ -482,6 +517,39 @@ mod tests { assert!(prompt.contains("Hello")); } + #[test] + fn test_build_prompt_multi_turn() { + use openfang_types::message::{Message, MessageContent}; + + let request = CompletionRequest { + model: "qwen-code/qwen3-coder".to_string(), + messages: vec![ + Message { + role: Role::User, + content: MessageContent::text("What is 2+2?"), + }, + Message { + role: Role::Assistant, + content: MessageContent::text("4"), + }, + Message { + role: Role::User, + content: MessageContent::text("And 3+3?"), + }, + ], + tools: vec![], + max_tokens: 1024, + temperature: 0.7, + system: None, + thinking: None, + }; + + let prompt = QwenCodeDriver::build_prompt(&request); + assert!(prompt.contains("[User]\nWhat is 2+2?")); + assert!(prompt.contains("[Assistant]\n4")); + assert!(prompt.contains("[User]\nAnd 3+3?")); + } + #[test] fn test_model_flag_mapping() { assert_eq!( @@ -531,29 +599,21 @@ mod tests { assert!(!driver.skip_permissions); } - #[test] - fn test_sensitive_env_list_coverage() { - assert!(SENSITIVE_ENV_EXACT.contains(&"OPENAI_API_KEY")); - assert!(SENSITIVE_ENV_EXACT.contains(&"ANTHROPIC_API_KEY")); - assert!(SENSITIVE_ENV_EXACT.contains(&"GEMINI_API_KEY")); - assert!(SENSITIVE_ENV_EXACT.contains(&"GROQ_API_KEY")); - assert!(SENSITIVE_ENV_EXACT.contains(&"DEEPSEEK_API_KEY")); - } - #[test] fn test_build_args_with_yolo() { let driver = QwenCodeDriver::new(None, true); let args = driver.build_args("test prompt", "qwen-code/qwen3-coder", false); - assert!(args.contains(&"--yolo".to_string())); + assert!(args.contains(&"--yolo".to_string()), "should contain --yolo when skip_permissions=true"); assert!(args.contains(&"json".to_string())); assert!(args.contains(&"--model".to_string())); + assert!(args.contains(&"qwen3-coder".to_string())); } #[test] fn test_build_args_without_yolo() { let driver = QwenCodeDriver::new(None, false); let args = driver.build_args("test prompt", "qwen-code/qwen3-coder", false); - assert!(!args.contains(&"--yolo".to_string())); + assert!(!args.contains(&"--yolo".to_string()), "should NOT contain --yolo when skip_permissions=false"); } #[test] @@ -564,6 +624,15 @@ mod tests { assert!(args.contains(&"--verbose".to_string())); } + #[test] + fn test_sensitive_env_list_coverage() { + assert!(SENSITIVE_ENV_EXACT.contains(&"OPENAI_API_KEY")); + assert!(SENSITIVE_ENV_EXACT.contains(&"ANTHROPIC_API_KEY")); + assert!(SENSITIVE_ENV_EXACT.contains(&"GEMINI_API_KEY")); + assert!(SENSITIVE_ENV_EXACT.contains(&"GROQ_API_KEY")); + assert!(SENSITIVE_ENV_EXACT.contains(&"DEEPSEEK_API_KEY")); + } + #[test] fn test_json_output_deserialization() { let json = r#"{"result":"Hello world","usage":{"input_tokens":10,"output_tokens":5}}"#; diff --git a/crates/openfang-runtime/src/llm_driver.rs b/crates/openfang-runtime/src/llm_driver.rs index 9178be9d4..07b6aeb5d 100644 --- a/crates/openfang-runtime/src/llm_driver.rs +++ b/crates/openfang-runtime/src/llm_driver.rs @@ -176,6 +176,12 @@ pub struct DriverConfig { /// restricts what agents can do, making this safe. #[serde(default = "default_skip_permissions")] pub skip_permissions: bool, + /// Optional list of config directory paths for CLI-based providers + /// (claude-code, qwen-code). Each directory contains its own OAuth + /// credentials. When multiple profiles are provided the driver + /// automatically rotates to the next profile on rate-limit errors. + #[serde(default)] + pub profiles: Vec, } fn default_skip_permissions() -> bool { @@ -190,6 +196,7 @@ impl std::fmt::Debug for DriverConfig { .field("api_key", &self.api_key.as_ref().map(|_| "")) .field("base_url", &self.base_url) .field("skip_permissions", &self.skip_permissions) + .field("profiles", &self.profiles) .finish() } } diff --git a/crates/openfang-runtime/src/model_catalog.rs b/crates/openfang-runtime/src/model_catalog.rs index 5846c4d57..cc5ed0af4 100644 --- a/crates/openfang-runtime/src/model_catalog.rs +++ b/crates/openfang-runtime/src/model_catalog.rs @@ -56,15 +56,16 @@ impl ModelCatalog { /// Only checks presence — never reads or stores the actual secret. pub fn detect_auth(&mut self) { for provider in &mut self.providers { - // Claude Code is special: no API key needed, but we probe for CLI - // installation so the dashboard shows "Configured" vs "Not Installed". + // Claude Code: detect CLI installation + authentication if provider.id == "claude-code" { - provider.auth_status = - if crate::drivers::claude_code::claude_code_available() { - AuthStatus::Configured - } else { - AuthStatus::Missing - }; + let cli_installed = crate::drivers::claude_code::ClaudeCodeDriver::detect().is_some(); + if cli_installed && crate::drivers::claude_code::claude_code_available() { + provider.auth_status = AuthStatus::Configured; + } else if cli_installed { + provider.auth_status = AuthStatus::Missing; + } else { + provider.auth_status = AuthStatus::NotRequired; + } continue; } if provider.id == "qwen-code" { @@ -77,6 +78,19 @@ impl ModelCatalog { continue; } + // Qwen Code: detect CLI installation + authentication + if provider.id == "qwen-code" { + let cli_installed = crate::drivers::qwen_code::QwenCodeDriver::detect().is_some(); + if cli_installed && crate::drivers::qwen_code::qwen_code_available() { + provider.auth_status = AuthStatus::Configured; + } else if cli_installed { + provider.auth_status = AuthStatus::Missing; + } else { + provider.auth_status = AuthStatus::NotRequired; + } + continue; + } + if !provider.key_required { provider.auth_status = AuthStatus::NotRequired; continue; @@ -92,7 +106,6 @@ impl ModelCatalog { std::env::var("OPENAI_API_KEY").is_ok() || read_codex_credential().is_some() } - // claude-code is handled above (before key_required check) _ => false, }; @@ -850,8 +863,11 @@ fn builtin_aliases() -> HashMap { // Qwen Code aliases ("qwen-code", "qwen-code/qwen3-coder"), ("qwen-coder", "qwen-code/qwen3-coder"), + ("qwen-code-qwen3", "qwen-code/qwen3-coder"), ("qwen-coder-plus", "qwen-code/qwen-coder-plus"), + ("qwen-code-plus", "qwen-code/qwen-coder-plus"), ("qwq", "qwen-code/qwq-32b"), + ("qwen-code-qwq", "qwen-code/qwq-32b"), ]; pairs .into_iter() @@ -3457,10 +3473,10 @@ fn builtin_models() -> Vec { // Qwen Code CLI (3) — subprocess-based, free via Qwen OAuth // ══════════════════════════════════════════════════════════════ ModelCatalogEntry { - id: "qwen-code/qwen-coder-plus".into(), - display_name: "Qwen Coder Plus (CLI)".into(), + id: "qwen-code/qwen3-coder".into(), + display_name: "Qwen3 Coder (CLI)".into(), provider: "qwen-code".into(), - tier: ModelTier::Frontier, + tier: ModelTier::Smart, context_window: 131_072, max_output_tokens: 65_536, input_cost_per_m: 0.0, @@ -3468,13 +3484,13 @@ fn builtin_models() -> Vec { supports_tools: false, supports_vision: false, supports_streaming: true, - aliases: vec!["qwen-coder-plus".into()], + aliases: vec!["qwen-code".into(), "qwen-coder".into(), "qwen-code-qwen3".into()], }, ModelCatalogEntry { - id: "qwen-code/qwen3-coder".into(), - display_name: "Qwen3 Coder (CLI)".into(), + id: "qwen-code/qwen-coder-plus".into(), + display_name: "Qwen Coder Plus (CLI)".into(), provider: "qwen-code".into(), - tier: ModelTier::Smart, + tier: ModelTier::Frontier, context_window: 131_072, max_output_tokens: 65_536, input_cost_per_m: 0.0, @@ -3482,7 +3498,7 @@ fn builtin_models() -> Vec { supports_tools: false, supports_vision: false, supports_streaming: true, - aliases: vec!["qwen-code".into(), "qwen-coder".into()], + aliases: vec!["qwen-coder-plus".into(), "qwen-code-plus".into()], }, ModelCatalogEntry { id: "qwen-code/qwq-32b".into(), @@ -3496,7 +3512,7 @@ fn builtin_models() -> Vec { supports_tools: false, supports_vision: false, supports_streaming: true, - aliases: vec!["qwq".into()], + aliases: vec!["qwq".into(), "qwen-code-qwq".into()], }, // ══════════════════════════════════════════════════════════════ // Chutes.ai (5) diff --git a/crates/openfang-runtime/src/prompt_builder.rs b/crates/openfang-runtime/src/prompt_builder.rs index e0a8bd2a6..91b8a7b3b 100644 --- a/crates/openfang-runtime/src/prompt_builder.rs +++ b/crates/openfang-runtime/src/prompt_builder.rs @@ -37,6 +37,10 @@ pub struct PromptContext { pub user_name: Option, /// Channel type (telegram, discord, web, etc.). pub channel_type: Option, + /// Platform-specific sender ID (e.g. phone number) — from channel gateway metadata. + pub sender_id: Option, + /// Human-readable sender display name — from channel gateway metadata. + pub sender_name: Option, /// Whether this agent was spawned as a subagent. pub is_subagent: bool, /// Whether this agent has autonomous config. @@ -144,7 +148,21 @@ pub fn build_system_prompt(ctx: &PromptContext) -> String { // Section 9 — Channel Awareness (skip for subagents) if !ctx.is_subagent { if let Some(ref channel) = ctx.channel_type { - sections.push(build_channel_section(channel)); + let mut section = build_channel_section(channel); + // Append sender identity when available (from channel gateway metadata) + if ctx.sender_id.is_some() || ctx.sender_name.is_some() { + section.push_str("\n\n### Current Message Sender\n"); + if let Some(ref name) = ctx.sender_name { + section.push_str(&format!("- **Name**: {name}\n")); + } + if let Some(ref id) = ctx.sender_id { + section.push_str(&format!("- **Platform ID**: {id}\n")); + } + section.push_str("\nIMPORTANT: Check this sender identity against your USER.md to determine \ + if this is your owner/master or someone else. If it is NOT your owner, \ + read and follow PRIVACY-RULES.md before responding."); + } + sections.push(section); } } diff --git a/crates/openfang-types/src/agent.rs b/crates/openfang-types/src/agent.rs index 5f002b778..f8d59fcfd 100644 --- a/crates/openfang-types/src/agent.rs +++ b/crates/openfang-types/src/agent.rs @@ -267,7 +267,7 @@ impl Default for ResourceQuota { max_tool_calls_per_minute: 60, max_llm_tokens_per_hour: 0, // unlimited by default max_network_bytes_per_hour: 100 * 1024 * 1024, // 100 MB - max_cost_per_hour_usd: 0.0, // unlimited by default + max_cost_per_hour_usd: 0.0, // unlimited max_cost_per_day_usd: 0.0, // unlimited max_cost_per_month_usd: 0.0, // unlimited } diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index 421253b97..512a98626 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -1454,6 +1454,10 @@ pub struct DefaultModelConfig { pub api_key_env: String, /// Optional base URL override. pub base_url: Option, + /// Optional vision-capable model for image messages. + /// When set, agents receiving images will automatically use this model + /// instead of the default (which may not support vision). + pub vision_model: Option, } impl Default for DefaultModelConfig { @@ -1463,6 +1467,7 @@ impl Default for DefaultModelConfig { model: "claude-sonnet-4-20250514".to_string(), api_key_env: "ANTHROPIC_API_KEY".to_string(), base_url: None, + vision_model: None, } } } diff --git a/crates/openfang-types/src/message.rs b/crates/openfang-types/src/message.rs index 99be59571..ac8fcd9bd 100644 --- a/crates/openfang-types/src/message.rs +++ b/crates/openfang-types/src/message.rs @@ -2,6 +2,20 @@ use serde::{Deserialize, Serialize}; +/// Sender context forwarded from channel gateways (e.g. WhatsApp, Telegram). +/// +/// Carries the identity of the person who sent the message so the agent +/// can distinguish between its owner and other users. +#[derive(Debug, Clone, Default)] +pub struct SenderContext { + /// Channel name (e.g. "whatsapp", "telegram"). + pub channel: Option, + /// Platform-specific sender ID (e.g. phone number, Telegram user ID). + pub sender_id: Option, + /// Human-readable sender display name. + pub sender_name: Option, +} + /// A message in an LLM conversation. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Message { @@ -56,6 +70,12 @@ pub enum ContentBlock { /// Base64-encoded image data. data: String, }, + /// A URL-referenced image (for providers like DashScope that prefer URLs over base64). + #[serde(rename = "image_url")] + ImageUrl { + /// The URL of the image. + url: String, + }, /// A tool use request from the assistant. #[serde(rename = "tool_use")] ToolUse { @@ -144,6 +164,7 @@ impl MessageContent { ContentBlock::Thinking { thinking } => thinking.len(), ContentBlock::ToolUse { .. } | ContentBlock::Image { .. } + | ContentBlock::ImageUrl { .. } | ContentBlock::Unknown => 0, }) .sum(), diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 000000000..621b91e31 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,5 @@ +#!/bin/bash +# Drop root privileges and run openfang as the openfang user +chown -R openfang:openfang /data 2>/dev/null +chown -R openfang:openfang /home/openfang 2>/dev/null +exec gosu openfang openfang "$@" diff --git a/packages/whatsapp-gateway/index.js b/packages/whatsapp-gateway/index.js index 18e845735..fdcce3e3f 100644 --- a/packages/whatsapp-gateway/index.js +++ b/packages/whatsapp-gateway/index.js @@ -5,7 +5,7 @@ import { randomUUID } from 'node:crypto'; import fs from 'node:fs'; import path from 'node:path'; import { fileURLToPath } from 'node:url'; -import makeWASocket, { useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion } from '@whiskeysockets/baileys'; +import makeWASocket, { useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion, downloadMediaMessage } from '@whiskeysockets/baileys'; import QRCode from 'qrcode'; import pino from 'pino'; @@ -17,7 +17,8 @@ const __dirname = path.dirname(__filename); // --------------------------------------------------------------------------- const PORT = parseInt(process.env.WHATSAPP_GATEWAY_PORT || '3009', 10); const OPENFANG_URL = (process.env.OPENFANG_URL || 'http://127.0.0.1:4200').replace(/\/+$/, ''); -const DEFAULT_AGENT = process.env.OPENFANG_DEFAULT_AGENT || 'assistant'; +const DEFAULT_AGENT_NAME = process.env.OPENFANG_DEFAULT_AGENT || 'ambrogio'; +let resolvedAgentId = null; // will be resolved on first use // --------------------------------------------------------------------------- // State @@ -124,27 +125,81 @@ async function startConnection() { if (msg.key.fromMe) continue; if (msg.key.remoteJid === 'status@broadcast') continue; - const sender = msg.key.remoteJid || ''; + const remoteJid = msg.key.remoteJid || ''; + const isGroup = remoteJid.endsWith('@g.us'); + + // Detect media messages + const mediaInfo = msg.message?.imageMessage + ? { type: 'image', mime: msg.message.imageMessage.mimetype || 'image/jpeg', caption: msg.message.imageMessage.caption || '' } + : msg.message?.videoMessage + ? { type: 'video', mime: msg.message.videoMessage.mimetype || 'video/mp4', caption: msg.message.videoMessage.caption || '' } + : msg.message?.audioMessage + ? { type: 'audio', mime: msg.message.audioMessage.mimetype || 'audio/ogg', caption: '' } + : msg.message?.documentMessage + ? { type: 'document', mime: msg.message.documentMessage.mimetype || 'application/octet-stream', caption: msg.message.documentMessage.caption || '', filename: msg.message.documentMessage.fileName || 'document' } + : msg.message?.stickerMessage + ? { type: 'sticker', mime: msg.message.stickerMessage.mimetype || 'image/webp', caption: '' } + : null; + const text = msg.message?.conversation || msg.message?.extendedTextMessage?.text - || msg.message?.imageMessage?.caption + || mediaInfo?.caption || ''; - if (!text) continue; + // Skip if no text AND no media + if (!text && !mediaInfo) continue; - // Extract phone number from JID (e.g. "1234567890@s.whatsapp.net" → "+1234567890") - const phone = '+' + sender.replace(/@.*$/, ''); + // For groups: real sender is in participant; for DMs: it's remoteJid + const senderJid = isGroup ? (msg.key.participant || '') : remoteJid; + const phone = '+' + senderJid.replace(/@.*$/, ''); const pushName = msg.pushName || phone; - console.log(`[gateway] Incoming from ${pushName} (${phone}): ${text.substring(0, 80)}`); + // Build metadata with group context + const metadata = { + channel: 'whatsapp', + sender: phone, + sender_name: pushName, + }; + + if (isGroup) { + metadata.group_jid = remoteJid; + metadata.group_name = msg.key.remoteJid; // basic group ID + metadata.is_group = true; + } + + // Download and upload media if present + let attachments = []; + if (mediaInfo) { + try { + const buffer = await downloadMediaMessage(msg, 'buffer', {}); + const ext = mediaInfo.mime.split('/').pop()?.split(';')[0] || 'bin'; + const filename = mediaInfo.filename || `${mediaInfo.type}.${ext}`; + const agentId = await resolveAgentId(DEFAULT_AGENT_NAME); + const fileId = await uploadToOpenFang(agentId, buffer, mediaInfo.mime, filename); + attachments.push({ file_id: fileId, filename, content_type: mediaInfo.mime }); + console.log(`[gateway] Uploaded ${mediaInfo.type} (${(buffer.length / 1024).toFixed(1)}KB) → ${fileId}`); + } catch (err) { + console.error(`[gateway] Media download/upload failed:`, err.message); + // Still forward the text/caption if available + } + } + + const logText = text ? text.substring(0, 80) : `[${mediaInfo?.type || 'media'}]`; + if (isGroup) { + console.log(`[gateway] Group msg from ${pushName} (${phone}) in ${remoteJid}: ${logText}`); + } else { + console.log(`[gateway] Incoming from ${pushName} (${phone}): ${logText}`); + } // Forward to OpenFang agent + const messageText = text || `[${mediaInfo?.type || 'media'} received]`; try { - const response = await forwardToOpenFang(text, phone, pushName); - if (response && sock) { - // Send agent response back to WhatsApp - await sock.sendMessage(sender, { text: response }); - console.log(`[gateway] Replied to ${pushName}`); + const response = await forwardToOpenFang(messageText, phone, pushName, metadata, attachments); + if (response && response !== '__SILENT__' && sock) { + // Reply in the same context: group → group, DM → DM + const replyJid = isGroup ? remoteJid : senderJid.replace(/@.*$/, '') + '@s.whatsapp.net'; + await sock.sendMessage(replyJid, { text: response }); + console.log(`[gateway] Replied to ${pushName}${isGroup ? ' in group ' + remoteJid : ' privately'}`); } } catch (err) { console.error(`[gateway] Forward/reply failed:`, err.message); @@ -153,21 +208,109 @@ async function startConnection() { }); } +// --------------------------------------------------------------------------- +// Resolve agent name to UUID via OpenFang API +// --------------------------------------------------------------------------- +async function resolveAgentId(agentName) { + if (resolvedAgentId) return resolvedAgentId; + + return new Promise((resolve, reject) => { + const url = new URL(`${OPENFANG_URL}/api/agents`); + const req = http.request( + { hostname: url.hostname, port: url.port || 4200, path: url.pathname, method: 'GET', timeout: 10_000 }, + (res) => { + let body = ''; + res.on('data', (chunk) => (body += chunk)); + res.on('end', () => { + try { + const agents = JSON.parse(body); + const list = Array.isArray(agents) ? agents : agents.agents || []; + const match = list.find((a) => a.name === agentName || a.id === agentName); + if (match) { + resolvedAgentId = match.id; + console.log(`[gateway] Resolved agent "${agentName}" → ${match.id}`); + resolve(match.id); + } else { + reject(new Error(`Agent "${agentName}" not found`)); + } + } catch (e) { + reject(new Error('Failed to parse agents list')); + } + }); + }, + ); + req.on('error', reject); + req.on('timeout', () => { req.destroy(); reject(new Error('Agent resolve timeout')); }); + req.end(); + }); +} + +// --------------------------------------------------------------------------- +// Upload media to OpenFang API, return file_id +// --------------------------------------------------------------------------- +async function uploadToOpenFang(agentId, buffer, contentType, filename) { + return new Promise((resolve, reject) => { + const url = new URL(`${OPENFANG_URL}/api/agents/${encodeURIComponent(agentId)}/upload`); + + const req = http.request( + { + hostname: url.hostname, + port: url.port || 4200, + path: url.pathname, + method: 'POST', + headers: { + 'Content-Type': contentType, + 'Content-Length': buffer.length, + 'X-Filename': filename, + }, + timeout: 30_000, + }, + (res) => { + let body = ''; + res.on('data', (chunk) => (body += chunk)); + res.on('end', () => { + try { + const data = JSON.parse(body); + if (res.statusCode >= 400) { + reject(new Error(`Upload failed (${res.statusCode}): ${data.error || body}`)); + } else { + resolve(data.file_id || data.id || ''); + } + } catch { + reject(new Error(`Upload parse error: ${body}`)); + } + }); + }, + ); + + req.on('error', reject); + req.on('timeout', () => { req.destroy(); reject(new Error('Upload timeout')); }); + req.write(buffer); + req.end(); + }); +} + // --------------------------------------------------------------------------- // Forward incoming message to OpenFang API, return agent response // --------------------------------------------------------------------------- -function forwardToOpenFang(text, phone, pushName) { +async function forwardToOpenFang(text, phone, pushName, metadata, attachments) { + const agentId = await resolveAgentId(DEFAULT_AGENT_NAME); + return new Promise((resolve, reject) => { - const payload = JSON.stringify({ + const body = { message: text, - metadata: { + metadata: metadata || { channel: 'whatsapp', sender: phone, sender_name: pushName, }, - }); + }; + if (attachments && attachments.length > 0) { + body.attachments = attachments; + } + const payload = JSON.stringify(body); - const url = new URL(`${OPENFANG_URL}/api/agents/${encodeURIComponent(DEFAULT_AGENT)}/message`); + const url = new URL(`${OPENFANG_URL}/api/agents/${encodeURIComponent(agentId)}/message`); const req = http.request( { @@ -187,6 +330,11 @@ function forwardToOpenFang(text, phone, pushName) { res.on('end', () => { try { const data = JSON.parse(body); + // If the agent intentionally chose silence, signal the caller + if (data.silent) { + resolve('__SILENT__'); + return; + } // The /api/agents/{id}/message endpoint returns { response: "..." } resolve(data.response || data.message || data.text || ''); } catch { @@ -214,12 +362,56 @@ async function sendMessage(to, text) { throw new Error('WhatsApp not connected'); } - // Normalize phone → JID: "+1234567890" → "1234567890@s.whatsapp.net" - const jid = to.replace(/^\+/, '').replace(/@.*$/, '') + '@s.whatsapp.net'; + // If already a full JID (group or user), use as-is; otherwise normalize phone → JID + let jid; + if (to.includes('@')) { + jid = to; + } else { + jid = to.replace(/^\+/, '') + '@s.whatsapp.net'; + } await sock.sendMessage(jid, { text }); } +// --------------------------------------------------------------------------- +// Send media (audio/image/file) via Baileys +// --------------------------------------------------------------------------- +async function sendMedia(to, filePath, mimetype, options = {}) { + if (!sock || connStatus !== 'connected') { + throw new Error('WhatsApp not connected'); + } + + let jid; + if (to.includes('@')) { + jid = to; + } else { + jid = to.replace(/^\+/, '') + '@s.whatsapp.net'; + } + + const buffer = fs.readFileSync(filePath); + + // Determine message type from mimetype + if (mimetype.startsWith('audio/')) { + await sock.sendMessage(jid, { + audio: buffer, + mimetype: mimetype, + ptt: options.ptt !== false, // voice note by default + }); + } else if (mimetype.startsWith('image/')) { + await sock.sendMessage(jid, { + image: buffer, + mimetype: mimetype, + caption: options.caption || '', + }); + } else { + await sock.sendMessage(jid, { + document: buffer, + mimetype: mimetype, + fileName: options.filename || path.basename(filePath), + }); + } +} + // --------------------------------------------------------------------------- // HTTP server // --------------------------------------------------------------------------- @@ -315,6 +507,20 @@ const server = http.createServer(async (req, res) => { return jsonResponse(res, 200, { success: true, message: 'Sent' }); } + // POST /message/send-media — send audio/image/file via Baileys + if (req.method === 'POST' && pathname === '/message/send-media') { + const body = await parseBody(req); + const { to, file_path: fp, mimetype, ptt, caption, filename } = body; + + if (!to || !fp) { + return jsonResponse(res, 400, { error: 'Missing "to" or "file_path" field' }); + } + + const mime = mimetype || 'application/octet-stream'; + await sendMedia(to, fp, mime, { ptt, caption, filename }); + return jsonResponse(res, 200, { success: true, message: 'Media sent' }); + } + // GET /health — health check if (req.method === 'GET' && pathname === '/health') { return jsonResponse(res, 200, { @@ -335,7 +541,7 @@ const server = http.createServer(async (req, res) => { server.listen(PORT, '127.0.0.1', () => { console.log(`[gateway] WhatsApp Web gateway listening on http://127.0.0.1:${PORT}`); console.log(`[gateway] OpenFang URL: ${OPENFANG_URL}`); - console.log(`[gateway] Default agent: ${DEFAULT_AGENT}`); + console.log(`[gateway] Default agent: ${DEFAULT_AGENT_NAME}`); // Auto-connect if credentials already exist from a previous session const credsPath = path.join(__dirname, 'auth_store', 'creds.json');