From 1b2021deeb59ece356b5cf80fd0e36dd18fe3940 Mon Sep 17 00:00:00 2001 From: Wenjing Yu Date: Wed, 23 Jul 2025 14:42:56 -0700 Subject: [PATCH 1/2] fix claude code with Gemini models --- app/utils/anthropic_converter.py | 13 +++++++++++-- app/utils/anthropic_streaming.py | 13 +++++++++---- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/app/utils/anthropic_converter.py b/app/utils/anthropic_converter.py index ba55e60..7ea3209 100644 --- a/app/utils/anthropic_converter.py +++ b/app/utils/anthropic_converter.py @@ -392,7 +392,7 @@ def convert_openai_to_anthropic_response( # Handle tool calls if message.get("tool_calls"): - for call in message["tool_calls"]: + for i, call in enumerate(message["tool_calls"]): if call.get("type") == "function": tool_input_dict: Dict[str, Any] = {} try: @@ -408,9 +408,18 @@ def convert_openai_to_anthropic_response( logger.error(f"Failed to parse JSON arguments for tool '{call['function']['name']}': {e}") tool_input_dict = {"error_parsing_arguments": call["function"]["arguments"]} + # Handle empty tool ID by generating a placeholder, similar to streaming logic + tool_id = call.get("id") + if not tool_id or tool_id.strip() == "": + tool_id = f"tool_ph_{request_id}_{i}" if request_id else f"tool_ph_{uuid.uuid4().hex}_{i}" + logger.debug( + f"Generated placeholder tool ID '{tool_id}' for tool '{call['function']['name']}' due to empty ID from provider", + extra={"request_id": request_id} if request_id else {} + ) + anthropic_content.append(ContentBlockToolUse( type="tool_use", - id=call["id"], + id=tool_id, name=call["function"]["name"], input=tool_input_dict, )) diff --git a/app/utils/anthropic_streaming.py b/app/utils/anthropic_streaming.py index 0fe7ebe..7b9d00d 100644 --- a/app/utils/anthropic_streaming.py +++ b/app/utils/anthropic_streaming.py @@ -127,8 +127,10 @@ async def handle_anthropic_streaming_response_from_openai_stream( next_anthropic_block_idx += 1 openai_tool_idx_to_anthropic_block_idx[openai_tc_idx] = current_anthropic_tool_block_idx + tool_id = tool_delta.get("id") or f"tool_ph_{request_id}_{current_anthropic_tool_block_idx}" + tool_states[current_anthropic_tool_block_idx] = { - "id": tool_delta.get("id") or f"tool_ph_{request_id}_{current_anthropic_tool_block_idx}", + "id": tool_id, "name": "", "arguments_buffer": "", } @@ -154,7 +156,6 @@ async def handle_anthropic_streaming_response_from_openai_stream( if ( current_anthropic_tool_block_idx not in sent_tool_block_starts and tool_state["id"] - and not tool_state["id"].startswith("tool_ph_") and tool_state["name"] ): start_tool_event = { @@ -205,9 +206,12 @@ async def handle_anthropic_streaming_response_from_openai_stream( if text_block_anthropic_idx is not None: yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': text_block_anthropic_idx})}\n\n" + # Collect tool IDs for logging + tool_ids = [] for anthropic_tool_idx in sent_tool_block_starts: tool_state_to_finalize = tool_states.get(anthropic_tool_idx) if tool_state_to_finalize: + tool_ids.append(f"{tool_state_to_finalize.get('name', 'unknown')}:{tool_state_to_finalize.get('id', 'unknown')}") try: json.loads(tool_state_to_finalize["arguments_buffer"]) except json.JSONDecodeError: @@ -255,9 +259,10 @@ async def handle_anthropic_streaming_response_from_openai_stream( "input_tokens": estimated_input_tokens, "output_tokens": output_token_count, "stop_reason": final_anthropic_stop_reason, + "tool_ids": tool_ids, } if stream_status_code == 200: - logger.info(f"Streaming request completed successfully: {log_data}") + logger.info("Streaming request completed successfully", extra={"request_id": request_id, "data": log_data}) else: - logger.error(f"Streaming request failed: {log_data}") \ No newline at end of file + logger.error("Streaming request failed", extra={"request_id": request_id, "data": log_data}) \ No newline at end of file From fcdbdff5bf42c8dc1adb4109e89976718bd0ac62 Mon Sep 17 00:00:00 2001 From: Wenjing Yu Date: Thu, 24 Jul 2025 10:51:55 -0700 Subject: [PATCH 2/2] fix anthropic tool call --- app/services/providers/anthropic_adapter.py | 412 +++++++++++++++++--- tests/unit_tests/test_anthropic_provider.py | 2 - 2 files changed, 362 insertions(+), 52 deletions(-) diff --git a/app/services/providers/anthropic_adapter.py b/app/services/providers/anthropic_adapter.py index 69bb4fb..253fb35 100644 --- a/app/services/providers/anthropic_adapter.py +++ b/app/services/providers/anthropic_adapter.py @@ -8,7 +8,10 @@ import aiohttp from app.core.logger import get_logger -from app.exceptions.exceptions import ProviderAPIException, InvalidCompletionRequestException +from app.exceptions.exceptions import ( + ProviderAPIException, + InvalidCompletionRequestException, +) from .base import ProviderAdapter @@ -59,17 +62,26 @@ def convert_openai_image_content_to_anthropic( @staticmethod def convert_openai_content_to_anthropic( - content: list[dict[str, Any]] | str, - ) -> list[dict[str, Any]]: + content: list[dict[str, Any]] | str | None, + ) -> list[dict[str, Any]] | str: """Convert OpenAI content model to Anthropic content model""" + if content is None: + return "" + if isinstance(content, str): return content + if not isinstance(content, list): + return str(content) + result = [] for msg in content: - _type = msg["type"] + if not msg or not isinstance(msg, dict): + continue + + _type = msg.get("type") if _type == "text": - result.append({"type": "text", "text": msg["text"]}) + result.append({"type": "text", "text": msg.get("text", "")}) elif _type == "image_url": result.append( AnthropicAdapter.convert_openai_image_content_to_anthropic(msg) @@ -78,8 +90,7 @@ def convert_openai_content_to_anthropic( error_message = f"{_type} is not supported" logger.error(error_message) raise InvalidCompletionRequestException( - provider_name="anthropic", - error=ValueError(error_message) + provider_name="anthropic", error=ValueError(error_message) ) return result @@ -104,11 +115,13 @@ async def list_models(self, api_key: str) -> list[str]: ): if response.status != HTTPStatus.OK: error_text = await response.text() - logger.error(f"List Models API error for {self.provider_name}: {error_text}") + logger.error( + f"List Models API error for {self.provider_name}: {error_text}" + ) raise ProviderAPIException( provider_name=self.provider_name, error_code=response.status, - error_message=error_text + error_message=error_text, ) resp = await response.json() self.CLAUDE_MODEL_MAPPING = { @@ -120,17 +133,91 @@ async def list_models(self, api_key: str) -> list[str]: self.cache_models(api_key, self._base_url, models) return models - + @staticmethod def convert_openai_payload_to_anthropic(payload: dict[str, Any]) -> dict[str, Any]: - """Convert Anthropic completion payload to OpenAI format""" + """Convert OpenAI completion payload to Anthropic format""" anthropic_payload = { "model": payload["model"], - "max_tokens": payload.get("max_completion_tokens", payload.get("max_tokens", ANTHROPIC_DEFAULT_MAX_TOKENS)), + "max_tokens": payload.get( + "max_completion_tokens", + payload.get("max_tokens", ANTHROPIC_DEFAULT_MAX_TOKENS), + ), "temperature": payload.get("temperature", 1.0), - "stop_sequences": payload.get("stop", []), } + # Handle optional parameters + if payload.get("top_p") is not None: + anthropic_payload["top_p"] = payload["top_p"] + if payload.get("top_k") is not None: + anthropic_payload["top_k"] = payload["top_k"] + if payload.get("stream") is not None: + anthropic_payload["stream"] = payload["stream"] + if payload.get("stop"): + anthropic_payload["stop_sequences"] = ( + payload["stop"] + if isinstance(payload["stop"], list) + else [payload["stop"]] + ) + + # Convert tools if present + tools = payload.get("tools") + if tools: + anthropic_tools = [] + for tool in tools: + if not tool or not isinstance(tool, dict): + continue + + if tool.get("type") == "function" and tool.get("function"): + func = tool.get("function", {}) + if not func or not isinstance(func, dict): + continue + + params = func.get("parameters", {}) + if params is None: + params = {} + + anthropic_tool = { + "name": func.get("name", ""), + "description": func.get("description", ""), + "input_schema": { + "type": params.get("type", "object"), + "properties": params.get("properties", {}), + }, + } + required = params.get("required") + if required and isinstance(required, list): + anthropic_tool["input_schema"]["required"] = required + + anthropic_tools.append(anthropic_tool) + + if anthropic_tools: + anthropic_payload["tools"] = anthropic_tools + + # Handle tool_choice conversion + tool_choice = payload.get("tool_choice") + if tool_choice: + if isinstance(tool_choice, str): + if tool_choice == "auto": + anthropic_payload["tool_choice"] = {"type": "auto"} + elif tool_choice == "any": + anthropic_payload["tool_choice"] = {"type": "any"} + elif tool_choice == "none": + # Anthropic doesn't have explicit "none", just omit tools + pass + elif ( + isinstance(tool_choice, dict) + and tool_choice.get("type") == "function" + ): + func_choice = tool_choice.get("function", {}) + anthropic_payload["tool_choice"] = { + "type": "tool", + "name": func_choice.get("name"), + } + else: + # Default to auto when tools are present + anthropic_payload["tool_choice"] = {"type": "auto"} + # Handle chat vs. completion if "messages" in payload: # Use Anthropic's messages API format @@ -140,17 +227,97 @@ def convert_openai_payload_to_anthropic(payload: dict[str, Any]) -> dict[str, An for msg in payload["messages"]: role = msg["role"] content = msg["content"] - content = AnthropicAdapter.convert_openai_content_to_anthropic(content) if role == "system": # Anthropic requires a system message to be string - # https://docs.anthropic.com/en/docs/build-with-claude/prompt-engineering/system-prompts - assert isinstance(content, str) - system_message = content - elif role == "user": - anthropic_messages.append({"role": "user", "content": content}) - elif role == "assistant": - anthropic_messages.append({"role": "assistant", "content": content}) + if isinstance(content, str): + system_message = content + elif isinstance(content, list): + # Extract text from content blocks + text_parts = [] + for part in content: + if isinstance(part, dict) and part.get("type") == "text": + text_parts.append(part.get("text", "")) + system_message = "\n".join(text_parts) + continue + + elif role == "tool": + # Tool responses should be converted to user messages with tool_result content + tool_call_id = msg.get("tool_call_id", "") + tool_content = content if isinstance(content, str) else str(content) + + anthropic_messages.append( + { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": tool_call_id, + "content": tool_content, + } + ], + } + ) + continue + + # Convert content for user/assistant messages + if isinstance(content, str): + anthropic_content = content + else: + anthropic_content = ( + AnthropicAdapter.convert_openai_content_to_anthropic(content) + ) + + anthropic_message = {"role": role, "content": anthropic_content} + + # Handle tool calls in assistant messages + tool_calls = msg.get("tool_calls") + if role == "assistant" and tool_calls: + # Ensure content is a list for tool calls + if isinstance(anthropic_content, str): + if anthropic_content and anthropic_content.strip(): + message_content = [ + {"type": "text", "text": anthropic_content} + ] + else: + message_content = [] + else: + message_content = ( + anthropic_content + if isinstance(anthropic_content, list) + else [] + ) + + # Add tool use blocks - ensure tool_calls is not None + if tool_calls is not None: + for tool_call in tool_calls: + if tool_call and tool_call.get("type") == "function": + func = tool_call.get("function", {}) + if func: # Ensure function is not None + try: + # Parse arguments JSON + args_str = func.get("arguments", "{}") + args = json.loads(args_str) if args_str else {} + except json.JSONDecodeError: + logger.warning( + f"Failed to parse tool call arguments: {func.get('arguments')}" + ) + args = {} + + message_content.append( + { + "type": "tool_use", + "id": tool_call.get( + "id", f"tool_{uuid.uuid4().hex[:8]}" + ), + "name": func.get("name", ""), + "input": args, + } + ) + + anthropic_message["content"] = message_content + + anthropic_messages.append(anthropic_message) # Add system message if present if system_message: @@ -199,7 +366,11 @@ async def process_completion( @staticmethod async def stream_anthropic_response( - url, headers, anthropic_payload, model_name, error_handler: Callable[[str, int], Any] | None = None + url, + headers, + anthropic_payload, + model_name, + error_handler: Callable[[str, int], Any] | None = None, ): """Handle streaming response from Anthropic API, including usage data.""" @@ -260,9 +431,11 @@ async def stream_response() -> AsyncGenerator[bytes, None]: "output_tokens", captured_output_tokens ) - elif event_type == "content_block_delta": - delta_content = data.get("delta", {}).get("text", "") - if delta_content: + elif event_type == "content_block_start": + # Handle start of content blocks (text or tool_use) + content_block = data.get("content_block", {}) + if content_block.get("type") == "tool_use": + # Start of a tool call openai_chunk = { "id": request_id, "object": "chat.completion.chunk", @@ -271,20 +444,94 @@ async def stream_response() -> AsyncGenerator[bytes, None]: "choices": [ { "index": 0, - "delta": {"content": delta_content}, + "delta": { + "tool_calls": [ + { + "index": data.get( + "index", 0 + ), + "id": content_block.get( + "id", + f"call_{uuid.uuid4().hex[:8]}", + ), + "type": "function", + "function": { + "name": content_block.get( + "name", "" + ), + "arguments": "", + }, + } + ] + }, "finish_reason": None, } ], } + elif event_type == "content_block_delta": + delta = data.get("delta", {}) + if delta.get("type") == "text_delta": + # Text content delta + delta_content = delta.get("text", "") + if delta_content: + openai_chunk = { + "id": request_id, + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model_name, + "choices": [ + { + "index": 0, + "delta": {"content": delta_content}, + "finish_reason": None, + } + ], + } + elif delta.get("type") == "input_json_delta": + # Tool arguments delta + partial_json = delta.get("partial_json", "") + if partial_json: + openai_chunk = { + "id": request_id, + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model_name, + "choices": [ + { + "index": 0, + "delta": { + "tool_calls": [ + { + "index": data.get( + "index", 0 + ), + "function": { + "arguments": partial_json + }, + } + ] + }, + "finish_reason": None, + } + ], + } + # Capture Output Tokens & Finish Reason from message_delta elif event_type == "message_delta": delta_data = data.get("delta", {}) - finish_reason = delta_data.get( - "stop_reason" - ) # Get finish reason from delta - if finish_reason: - finish_reason = finish_reason.lower() + anthropic_stop_reason = delta_data.get("stop_reason") + if anthropic_stop_reason: + # Map Anthropic stop reason to OpenAI finish reason + finish_reason_map = { + "end_turn": "stop", + "stop_sequence": "stop", + "max_tokens": "length", + "tool_use": "tool_calls", + } + finish_reason = finish_reason_map.get( + anthropic_stop_reason, "stop" + ) # Check for usage at the TOP LEVEL of the message_delta event data if "usage" in data: @@ -297,7 +544,21 @@ async def stream_response() -> AsyncGenerator[bytes, None]: # Capture Finish Reason from message_stop (backup for usage) elif event_type == "message_stop": - finish_reason = data.get("stop_reason", "stop").lower() + # Map Anthropic stop reason to OpenAI finish reason if not already set + if not finish_reason: + anthropic_stop_reason = data.get( + "stop_reason", "end_turn" + ) + finish_reason_map = { + "end_turn": "stop", + "stop_sequence": "stop", + "max_tokens": "length", + "tool_use": "tool_calls", + } + finish_reason = finish_reason_map.get( + anthropic_stop_reason, "stop" + ) + if not usage_info_complete and "usage" in data: usage = data["usage"] captured_input_tokens = usage.get( @@ -315,9 +576,9 @@ async def stream_response() -> AsyncGenerator[bytes, None]: # --- Yielding Logic --- if openai_chunk: if finish_reason: - openai_chunk["choices"][0][ - "finish_reason" - ] = finish_reason + openai_chunk["choices"][0]["finish_reason"] = ( + finish_reason + ) yield f"data: {json.dumps(openai_chunk)}\n\n".encode() # Check if usage info is complete *after* potential content chunk @@ -325,7 +586,8 @@ async def stream_response() -> AsyncGenerator[bytes, None]: final_usage_data = { "prompt_tokens": captured_input_tokens, "completion_tokens": captured_output_tokens, - "total_tokens": captured_input_tokens + captured_output_tokens, + "total_tokens": captured_input_tokens + + captured_output_tokens, } usage_chunk = { "id": request_id, @@ -340,7 +602,9 @@ async def stream_response() -> AsyncGenerator[bytes, None]: usage_info_complete = False except json.JSONDecodeError as e: - logger.warning(f"Stream API error for {self.provider_name}: Failed to parse JSON: {e}") + logger.warning( + f"Stream API error for {self.provider_name}: Failed to parse JSON: {e}" + ) continue except Exception as e: continue @@ -352,7 +616,11 @@ async def stream_response() -> AsyncGenerator[bytes, None]: @staticmethod async def process_regular_response( - url: str, headers: dict[str, str], anthropic_payload: dict[str, Any], model_name: str, error_handler: Callable[[str, int], Any] | None = None + url: str, + headers: dict[str, str], + anthropic_payload: dict[str, Any], + model_name: str, + error_handler: Callable[[str, int], Any] | None = None, ): """Handle regular (non-streaming) response from Anthropic API""" # Single with statement for multiple contexts @@ -379,14 +647,61 @@ async def process_regular_response( # Messages API response content = anthropic_response.get("content", []) text_content = "" + tool_calls = [] - # Extract text from content blocks - for block in content: - if block.get("type") == "text": - text_content += block.get("text", "") + # Extract text and tool calls from content blocks + if content: # Ensure content is not None + for block in content: + if not block or not isinstance(block, dict): + continue - input_tokens = anthropic_response.get("usage", {}).get("input_tokens", 0) - output_tokens = anthropic_response.get("usage", {}).get("output_tokens", 0) + if block.get("type") == "text": + text_content += block.get("text", "") + elif block.get("type") == "tool_use": + # Convert Anthropic tool use to OpenAI tool call format + tool_calls.append( + { + "id": block.get( + "id", f"call_{uuid.uuid4().hex[:8]}" + ), + "type": "function", + "function": { + "name": block.get("name", ""), + "arguments": json.dumps(block.get("input", {})), + }, + } + ) + + # Map Anthropic stop reason to OpenAI finish reason + stop_reason = anthropic_response.get("stop_reason", "end_turn") + finish_reason_map = { + "end_turn": "stop", + "stop_sequence": "stop", + "max_tokens": "length", + "tool_use": "tool_calls", + } + finish_reason = finish_reason_map.get(stop_reason, "stop") + + # Build message content + message_content = { + "role": "assistant", + "content": text_content if text_content else None, + } + + # Add tool calls if present + if tool_calls: + message_content["tool_calls"] = tool_calls + if not text_content: + message_content["content"] = ( + None # OpenAI expects null content when tool calls are present + ) + + input_tokens = anthropic_response.get("usage", {}).get( + "input_tokens", 0 + ) + output_tokens = anthropic_response.get("usage", {}).get( + "output_tokens", 0 + ) return { "id": completion_id, "object": "chat.completion", @@ -395,11 +710,8 @@ async def process_regular_response( "choices": [ { "index": 0, - "message": { - "role": "assistant", - "content": text_content, - }, - "finish_reason": "stop", + "message": message_content, + "finish_reason": finish_reason, } ], "usage": { @@ -431,7 +743,7 @@ async def process_regular_response( "total_tokens": -1, }, } - + async def process_embeddings( self, endpoint: str, diff --git a/tests/unit_tests/test_anthropic_provider.py b/tests/unit_tests/test_anthropic_provider.py index 7fa01a8..298ae03 100644 --- a/tests/unit_tests/test_anthropic_provider.py +++ b/tests/unit_tests/test_anthropic_provider.py @@ -80,7 +80,6 @@ async def test_chat_completion(self): "messages": [{"role": "user", "content": "Hello, how are you?"}], "max_tokens": 4096, "temperature": 1.0, - "stop_sequences": [], } async def test_chat_completion_streaming(self): @@ -113,6 +112,5 @@ async def test_chat_completion_streaming(self): "messages": [{"role": "user", "content": "Hello, how are you?"}], "max_tokens": 4096, "temperature": 1.0, - "stop_sequences": [], "stream": True, }