The World's First Neuro-Semantic Event Bus
"Don't send events. Send understanding."
NeuroBUS is a revolutionary event bus that transforms message passing into meaning passing for cognitive AI systems. Unlike traditional event buses (Redis, RabbitMQ, Kafka), NeuroBUS understands the semantic meaning of events and intelligently routes them based on context, not just pattern matching.
Traditional event buses are dumb pipes - they match strings and forward messages. NeuroBUS is intelligent - it understands what events mean, maintains context across conversations, remembers past interactions, and can even reason about events using LLMs.
Perfect for:
- 🤖 AI Agent Systems - Multi-agent coordination with semantic understanding
- 🔄 Microservices - Intelligent service-to-service communication
- 📊 Data Pipelines - Context-aware stream processing
- 🎮 Real-time Systems - Low-latency semantic routing
- 🧪 Event Sourcing - Time-travel debugging with causality tracking
Events are matched by meaning, not just exact strings. Using transformer embeddings, NeuroBUS understands that "user_logged_in" and "authentication_successful" are semantically similar.
@bus.subscribe("user authentication", semantic=True, threshold=0.8)
async def handle_auth(event: Event):
# Matches: user_login, auth_success, sign_in_complete, etc.
passMaintain state across 4 hierarchical scopes (global/session/user/event) with automatic context merging and DSL-based filtering.
# Set context at different scopes
bus.context.set_global("app_version", "1.0.0")
bus.context.set_session("user_id", "alice", session_id="sess_123")
# Filter events based on context
@bus.subscribe("alert", filter="priority >= 5 AND user.role == 'admin'")
async def handle_critical_alert(event: Event):
passTime-travel debugging with event replay, causality tracking, and temporal queries.
# Replay events from the past
async for event in bus.temporal.replay(
from_time=yesterday,
to_time=now,
speed=10.0 # 10x faster
):
# Re-process historical events
pass
# Track event causality
chain = bus.temporal.causality.get_causal_chain(event_id)
root = bus.temporal.causality.get_root(event_id)Native support for Qdrant and LanceDB for semantic event search and long-term memory.
from neurobus.memory import QdrantAdapter
# Store events in vector database
adapter = QdrantAdapter(url="http://localhost:6333")
await adapter.store_event(event, embedding)
# Search semantically similar past events
results = await adapter.search_similar(query_embedding, k=5)Automatically trigger LLM reasoning when events match patterns - no manual integration needed.
from neurobus.llm import LLMBridge
bridge = LLMBridge(provider="openai", api_key="sk-...")
@bridge.hook("error.*", "Analyze this error: {topic}\nData: {data}")
async def analyze_error(event, reasoning):
print(f"LLM Analysis: {reasoning}")
# Automatically invoked on any error.* eventHorizontal scaling with Redis-based multi-node clustering, leader election, and distributed locking.
config = NeuroBusConfig(
distributed={
"enabled": True,
"redis_url": "redis://localhost:6379",
}
)
bus = NeuroBus(config=config)
# Events automatically broadcast across all nodesBuilt-in metrics, comprehensive logging, and high test coverage (173 tests, 95% coverage).
from neurobus.monitoring.metrics import get_metrics
metrics = get_metrics()
stats = metrics.get_histogram_stats("dispatch_latency_seconds")
# Returns: min, max, mean, p50, p95, p99pip install neurobusimport asyncio
from neurobus import NeuroBus, Event
async def main():
# Create bus
bus = NeuroBus()
# Subscribe to events
@bus.subscribe("user.login")
async def handle_login(event: Event):
print(f"User {event.data['username']} logged in")
# Start bus
async with bus:
# Publish event
await bus.publish(Event(
topic="user.login",
data={"username": "alice"}
))
await asyncio.sleep(0.1)
asyncio.run(main())from neurobus import NeuroBus, Event, NeuroBusConfig
async def main():
# Enable semantic routing
config = NeuroBusConfig(semantic={"enabled": True})
bus = NeuroBus(config=config)
# Subscribe with semantic matching
@bus.subscribe("greeting", semantic=True, threshold=0.75)
async def handle_greeting(event: Event):
print(f"Got greeting: {event.topic}")
async with bus:
# All these will match semantically!
await bus.publish(Event(topic="hello", data={}))
await bus.publish(Event(topic="hi_there", data={}))
await bus.publish(Event(topic="good_morning", data={}))
await asyncio.sleep(0.5)
asyncio.run(main())from neurobus import NeuroBus, Event
from neurobus.llm import LLMBridge
async def main():
bus = NeuroBus()
# Setup LLM bridge
llm = LLMBridge(provider="openai", api_key="sk-...")
await llm.initialize()
# Hook LLM to error events
@llm.hook("error.*", "Diagnose: {topic}\nDetails: {data}")
async def diagnose_error(event, reasoning):
print(f"LLM says: {reasoning}")
async with bus:
# This will automatically trigger LLM analysis
await bus.publish(Event(
topic="error.database",
data={"error": "Connection timeout"}
))
await asyncio.sleep(1)
asyncio.run(main())pip install neurobuspip install neurobus[semantic]
# Includes: sentence-transformers, torchpip install neurobus[qdrant] # Qdrant support
pip install neurobus[lancedb] # LanceDB support
pip install neurobus[memory] # Bothpip install neurobus[openai] # OpenAI GPT
pip install neurobus[anthropic] # Anthropic Claude
pip install neurobus[ollama] # Local LLMs via Ollama
pip install neurobus[llm] # All LLM providerspip install neurobus[distributed] # Redis clusteringpip install neurobus[all]- API Documentation - Complete API reference
- Examples - 15+ working examples
- Architecture - System design and internals
- Contributing - How to contribute
- Changelog - Version history
Events are the fundamental unit of communication in NeuroBUS. Each event has a topic, data, optional context, and metadata.
event = Event(
topic="user.action.completed",
data={"action": "purchase", "amount": 99.99},
context={"user_id": "alice", "session": "xyz"},
metadata={"source": "web", "version": "2.0"}
)Subscribe to events using exact patterns, wildcards, or semantic similarity.
# Exact match
@bus.subscribe("user.login")
# Wildcard
@bus.subscribe("user.*")
# Semantic (requires sentence-transformers)
@bus.subscribe("user authentication", semantic=True)
# With context filtering
@bus.subscribe("alert", filter="priority > 5")Hierarchical state management across 4 scopes with automatic merging.
# Global context (shared across all)
bus.context.set_global("app_name", "MyApp")
# Session context (per-session)
bus.context.set_session("user_id", "alice", session_id="sess_1")
# User context (per-user)
bus.context.set_user("preferences", {"theme": "dark"}, user_id="alice")
# Event context (per-event)
event = Event(topic="action", data={}, context={"trace_id": "abc"})Time-travel debugging with event persistence, replay, and causality tracking.
# Store events (automatic with config)
config = NeuroBusConfig(temporal={"enabled": True})
# Query past events
events = await bus.temporal.query_events(
topic="user.*",
from_time=yesterday,
to_time=now
)
# Replay events
await bus.temporal.replay_events(from_time, to_time, speed=5.0)
# Track causality
chain = bus.temporal.causality.get_causal_chain(event_id)Long-term event storage with vector similarity search.
# Enable memory
config = NeuroBusConfig(memory={"enabled": True})
# Search similar events
results = await bus.memory.search("user authentication issues", k=5)
# Get recent memories
recent = bus.memory.get_recent(limit=10)┌─────────────────────────────────────────────────────────────┐
│ NeuroBUS │
├─────────────────────────────────────────────────────────────┤
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Semantic │ │ Context │ │ Temporal │ │ LLM │ │
│ │ Router │ │ Engine │ │ Store │ │ Bridge │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Memory │ │ Metrics │ │ Cluster │ │ Core │ │
│ │ Engine │ │Collector │ │ Manager │ │ Bus │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Qdrant │ │ Redis │ │ SQLite │ │ LLM │
│LanceDB │ │Cluster │ │ WAL │ │Provider │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
- Latency: <2ms P95 for event dispatch
- Throughput: 10,000+ events/second
- Memory: <100MB base footprint
- Scalability: Horizontal scaling via Redis clustering
- Semantic: <5ms embedding generation (cached)
NeuroBUS has comprehensive test coverage:
# Run all tests
pytest
# Run with coverage
pytest --cov=neurobus --cov-report=html
# Run specific test suite
pytest tests/unit/
pytest tests/integration/Test Statistics:
- 173 tests (100% passing)
- 95% code coverage
- 100% type coverage (mypy strict)
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
Quick Start for Contributors:
# Clone repository
git clone https://github.com/eshanized/neurobus.git
cd neurobus
# Install in development mode
pip install -e ".[dev,all]"
# Run tests
pytest
# Format code
black neurobus/ tests/
ruff check neurobus/ tests/
# Type check
mypy neurobus/NeuroBUS is released under the MIT License.
- Eshan Roy (@eshanized) - Creator & Lead Developer
- TIVerse Labs - Cognitive Infrastructure Division
Special thanks to:
- The sentence-transformers team for semantic embeddings
- Qdrant and LanceDB teams for vector database support
- OpenAI and Anthropic for LLM capabilities
- The Python async community
- Email: eshanized@proton.me
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Discord: Join our community (coming soon)
- GraphQL API
- Admin Dashboard
- Enhanced monitoring (Grafana dashboards)
- Performance benchmarks suite
- Multi-tenancy support
- Rate limiting per subscription
- Schema evolution
- Additional vector DB adapters (Pinecone, Weaviate)
- Streaming support
- Plugin architecture
- Cloud-native deployment templates
- Enterprise features
If you find NeuroBUS useful, please star the repository on GitHub!
# Multiple agents communicating semantically
@agent1.subscribe("help needed", semantic=True)
async def assist(event):
# Responds to "need help", "assistance required", etc.
pass# Service-to-service with context
bus.context.set_session("request_id", req_id)
await bus.publish(Event("order.created", data=order_data))# Semantic sensor fusion
@bus.subscribe("temperature reading", semantic=True)
async def process_temp(event):
# Matches various sensor formats
pass# Time-travel for debugging
events = await bus.temporal.query_events(
topic="order.*",
from_time=incident_time - 1hour,
to_time=incident_time + 1hour
)Built with ❤️ by TIVerse Labs - Building Cognitive Infrastructure for AI
NeuroBUS: Where Events Meet Intelligence 🧠✨