From d53e4365295e1b5ee84dac48642d794e0e61d64c Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 03:44:06 -0500 Subject: [PATCH 01/12] Create config.yaml --- config.yaml | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 config.yaml diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..e69de29 From b4f0a6f5d55cd46e5eeeac96ba9c61cb20d34091 Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 04:35:09 -0500 Subject: [PATCH 02/12] Update code.py --- src/source_agent/agents/code.py | 36 ++++++++++++--------------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/src/source_agent/agents/code.py b/src/source_agent/agents/code.py index 60fe151..9f71d18 100644 --- a/src/source_agent/agents/code.py +++ b/src/source_agent/agents/code.py @@ -13,7 +13,6 @@ def __init__( base_url=None, model=None, temperature=0.3, - prompt=None, ): self.api_key = api_key self.base_url = base_url @@ -25,17 +24,9 @@ def __init__( # self.presence_penalty = 0.0005 self.messages = [] - self.prompt = prompt self.system_prompt = Path("AGENTS.md").read_text(encoding="utf-8") - self.user_prompt = ( - "You are a helpful code assistant. Think step-by-step and use tools when needed.\n" - "Stop when you have completed your analysis.\n" - f"The user's prompt is:\n\n{self.prompt}" - ) - # Initialize system and user messages self.messages.append({"role": "system", "content": self.system_prompt}) - self.messages.append({"role": "user", "content": self.user_prompt}) # Load tools from the registry self.tools = source_agent.tools.tool_registry.registry.get_tools() @@ -47,19 +38,23 @@ def __init__( api_key=self.api_key, ) - def run(self, max_steps=50): + def run(self, user_prompt: str = None, max_steps: int = 50): + """ + If user_prompt is provided, seed it; + otherwise assume messages already has the last user turn. + Then run the full react loop to completion. + """ + if user_prompt is not None: + self.messages.append({"role": "user", "content": user_prompt}) + for step in range(max_steps): print(f"🔄 Agent iteration {step}/{max_steps}") - response = self.call_llm(self.messages) - choice = response.choices[0] message = choice.message self.messages.append(message) - print("🤖 Agent:", message.content) - # If the agent is using a tool, run it and loop again if message.tool_calls: for tool_call in message.tool_calls: print(f"🔧 Calling: {tool_call.function.name}") @@ -68,22 +63,17 @@ def run(self, max_steps=50): result = self.handle_tool_call(tool_call) self.messages.append(result) - print(f"✅ Result: {result}") + print("✅ Result:", result) - # Check if this was the task completion tool if tool_call.function.name == "task_mark_complete": print("💯 Task marked complete!") return result else: - print("💭 Agent responded without tool calls - continuing loop") + print("💭 No tools; continuing") - print(f"\n{'-'*40}\n") - - print( - "🚨 Max steps reached without task completion" - " - consider refining the prompt or tools." - ) + print("\n" + "-" * 40 + "\n") + print("🚨 Max steps reached without task completion.") return {"error": "Max steps reached without task completion."} def handle_tool_call(self, tool_call): From 9c0be0a18f295c0e040bc204d2175ac9be36bd01 Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 04:35:11 -0500 Subject: [PATCH 03/12] Update entrypoint.py --- src/source_agent/entrypoint.py | 107 ++++++++++++++++----------------- 1 file changed, 51 insertions(+), 56 deletions(-) diff --git a/src/source_agent/entrypoint.py b/src/source_agent/entrypoint.py index e161803..fb4e102 100644 --- a/src/source_agent/entrypoint.py +++ b/src/source_agent/entrypoint.py @@ -24,7 +24,7 @@ def get_provider(provider_name: str = "openrouter") -> tuple[str, str]: Raises: ValueError: If the provider is unknown or the API key is missing. """ - PROVIDER_KEYS = { + provider_keys = { "xai": "XAI_API_KEY", "google": "GEMINI_API_KEY", "google_vertex": "GOOGLE_VERTEX_API_KEY", @@ -38,7 +38,7 @@ def get_provider(provider_name: str = "openrouter") -> tuple[str, str]: "openrouter": "OPENROUTER_API_KEY", } - PROVIDER_BASE_URLS = { + provider_base_urls = { "xai": "https://api.x.ai/v1", "google": "https://generativelanguage.googleapis.com/v1beta", "google_vertex": "https://generativelanguage.googleapis.com/v1beta", @@ -52,7 +52,7 @@ def get_provider(provider_name: str = "openrouter") -> tuple[str, str]: "openrouter": "https://openrouter.ai/api/v1", } - provider_key = PROVIDER_KEYS.get(provider_name.lower()) + provider_key = provider_keys.get(provider_name.lower()) if not provider_key: raise ValueError(f"Unknown provider: {provider_name}") @@ -60,74 +60,53 @@ def get_provider(provider_name: str = "openrouter") -> tuple[str, str]: if not api_key: raise ValueError(f"Missing API key for provider: {provider_name}") - base_url = PROVIDER_BASE_URLS.get(provider_name.lower()) + base_url = provider_base_urls.get(provider_name.lower()) if not base_url: raise ValueError(f"Missing base URL for provider: {provider_name}") return api_key, base_url -def dispatch_agent( - prompt: str, - provider: str = "openrouter", - model: str = "moonshotai/kimi-k2", - temperature: float = 0.3, -) -> str: +def dispatch_agent(agent, prompt) -> str: """ Dispatch the agent with the given prompt. Args: - prompt: The prompt to send to the agent. - provider: The AI provider to use. - model: The model to use. - temperature: The temperature for the model. + agent: The agent instance to run. + prompt: The prompt to provide to the agent. Returns: The response from the agent. - - Raises: - Exception: If agent execution fails. """ print("Starting Source Agent") - print(f"Using provider: {provider}, model: {model}, temperature: {temperature}") - api_key, provider_url = get_provider(provider) - - agent = source_agent.agents.code.CodeAgent( - api_key=api_key, - base_url=provider_url, - model=model, - prompt=prompt, - temperature=temperature, + user_prompt = ( + "You are a helpful code assistant. Think step-by-step and use tools when needed.\n" + "Stop when you have completed your analysis.\n" + f"The user's prompt is:\n\n{prompt}" ) - result = agent.run() + result = agent.run(user_prompt=user_prompt) print("Agent execution completed successfully") - return result - -def validate_prompt(prompt: str, max_length: int = 10000) -> str: - """ - Validate and sanitize the prompt. - - Args: - prompt: The prompt to validate. + return result - Returns: - The validated prompt. - Raises: - ValueError: If prompt is invalid. - """ - prompt = prompt.strip() - if not prompt: - raise ValueError("Prompt cannot be empty or whitespace only") +def interactive_session(agent): + print("Entering interactive mode. Type your prompt and ↵; type 'q' to quit.") + while True: + user_input = input("\n> ").strip() + if user_input.lower() == "q": + print("Exiting interactive session.") + return - # Reasonable upper limit - if len(prompt) > max_length: - raise ValueError(f"Prompt is too long (max {max_length} characters)") + # reset the conversation to just the system prompt + the new user prompt + agent.messages = [{"role": "system", "content": agent.system_prompt}] + agent.messages.append({"role": "user", "content": user_input}) - return prompt + # run full react loop + agent.run() + print("\n🔚 Run completed.\n") def main() -> int: @@ -182,25 +161,41 @@ def main() -> int: default=False, help="Enable verbose logging", ) + parser.add_argument( + "-i", + "--interactive", + action="store_true", + default=False, + help="Run in interactive step‑through mode", + ) + parser.add_argument( + "-h", + "--heavy", + action="store_true", + default=False, + help="Enable heavy mode", + ) args = parser.parse_args() # if args.verbose: # logging.getLogger().setLevel(logging.DEBUG) - # Validate prompt - prompt = validate_prompt(args.prompt) - - # Run agent - result = dispatch_agent( - prompt=prompt, - provider=args.provider, + api_key, base_url = get_provider(args.provider) + agent = source_agent.agents.code.CodeAgent( + api_key=api_key, + base_url=base_url, model=args.model, temperature=args.temperature, ) - print(result) - return 0 + if args.interactive: + # Run in interactive mode + return interactive_session(agent) + + else: + # Let the agent run autonomously + return dispatch_agent(agent=agent, prompt=args.prompt) if __name__ == "__main__": From 64c96868200c56534aa7b9be4f9df0c1006819d2 Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 04:35:13 -0500 Subject: [PATCH 04/12] Create orchestrator.py --- src/source_agent/orchestrator.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 src/source_agent/orchestrator.py diff --git a/src/source_agent/orchestrator.py b/src/source_agent/orchestrator.py new file mode 100644 index 0000000..e69de29 From 34f195daff589ee9060ab791e367fbeed2978c3d Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 04:37:53 -0500 Subject: [PATCH 05/12] Update pyproject.toml --- pyproject.toml | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ba84371..192038d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,11 +5,11 @@ build-backend = "hatchling.build" [project.scripts] source-agent = "source_agent.entrypoint:main" - +heavy-agent = "source_agent.heavy:main" [project] requires-python = ">=3.10" -version = "0.0.10" +version = "0.0.11" name = "source-agent" description = "Simple coding agent." readme = ".github/README.md" @@ -21,9 +21,7 @@ dependencies = [ "openai", "pathspec", "requests", - - # "python-dotenv", - # "pyyaml", + "pyyaml", ] [project.optional-dependencies] @@ -32,10 +30,8 @@ developer = [ "black", "hatch", "mypy", - # "pre-commit", "pytest", "ruff", - "uv", ] From a0e529393bfbfa3c2dcd4bf0a7f8fc35f4b74b6e Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 04:37:56 -0500 Subject: [PATCH 06/12] Create heavy.py --- src/source_agent/heavy.py | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 src/source_agent/heavy.py diff --git a/src/source_agent/heavy.py b/src/source_agent/heavy.py new file mode 100644 index 0000000..92fd75c --- /dev/null +++ b/src/source_agent/heavy.py @@ -0,0 +1,11 @@ +import sys + + +def main(): + print("Heavy agent running...") + # Add heavy agent logic here + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From 6d2b9f42e1fa12d1c60941715baf628092308caf Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 04:39:53 -0500 Subject: [PATCH 07/12] Update NOTES.md --- .github/NOTES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/NOTES.md b/.github/NOTES.md index a82587a..77d1052 100644 --- a/.github/NOTES.md +++ b/.github/NOTES.md @@ -3,6 +3,7 @@ ## Agents - https://github.com/Doriandarko/make-it-heavy - https://github.com/frdel/agent-zero + - https://github.com/kris-hansen/comanda - [Flowise - Build AI Agents, Visually](https://github.com/FlowiseAI/Flowise) - [AWS Multiagent AI Framework](https://github.com/awslabs/agent-squad) - [Google Agent Development Kit](https://github.com/google/adk-python) From 26f9f554134e94d3ead71b7f5af82fed787d5a63 Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 04:56:01 -0500 Subject: [PATCH 08/12] Update config.yaml --- config.yaml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/config.yaml b/config.yaml index e69de29..1e0649b 100644 --- a/config.yaml +++ b/config.yaml @@ -0,0 +1,10 @@ +orchestrator: + parallel_agents: 4 + task_timeout: 60 + aggregation_strategy: consensus + question_generation_prompt: | + Generate {num_agents} JSON-array questions to explore: "{user_input}" + synthesis_prompt: | + You are to synthesize {num_responses} inputs: + {agent_responses} + Produce a final coherent summary. \ No newline at end of file From cb76185d0cf281ee29f9ab7e23425064757812bf Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 04:56:04 -0500 Subject: [PATCH 09/12] Update heavy.py --- src/source_agent/heavy.py | 328 +++++++++++++++++++++++++++++++++++++- 1 file changed, 324 insertions(+), 4 deletions(-) diff --git a/src/source_agent/heavy.py b/src/source_agent/heavy.py index 92fd75c..b6902c5 100644 --- a/src/source_agent/heavy.py +++ b/src/source_agent/heavy.py @@ -1,11 +1,331 @@ -import sys +import argparse +import os +import json +import yaml +import time +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import List, Dict, Any +from source_agent.agents.code import CodeAgent -def main(): - print("Heavy agent running...") - # Add heavy agent logic here + +def get_provider(provider_name: str = "openrouter") -> tuple[str, str]: + """ + Get the API key and base URL for the specified provider. + + Args: + provider_name: The name of the AI provider. + + Returns: + A tuple containing the API key and base URL for the provider. + + Raises: + ValueError: If the provider is unknown or the API key is missing. + """ + provider_keys = { + "xai": "XAI_API_KEY", + "google": "GEMINI_API_KEY", + "google_vertex": "GOOGLE_VERTEX_API_KEY", + "openai": "OPENAI_API_KEY", + "anthropic": "ANTHROPIC_API_KEY", + "mistral": "MISTRAL_API_KEY", + "deepseek": "DEEPSEEK_API_KEY", + "cerebras": "CEREBRAS_API_KEY", + "groq": "GROQ_API_KEY", + "vercel": "VERCEL_API_KEY", + "openrouter": "OPENROUTER_API_KEY", + } + + provider_base_urls = { + "xai": "https://api.x.ai/v1", + "google": "https://generativelanguage.googleapis.com/v1beta", + "google_vertex": "https://generativelanguage.googleapis.com/v1beta", + "openai": "https://api.openai.com/v1", + "anthropic": "https://api.anthropic.com/v1", + "mistral": "https://api.mistral.ai/v1", + "deepseek": "https://api.deepseek.com/v1", + "cerebras": "https://api.cerebras.net/v1", + "groq": "https://api.groq.com/v1", + "vercel": "https://api.vercel.ai/v1", + "openrouter": "https://openrouter.ai/api/v1", + } + + provider_key = provider_keys.get(provider_name.lower()) + if not provider_key: + raise ValueError(f"Unknown provider: {provider_name}") + + api_key = os.getenv(provider_key) + if not api_key: + raise ValueError(f"Missing API key for provider: {provider_name}") + + base_url = provider_base_urls.get(provider_name.lower()) + if not base_url: + raise ValueError(f"Missing base URL for provider: {provider_name}") + + return api_key, base_url + + +class TaskOrchestrator: + def __init__( + self, + config_path: str = "config.yaml", + provider: str = "openrouter", + model: str = "moonshotai/kimi-k2", + temperature: float = 0.3, + silent: bool = False, + ): + # load orchestrator config + with open(config_path, "r") as f: + self.config = yaml.safe_load(f)["orchestrator"] + + self.num_agents = self.config["parallel_agents"] + self.task_timeout = self.config["task_timeout"] + self.aggregation_strategy = self.config.get("aggregation_strategy", "consensus") + self.silent = silent + + # prepare agent factory args + api_key, base_url = get_provider(provider) + self.agent_kwargs = { + "api_key": api_key, + "base_url": base_url, + "model": model, + "temperature": temperature, + } + + # thread-safe progress & results + self.progress_lock = threading.Lock() + self.agent_progress: Dict[int, str] = {} + self.agent_results: Dict[int, str] = {} + + def _make_agent(self) -> CodeAgent: + """Instantiate a fresh CodeAgent for a subtask.""" + agent = CodeAgent(**self.agent_kwargs) + if self.silent: + # suppress internal prints by redirecting stdout temporarily + import sys, io + agent._print = print + agent._stdout_backup = sys.stdout + sys.stdout = io.StringIO() + return agent + + def decompose_task(self, user_input: str, num_agents: int) -> List[str]: + """Use AI to generate N sub‑prompts for parallel agents.""" + question_agent = self._make_agent() + + template = self.config["question_generation_prompt"] + prompt = template.format(user_input=user_input, num_agents=num_agents) + + # remove the task‐complete tool if present + question_agent.tools = [ + t for t in question_agent.tools + if getattr(t, "function", {}).get("name") != "task_mark_complete" + ] + question_agent.tool_mapping = { + name: fn for name, fn in question_agent.tool_mapping.items() + if name != "task_mark_complete" + } + + try: + raw = question_agent.run(prompt) + questions = json.loads(raw.strip()) + if len(questions) != num_agents: + raise ValueError(f"expected {num_agents} questions, got {len(questions)}") + return questions + except Exception: + # fallback simple templates + variations = [ + f"Research comprehensive information about: {user_input}", + f"Analyze and provide insights about: {user_input}", + f"Find alternative perspectives on: {user_input}", + f"Verify and cross-check facts about: {user_input}", + ] + return variations[:num_agents] + + def update_agent_progress(self, agent_id: int, status: str, result: str = None): + with self.progress_lock: + self.agent_progress[agent_id] = status + if result is not None: + self.agent_results[agent_id] = result + + def run_agent_parallel(self, agent_id: int, subtask: str) -> Dict[str, Any]: + """Execute one CodeAgent on the given subtask.""" + try: + self.update_agent_progress(agent_id, "PROCESSING") + agent = self._make_agent() + + start = time.time() + response = agent.run(subtask) + duration = time.time() - start + + self.update_agent_progress(agent_id, "COMPLETED", response) + return { + "agent_id": agent_id, + "status": "success", + "response": response, + "execution_time": duration, + } + except Exception as e: + return { + "agent_id": agent_id, + "status": "error", + "response": f"Error: {e}", + "execution_time": 0.0, + } + + def aggregate_results(self, results: List[Dict[str, Any]]) -> str: + """Combine all agent outputs into one final answer.""" + successes = [r for r in results if r["status"] == "success"] + if not successes: + return "All agents failed—please try again." + + responses = [r["response"] for r in successes] + if self.aggregation_strategy == "consensus": + return self._aggregate_consensus(responses) + # future strategies can go here + return self._aggregate_consensus(responses) + + def _aggregate_consensus(self, responses: List[str]) -> str: + """Use a final AI call to synthesize all successful agent responses.""" + if len(responses) == 1: + return responses[0] + + synth_agent = self._make_agent() + # strip out all tools to force plain completion + synth_agent.tools = [] + synth_agent.tool_mapping = {} + + header = [] + for i, resp in enumerate(responses, start=1): + header.append(f"=== AGENT {i} RESPONSE ===\n{resp}\n") + all_text = "\n".join(header) + + template = self.config["synthesis_prompt"] + prompt = template.format( + num_responses=len(responses), + agent_responses=all_text, + ) + + try: + return synth_agent.run(prompt) + except Exception as e: + # fallback to concatenation + fallback = [] + for i, resp in enumerate(responses, start=1): + fallback.append(f"=== AGENT {i} ===") + fallback.append(resp) + fallback.append("") + return "\n".join(fallback) + + def get_progress_status(self) -> Dict[int, str]: + with self.progress_lock: + return dict(self.agent_progress) + + def orchestrate(self, user_input: str) -> str: + """Top-level entry: decompose, run in parallel, aggregate.""" + # reset + self.agent_progress.clear() + self.agent_results.clear() + + # 1) task decomposition + subtasks = self.decompose_task(user_input, self.num_agents) + for idx in range(self.num_agents): + self.agent_progress[idx] = "QUEUED" + + # 2) parallel execution + results: List[Dict[str, Any]] = [] + with ThreadPoolExecutor(max_workers=self.num_agents) as exe: + futures = { + exe.submit(self.run_agent_parallel, idx, subtasks[idx]): idx + for idx in range(self.num_agents) + } + for fut in as_completed(futures, timeout=self.task_timeout): + try: + results.append(fut.result()) + except Exception as e: + idx = futures[fut] + results.append({ + "agent_id": idx, + "status": "timeout", + "response": f"Agent {idx} timed out: {e}", + "execution_time": self.task_timeout, + }) + + # sort + aggregate + results.sort(key=lambda r: r["agent_id"]) + return self.aggregate_results(results) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Parallel task orchestrator") + parser.add_argument( + "--prompt", + type=str, + required=True, + help="The user query to orchestrate across agents" + ) + parser.add_argument( + "-c", "--config", + type=str, + default="config.yaml", + help="Path to YAML config file (default: config.yaml)" + ) + parser.add_argument( + "--provider", + type=str, + default="openrouter", + help="AI provider to use (must match entrypoint providers)" + ) + parser.add_argument( + "--model", + type=str, + default="moonshotai/kimi-k2", + help="Model name to use for all agents" + ) + parser.add_argument( + "--temperature", + type=float, + default=0.3, + help="Sampling temperature for all agents" + ) + parser.add_argument( + "--parallel_agents", + type=int, + help="Override number of parallel agents from config" + ) + parser.add_argument( + "--timeout", + type=float, + help="Override per-task timeout (seconds) from config" + ) + parser.add_argument( + "--silent", + action="store_true", + help="Suppress intermediate agent logs" + ) + args = parser.parse_args() + + orchestrator = TaskOrchestrator( + config_path=args.config, + provider=args.provider, + model=args.model, + temperature=args.temperature, + silent=args.silent + ) + + # optional overrides + if args.parallel_agents: + orchestrator.num_agents = args.parallel_agents + if args.timeout: + orchestrator.task_timeout = args.timeout + + print("🔍 Decomposing and running tasks in parallel...") + final = orchestrator.orchestrate(args.prompt) + print("\n🎯 Final aggregated result:\n") + print(final) return 0 if __name__ == "__main__": + import sys sys.exit(main()) From 4cf3b45b3bfc13d8cb63b2994a683ea93ca71acd Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 05:01:25 -0500 Subject: [PATCH 10/12] Update config.yaml --- config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config.yaml b/config.yaml index 1e0649b..4077028 100644 --- a/config.yaml +++ b/config.yaml @@ -1,6 +1,6 @@ orchestrator: - parallel_agents: 4 - task_timeout: 60 + parallel_agents: 2 + task_timeout: 300 aggregation_strategy: consensus question_generation_prompt: | Generate {num_agents} JSON-array questions to explore: "{user_input}" From ff456e91f348bf91c46989c714a961eb40c3bfde Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 05:04:14 -0500 Subject: [PATCH 11/12] Update README.md --- .github/README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/README.md b/.github/README.md index 485228b..e2dc9d1 100644 --- a/.github/README.md +++ b/.github/README.md @@ -73,6 +73,11 @@ source-agent \ --prompt "Review the error handling in this codebase" ``` +### Interactive Mode +```bash +source-agent --interactive +``` + --- ## Supported Providers From 4589c44aa8ff02987e0cab5d9bfc4169f4f62fbc Mon Sep 17 00:00:00 2001 From: Chris <363708+christopherwoodall@users.noreply.github.com> Date: Wed, 16 Jul 2025 05:11:09 -0500 Subject: [PATCH 12/12] Update heavy.py --- src/source_agent/heavy.py | 328 +------------------------------------- 1 file changed, 4 insertions(+), 324 deletions(-) diff --git a/src/source_agent/heavy.py b/src/source_agent/heavy.py index b6902c5..92fd75c 100644 --- a/src/source_agent/heavy.py +++ b/src/source_agent/heavy.py @@ -1,331 +1,11 @@ -import argparse -import os -import json -import yaml -import time -import threading -from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import List, Dict, Any +import sys -from source_agent.agents.code import CodeAgent - -def get_provider(provider_name: str = "openrouter") -> tuple[str, str]: - """ - Get the API key and base URL for the specified provider. - - Args: - provider_name: The name of the AI provider. - - Returns: - A tuple containing the API key and base URL for the provider. - - Raises: - ValueError: If the provider is unknown or the API key is missing. - """ - provider_keys = { - "xai": "XAI_API_KEY", - "google": "GEMINI_API_KEY", - "google_vertex": "GOOGLE_VERTEX_API_KEY", - "openai": "OPENAI_API_KEY", - "anthropic": "ANTHROPIC_API_KEY", - "mistral": "MISTRAL_API_KEY", - "deepseek": "DEEPSEEK_API_KEY", - "cerebras": "CEREBRAS_API_KEY", - "groq": "GROQ_API_KEY", - "vercel": "VERCEL_API_KEY", - "openrouter": "OPENROUTER_API_KEY", - } - - provider_base_urls = { - "xai": "https://api.x.ai/v1", - "google": "https://generativelanguage.googleapis.com/v1beta", - "google_vertex": "https://generativelanguage.googleapis.com/v1beta", - "openai": "https://api.openai.com/v1", - "anthropic": "https://api.anthropic.com/v1", - "mistral": "https://api.mistral.ai/v1", - "deepseek": "https://api.deepseek.com/v1", - "cerebras": "https://api.cerebras.net/v1", - "groq": "https://api.groq.com/v1", - "vercel": "https://api.vercel.ai/v1", - "openrouter": "https://openrouter.ai/api/v1", - } - - provider_key = provider_keys.get(provider_name.lower()) - if not provider_key: - raise ValueError(f"Unknown provider: {provider_name}") - - api_key = os.getenv(provider_key) - if not api_key: - raise ValueError(f"Missing API key for provider: {provider_name}") - - base_url = provider_base_urls.get(provider_name.lower()) - if not base_url: - raise ValueError(f"Missing base URL for provider: {provider_name}") - - return api_key, base_url - - -class TaskOrchestrator: - def __init__( - self, - config_path: str = "config.yaml", - provider: str = "openrouter", - model: str = "moonshotai/kimi-k2", - temperature: float = 0.3, - silent: bool = False, - ): - # load orchestrator config - with open(config_path, "r") as f: - self.config = yaml.safe_load(f)["orchestrator"] - - self.num_agents = self.config["parallel_agents"] - self.task_timeout = self.config["task_timeout"] - self.aggregation_strategy = self.config.get("aggregation_strategy", "consensus") - self.silent = silent - - # prepare agent factory args - api_key, base_url = get_provider(provider) - self.agent_kwargs = { - "api_key": api_key, - "base_url": base_url, - "model": model, - "temperature": temperature, - } - - # thread-safe progress & results - self.progress_lock = threading.Lock() - self.agent_progress: Dict[int, str] = {} - self.agent_results: Dict[int, str] = {} - - def _make_agent(self) -> CodeAgent: - """Instantiate a fresh CodeAgent for a subtask.""" - agent = CodeAgent(**self.agent_kwargs) - if self.silent: - # suppress internal prints by redirecting stdout temporarily - import sys, io - agent._print = print - agent._stdout_backup = sys.stdout - sys.stdout = io.StringIO() - return agent - - def decompose_task(self, user_input: str, num_agents: int) -> List[str]: - """Use AI to generate N sub‑prompts for parallel agents.""" - question_agent = self._make_agent() - - template = self.config["question_generation_prompt"] - prompt = template.format(user_input=user_input, num_agents=num_agents) - - # remove the task‐complete tool if present - question_agent.tools = [ - t for t in question_agent.tools - if getattr(t, "function", {}).get("name") != "task_mark_complete" - ] - question_agent.tool_mapping = { - name: fn for name, fn in question_agent.tool_mapping.items() - if name != "task_mark_complete" - } - - try: - raw = question_agent.run(prompt) - questions = json.loads(raw.strip()) - if len(questions) != num_agents: - raise ValueError(f"expected {num_agents} questions, got {len(questions)}") - return questions - except Exception: - # fallback simple templates - variations = [ - f"Research comprehensive information about: {user_input}", - f"Analyze and provide insights about: {user_input}", - f"Find alternative perspectives on: {user_input}", - f"Verify and cross-check facts about: {user_input}", - ] - return variations[:num_agents] - - def update_agent_progress(self, agent_id: int, status: str, result: str = None): - with self.progress_lock: - self.agent_progress[agent_id] = status - if result is not None: - self.agent_results[agent_id] = result - - def run_agent_parallel(self, agent_id: int, subtask: str) -> Dict[str, Any]: - """Execute one CodeAgent on the given subtask.""" - try: - self.update_agent_progress(agent_id, "PROCESSING") - agent = self._make_agent() - - start = time.time() - response = agent.run(subtask) - duration = time.time() - start - - self.update_agent_progress(agent_id, "COMPLETED", response) - return { - "agent_id": agent_id, - "status": "success", - "response": response, - "execution_time": duration, - } - except Exception as e: - return { - "agent_id": agent_id, - "status": "error", - "response": f"Error: {e}", - "execution_time": 0.0, - } - - def aggregate_results(self, results: List[Dict[str, Any]]) -> str: - """Combine all agent outputs into one final answer.""" - successes = [r for r in results if r["status"] == "success"] - if not successes: - return "All agents failed—please try again." - - responses = [r["response"] for r in successes] - if self.aggregation_strategy == "consensus": - return self._aggregate_consensus(responses) - # future strategies can go here - return self._aggregate_consensus(responses) - - def _aggregate_consensus(self, responses: List[str]) -> str: - """Use a final AI call to synthesize all successful agent responses.""" - if len(responses) == 1: - return responses[0] - - synth_agent = self._make_agent() - # strip out all tools to force plain completion - synth_agent.tools = [] - synth_agent.tool_mapping = {} - - header = [] - for i, resp in enumerate(responses, start=1): - header.append(f"=== AGENT {i} RESPONSE ===\n{resp}\n") - all_text = "\n".join(header) - - template = self.config["synthesis_prompt"] - prompt = template.format( - num_responses=len(responses), - agent_responses=all_text, - ) - - try: - return synth_agent.run(prompt) - except Exception as e: - # fallback to concatenation - fallback = [] - for i, resp in enumerate(responses, start=1): - fallback.append(f"=== AGENT {i} ===") - fallback.append(resp) - fallback.append("") - return "\n".join(fallback) - - def get_progress_status(self) -> Dict[int, str]: - with self.progress_lock: - return dict(self.agent_progress) - - def orchestrate(self, user_input: str) -> str: - """Top-level entry: decompose, run in parallel, aggregate.""" - # reset - self.agent_progress.clear() - self.agent_results.clear() - - # 1) task decomposition - subtasks = self.decompose_task(user_input, self.num_agents) - for idx in range(self.num_agents): - self.agent_progress[idx] = "QUEUED" - - # 2) parallel execution - results: List[Dict[str, Any]] = [] - with ThreadPoolExecutor(max_workers=self.num_agents) as exe: - futures = { - exe.submit(self.run_agent_parallel, idx, subtasks[idx]): idx - for idx in range(self.num_agents) - } - for fut in as_completed(futures, timeout=self.task_timeout): - try: - results.append(fut.result()) - except Exception as e: - idx = futures[fut] - results.append({ - "agent_id": idx, - "status": "timeout", - "response": f"Agent {idx} timed out: {e}", - "execution_time": self.task_timeout, - }) - - # sort + aggregate - results.sort(key=lambda r: r["agent_id"]) - return self.aggregate_results(results) - - -def main() -> int: - parser = argparse.ArgumentParser(description="Parallel task orchestrator") - parser.add_argument( - "--prompt", - type=str, - required=True, - help="The user query to orchestrate across agents" - ) - parser.add_argument( - "-c", "--config", - type=str, - default="config.yaml", - help="Path to YAML config file (default: config.yaml)" - ) - parser.add_argument( - "--provider", - type=str, - default="openrouter", - help="AI provider to use (must match entrypoint providers)" - ) - parser.add_argument( - "--model", - type=str, - default="moonshotai/kimi-k2", - help="Model name to use for all agents" - ) - parser.add_argument( - "--temperature", - type=float, - default=0.3, - help="Sampling temperature for all agents" - ) - parser.add_argument( - "--parallel_agents", - type=int, - help="Override number of parallel agents from config" - ) - parser.add_argument( - "--timeout", - type=float, - help="Override per-task timeout (seconds) from config" - ) - parser.add_argument( - "--silent", - action="store_true", - help="Suppress intermediate agent logs" - ) - args = parser.parse_args() - - orchestrator = TaskOrchestrator( - config_path=args.config, - provider=args.provider, - model=args.model, - temperature=args.temperature, - silent=args.silent - ) - - # optional overrides - if args.parallel_agents: - orchestrator.num_agents = args.parallel_agents - if args.timeout: - orchestrator.task_timeout = args.timeout - - print("🔍 Decomposing and running tasks in parallel...") - final = orchestrator.orchestrate(args.prompt) - print("\n🎯 Final aggregated result:\n") - print(final) +def main(): + print("Heavy agent running...") + # Add heavy agent logic here return 0 if __name__ == "__main__": - import sys sys.exit(main())