diff --git a/.github/README.md b/.github/README.md
index 35cca027..8f3727f7 100644
--- a/.github/README.md
+++ b/.github/README.md
@@ -3,8 +3,7 @@
> **You are reading the Github README!**
>
> - ๐ **Documentation**: See our [technical documentation](https://deepcritical.github.io/GradioDemo/) for detailed information
-> - ๐ **Demo README**: Check out the [Demo README](..README.md) for for more information about our MCP Hackathon submission
-> - ๐ **Hackathon Submission**: Keep reading below for more information about our MCP Hackathon submission
+> - ๐ **Demo README**: Check out the [Demo README](..README.md) for more information > - ๐ **Demo**: Kindly consider using our [Free Demo](https://hf.co/DataQuests/GradioDemo)
diff --git a/.github/scripts/deploy_to_hf_space.py b/.github/scripts/deploy_to_hf_space.py
index 839a38dd..a60ba216 100644
--- a/.github/scripts/deploy_to_hf_space.py
+++ b/.github/scripts/deploy_to_hf_space.py
@@ -3,6 +3,7 @@
import os
import shutil
import subprocess
+import tempfile
from pathlib import Path
from typing import Set
@@ -38,6 +39,7 @@ def get_excluded_dirs() -> Set[str]:
"dist",
".eggs",
"htmlcov",
+ "hf_space", # Exclude the cloned HF Space directory itself
}
@@ -48,7 +50,6 @@ def get_excluded_files() -> Set[str]:
"mkdocs.yml",
"uv.lock",
"AGENTS.txt",
- "CONTRIBUTING.md",
".env",
".env.local",
"*.local",
@@ -101,9 +102,22 @@ def deploy_to_hf_space() -> None:
hf_username = os.getenv("HF_USERNAME") # Can be username or organization name
space_name = os.getenv("HF_SPACE_NAME")
- if not all([hf_token, hf_username, space_name]):
+ # Check which variables are missing and provide helpful error message
+ missing = []
+ if not hf_token:
+ missing.append("HF_TOKEN (should be in repository secrets)")
+ if not hf_username:
+ missing.append("HF_USERNAME (should be in repository variables)")
+ if not space_name:
+ missing.append("HF_SPACE_NAME (should be in repository variables)")
+
+ if missing:
raise ValueError(
- "Missing required environment variables: HF_TOKEN, HF_USERNAME, HF_SPACE_NAME"
+ f"Missing required environment variables: {', '.join(missing)}\n"
+ f"Please configure:\n"
+ f" - HF_TOKEN in Settings > Secrets and variables > Actions > Secrets\n"
+ f" - HF_USERNAME in Settings > Secrets and variables > Actions > Variables\n"
+ f" - HF_SPACE_NAME in Settings > Secrets and variables > Actions > Variables"
)
# HF_USERNAME can be either a username or organization name
@@ -134,8 +148,36 @@ def deploy_to_hf_space() -> None:
)
print(f"โ
Created new Space: {repo_id}")
+ # Configure Git credential helper for authentication
+ # This is needed for Git LFS to work properly with fine-grained tokens
+ print("๐ Configuring Git credentials...")
+
+ # Use Git credential store to store the token
+ # This allows Git LFS to authenticate properly
+ temp_dir = Path(tempfile.gettempdir())
+ credential_store = temp_dir / ".git-credentials-hf"
+
+ # Write credentials in the format: https://username:token@huggingface.co
+ credential_store.write_text(f"https://{hf_username}:{hf_token}@huggingface.co\n", encoding="utf-8")
+ try:
+ credential_store.chmod(0o600) # Secure permissions (Unix only)
+ except OSError:
+ # Windows doesn't support chmod, skip
+ pass
+
+ # Configure Git to use the credential store
+ subprocess.run(
+ ["git", "config", "--global", "credential.helper", f"store --file={credential_store}"],
+ check=True,
+ capture_output=True,
+ )
+
+ # Also set environment variable for Git LFS
+ os.environ["GIT_CREDENTIAL_HELPER"] = f"store --file={credential_store}"
+
# Clone repository using git
- space_url = f"https://{hf_token}@huggingface.co/spaces/{repo_id}"
+ # Use the token in the URL for initial clone, but LFS will use credential store
+ space_url = f"https://{hf_username}:{hf_token}@huggingface.co/spaces/{repo_id}"
if Path(local_dir).exists():
print(f"๐งน Removing existing {local_dir} directory...")
@@ -150,10 +192,58 @@ def deploy_to_hf_space() -> None:
text=True,
)
print(f"โ
Cloned Space repository")
+
+ # After clone, configure the remote to use credential helper
+ # This ensures future operations (like push) use the credential store
+ os.chdir(local_dir)
+ subprocess.run(
+ ["git", "remote", "set-url", "origin", f"https://huggingface.co/spaces/{repo_id}"],
+ check=True,
+ capture_output=True,
+ )
+ os.chdir("..")
+
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else e.stdout if e.stdout else "Unknown error"
print(f"โ Failed to clone Space repository: {error_msg}")
- raise RuntimeError(f"Git clone failed: {error_msg}") from e
+
+ # Try alternative: clone with LFS skip, then fetch LFS files separately
+ print("๐ Trying alternative clone method (skip LFS during clone)...")
+ try:
+ env = os.environ.copy()
+ env["GIT_LFS_SKIP_SMUDGE"] = "1" # Skip LFS during clone
+
+ subprocess.run(
+ ["git", "clone", space_url, local_dir],
+ check=True,
+ capture_output=True,
+ text=True,
+ env=env,
+ )
+ print(f"โ
Cloned Space repository (LFS skipped)")
+
+ # Configure remote
+ os.chdir(local_dir)
+ subprocess.run(
+ ["git", "remote", "set-url", "origin", f"https://huggingface.co/spaces/{repo_id}"],
+ check=True,
+ capture_output=True,
+ )
+
+ # Try to fetch LFS files with proper authentication
+ print("๐ฅ Fetching LFS files...")
+ subprocess.run(
+ ["git", "lfs", "pull"],
+ check=False, # Don't fail if LFS pull fails - we'll continue without LFS files
+ capture_output=True,
+ text=True,
+ )
+ os.chdir("..")
+ print(f"โ
Repository cloned (LFS files may be incomplete, but deployment can continue)")
+ except subprocess.CalledProcessError as e2:
+ error_msg2 = e2.stderr if e2.stderr else e2.stdout if e2.stdout else "Unknown error"
+ print(f"โ Alternative clone method also failed: {error_msg2}")
+ raise RuntimeError(f"Git clone failed: {error_msg}") from e
# Get exclusion sets
excluded_dirs = get_excluded_dirs()
@@ -180,6 +270,10 @@ def deploy_to_hf_space() -> None:
if ".git" in item.parts:
continue
+ # Skip if in hf_space directory (the cloned Space directory)
+ if "hf_space" in item.parts:
+ continue
+
# Skip if should be excluded
if should_exclude(item, excluded_dirs, excluded_files):
continue
@@ -253,6 +347,12 @@ def deploy_to_hf_space() -> None:
capture_output=True,
)
print("๐ค Pushing to Hugging Face Space...")
+ # Ensure remote URL uses credential helper (not token in URL)
+ subprocess.run(
+ ["git", "remote", "set-url", "origin", f"https://huggingface.co/spaces/{repo_id}"],
+ check=True,
+ capture_output=True,
+ )
subprocess.run(
["git", "push"],
check=True,
@@ -273,6 +373,14 @@ def deploy_to_hf_space() -> None:
finally:
# Return to original directory
os.chdir(original_cwd)
+
+ # Clean up credential store for security
+ try:
+ if credential_store.exists():
+ credential_store.unlink()
+ except Exception:
+ # Ignore cleanup errors
+ pass
print(f"๐ Successfully deployed to: https://huggingface.co/spaces/{repo_id}")
diff --git a/.github/workflows/deploy-hf-space.yml b/.github/workflows/deploy-hf-space.yml
index 5e788686..e22f89ab 100644
--- a/.github/workflows/deploy-hf-space.yml
+++ b/.github/workflows/deploy-hf-space.yml
@@ -30,9 +30,12 @@ jobs:
- name: Deploy to Hugging Face Space
env:
+ # Token from secrets (sensitive data)
HF_TOKEN: ${{ secrets.HF_TOKEN }}
- HF_USERNAME: ${{ secrets.HF_USERNAME }}
- HF_SPACE_NAME: ${{ secrets.HF_SPACE_NAME }}
+ # Username/Organization from repository variables (non-sensitive)
+ HF_USERNAME: ${{ vars.HF_USERNAME }}
+ # Space name from repository variables (non-sensitive)
+ HF_SPACE_NAME: ${{ vars.HF_SPACE_NAME }}
run: |
python .github/scripts/deploy_to_hf_space.py
@@ -40,5 +43,5 @@ jobs:
if: success()
run: |
echo "โ
Deployment completed successfully!"
- echo "Space URL: https://huggingface.co/spaces/${{ secrets.HF_USERNAME }}/${{ secrets.HF_SPACE_NAME }}"
+ echo "Space URL: https://huggingface.co/spaces/${{ vars.HF_USERNAME }}/${{ vars.HF_SPACE_NAME }}"
diff --git a/README.md b/README.md
index aedb9d90..a4c0e90e 100644
--- a/README.md
+++ b/README.md
@@ -10,7 +10,14 @@ app_file: src/app.py
hf_oauth: true
hf_oauth_expiration_minutes: 480
hf_oauth_scopes:
- - inference-api
+ # Required for HuggingFace Inference API (includes all third-party providers)
+ # This scope grants access to:
+ # - HuggingFace's own Inference API
+ # - Third-party inference providers (nebius, together, scaleway, hyperbolic, novita, nscale, sambanova, ovh, fireworks, etc.)
+ # - All models available through the Inference Providers API
+ - inference-api
+ # Optional: Uncomment if you need to access user's billing information
+ # - read-billing
pinned: true
license: mit
tags:
diff --git a/dev/__init__.py b/dev/__init__.py
index d1ee8372..cf9d92ad 100644
--- a/dev/__init__.py
+++ b/dev/__init__.py
@@ -4,3 +4,8 @@
+
+
+
+
+
diff --git a/docs/LICENSE.md b/docs/LICENSE.md
index 7a0c1fad..d12c3003 100644
--- a/docs/LICENSE.md
+++ b/docs/LICENSE.md
@@ -25,3 +25,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+
+
+
+
+
diff --git a/docs/api/orchestrators.md b/docs/api/orchestrators.md
index 9e3d22df..ec187453 100644
--- a/docs/api/orchestrators.md
+++ b/docs/api/orchestrators.md
@@ -23,9 +23,12 @@ Runs iterative research flow.
- `background_context`: Background context (default: "")
- `output_length`: Optional description of desired output length (default: "")
- `output_instructions`: Optional additional instructions for report generation (default: "")
+- `message_history`: Optional user conversation history in Pydantic AI `ModelMessage` format (default: None)
**Returns**: Final report string.
+**Note**: The `message_history` parameter enables multi-turn conversations by providing context from previous interactions.
+
**Note**: `max_iterations`, `max_time_minutes`, and `token_budget` are constructor parameters, not `run()` parameters.
## DeepResearchFlow
@@ -46,9 +49,12 @@ Runs deep research flow.
**Parameters**:
- `query`: Research query string
+- `message_history`: Optional user conversation history in Pydantic AI `ModelMessage` format (default: None)
**Returns**: Final report string.
+**Note**: The `message_history` parameter enables multi-turn conversations by providing context from previous interactions.
+
**Note**: `max_iterations_per_section`, `max_time_minutes`, and `token_budget` are constructor parameters, not `run()` parameters.
## GraphOrchestrator
@@ -69,10 +75,13 @@ Runs graph-based research orchestration.
**Parameters**:
- `query`: Research query string
+- `message_history`: Optional user conversation history in Pydantic AI `ModelMessage` format (default: None)
**Yields**: `AgentEvent` objects during graph execution.
-**Note**: `research_mode` and `use_graph` are constructor parameters, not `run()` parameters.
+**Note**:
+- `research_mode` and `use_graph` are constructor parameters, not `run()` parameters.
+- The `message_history` parameter enables multi-turn conversations by providing context from previous interactions. Message history is stored in `GraphExecutionContext` and passed to agents during execution.
## Orchestrator Factory
diff --git a/docs/architecture/graph_orchestration.md b/docs/architecture/graph_orchestration.md
index cf8c4dbd..cdbec2d1 100644
--- a/docs/architecture/graph_orchestration.md
+++ b/docs/architecture/graph_orchestration.md
@@ -4,6 +4,44 @@
DeepCritical implements a graph-based orchestration system for research workflows using Pydantic AI agents as nodes. This enables better parallel execution, conditional routing, and state management compared to simple agent chains.
+## Conversation History
+
+DeepCritical supports multi-turn conversations through Pydantic AI's native message history format. The system maintains two types of history:
+
+1. **User Conversation History**: Multi-turn user interactions (from Gradio chat interface) stored as `list[ModelMessage]`
+2. **Research Iteration History**: Internal research process state (existing `Conversation` model)
+
+### Message History Flow
+
+```
+Gradio Chat History โ convert_gradio_to_message_history() โ GraphOrchestrator.run(message_history)
+ โ
+GraphExecutionContext (stores message_history)
+ โ
+Agent Nodes (receive message_history via agent.run())
+ โ
+WorkflowState (persists user_message_history)
+```
+
+### Usage
+
+Message history is automatically converted from Gradio format and passed through the orchestrator:
+
+```python
+# In app.py - automatic conversion
+message_history = convert_gradio_to_message_history(history) if history else None
+async for event in orchestrator.run(query, message_history=message_history):
+ yield event
+```
+
+Agents receive message history through their `run()` methods:
+
+```python
+# In agent execution
+if message_history:
+ result = await agent.run(input_data, message_history=message_history)
+```
+
## Graph Patterns
### Iterative Research Graph
diff --git a/src/agents/knowledge_gap.py b/src/agents/knowledge_gap.py
index 92114eaf..f7c12d46 100644
--- a/src/agents/knowledge_gap.py
+++ b/src/agents/knowledge_gap.py
@@ -9,6 +9,11 @@
import structlog
from pydantic_ai import Agent
+try:
+ from pydantic_ai import ModelMessage
+except ImportError:
+ ModelMessage = Any # type: ignore[assignment, misc]
+
from src.agent_factory.judges import get_model
from src.utils.exceptions import ConfigurationError
from src.utils.models import KnowledgeGapOutput
@@ -68,6 +73,7 @@ async def evaluate(
query: str,
background_context: str = "",
conversation_history: str = "",
+ message_history: list[ModelMessage] | None = None,
iteration: int = 0,
time_elapsed_minutes: float = 0.0,
max_time_minutes: int = 10,
@@ -78,7 +84,8 @@ async def evaluate(
Args:
query: The original research query
background_context: Optional background context
- conversation_history: History of actions, findings, and thoughts
+ conversation_history: History of actions, findings, and thoughts (backward compat)
+ message_history: Optional user conversation history (Pydantic AI format)
iteration: Current iteration number
time_elapsed_minutes: Time elapsed so far
max_time_minutes: Maximum time allowed
@@ -111,8 +118,11 @@ async def evaluate(
"""
try:
- # Run the agent
- result = await self.agent.run(user_message)
+ # Run the agent with message_history if provided
+ if message_history:
+ result = await self.agent.run(user_message, message_history=message_history)
+ else:
+ result = await self.agent.run(user_message)
evaluation = result.output
self.logger.info(
diff --git a/src/agents/thinking.py b/src/agents/thinking.py
index 225543b7..021b223f 100644
--- a/src/agents/thinking.py
+++ b/src/agents/thinking.py
@@ -9,6 +9,11 @@
import structlog
from pydantic_ai import Agent
+try:
+ from pydantic_ai import ModelMessage
+except ImportError:
+ ModelMessage = Any # type: ignore[assignment, misc]
+
from src.agent_factory.judges import get_model
from src.utils.exceptions import ConfigurationError
@@ -72,6 +77,7 @@ async def generate_observations(
query: str,
background_context: str = "",
conversation_history: str = "",
+ message_history: list[ModelMessage] | None = None,
iteration: int = 1,
) -> str:
"""
@@ -80,7 +86,8 @@ async def generate_observations(
Args:
query: The original research query
background_context: Optional background context
- conversation_history: History of actions, findings, and thoughts
+ conversation_history: History of actions, findings, and thoughts (backward compat)
+ message_history: Optional user conversation history (Pydantic AI format)
iteration: Current iteration number
Returns:
@@ -110,8 +117,11 @@ async def generate_observations(
"""
try:
- # Run the agent
- result = await self.agent.run(user_message)
+ # Run the agent with message_history if provided
+ if message_history:
+ result = await self.agent.run(user_message, message_history=message_history)
+ else:
+ result = await self.agent.run(user_message)
observations = result.output
self.logger.info("Observations generated", length=len(observations))
diff --git a/src/agents/tool_selector.py b/src/agents/tool_selector.py
index 3f06fe92..d95113b9 100644
--- a/src/agents/tool_selector.py
+++ b/src/agents/tool_selector.py
@@ -9,6 +9,11 @@
import structlog
from pydantic_ai import Agent
+try:
+ from pydantic_ai import ModelMessage
+except ImportError:
+ ModelMessage = Any # type: ignore[assignment, misc]
+
from src.agent_factory.judges import get_model
from src.utils.exceptions import ConfigurationError
from src.utils.models import AgentSelectionPlan
@@ -81,6 +86,7 @@ async def select_tools(
query: str,
background_context: str = "",
conversation_history: str = "",
+ message_history: list[ModelMessage] | None = None,
) -> AgentSelectionPlan:
"""
Select tools to address a knowledge gap.
@@ -89,7 +95,8 @@ async def select_tools(
gap: The knowledge gap to address
query: The original research query
background_context: Optional background context
- conversation_history: History of actions, findings, and thoughts
+ conversation_history: History of actions, findings, and thoughts (backward compat)
+ message_history: Optional user conversation history (Pydantic AI format)
Returns:
AgentSelectionPlan with tasks for selected agents
@@ -115,8 +122,11 @@ async def select_tools(
"""
try:
- # Run the agent
- result = await self.agent.run(user_message)
+ # Run the agent with message_history if provided
+ if message_history:
+ result = await self.agent.run(user_message, message_history=message_history)
+ else:
+ result = await self.agent.run(user_message)
selection_plan = result.output
self.logger.info(
diff --git a/src/app.py b/src/app.py
index ecb13412..542c21d7 100644
--- a/src/app.py
+++ b/src/app.py
@@ -37,8 +37,14 @@
from src.tools.search_handler import SearchHandler
from src.tools.neo4j_search import Neo4jSearchTool
from src.utils.config import settings
+from src.utils.message_history import convert_gradio_to_message_history
from src.utils.models import AgentEvent, OrchestratorConfig
+try:
+ from pydantic_ai import ModelMessage
+except ImportError:
+ ModelMessage = Any # type: ignore[assignment, misc]
+
logger = structlog.get_logger()
@@ -104,11 +110,29 @@ def configure_orchestrator(
# 2. API Key (OAuth or Env) - HuggingFace only (OAuth provides HF token)
# Priority: oauth_token > env vars
# On HuggingFace Spaces, OAuth token is available via request.oauth_token
+ #
+ # OAuth Scope Requirements:
+ # - 'inference-api': Required for HuggingFace Inference API access
+ # This scope grants access to:
+ # * HuggingFace's own Inference API
+ # * All third-party inference providers (nebius, together, scaleway, hyperbolic, novita, nscale, sambanova, ovh, fireworks, etc.)
+ # * All models available through the Inference Providers API
+ # See: https://huggingface.co/docs/hub/oauth#currently-supported-scopes
+ #
+ # Note: The hf_provider parameter is accepted but not used here because HuggingFaceProvider
+ # from pydantic-ai doesn't support provider selection. Provider selection happens at the
+ # InferenceClient level (used in HuggingFaceChatClient for advanced mode).
effective_api_key = oauth_token or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_API_KEY")
+
+ # Log which authentication source is being used
+ if effective_api_key:
+ auth_source = "OAuth token" if oauth_token else ("HF_TOKEN env var" if os.getenv("HF_TOKEN") else "HUGGINGFACE_API_KEY env var")
+ logger.info("Using HuggingFace authentication", source=auth_source, has_token=bool(effective_api_key))
if effective_api_key:
# We have an API key (OAuth or env) - use pydantic-ai with JudgeHandler
- # This uses HuggingFace's own inference API, not third-party providers
+ # This uses HuggingFace Inference API, which includes access to all third-party providers
+ # via the Inference Providers API (router.huggingface.co)
model: Any | None = None
# Use selected model or fall back to env var/settings
model_name = (
@@ -126,6 +150,7 @@ def configure_orchestrator(
# Per https://ai.pydantic.dev/models/huggingface/#configure-the-provider
# HuggingFaceProvider accepts api_key parameter directly
# This is consistent with usage in src/utils/llm_factory.py and src/agent_factory/judges.py
+ # The OAuth token with 'inference-api' scope provides access to all inference providers
provider = HuggingFaceProvider(api_key=effective_api_key) # type: ignore[misc]
model = HuggingFaceModel(model_name, provider=provider) # type: ignore[misc]
backend_info = "API (HuggingFace OAuth)" if oauth_token else "API (Env Config)"
@@ -469,6 +494,7 @@ async def yield_auth_messages(
async def handle_orchestrator_events(
orchestrator: Any,
message: str,
+ conversation_history: list[ModelMessage] | None = None,
) -> AsyncGenerator[dict[str, Any], None]:
"""
Handle orchestrator events and yield ChatMessages.
@@ -476,6 +502,7 @@ async def handle_orchestrator_events(
Args:
orchestrator: The orchestrator instance
message: The research question
+ conversation_history: Optional user conversation history
Yields:
ChatMessage objects from orchestrator events
@@ -483,7 +510,7 @@ async def handle_orchestrator_events(
# Track pending accordions for real-time updates
pending_accordions: dict[str, str] = {} # title -> accumulated content
- async for event in orchestrator.run(message):
+ async for event in orchestrator.run(message, message_history=conversation_history):
# Convert event to ChatMessage with metadata
chat_msg = event_to_chat_message(event)
@@ -591,11 +618,14 @@ async def research_agent(
# OAuthToken has a .token attribute containing the access token
if hasattr(oauth_token, "token"):
token_value = oauth_token.token
+ logger.debug("OAuth token extracted from oauth_token.token attribute")
elif isinstance(oauth_token, str):
# Handle case where oauth_token is already a string (shouldn't happen but defensive)
token_value = oauth_token
+ logger.debug("OAuth token extracted as string")
else:
token_value = None
+ logger.warning("OAuth token object present but token extraction failed", oauth_token_type=type(oauth_token).__name__)
if oauth_profile is not None:
# OAuthProfile has .username, .name, .profile_image attributes
@@ -608,6 +638,8 @@ async def research_agent(
else None
)
)
+ if username:
+ logger.info("OAuth user authenticated", username=username)
# Check if user is logged in (OAuth token or env var)
# Fallback to env vars for local development or Spaces with HF_TOKEN secret
@@ -687,10 +719,21 @@ async def research_agent(
model_id = hf_model if hf_model and hf_model.strip() else None
provider_name = hf_provider if hf_provider and hf_provider.strip() else None
+ # Log authentication source for debugging
+ auth_source = "OAuth" if token_value else ("Env (HF_TOKEN)" if os.getenv("HF_TOKEN") else ("Env (HUGGINGFACE_API_KEY)" if os.getenv("HUGGINGFACE_API_KEY") else "None"))
+ logger.info(
+ "Configuring orchestrator",
+ mode=effective_mode,
+ auth_source=auth_source,
+ has_oauth_token=bool(token_value),
+ model=model_id or "default",
+ provider=provider_name or "auto",
+ )
+
orchestrator, backend_name = configure_orchestrator(
use_mock=False, # Never use mock in production - HF Inference is the free fallback
mode=effective_mode,
- oauth_token=token_value, # Use extracted token value
+ oauth_token=token_value, # Use extracted token value - passed to all agents and services
hf_model=model_id, # None will use defaults in configure_orchestrator
hf_provider=provider_name, # None will use defaults in configure_orchestrator
graph_mode=graph_mode if graph_mode else None,
@@ -702,11 +745,21 @@ async def research_agent(
"content": f"๐ง **Backend**: {backend_name}\n\n",
}
+ # Convert Gradio history to message history
+ message_history = convert_gradio_to_message_history(history) if history else None
+ if message_history:
+ logger.info(
+ "Using conversation history",
+ turns=len(message_history) // 2, # Approximate turn count
+ )
+
# Handle orchestrator events and generate audio output
audio_output_data: tuple[int, np.ndarray] | None = None
final_message = ""
- async for msg in handle_orchestrator_events(orchestrator, processed_text):
+ async for msg in handle_orchestrator_events(
+ orchestrator, processed_text, conversation_history=message_history
+ ):
# Track final message for TTS
if isinstance(msg, dict) and msg.get("role") == "assistant":
content = msg.get("content", "")
diff --git a/src/legacy_orchestrator.py b/src/legacy_orchestrator.py
index ac1ee46c..b41ba8aa 100644
--- a/src/legacy_orchestrator.py
+++ b/src/legacy_orchestrator.py
@@ -6,6 +6,11 @@
import structlog
+try:
+ from pydantic_ai import ModelMessage
+except ImportError:
+ ModelMessage = Any # type: ignore[assignment, misc]
+
from src.utils.config import settings
from src.utils.models import (
AgentEvent,
@@ -153,7 +158,9 @@ async def _run_analysis_phase(
iteration=iteration,
)
- async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: # noqa: PLR0915
+ async def run(
+ self, query: str, message_history: list[ModelMessage] | None = None
+ ) -> AsyncGenerator[AgentEvent, None]: # noqa: PLR0915
"""
Run the agent loop for a query.
@@ -161,11 +168,16 @@ async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: # noqa: PL
Args:
query: The user's research question
+ message_history: Optional user conversation history (for compatibility)
Yields:
AgentEvent objects for each step of the process
"""
- logger.info("Starting orchestrator", query=query)
+ logger.info(
+ "Starting orchestrator",
+ query=query,
+ has_history=bool(message_history),
+ )
yield AgentEvent(
type="started",
diff --git a/src/middleware/state_machine.py b/src/middleware/state_machine.py
index d43e131e..61473d76 100644
--- a/src/middleware/state_machine.py
+++ b/src/middleware/state_machine.py
@@ -11,6 +11,11 @@
import structlog
from pydantic import BaseModel, Field
+try:
+ from pydantic_ai import ModelMessage
+except ImportError:
+ ModelMessage = Any # type: ignore[assignment, misc]
+
from src.utils.models import Citation, Conversation, Evidence
if TYPE_CHECKING:
@@ -28,6 +33,10 @@ class WorkflowState(BaseModel):
evidence: list[Evidence] = Field(default_factory=list)
conversation: Conversation = Field(default_factory=Conversation)
+ user_message_history: list[ModelMessage] = Field(
+ default_factory=list,
+ description="User conversation history (multi-turn interactions)",
+ )
# Type as Any to avoid circular imports/runtime resolution issues
# The actual object injected will be an EmbeddingService instance
embedding_service: Any = Field(default=None)
@@ -90,6 +99,31 @@ async def search_related(self, query: str, n_results: int = 5) -> list[Evidence]
return evidence_list
+ def add_user_message(self, message: ModelMessage) -> None:
+ """Add a user message to conversation history.
+
+ Args:
+ message: Message to add
+ """
+ self.user_message_history.append(message)
+
+ def get_user_history(self, max_messages: int | None = None) -> list[ModelMessage]:
+ """Get user conversation history.
+
+ Args:
+ max_messages: Maximum messages to return (None for all)
+
+ Returns:
+ List of messages
+ """
+ if max_messages is None:
+ return self.user_message_history.copy()
+ return (
+ self.user_message_history[-max_messages:]
+ if len(self.user_message_history) > max_messages
+ else self.user_message_history.copy()
+ )
+
# The ContextVar holds the WorkflowState for the current execution context
_workflow_state_var: ContextVar[WorkflowState | None] = ContextVar("workflow_state", default=None)
@@ -97,18 +131,26 @@ async def search_related(self, query: str, n_results: int = 5) -> list[Evidence]
def init_workflow_state(
embedding_service: "EmbeddingService | None" = None,
+ message_history: list[ModelMessage] | None = None,
) -> WorkflowState:
"""Initialize a new state for the current context.
Args:
embedding_service: Optional embedding service for semantic search.
+ message_history: Optional user conversation history.
Returns:
The initialized WorkflowState instance.
"""
state = WorkflowState(embedding_service=embedding_service)
+ if message_history:
+ state.user_message_history = message_history.copy()
_workflow_state_var.set(state)
- logger.debug("Workflow state initialized", has_embeddings=embedding_service is not None)
+ logger.debug(
+ "Workflow state initialized",
+ has_embeddings=embedding_service is not None,
+ has_history=bool(message_history),
+ )
return state
@@ -137,3 +179,4 @@ def get_workflow_state() -> WorkflowState:
+
diff --git a/src/orchestrator/graph_orchestrator.py b/src/orchestrator/graph_orchestrator.py
index 82650b9f..09136d4e 100644
--- a/src/orchestrator/graph_orchestrator.py
+++ b/src/orchestrator/graph_orchestrator.py
@@ -10,6 +10,11 @@
import structlog
+try:
+ from pydantic_ai import ModelMessage
+except ImportError:
+ ModelMessage = Any # type: ignore[assignment, misc]
+
from src.agent_factory.agents import (
create_input_parser_agent,
create_knowledge_gap_agent,
@@ -44,12 +49,18 @@
class GraphExecutionContext:
"""Context for managing graph execution state."""
- def __init__(self, state: WorkflowState, budget_tracker: BudgetTracker) -> None:
+ def __init__(
+ self,
+ state: WorkflowState,
+ budget_tracker: BudgetTracker,
+ message_history: list[ModelMessage] | None = None,
+ ) -> None:
"""Initialize execution context.
Args:
state: Current workflow state
budget_tracker: Budget tracker instance
+ message_history: Optional user conversation history
"""
self.current_node: str = ""
self.visited_nodes: set[str] = set()
@@ -57,6 +68,7 @@ def __init__(self, state: WorkflowState, budget_tracker: BudgetTracker) -> None:
self.state = state
self.budget_tracker = budget_tracker
self.iteration_count = 0
+ self.message_history: list[ModelMessage] = message_history or []
def set_node_result(self, node_id: str, result: Any) -> None:
"""Store result from node execution.
@@ -108,6 +120,31 @@ def update_state(
"""
self.state = updater(self.state, data)
+ def add_message(self, message: ModelMessage) -> None:
+ """Add a message to the history.
+
+ Args:
+ message: Message to add
+ """
+ self.message_history.append(message)
+
+ def get_message_history(self, max_messages: int | None = None) -> list[ModelMessage]:
+ """Get message history, optionally truncated.
+
+ Args:
+ max_messages: Maximum messages to return (None for all)
+
+ Returns:
+ List of messages
+ """
+ if max_messages is None:
+ return self.message_history.copy()
+ return (
+ self.message_history[-max_messages:]
+ if len(self.message_history) > max_messages
+ else self.message_history.copy()
+ )
+
class GraphOrchestrator:
"""
@@ -174,12 +211,15 @@ def _get_file_service(self) -> ReportFileService | None:
return None
return self._file_service
- async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
+ async def run(
+ self, query: str, message_history: list[ModelMessage] | None = None
+ ) -> AsyncGenerator[AgentEvent, None]:
"""
Run the research workflow.
Args:
query: The user's research query
+ message_history: Optional user conversation history
Yields:
AgentEvent objects for real-time UI updates
@@ -189,6 +229,7 @@ async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
query=query[:100],
mode=self.mode,
use_graph=self.use_graph,
+ has_history=bool(message_history),
)
yield AgentEvent(
@@ -205,10 +246,10 @@ async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
# Use graph execution if enabled, otherwise fall back to agent chains
if self.use_graph:
- async for event in self._run_with_graph(query, research_mode):
+ async for event in self._run_with_graph(query, research_mode, message_history):
yield event
else:
- async for event in self._run_with_chains(query, research_mode):
+ async for event in self._run_with_chains(query, research_mode, message_history):
yield event
except Exception as e:
@@ -220,13 +261,17 @@ async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
)
async def _run_with_graph(
- self, query: str, research_mode: Literal["iterative", "deep"]
+ self,
+ query: str,
+ research_mode: Literal["iterative", "deep"],
+ message_history: list[ModelMessage] | None = None,
) -> AsyncGenerator[AgentEvent, None]:
"""Run workflow using graph execution.
Args:
query: The research query
research_mode: The research mode
+ message_history: Optional user conversation history
Yields:
AgentEvent objects
@@ -235,7 +280,10 @@ async def _run_with_graph(
from src.services.embeddings import get_embedding_service
embedding_service = get_embedding_service()
- state = init_workflow_state(embedding_service=embedding_service)
+ state = init_workflow_state(
+ embedding_service=embedding_service,
+ message_history=message_history,
+ )
budget_tracker = BudgetTracker()
budget_tracker.create_budget(
loop_id="graph_execution",
@@ -245,7 +293,11 @@ async def _run_with_graph(
)
budget_tracker.start_timer("graph_execution")
- context = GraphExecutionContext(state, budget_tracker)
+ context = GraphExecutionContext(
+ state,
+ budget_tracker,
+ message_history=message_history or [],
+ )
# Build graph
self._graph = await self._build_graph(research_mode)
@@ -255,13 +307,17 @@ async def _run_with_graph(
yield event
async def _run_with_chains(
- self, query: str, research_mode: Literal["iterative", "deep"]
+ self,
+ query: str,
+ research_mode: Literal["iterative", "deep"],
+ message_history: list[ModelMessage] | None = None,
) -> AsyncGenerator[AgentEvent, None]:
"""Run workflow using agent chains (backward compatibility).
Args:
query: The research query
research_mode: The research mode
+ message_history: Optional user conversation history
Yields:
AgentEvent objects
@@ -282,7 +338,7 @@ async def _run_with_chains(
)
try:
- final_report = await self._iterative_flow.run(query)
+ final_report = await self._iterative_flow.run(query, message_history=message_history)
except Exception as e:
self.logger.error("Iterative flow failed", error=str(e), exc_info=True)
# Yield error event - outer handler will also catch and yield error event
@@ -318,7 +374,7 @@ async def _run_with_chains(
)
try:
- final_report = await self._deep_flow.run(query)
+ final_report = await self._deep_flow.run(query, message_history=message_history)
except Exception as e:
self.logger.error("Deep flow failed", error=str(e), exc_info=True)
# Yield error event before re-raising so test can capture it
@@ -862,9 +918,26 @@ async def _execute_agent_node(
if node.input_transformer:
input_data = node.input_transformer(input_data)
+ # Get message history from context (limit to most recent 10 messages for token efficiency)
+ message_history = context.get_message_history(max_messages=10)
+
# Execute agent with error handling
try:
- result = await node.agent.run(input_data)
+ # Pass message_history if available (Pydantic AI agents support this)
+ if message_history:
+ result = await node.agent.run(input_data, message_history=message_history)
+ else:
+ result = await node.agent.run(input_data)
+
+ # Accumulate new messages from agent result if available
+ if hasattr(result, "new_messages"):
+ try:
+ new_messages = result.new_messages()
+ for msg in new_messages:
+ context.add_message(msg)
+ except Exception as e:
+ # Don't fail if message accumulation fails
+ self.logger.debug("Failed to accumulate messages from agent result", error=str(e))
except Exception as e:
# Handle validation errors and API errors for planner node
if node.node_id == "planner":
diff --git a/src/orchestrator/research_flow.py b/src/orchestrator/research_flow.py
index 52756654..4a72bd55 100644
--- a/src/orchestrator/research_flow.py
+++ b/src/orchestrator/research_flow.py
@@ -10,6 +10,11 @@
import structlog
+try:
+ from pydantic_ai import ModelMessage
+except ImportError:
+ ModelMessage = Any # type: ignore[assignment, misc]
+
from src.agent_factory.agents import (
create_graph_orchestrator,
create_knowledge_gap_agent,
@@ -137,6 +142,7 @@ async def run(
background_context: str = "",
output_length: str = "",
output_instructions: str = "",
+ message_history: list[ModelMessage] | None = None,
) -> str:
"""
Run the iterative research flow.
@@ -146,17 +152,18 @@ async def run(
background_context: Optional background context
output_length: Optional description of desired output length
output_instructions: Optional additional instructions
+ message_history: Optional user conversation history
Returns:
Final report string
"""
if self.use_graph:
return await self._run_with_graph(
- query, background_context, output_length, output_instructions
+ query, background_context, output_length, output_instructions, message_history
)
else:
return await self._run_with_chains(
- query, background_context, output_length, output_instructions
+ query, background_context, output_length, output_instructions, message_history
)
async def _run_with_chains(
@@ -165,6 +172,7 @@ async def _run_with_chains(
background_context: str = "",
output_length: str = "",
output_instructions: str = "",
+ message_history: list[ModelMessage] | None = None,
) -> str:
"""
Run the iterative research flow using agent chains.
@@ -174,6 +182,7 @@ async def _run_with_chains(
background_context: Optional background context
output_length: Optional description of desired output length
output_instructions: Optional additional instructions
+ message_history: Optional user conversation history
Returns:
Final report string
@@ -193,10 +202,10 @@ async def _run_with_chains(
self.conversation.add_iteration()
# 1. Generate observations
- await self._generate_observations(query, background_context)
+ await self._generate_observations(query, background_context, message_history)
# 2. Evaluate gaps
- evaluation = await self._evaluate_gaps(query, background_context)
+ evaluation = await self._evaluate_gaps(query, background_context, message_history)
# 3. Assess with judge (after tools execute, we'll assess again)
# For now, check knowledge gap evaluation
@@ -210,7 +219,7 @@ async def _run_with_chains(
# 4. Select tools for next gap
next_gap = evaluation.outstanding_gaps[0] if evaluation.outstanding_gaps else query
- selection_plan = await self._select_agents(next_gap, query, background_context)
+ selection_plan = await self._select_agents(next_gap, query, background_context, message_history)
# 5. Execute tools
await self._execute_tools(selection_plan.tasks)
@@ -250,6 +259,7 @@ async def _run_with_graph(
background_context: str = "",
output_length: str = "",
output_instructions: str = "",
+ message_history: list[ModelMessage] | None = None,
) -> str:
"""
Run the iterative research flow using graph execution.
@@ -313,7 +323,9 @@ def _check_constraints(self) -> bool:
return True
- async def _generate_observations(self, query: str, background_context: str = "") -> str:
+ async def _generate_observations(
+ self, query: str, background_context: str = "", message_history: list[ModelMessage] | None = None
+ ) -> str:
"""Generate observations from current research state."""
# Build input prompt for token estimation
conversation_history = self.conversation.compile_conversation_history()
@@ -335,6 +347,7 @@ async def _generate_observations(self, query: str, background_context: str = "")
query=query,
background_context=background_context,
conversation_history=conversation_history,
+ message_history=message_history,
iteration=self.iteration,
)
@@ -350,7 +363,9 @@ async def _generate_observations(self, query: str, background_context: str = "")
self.conversation.set_latest_thought(observations)
return observations
- async def _evaluate_gaps(self, query: str, background_context: str = "") -> KnowledgeGapOutput:
+ async def _evaluate_gaps(
+ self, query: str, background_context: str = "", message_history: list[ModelMessage] | None = None
+ ) -> KnowledgeGapOutput:
"""Evaluate knowledge gaps in current research."""
if self.start_time:
elapsed_minutes = (time.time() - self.start_time) / 60
@@ -377,6 +392,7 @@ async def _evaluate_gaps(self, query: str, background_context: str = "") -> Know
query=query,
background_context=background_context,
conversation_history=conversation_history,
+ message_history=message_history,
iteration=self.iteration,
time_elapsed_minutes=elapsed_minutes,
max_time_minutes=self.max_time_minutes,
@@ -437,7 +453,11 @@ async def _assess_with_judge(self, query: str) -> JudgeAssessment:
return assessment
async def _select_agents(
- self, gap: str, query: str, background_context: str = ""
+ self,
+ gap: str,
+ query: str,
+ background_context: str = "",
+ message_history: list[ModelMessage] | None = None,
) -> AgentSelectionPlan:
"""Select tools to address knowledge gap."""
# Build input prompt for token estimation
@@ -461,6 +481,7 @@ async def _select_agents(
query=query,
background_context=background_context,
conversation_history=conversation_history,
+ message_history=message_history,
)
# Track tokens for this iteration
@@ -775,27 +796,29 @@ def _get_file_service(self) -> ReportFileService | None:
return None
return self._file_service
- async def run(self, query: str) -> str:
+ async def run(self, query: str, message_history: list[ModelMessage] | None = None) -> str:
"""
Run the deep research flow.
Args:
query: The research query
+ message_history: Optional user conversation history
Returns:
Final report string
"""
if self.use_graph:
- return await self._run_with_graph(query)
+ return await self._run_with_graph(query, message_history)
else:
- return await self._run_with_chains(query)
+ return await self._run_with_chains(query, message_history)
- async def _run_with_chains(self, query: str) -> str:
+ async def _run_with_chains(self, query: str, message_history: list[ModelMessage] | None = None) -> str:
"""
Run the deep research flow using agent chains.
Args:
query: The research query
+ message_history: Optional user conversation history
Returns:
Final report string
@@ -812,11 +835,11 @@ async def _run_with_chains(self, query: str) -> str:
embedding_service = None
self.logger.debug("Embedding service unavailable, initializing state without it")
- init_workflow_state(embedding_service=embedding_service)
+ init_workflow_state(embedding_service=embedding_service, message_history=message_history)
self.logger.debug("Workflow state initialized for deep research")
# 1. Build report plan
- report_plan = await self._build_report_plan(query)
+ report_plan = await self._build_report_plan(query, message_history)
self.logger.info(
"Report plan created",
sections=len(report_plan.report_outline),
@@ -824,7 +847,7 @@ async def _run_with_chains(self, query: str) -> str:
)
# 2. Run parallel research loops with state synchronization
- section_drafts = await self._run_research_loops(report_plan)
+ section_drafts = await self._run_research_loops(report_plan, message_history)
# Verify state synchronization - log evidence count
state = get_workflow_state()
@@ -845,12 +868,13 @@ async def _run_with_chains(self, query: str) -> str:
return final_report
- async def _run_with_graph(self, query: str) -> str:
+ async def _run_with_graph(self, query: str, message_history: list[ModelMessage] | None = None) -> str:
"""
Run the deep research flow using graph execution.
Args:
query: The research query
+ message_history: Optional user conversation history
Returns:
Final report string
@@ -868,7 +892,7 @@ async def _run_with_graph(self, query: str) -> str:
# Run orchestrator and collect events
final_report = ""
- async for event in self._graph_orchestrator.run(query):
+ async for event in self._graph_orchestrator.run(query, message_history=message_history):
if event.type == "complete":
final_report = event.message
break
@@ -884,13 +908,17 @@ async def _run_with_graph(self, query: str) -> str:
return final_report
- async def _build_report_plan(self, query: str) -> ReportPlan:
+ async def _build_report_plan(
+ self, query: str, message_history: list[ModelMessage] | None = None
+ ) -> ReportPlan:
"""Build the initial report plan."""
self.logger.info("Building report plan")
# Build input prompt for token estimation
input_prompt = f"QUERY: {query}"
+ # Planner agent may not support message_history yet, so we'll pass it if available
+ # For now, just use the standard run() call
report_plan = await self.planner_agent.run(query)
# Track tokens for planner agent
@@ -913,7 +941,9 @@ async def _build_report_plan(self, query: str) -> ReportPlan:
return report_plan
- async def _run_research_loops(self, report_plan: ReportPlan) -> list[str]:
+ async def _run_research_loops(
+ self, report_plan: ReportPlan, message_history: list[ModelMessage] | None = None
+ ) -> list[str]:
"""Run parallel iterative research loops for each section."""
self.logger.info("Running research loops", sections=len(report_plan.report_outline))
@@ -950,10 +980,11 @@ async def run_research_for_section(config: dict[str, Any]) -> str:
judge_handler=self.judge_handler if not self.use_graph else None,
)
- # Run research
+ # Run research with message_history
result = await flow.run(
query=query,
background_context=background_context,
+ message_history=message_history,
)
# Sync evidence from flow to loop
diff --git a/src/orchestrator_hierarchical.py b/src/orchestrator_hierarchical.py
index bf3848ad..a7bfb85a 100644
--- a/src/orchestrator_hierarchical.py
+++ b/src/orchestrator_hierarchical.py
@@ -2,9 +2,15 @@
import asyncio
from collections.abc import AsyncGenerator
+from typing import Any
import structlog
+try:
+ from pydantic_ai import ModelMessage
+except ImportError:
+ ModelMessage = Any # type: ignore[assignment, misc]
+
from src.agents.judge_agent_llm import LLMSubIterationJudge
from src.agents.magentic_agents import create_search_agent
from src.middleware.sub_iteration import SubIterationMiddleware, SubIterationTeam
@@ -38,8 +44,14 @@ def __init__(self) -> None:
self.judge = LLMSubIterationJudge()
self.middleware = SubIterationMiddleware(self.team, self.judge, max_iterations=5)
- async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
- logger.info("Starting hierarchical orchestrator", query=query)
+ async def run(
+ self, query: str, message_history: list[ModelMessage] | None = None
+ ) -> AsyncGenerator[AgentEvent, None]:
+ logger.info(
+ "Starting hierarchical orchestrator",
+ query=query,
+ has_history=bool(message_history),
+ )
try:
service = get_embedding_service()
@@ -58,6 +70,8 @@ async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
async def event_callback(event: AgentEvent) -> None:
await queue.put(event)
+ # Note: middleware.run() may not support message_history yet
+ # Pass query for now, message_history can be added to middleware later if needed
task_future = asyncio.create_task(self.middleware.run(query, event_callback))
while not task_future.done():
diff --git a/src/orchestrator_magentic.py b/src/orchestrator_magentic.py
index fd9d4f72..416d8968 100644
--- a/src/orchestrator_magentic.py
+++ b/src/orchestrator_magentic.py
@@ -4,6 +4,11 @@
from typing import TYPE_CHECKING, Any
import structlog
+
+try:
+ from pydantic_ai import ModelMessage
+except ImportError:
+ ModelMessage = Any # type: ignore[assignment, misc]
from agent_framework import (
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
@@ -98,17 +103,24 @@ def _build_workflow(self) -> Any:
.build()
)
- async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
+ async def run(
+ self, query: str, message_history: list[ModelMessage] | None = None
+ ) -> AsyncGenerator[AgentEvent, None]:
"""
Run the Magentic workflow.
Args:
query: User's research question
+ message_history: Optional user conversation history (for compatibility)
Yields:
AgentEvent objects for real-time UI updates
"""
- logger.info("Starting Magentic orchestrator", query=query)
+ logger.info(
+ "Starting Magentic orchestrator",
+ query=query,
+ has_history=bool(message_history),
+ )
yield AgentEvent(
type="started",
@@ -122,7 +134,17 @@ async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
workflow = self._build_workflow()
- task = f"""Research query: {query}
+ # Include conversation history context if provided
+ history_context = ""
+ if message_history:
+ # Convert message history to string context for task
+ from src.utils.message_history import message_history_to_string
+
+ history_str = message_history_to_string(message_history, max_messages=5)
+ if history_str:
+ history_context = f"\n\nPrevious conversation context:\n{history_str}"
+
+ task = f"""Research query: {query}{history_context}
Workflow:
1. SearchAgent: Find evidence from available sources (automatically selects: web search, PubMed, ClinicalTrials.gov, Europe PMC, or RAG based on query)
diff --git a/src/tools/searchxng_web_search.py b/src/tools/searchxng_web_search.py
index 80cf8be5..b1368581 100644
--- a/src/tools/searchxng_web_search.py
+++ b/src/tools/searchxng_web_search.py
@@ -123,3 +123,8 @@ async def search(self, query: str, max_results: int = 10) -> list[Evidence]:
+
+
+
+
+
diff --git a/src/tools/serper_web_search.py b/src/tools/serper_web_search.py
index 79e9449e..85e47670 100644
--- a/src/tools/serper_web_search.py
+++ b/src/tools/serper_web_search.py
@@ -123,3 +123,8 @@ async def search(self, query: str, max_results: int = 10) -> list[Evidence]:
+
+
+
+
+
diff --git a/src/tools/vendored/crawl_website.py b/src/tools/vendored/crawl_website.py
index 32fc2d94..f0051382 100644
--- a/src/tools/vendored/crawl_website.py
+++ b/src/tools/vendored/crawl_website.py
@@ -135,3 +135,8 @@ async def fetch_page(url: str) -> str:
+
+
+
+
+
diff --git a/src/tools/vendored/searchxng_client.py b/src/tools/vendored/searchxng_client.py
index 2cf4ce83..95f6be55 100644
--- a/src/tools/vendored/searchxng_client.py
+++ b/src/tools/vendored/searchxng_client.py
@@ -104,3 +104,8 @@ async def search(
+
+
+
+
+
diff --git a/src/tools/vendored/serper_client.py b/src/tools/vendored/serper_client.py
index 6b54c10b..9430bf8e 100644
--- a/src/tools/vendored/serper_client.py
+++ b/src/tools/vendored/serper_client.py
@@ -100,3 +100,8 @@ async def search(
+
+
+
+
+
diff --git a/src/tools/vendored/web_search_core.py b/src/tools/vendored/web_search_core.py
index 391b5c2e..354f2e05 100644
--- a/src/tools/vendored/web_search_core.py
+++ b/src/tools/vendored/web_search_core.py
@@ -209,3 +209,8 @@ def is_valid_url(url: str) -> bool:
+
+
+
+
+
diff --git a/src/tools/web_search_factory.py b/src/tools/web_search_factory.py
index fae4c5ff..c82631d8 100644
--- a/src/tools/web_search_factory.py
+++ b/src/tools/web_search_factory.py
@@ -76,3 +76,8 @@ def create_web_search_tool() -> SearchTool | None:
+
+
+
+
+
diff --git a/src/utils/markdown.css b/src/utils/markdown.css
index b083c296..fad9564f 100644
--- a/src/utils/markdown.css
+++ b/src/utils/markdown.css
@@ -14,3 +14,8 @@ body {
+
+
+
+
+
diff --git a/src/utils/md_to_pdf.py b/src/utils/md_to_pdf.py
index 940d707d..0f062bc0 100644
--- a/src/utils/md_to_pdf.py
+++ b/src/utils/md_to_pdf.py
@@ -74,3 +74,8 @@ def md_to_pdf(md_text: str, pdf_file_path: str) -> None:
+
+
+
+
+
diff --git a/src/utils/message_history.py b/src/utils/message_history.py
new file mode 100644
index 00000000..600c0b8e
--- /dev/null
+++ b/src/utils/message_history.py
@@ -0,0 +1,174 @@
+"""Message history utilities for Pydantic AI integration."""
+
+from typing import Any
+
+import structlog
+
+try:
+ from pydantic_ai import ModelMessage, ModelRequest, ModelResponse
+ from pydantic_ai.messages import TextPart, UserPromptPart
+
+ _PYDANTIC_AI_AVAILABLE = True
+except ImportError:
+ # Fallback for older pydantic-ai versions
+ ModelMessage = Any # type: ignore[assignment, misc]
+ ModelRequest = Any # type: ignore[assignment, misc]
+ ModelResponse = Any # type: ignore[assignment, misc]
+ TextPart = Any # type: ignore[assignment, misc]
+ UserPromptPart = Any # type: ignore[assignment, misc]
+ _PYDANTIC_AI_AVAILABLE = False
+
+logger = structlog.get_logger()
+
+
+def convert_gradio_to_message_history(
+ history: list[dict[str, Any]],
+ max_messages: int = 20,
+) -> list[ModelMessage]:
+ """
+ Convert Gradio chat history to Pydantic AI message history.
+
+ Args:
+ history: Gradio chat history format [{"role": "user", "content": "..."}, ...]
+ max_messages: Maximum messages to include (most recent)
+
+ Returns:
+ List of ModelMessage objects for Pydantic AI
+ """
+ if not history:
+ return []
+
+ if not _PYDANTIC_AI_AVAILABLE:
+ logger.warning(
+ "Pydantic AI message history not available, returning empty list",
+ )
+ return []
+
+ messages: list[ModelMessage] = []
+
+ # Take most recent messages
+ recent = history[-max_messages:] if len(history) > max_messages else history
+
+ for msg in recent:
+ role = msg.get("role", "")
+ content = msg.get("content", "")
+
+ if not content or role not in ("user", "assistant"):
+ continue
+
+ # Convert content to string if needed
+ content_str = str(content)
+
+ if role == "user":
+ messages.append(
+ ModelRequest(parts=[UserPromptPart(content=content_str)]),
+ )
+ elif role == "assistant":
+ messages.append(
+ ModelResponse(parts=[TextPart(content=content_str)]),
+ )
+
+ logger.debug(
+ "Converted Gradio history to message history",
+ input_turns=len(history),
+ output_messages=len(messages),
+ )
+
+ return messages
+
+
+def message_history_to_string(
+ messages: list[ModelMessage],
+ max_messages: int = 5,
+ include_metadata: bool = False,
+) -> str:
+ """
+ Convert message history to string format for backward compatibility.
+
+ Used during transition period when some agents still expect strings.
+
+ Args:
+ messages: List of ModelMessage objects
+ max_messages: Maximum messages to include
+ include_metadata: Whether to include metadata
+
+ Returns:
+ Formatted string representation
+ """
+ if not messages:
+ return ""
+
+ recent = messages[-max_messages:] if len(messages) > max_messages else messages
+
+ parts = ["PREVIOUS CONVERSATION:", "---"]
+ turn_num = 1
+
+ for msg in recent:
+ # Extract text content
+ text = ""
+ if isinstance(msg, ModelRequest):
+ for part in msg.parts:
+ if hasattr(part, "content"):
+ text += str(part.content)
+ parts.append(f"[Turn {turn_num}]")
+ parts.append(f"User: {text}")
+ turn_num += 1
+ elif isinstance(msg, ModelResponse):
+ for part in msg.parts:
+ if hasattr(part, "content"):
+ text += str(part.content)
+ parts.append(f"Assistant: {text}")
+
+ parts.append("---")
+ return "\n".join(parts)
+
+
+def create_truncation_processor(max_messages: int = 10):
+ """Create a history processor that keeps only the most recent N messages.
+
+ Args:
+ max_messages: Maximum number of messages to keep
+
+ Returns:
+ Processor function that takes a list of messages and returns truncated list
+ """
+
+ def processor(messages: list[ModelMessage]) -> list[ModelMessage]:
+ return messages[-max_messages:] if len(messages) > max_messages else messages
+
+ return processor
+
+
+def create_relevance_processor(min_length: int = 10):
+ """Create a history processor that filters out very short messages.
+
+ Args:
+ min_length: Minimum message length to keep
+
+ Returns:
+ Processor function that filters messages by length
+ """
+
+ def processor(messages: list[ModelMessage]) -> list[ModelMessage]:
+ filtered = []
+ for msg in messages:
+ text = ""
+ if isinstance(msg, ModelRequest):
+ for part in msg.parts:
+ if hasattr(part, "content"):
+ text += str(part.content)
+ elif isinstance(msg, ModelResponse):
+ for part in msg.parts:
+ if hasattr(part, "content"):
+ text += str(part.content)
+
+ if len(text.strip()) >= min_length:
+ filtered.append(msg)
+ return filtered
+
+ return processor
+
+
+
+
+
diff --git a/src/utils/report_generator.py b/src/utils/report_generator.py
index ad283050..3bd6c2db 100644
--- a/src/utils/report_generator.py
+++ b/src/utils/report_generator.py
@@ -177,3 +177,8 @@ def generate_report_from_evidence(
+
+
+
+
+
diff --git a/tests/unit/middleware/test_budget_tracker_phase7.py b/tests/unit/middleware/test_budget_tracker_phase7.py
index 903addc1..8d881e80 100644
--- a/tests/unit/middleware/test_budget_tracker_phase7.py
+++ b/tests/unit/middleware/test_budget_tracker_phase7.py
@@ -160,3 +160,8 @@ def test_iteration_tokens_separate_per_loop(self) -> None:
+
+
+
+
+
diff --git a/tests/unit/middleware/test_state_machine.py b/tests/unit/middleware/test_state_machine.py
index 90fc3a4d..7d33f477 100644
--- a/tests/unit/middleware/test_state_machine.py
+++ b/tests/unit/middleware/test_state_machine.py
@@ -12,6 +12,14 @@
)
from src.utils.models import Citation, Conversation, Evidence, IterationData
+try:
+ from pydantic_ai import ModelRequest, ModelResponse
+ from pydantic_ai.messages import TextPart, UserPromptPart
+
+ _PYDANTIC_AI_AVAILABLE = True
+except ImportError:
+ _PYDANTIC_AI_AVAILABLE = False
+
@pytest.mark.unit
class TestWorkflowState:
@@ -143,6 +151,49 @@ async def test_search_related_handles_empty_authors(self) -> None:
assert len(results) == 1
assert results[0].citation.authors == []
+ @pytest.mark.skipif(not _PYDANTIC_AI_AVAILABLE, reason="pydantic_ai not available")
+ def test_user_message_history_initialization(self) -> None:
+ """WorkflowState should initialize with empty user_message_history."""
+ state = WorkflowState()
+ assert state.user_message_history == []
+
+ @pytest.mark.skipif(not _PYDANTIC_AI_AVAILABLE, reason="pydantic_ai not available")
+ def test_add_user_message(self) -> None:
+ """add_user_message should add messages to history."""
+ state = WorkflowState()
+ message = ModelRequest(parts=[UserPromptPart(content="Test message")])
+ state.add_user_message(message)
+ assert len(state.user_message_history) == 1
+ assert state.user_message_history[0] == message
+
+ @pytest.mark.skipif(not _PYDANTIC_AI_AVAILABLE, reason="pydantic_ai not available")
+ def test_get_user_history(self) -> None:
+ """get_user_history should return message history."""
+ state = WorkflowState()
+ for i in range(5):
+ message = ModelRequest(parts=[UserPromptPart(content=f"Message {i}")])
+ state.add_user_message(message)
+
+ # Get all history
+ all_history = state.get_user_history()
+ assert len(all_history) == 5
+
+ # Get limited history
+ limited = state.get_user_history(max_messages=3)
+ assert len(limited) == 3
+ # Should be most recent messages
+ assert limited[0].parts[0].content == "Message 2"
+
+ @pytest.mark.skipif(not _PYDANTIC_AI_AVAILABLE, reason="pydantic_ai not available")
+ def test_init_workflow_state_with_message_history(self) -> None:
+ """init_workflow_state should accept message_history parameter."""
+ messages = [
+ ModelRequest(parts=[UserPromptPart(content="Question")]),
+ ModelResponse(parts=[TextPart(content="Answer")]),
+ ]
+ state = init_workflow_state(message_history=messages)
+ assert len(state.user_message_history) == 2
+
@pytest.mark.unit
class TestConversation:
@@ -358,3 +409,6 @@ def context2():
+
+
+
diff --git a/tests/unit/middleware/test_workflow_manager.py b/tests/unit/middleware/test_workflow_manager.py
index 3703390c..025065b3 100644
--- a/tests/unit/middleware/test_workflow_manager.py
+++ b/tests/unit/middleware/test_workflow_manager.py
@@ -286,3 +286,8 @@ async def test_get_shared_evidence(self, monkeypatch) -> None:
assert shared[0].content == "Shared"
+
+
+
+
+
diff --git a/tests/unit/orchestrator/test_graph_orchestrator.py b/tests/unit/orchestrator/test_graph_orchestrator.py
index 4136663f..2231d816 100644
--- a/tests/unit/orchestrator/test_graph_orchestrator.py
+++ b/tests/unit/orchestrator/test_graph_orchestrator.py
@@ -48,6 +48,82 @@ def test_visited_nodes_tracking(self):
context = GraphExecutionContext(WorkflowState(), BudgetTracker())
assert not context.has_visited("node1")
context.mark_visited("node1")
+
+ def test_message_history_initialization(self):
+ """Test message history initialization in context."""
+ from src.middleware.budget_tracker import BudgetTracker
+ from src.middleware.state_machine import WorkflowState
+
+ context = GraphExecutionContext(WorkflowState(), BudgetTracker())
+ assert context.message_history == []
+
+ def test_message_history_with_initial_history(self):
+ """Test context with initial message history."""
+ from src.middleware.budget_tracker import BudgetTracker
+ from src.middleware.state_machine import WorkflowState
+
+ try:
+ from pydantic_ai import ModelRequest
+ from pydantic_ai.messages import UserPromptPart
+
+ messages = [
+ ModelRequest(parts=[UserPromptPart(content="Test message")])
+ ]
+ context = GraphExecutionContext(
+ WorkflowState(), BudgetTracker(), message_history=messages
+ )
+ assert len(context.message_history) == 1
+ except ImportError:
+ pytest.skip("pydantic_ai not available")
+
+ def test_add_message(self):
+ """Test adding messages to context."""
+ from src.middleware.budget_tracker import BudgetTracker
+ from src.middleware.state_machine import WorkflowState
+
+ try:
+ from pydantic_ai import ModelRequest, ModelResponse
+ from pydantic_ai.messages import TextPart, UserPromptPart
+
+ context = GraphExecutionContext(WorkflowState(), BudgetTracker())
+ message1 = ModelRequest(parts=[UserPromptPart(content="Question")])
+ message2 = ModelResponse(parts=[TextPart(content="Answer")])
+
+ context.add_message(message1)
+ context.add_message(message2)
+
+ assert len(context.message_history) == 2
+ except ImportError:
+ pytest.skip("pydantic_ai not available")
+
+ def test_get_message_history(self):
+ """Test getting message history with limits."""
+ from src.middleware.budget_tracker import BudgetTracker
+ from src.middleware.state_machine import WorkflowState
+
+ try:
+ from pydantic_ai import ModelRequest
+ from pydantic_ai.messages import UserPromptPart
+
+ messages = [
+ ModelRequest(parts=[UserPromptPart(content=f"Message {i}")])
+ for i in range(10)
+ ]
+ context = GraphExecutionContext(
+ WorkflowState(), BudgetTracker(), message_history=messages
+ )
+
+ # Get all
+ all_messages = context.get_message_history()
+ assert len(all_messages) == 10
+
+ # Get limited
+ limited = context.get_message_history(max_messages=5)
+ assert len(limited) == 5
+ # Should be most recent
+ assert limited[0].parts[0].content == "Message 5"
+ except ImportError:
+ pytest.skip("pydantic_ai not available")
assert context.has_visited("node1")
diff --git a/tests/unit/utils/test_message_history.py b/tests/unit/utils/test_message_history.py
new file mode 100644
index 00000000..19344e58
--- /dev/null
+++ b/tests/unit/utils/test_message_history.py
@@ -0,0 +1,151 @@
+"""Unit tests for message history utilities."""
+
+import pytest
+
+pytestmark = pytest.mark.unit
+
+from src.utils.message_history import (
+ convert_gradio_to_message_history,
+ create_relevance_processor,
+ create_truncation_processor,
+ message_history_to_string,
+)
+
+
+def test_convert_gradio_to_message_history_empty():
+ """Test conversion with empty history."""
+ result = convert_gradio_to_message_history([])
+ assert result == []
+
+
+def test_convert_gradio_to_message_history_single_turn():
+ """Test conversion with a single turn."""
+ gradio_history = [
+ {"role": "user", "content": "What is AI?"},
+ {"role": "assistant", "content": "AI is artificial intelligence."},
+ ]
+ result = convert_gradio_to_message_history(gradio_history)
+ assert len(result) == 2
+
+
+def test_convert_gradio_to_message_history_multiple_turns():
+ """Test conversion with multiple turns."""
+ gradio_history = [
+ {"role": "user", "content": "What is AI?"},
+ {"role": "assistant", "content": "AI is artificial intelligence."},
+ {"role": "user", "content": "Tell me more"},
+ {"role": "assistant", "content": "AI includes machine learning..."},
+ ]
+ result = convert_gradio_to_message_history(gradio_history)
+ assert len(result) == 4
+
+
+def test_convert_gradio_to_message_history_max_messages():
+ """Test conversion with max_messages limit."""
+ gradio_history = []
+ for i in range(15): # Create 15 turns
+ gradio_history.append({"role": "user", "content": f"Message {i}"})
+ gradio_history.append({"role": "assistant", "content": f"Response {i}"})
+
+ result = convert_gradio_to_message_history(gradio_history, max_messages=10)
+ # Should only include most recent 10 messages
+ assert len(result) <= 10
+
+
+def test_convert_gradio_to_message_history_filters_invalid():
+ """Test that invalid entries are filtered out."""
+ gradio_history = [
+ {"role": "user", "content": "Valid message"},
+ {"role": "system", "content": "Should be filtered"},
+ {"role": "assistant", "content": ""}, # Empty content should be filtered
+ {"role": "assistant", "content": "Valid response"},
+ ]
+ result = convert_gradio_to_message_history(gradio_history)
+ # Should only have 2 valid messages (user + assistant)
+ assert len(result) == 2
+
+
+def test_message_history_to_string_empty():
+ """Test string conversion with empty history."""
+ result = message_history_to_string([])
+ assert result == ""
+
+
+def test_message_history_to_string_format():
+ """Test string conversion format."""
+ # Create mock message history
+ try:
+ from pydantic_ai import ModelRequest, ModelResponse
+ from pydantic_ai.messages import TextPart, UserPromptPart
+
+ messages = [
+ ModelRequest(parts=[UserPromptPart(content="Question 1")]),
+ ModelResponse(parts=[TextPart(content="Answer 1")]),
+ ]
+ result = message_history_to_string(messages)
+ assert "PREVIOUS CONVERSATION" in result
+ assert "User:" in result
+ assert "Assistant:" in result
+ except ImportError:
+ # Skip if pydantic_ai not available
+ pytest.skip("pydantic_ai not available")
+
+
+def test_message_history_to_string_max_messages():
+ """Test string conversion with max_messages limit."""
+ try:
+ from pydantic_ai import ModelRequest, ModelResponse
+ from pydantic_ai.messages import TextPart, UserPromptPart
+
+ messages = []
+ for i in range(10): # Create 10 turns
+ messages.append(ModelRequest(parts=[UserPromptPart(content=f"Question {i}")]))
+ messages.append(ModelResponse(parts=[TextPart(content=f"Answer {i}")]))
+
+ result = message_history_to_string(messages, max_messages=3)
+ # Should only include most recent 3 messages (1.5 turns)
+ assert result != ""
+ except ImportError:
+ pytest.skip("pydantic_ai not available")
+
+
+def test_create_truncation_processor():
+ """Test truncation processor factory."""
+ processor = create_truncation_processor(max_messages=5)
+ assert callable(processor)
+
+ try:
+ from pydantic_ai import ModelRequest
+ from pydantic_ai.messages import UserPromptPart
+
+ messages = [
+ ModelRequest(parts=[UserPromptPart(content=f"Message {i}")])
+ for i in range(10)
+ ]
+ result = processor(messages)
+ assert len(result) == 5
+ except ImportError:
+ pytest.skip("pydantic_ai not available")
+
+
+def test_create_relevance_processor():
+ """Test relevance processor factory."""
+ processor = create_relevance_processor(min_length=10)
+ assert callable(processor)
+
+ try:
+ from pydantic_ai import ModelRequest, ModelResponse
+ from pydantic_ai.messages import TextPart, UserPromptPart
+
+ messages = [
+ ModelRequest(parts=[UserPromptPart(content="Short")]), # Too short
+ ModelRequest(parts=[UserPromptPart(content="This is a longer message")]), # Valid
+ ModelResponse(parts=[TextPart(content="OK")]), # Too short
+ ModelResponse(parts=[TextPart(content="This is a valid response")]), # Valid
+ ]
+ result = processor(messages)
+ # Should only keep messages with length >= 10
+ assert len(result) == 2
+ except ImportError:
+ pytest.skip("pydantic_ai not available")
+