Cooperative multi-agent orchestration with pipeline workflows, shared memory, and task execution
A multi-agent orchestration framework where specialized agents (Researcher, Writer, Coder, Reviewer) collaborate through pipeline workflows with shared memory, message passing, and cooperative task completion. Built with production patterns including retry logic, middleware, serialization, and factory-based instantiation.
- π€ 4 Specialized Agents β Researcher, Writer, Coder, Reviewer with distinct roles
- π Pipeline Orchestration β Sequential, conditional, broadcast, and round-robin workflows
- π§ Shared Memory β Conversation history, key-value context, task queues, event bus
- π¬ Message Passing β Structured inter-agent communication protocol with metadata
- π Performance Metrics β Agent stats, latency, quality scoring, run comparison
- π Retry & Resilience β Exponential backoff with jitter for transient failure recovery
- π§ Middleware System β Composable pre/post-processing hooks (logging, validation, token budget)
- π Agent Factory β Configuration-driven agent creation with extensible registry
- πΎ Run Persistence β Save/load pipeline runs to JSON for auditing and replay
- π₯οΈ CLI Interface β Run pipelines from the terminal with verbose and JSON output modes
- π REST API β FastAPI endpoints for remote pipeline execution
- π§ͺ 90+ Tests β Comprehensive test coverage with parametrized tests and edge cases
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Entry Points β
β CLI (cli.py) β REST API (api/) β Streamlit (streamlit/) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Middleware Stack β
β LoggingMW β ValidationMW β TokenBudgetMW β Agent β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Orchestration Layer β
β Pipeline β Workflow β Coordinator β ExecutionTracer β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Agent Layer β
β Researcher β Writer β Coder β Reviewer β (custom...) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Infrastructure β
β Memory β EventBus β TaskQueue β SharedContext β Retry β
β Factory β Serialization β Evaluation β Exceptions β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
multi-agent-system/
βββ src/
β βββ __init__.py # Package exports and version
β βββ agent_base.py # AgentMessage, BaseAgent ABC, Tool
β βββ agents.py # Researcher, Writer, Coder, Reviewer
β βββ orchestration.py # Pipeline, Workflow, Coordinator, Tracer
β βββ memory.py # ConversationMemory, SharedContext, TaskQueue, EventBus
β βββ evaluation.py # Metrics, latency, quality scoring, reports
β βββ middleware.py # Middleware ABC and built-in middleware
β βββ retry.py # @retry decorator and RetryPolicy
β βββ factory.py # AgentFactory and create_agent/create_team
β βββ serialization.py # PipelineRun save/load to JSON
β βββ cli.py # Command-line interface
β βββ config.py # Centralized configuration
β βββ exceptions.py # Custom exception hierarchy
β βββ api/
β βββ main.py # FastAPI REST endpoints
βββ tests/
β βββ conftest.py # Shared fixtures
β βββ test_agent_base.py # Base class and message tests
β βββ test_agents.py # Agent implementation tests
β βββ test_orchestration.py # Pipeline, Workflow, Coordinator tests
β βββ test_memory.py # Memory subsystem tests
β βββ test_evaluation.py # Metrics and report tests
β βββ test_middleware.py # Middleware stack tests
β βββ test_retry.py # Retry logic tests
β βββ test_factory.py # Factory pattern tests
β βββ test_serialization.py # Persistence tests
β βββ test_api.py # API endpoint tests
βββ streamlit_app/ # Interactive dashboard
βββ requirements.txt
βββ README.md
git clone https://github.com/mohamed-elkholy95/multi-agent-system.git
cd multi-agent-system
pip install -r requirements.txt
python -m pytest tests/ -v# Default pipeline (all agents)
python -m src.cli "Analyze the impact of transformers on NLP"
# Custom agent selection
python -m src.cli "Write a sorting algorithm" --agents coder reviewer
# Verbose output with execution trace
python -m src.cli "Research AI trends" --verbose --trace --report
# Save results to file
python -m src.cli "Analyze data" --output outputs/run_001.json --jsonuvicorn src.api.main:app --reload --port 8004
# Then visit http://localhost:8004/docs for interactive API docsstreamlit run streamlit_app/app.pyfrom src import Pipeline, create_team, MiddlewareStack, LoggingMiddleware
# Create agents using the factory
team = create_team(["researcher", "writer", "reviewer"])
# Build a pipeline with middleware
pipeline = Pipeline("research_pipeline")
for agent in team:
pipeline.add_agent(agent)
# Run the pipeline
messages = pipeline.run("Analyze current AI safety research")
# Print results
for msg in messages:
print(f"[{msg.agent_name or 'user'}] {msg.content[:80]}")from src import MiddlewareStack, LoggingMiddleware, TokenBudgetMiddleware, create_agent
stack = MiddlewareStack()
stack.add(LoggingMiddleware(log_content=True))
stack.add(TokenBudgetMiddleware(max_tokens=5000))
agent = create_agent("researcher")
messages = [AgentMessage("user", "What are the latest ML trends?")]
# Execute with middleware wrapping
response = stack.execute(agent, messages)
print(response.metadata["token_budget"]) # Token usage statsfrom src import PipelineRun, save_run, load_run
# After running a pipeline...
run = PipelineRun("experiment_001", messages, metadata={"model": "gpt-4"})
run.finalize()
save_run(run, "outputs/experiment_001.json")
# Later, load and analyze
loaded = load_run("outputs/experiment_001.json")
print(loaded.summary)
print(f"Agents: {loaded.agent_names}")This project demonstrates several GoF and production design patterns:
| Pattern | Where | Purpose |
|---|---|---|
| Strategy | BaseAgent.process() |
Agents are interchangeable strategies |
| Chain of Responsibility | Pipeline |
Sequential agent processing |
| Mediator | Coordinator |
Centralized agent communication |
| Observer | EventBus, ExecutionTracer |
Decoupled event notifications |
| Factory | AgentFactory, create_agent() |
Config-driven agent creation |
| Decorator | @retry, Middleware |
Transparent behavior extension |
| Template Method | BaseAgent |
Common structure, custom processing |
| Method | Endpoint | Description |
|---|---|---|
GET |
/health |
System health check |
GET |
/agents |
List registered agents |
POST |
/pipeline/run |
Execute the default pipeline |
POST |
/broadcast |
Send message to all agents |
POST |
/round-robin |
Multi-turn agent discussion |
GET |
/metrics/summary |
Last run metrics |
GET |
/events/log |
Event bus history |
Agents are specialized workers that each handle one aspect of a task. The orchestration layer decides WHO does WHAT and in WHAT ORDER, turning individual capabilities into collaborative workflows.
Composable processing layers wrap agent execution (like Express.js middleware). Before hooks can validate inputs or check budgets; after hooks can log results or track tokens. The onion model ensures proper ordering.
When agents call external services that fail transiently, the retry decorator waits progressively longer between attempts (1s β 2s β 4s) with random jitter to prevent thundering herd problems.
Complete pipeline runs can be serialized to JSON for audit trails, debugging, caching, and A/B comparison between runs.
Mohamed Elkholy β GitHub Β· melkholy@techmatrix.com