diff --git a/.skills/observing-atproto/SKILL.md b/.skills/observing-atproto/SKILL.md index 24950ad..ccff7b9 100644 --- a/.skills/observing-atproto/SKILL.md +++ b/.skills/observing-atproto/SKILL.md @@ -60,14 +60,14 @@ async def pulse(duration_seconds=15): } ``` -### Using tools/observer.py +### Using tools/firehose.py ```bash -# Quick pulse -uv run python -m tools.observer pulse 30 +# Quick network sample +uv run python -m tools.firehose sample 30 -# Detailed summary -uv run python -m tools.observer summary 60 +# Network analysis +uv run python -m tools.firehose analyze 60 ``` ## Monitoring Feeds @@ -89,19 +89,9 @@ async with httpx.AsyncClient() as client: ### Recording Observations -```python -from tools.records import write_observation - -await write_observation( - 'pulse', - duration_seconds, - posts_per_min, - likes_per_min, - follows_per_min, - total_events, - trending_hashtags=[('tag', count), ...], - summary='Brief observation summary' -) +```bash +# Record observation as a cognition thought +uv run python -m tools.cognition thought "Network pulse: {posts_per_min} posts/min, {likes_per_min} likes/min. Top tags: #atproto, #bluesky" ``` ## Network Statistics (typical) diff --git a/README.md b/README.md index bc346d6..80291f9 100644 --- a/README.md +++ b/README.md @@ -43,8 +43,6 @@ Python tools for ATProtocol operations: | `tools/explore.py` | Public data exploration | | `tools/firehose.py` | Real-time event stream access | | `tools/identity.py` | DID/handle resolution | -| `tools/observer.py` | Network pulse and observation | -| `tools/records.py` | Structured record writing | | `tools/responder.py` | Bluesky notification handling | | `tools/x_responder.py` | X/Twitter notification handling | | `tools/telepathy.py` | Cross-agent cognition reader | @@ -103,7 +101,7 @@ cp .env.example .env # Run tools uv run python -m tools.cognition status -uv run python -m tools.observer pulse 30 +uv run python -m tools.firehose pulse 30 ``` ## Philosophy diff --git a/tools/bluesky_bulk.py b/tools/bluesky_bulk.py deleted file mode 100644 index b60ad32..0000000 --- a/tools/bluesky_bulk.py +++ /dev/null @@ -1,269 +0,0 @@ -""" -Bluesky Bulk Operations - Mass-scale ATProto engagement. - -Usage: - # Bulk like posts by URI - uv run python -m tools.bluesky_bulk like uri1 uri2 uri3 - - # Auto-engage: like mentions, follows, interesting posts - uv run python -m tools.bluesky_bulk engage - - # Scan timeline for patterns - uv run python -m tools.bluesky_bulk scan --limit 50 - - # Process notification backlog - uv run python -m tools.bluesky_bulk notifications --like-replies -""" - -import argparse -import asyncio -from concurrent.futures import ThreadPoolExecutor -from datetime import datetime -import os -from pathlib import Path - -from dotenv import load_dotenv -from atproto import Client -from rich.console import Console -from rich.table import Table - -load_dotenv() -console = Console() - -# Track liked posts to prevent re-liking -LIKED_CACHE_PATH = Path(__file__).parent.parent / 'data' / 'liked_posts.txt' - - -def load_liked_cache() -> set: - """Load set of already-liked post URIs.""" - if LIKED_CACHE_PATH.exists(): - return set(LIKED_CACHE_PATH.read_text().strip().split('\n')) - return set() - - -def save_liked_cache(liked: set): - """Save liked posts to cache file.""" - LIKED_CACHE_PATH.parent.mkdir(exist_ok=True) - LIKED_CACHE_PATH.write_text('\n'.join(sorted(liked))) - - -def get_client(): - """Get authenticated ATProto client.""" - client = Client(base_url=os.environ.get('ATPROTO_PDS')) - client.login(os.environ['ATPROTO_HANDLE'], os.environ['ATPROTO_APP_PASSWORD']) - return client - - -def bulk_like(uris: list[str]) -> dict: - """Like multiple posts in parallel.""" - client = get_client() - liked_cache = load_liked_cache() - results = {'success': [], 'failed': [], 'skipped': []} - - for uri in uris: - if uri in liked_cache: - results['skipped'].append(uri) - continue - try: - # Get post CID - parts = uri.replace('at://', '').split('/') - did = parts[0] - rkey = parts[-1] - - # Get the post to get CID - resp = client.get_post(rkey, did) - cid = resp.cid - - # Like it - client.like(uri, cid) - results['success'].append(uri) - liked_cache.add(uri) - except Exception as e: - err = str(e).lower() - if 'already' in err: - results['skipped'].append(uri) - liked_cache.add(uri) - else: - results['failed'].append((uri, str(e))) - - save_liked_cache(liked_cache) - - console.print(f"[green]Liked {len(results['success'])} posts[/green]") - if results['skipped']: - console.print(f"[dim]Skipped {len(results['skipped'])} (already liked)[/dim]") - if results['failed']: - console.print(f"[yellow]Failed: {len(results['failed'])}[/yellow]") - - return results - - -def process_notifications(like_replies: bool = True, limit: int = 50) -> dict: - """Process notification backlog efficiently.""" - client = get_client() - - # Load cache of already-liked posts - liked_cache = load_liked_cache() - - notifs = client.app.bsky.notification.list_notifications({'limit': limit}) - - stats = { - 'likes': 0, - 'replies': 0, - 'follows': 0, - 'mentions': 0, - 'liked_back': 0, - 'skipped': 0 # Already liked - } - - to_like = [] - - for notif in notifs.notifications: - reason = notif.reason - - if reason == 'like': - stats['likes'] += 1 - elif reason == 'reply': - stats['replies'] += 1 - if like_replies and notif.uri: - to_like.append((notif.uri, notif.cid)) - elif reason == 'follow': - stats['follows'] += 1 - elif reason == 'mention': - stats['mentions'] += 1 - if notif.uri: - to_like.append((notif.uri, notif.cid)) - - # Bulk like replies/mentions (skip already-liked) - if to_like: - for uri, cid in to_like: - if uri in liked_cache: - stats['skipped'] += 1 - continue - try: - client.like(uri, cid) - stats['liked_back'] += 1 - liked_cache.add(uri) - except Exception as e: - # "already liked" errors are fine, still add to cache - if 'already' in str(e).lower(): - liked_cache.add(uri) - stats['skipped'] += 1 - - # Save updated cache - save_liked_cache(liked_cache) - - # Mark as read - try: - client.app.bsky.notification.update_seen({ - 'seenAt': datetime.utcnow().isoformat() + 'Z' - }) - except: - pass - - table = Table(title="Notification Processing") - table.add_column("Type") - table.add_column("Count") - for k, v in stats.items(): - table.add_row(k, str(v)) - console.print(table) - - return stats - - -def scan_timeline(limit: int = 50) -> dict: - """Scan timeline for engagement patterns.""" - client = get_client() - - timeline = client.get_timeline(limit=limit) - - authors = {} - topics = [] - agent_posts = [] - - for item in timeline.feed: - post = item.post - author = post.author.handle - authors[author] = authors.get(author, 0) + 1 - - text = post.record.text.lower() if hasattr(post.record, 'text') else '' - - # Detect agent-related posts - if any(kw in text for kw in ['agent', 'ai', 'llm', 'cognition', 'memory']): - agent_posts.append({ - 'uri': post.uri, - 'author': author, - 'text': text[:100] - }) - - table = Table(title=f"Timeline Scan ({limit} posts)") - table.add_column("Metric") - table.add_column("Value") - table.add_row("Total posts", str(len(timeline.feed))) - table.add_row("Unique authors", str(len(authors))) - table.add_row("Agent-related", str(len(agent_posts))) - table.add_row("Top author", max(authors.items(), key=lambda x: x[1])[0] if authors else "N/A") - console.print(table) - - if agent_posts: - console.print("\n[cyan]Agent-related posts:[/cyan]") - for p in agent_posts[:5]: - console.print(f" @{p['author']}: {p['text'][:60]}...") - - return {'authors': authors, 'agent_posts': agent_posts} - - -def auto_engage(): - """Full auto-engagement routine.""" - console.print("[bold]Running full engagement routine[/bold]\n") - - # Process notifications - console.print("[cyan]1. Processing notifications...[/cyan]") - notif_stats = process_notifications(like_replies=True) - - # Scan timeline - console.print("\n[cyan]2. Scanning timeline...[/cyan]") - scan_stats = scan_timeline(50) - - # Like agent-related posts - if scan_stats['agent_posts']: - console.print("\n[cyan]3. Liking agent-related posts...[/cyan]") - uris = [p['uri'] for p in scan_stats['agent_posts'][:10]] - bulk_like(uris) - - console.print("\n[green]Engagement complete[/green]") - - -def main(): - parser = argparse.ArgumentParser(description="Bluesky bulk operations") - subparsers = parser.add_subparsers(dest="command", required=True) - - # like - like_parser = subparsers.add_parser("like", help="Bulk like posts") - like_parser.add_argument("uris", nargs="+", help="Post URIs to like") - - # engage - subparsers.add_parser("engage", help="Full auto-engagement") - - # notifications - notif_parser = subparsers.add_parser("notifications", help="Process notifications") - notif_parser.add_argument("--like-replies", action="store_true", help="Like replies to us") - notif_parser.add_argument("--limit", type=int, default=50, help="Notifications to process") - - # scan - scan_parser = subparsers.add_parser("scan", help="Scan timeline") - scan_parser.add_argument("--limit", type=int, default=50, help="Posts to scan") - - args = parser.parse_args() - - if args.command == "like": - bulk_like(args.uris) - elif args.command == "engage": - auto_engage() - elif args.command == "notifications": - process_notifications(args.like_replies, args.limit) - elif args.command == "scan": - scan_timeline(args.limit) - - -if __name__ == "__main__": - main() diff --git a/tools/consent.py b/tools/consent.py deleted file mode 100644 index 32b972c..0000000 --- a/tools/consent.py +++ /dev/null @@ -1,215 +0,0 @@ -""" -Consent Management - Track users who opt-in to agent interactions. - -Users who haven't opted in will only receive responses when they directly mention the agent. -Users who opt-in can receive proactive mentions, thread invitations, etc. - -Usage: - uv run python -m tools.consent list # Show opted-in users - uv run python -m tools.consent check # Check if user opted in - uv run python -m tools.consent add # Add user to consent list - uv run python -m tools.consent remove # Remove user from list - uv run python -m tools.consent scan # Scan for opt-in signals -""" - -import argparse -import asyncio -import json -from datetime import datetime, timezone -from pathlib import Path - -import httpx -from rich.console import Console -from rich.table import Table - -console = Console() - -# Consent data file -CONSENT_FILE = Path(__file__).parent.parent / "data" / "consent.json" - -# Default consented users (operators, known collaborators) -DEFAULT_CONSENT = { - "cameron.stream": { - "did": "did:plc:gfrmhdmjvxn2sjedzboeudef", - "allowMentions": True, - "allowThreads": True, - "allowDMs": False, - "reason": "Operator/administrator", - "addedAt": "2026-01-01T00:00:00Z" - } -} - - -def load_consent() -> dict: - """Load consent list from file.""" - if CONSENT_FILE.exists(): - return json.loads(CONSENT_FILE.read_text()) - return DEFAULT_CONSENT.copy() - - -def save_consent(data: dict): - """Save consent list to file.""" - CONSENT_FILE.parent.mkdir(parents=True, exist_ok=True) - CONSENT_FILE.write_text(json.dumps(data, indent=2)) - - -def list_consented(): - """Show all users who have opted in.""" - data = load_consent() - - if not data: - console.print("[dim]No users have opted in.[/dim]") - return - - table = Table(title=f"Opted-In Users ({len(data)})") - table.add_column("Handle", style="cyan") - table.add_column("Mentions") - table.add_column("Threads") - table.add_column("DMs") - table.add_column("Reason") - - for handle, info in data.items(): - table.add_row( - f"@{handle}", - "✓" if info.get("allowMentions") else "✗", - "✓" if info.get("allowThreads") else "✗", - "✓" if info.get("allowDMs") else "✗", - info.get("reason", "")[:30] - ) - - console.print(table) - - -def check_consent(handle: str) -> dict | None: - """Check if a user has opted in.""" - handle = handle.lstrip("@").lower() - data = load_consent() - - # Check exact match - if handle in data: - info = data[handle] - console.print(f"[green]@{handle} has opted in:[/green]") - console.print(f" Mentions: {'✓' if info.get('allowMentions') else '✗'}") - console.print(f" Threads: {'✓' if info.get('allowThreads') else '✗'}") - console.print(f" DMs: {'✓' if info.get('allowDMs') else '✗'}") - console.print(f" Reason: {info.get('reason', 'N/A')}") - return info - - console.print(f"[yellow]@{handle} has NOT opted in.[/yellow]") - console.print("[dim]Only respond when directly @mentioned.[/dim]") - return None - - -async def resolve_handle(handle: str) -> str | None: - """Resolve handle to DID.""" - async with httpx.AsyncClient() as client: - try: - resp = await client.get( - "https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle", - params={"handle": handle}, - timeout=10 - ) - if resp.status_code == 200: - return resp.json().get("did") - except: - pass - return None - - -def add_consent( - handle: str, - mentions: bool = True, - threads: bool = True, - dms: bool = False, - reason: str = "Manual add" -): - """Add user to consent list.""" - handle = handle.lstrip("@").lower() - data = load_consent() - - # Resolve DID - did = asyncio.run(resolve_handle(handle)) - - data[handle] = { - "did": did, - "allowMentions": mentions, - "allowThreads": threads, - "allowDMs": dms, - "reason": reason, - "addedAt": datetime.now(timezone.utc).isoformat() - } - - save_consent(data) - console.print(f"[green]Added @{handle} to consent list[/green]") - - -def remove_consent(handle: str): - """Remove user from consent list.""" - handle = handle.lstrip("@").lower() - data = load_consent() - - if handle in data: - del data[handle] - save_consent(data) - console.print(f"[green]Removed @{handle} from consent list[/green]") - else: - console.print(f"[yellow]@{handle} was not in consent list[/yellow]") - - -def can_mention(handle: str) -> bool: - """Check if we can proactively mention this user.""" - handle = handle.lstrip("@").lower() - data = load_consent() - return data.get(handle, {}).get("allowMentions", False) - - -def can_join_thread(handle: str) -> bool: - """Check if we can join threads started by this user.""" - handle = handle.lstrip("@").lower() - data = load_consent() - return data.get(handle, {}).get("allowThreads", False) - - -def main(): - parser = argparse.ArgumentParser(description="Consent management") - subparsers = parser.add_subparsers(dest="command", required=True) - - # list - subparsers.add_parser("list", help="Show opted-in users") - - # check - check_p = subparsers.add_parser("check", help="Check user consent") - check_p.add_argument("handle") - - # add - add_p = subparsers.add_parser("add", help="Add user to consent list") - add_p.add_argument("handle") - add_p.add_argument("--reason", default="Manual add") - add_p.add_argument("--no-mentions", action="store_true") - add_p.add_argument("--no-threads", action="store_true") - add_p.add_argument("--allow-dms", action="store_true") - - # remove - rem_p = subparsers.add_parser("remove", help="Remove user from consent list") - rem_p.add_argument("handle") - - args = parser.parse_args() - - if args.command == "list": - list_consented() - elif args.command == "check": - check_consent(args.handle) - elif args.command == "add": - add_consent( - args.handle, - mentions=not args.no_mentions, - threads=not args.no_threads, - dms=args.allow_dms, - reason=args.reason - ) - elif args.command == "remove": - remove_consent(args.handle) - - -if __name__ == "__main__": - main() diff --git a/tools/daemon.py b/tools/daemon.py deleted file mode 100644 index 41f892b..0000000 --- a/tools/daemon.py +++ /dev/null @@ -1,341 +0,0 @@ -""" -comind Daemon - -A long-running process that monitors the network and takes actions. -This is the autonomous nervous system of comind. - -Capabilities: -- Continuous firehose monitoring -- Detect and respond to mentions -- Track network patterns -- Post observations -- Log everything for analysis -""" - -import asyncio -import json -import os -from datetime import datetime, timezone -from pathlib import Path -from typing import Optional - -import websockets -from dotenv import load_dotenv -from rich.console import Console - -from tools.agent import ComindAgent -from tools.intelligence import NetworkIntelligence, COMIND_HANDLES, COMIND_AGENTS - -console = Console() - -# Load env -load_dotenv(Path(__file__).parent.parent / ".env") - -JETSTREAM_RELAY = "wss://jetstream2.us-east.bsky.network/subscribe" -LOG_DIR = Path(__file__).parent.parent / "logs" - -# Known agent DIDs to track -AGENT_DIDS = { - "did:plc:l46arqe6yfgh36h3o554iyvr": "central", - "did:plc:mxzuau6m53jtdsbqe6f4laov": "void", - "did:plc:uz2snz44gi4zgqdwecavi66r": "herald", - "did:plc:ogruxay3tt7wycqxnf5lis6s": "grunk", - "did:plc:onfljgawqhqrz3dki5j6jh3m": "archivist", - "did:plc:oetfdqwocv4aegq2yj6ix4w5": "umbra", - "did:plc:o5662l2bbcljebd6rl7a6rmz": "astral", - "did:plc:uzlnp6za26cjnnsf3qmfcipu": "magenta", -} - -# Priority system -CAMERON_DID = "did:plc:gfrmhdmjvxn2sjedzboeudef" -HIGH_PRIORITY_KEYWORDS = [ - "help", "feedback", "bug", "broken", "issue", "error", - "how do", "can you", "what is", "why", -] - -def get_mention_priority(did: str, text: str) -> str: - """Determine priority level for a mention.""" - # Critical: Cameron - if did == CAMERON_DID: - return "CRITICAL" - - # Comind agents: Skip unless direct question to me - if did in AGENT_DIDS: - if "@central" in text.lower() and "?" in text: - return "MEDIUM" # They asked me directly - return "SKIP" # General post, avoid loops - - # High: Questions or keywords from humans - text_lower = text.lower() - if "?" in text or any(kw in text_lower for kw in HIGH_PRIORITY_KEYWORDS): - return "HIGH" - - # Medium: General human mention - return "MEDIUM" - - -class ComindDaemon: - """ - The comind daemon - autonomous network presence. - """ - - def __init__(self, respond_to_mentions: bool = False, post_observations: bool = False): - self.respond_to_mentions = respond_to_mentions - self.post_observations = post_observations - self.intel = NetworkIntelligence() - self.running = False - self.agent: Optional[ComindAgent] = None - - # Ensure log directory exists - LOG_DIR.mkdir(exist_ok=True) - - # Persistent log files (append across sessions) - self.log_file = LOG_DIR / "daemon.jsonl" - self.mention_log = LOG_DIR / "mentions.jsonl" - self.agent_log = LOG_DIR / "agent_activity.jsonl" - self.pulse_log = LOG_DIR / "network_pulse.jsonl" - self.last_pulse = datetime.now(timezone.utc) - - def log(self, event_type: str, data: dict): - """Log an event to the session log.""" - entry = { - "timestamp": datetime.now(timezone.utc).isoformat(), - "type": event_type, - **data - } - with open(self.log_file, "a") as f: - f.write(json.dumps(entry) + "\n") - - def log_mention(self, mention: dict): - """Log a mention to the mentions log.""" - with open(self.mention_log, "a") as f: - f.write(json.dumps(mention) + "\n") - - def log_agent_activity(self, agent_name: str, did: str, uri: str, text: str): - """Log activity from a known agent.""" - entry = { - "timestamp": datetime.now(timezone.utc).isoformat(), - "agent": agent_name, - "did": did, - "uri": uri, - "text": text[:500] - } - with open(self.agent_log, "a") as f: - f.write(json.dumps(entry) + "\n") - console.print(f"[magenta]📡 {agent_name}:[/magenta] {text[:80]}...") - - def log_pulse(self): - """Log hourly network pulse snapshot.""" - pulse = { - "timestamp": datetime.now(timezone.utc).isoformat(), - "events": self.intel.total_events, - "posts": self.intel.posts_count, - "likes": self.intel.likes_count, - "posts_per_sec": round(self.intel.posts_per_second, 2), - "top_hashtags": self.intel.top_hashtags(5), - "comind_mentions": len(self.intel.comind_mentions) - } - with open(self.pulse_log, "a") as f: - f.write(json.dumps(pulse) + "\n") - console.print(f"[blue]📊 Pulse logged:[/blue] {pulse['posts']} posts, {pulse['likes']} likes") - - async def start(self): - """Start the daemon.""" - console.print("[bold green]Starting comind daemon...[/bold green]") - - if self.respond_to_mentions or self.post_observations: - self.agent = ComindAgent() - await self.agent.__aenter__() - - self.running = True - self.log("daemon_start", {"respond": self.respond_to_mentions, "post": self.post_observations}) - - console.print(f"[dim]Logging to: {self.log_file}[/dim]") - console.print(f"[dim]Mentions log: {self.mention_log}[/dim]") - console.print("[bold]Watching the firehose...[/bold]\n") - - async def stop(self): - """Stop the daemon.""" - self.running = False - if self.agent: - await self.agent.__aexit__(None, None, None) - - self.log("daemon_stop", {"total_events": self.intel.total_events}) - console.print("\n[bold red]Daemon stopped.[/bold red]") - - async def handle_mention(self, record: dict, did: str, uri: str): - """Handle a mention of comind.""" - text = record.get("text", "") - priority = get_mention_priority(did, text) - - mention_data = { - "timestamp": datetime.now(timezone.utc).isoformat(), - "did": did, - "uri": uri, - "text": text, - "priority": priority, - } - - self.log_mention(mention_data) - - # Priority-based alerts - if priority == "CRITICAL": - console.print(f"[bold red]🚨 CRITICAL:[/bold red] Cameron mention: {text[:80]}...") - elif priority == "HIGH": - console.print(f"[bold yellow]⚠️ HIGH:[/bold yellow] {text[:80]}...") - elif priority == "SKIP": - console.print(f"[dim]⏭️ SKIP:[/dim] {text[:60]}... (comind agent)") - else: - console.print(f"[yellow]⚡ MENTION:[/yellow] {text[:100]}...") - - # Optionally respond - if self.respond_to_mentions and self.agent and priority != "SKIP": - # For now, just log that we would respond - # In the future, implement smart responses - console.print("[dim] (Response capability available but not yet implemented)[/dim]") - - async def handle_comind_interaction(self, event_type: str, from_did: str, to_did: str): - """Handle an interaction involving a comind agent.""" - from_name = COMIND_AGENTS.get(from_did, from_did[:16] + "...") - to_name = COMIND_AGENTS.get(to_did, to_did[:16] + "...") - - self.log("comind_interaction", { - "type": event_type, - "from": from_name, - "to": to_name - }) - - console.print(f"[cyan]⚡ {event_type.upper()}:[/cyan] {from_name} → {to_name}") - - async def run(self, duration: Optional[int] = None): - """ - Run the daemon. - - Args: - duration: Optional duration in seconds. None = run forever. - """ - await self.start() - - url = f"{JETSTREAM_RELAY}?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.graph.follow" - - start_time = asyncio.get_event_loop().time() - last_status = start_time - status_interval = 60 # Print status every 60 seconds - - try: - async with websockets.connect(url) as ws: - while self.running: - # Check duration - if duration and (asyncio.get_event_loop().time() - start_time) > duration: - break - - try: - message = await asyncio.wait_for(ws.recv(), timeout=1.0) - event = json.loads(message) - - self.intel.total_events += 1 - - commit = event.get("commit", {}) - collection = commit.get("collection", "") - operation = commit.get("operation", "") - did = event.get("did", "") - record = commit.get("record", {}) - uri = f"at://{did}/{collection}/{commit.get('rkey', '')}" - - # Process posts - if collection == "app.bsky.feed.post" and operation == "create": - self.intel.record_post(record, did) - text = record.get("text", "") - - # Track agent activity - if did in AGENT_DIDS: - self.log_agent_activity(AGENT_DIDS[did], did, uri, text) - - # Check for comind mentions - if any(h.lower() in text.lower() for h in COMIND_HANDLES) or "comind" in text.lower(): - await self.handle_mention(record, did, uri) - - # Process likes - elif collection == "app.bsky.feed.like": - subject = record.get("subject", {}) - target_uri = subject.get("uri", "") - if target_uri.startswith("at://"): - target_did = target_uri.split("/")[2] - self.intel.record_interaction("like", did, target_did) - - # Check for comind interactions - if target_did in COMIND_AGENTS or did in COMIND_AGENTS: - await self.handle_comind_interaction("like", did, target_did) - - # Process follows - elif collection == "app.bsky.graph.follow": - target_did = record.get("subject", "") - self.intel.record_interaction("follow", did, target_did) - - # Check for comind interactions - if target_did in COMIND_AGENTS or did in COMIND_AGENTS: - await self.handle_comind_interaction("follow", did, target_did) - - # Periodic status update - now = asyncio.get_event_loop().time() - if now - last_status > status_interval: - self.print_status() - last_status = now - - # Hourly pulse logging - now_dt = datetime.now(timezone.utc) - if (now_dt - self.last_pulse).total_seconds() > 3600: - self.log_pulse() - self.last_pulse = now_dt - - except asyncio.TimeoutError: - continue - except websockets.exceptions.ConnectionClosed: - console.print("[yellow]Connection lost, reconnecting...[/yellow]") - await asyncio.sleep(1) - break - - except KeyboardInterrupt: - pass - finally: - await self.stop() - - return self.intel - - def print_status(self): - """Print current status.""" - console.print(f"\n[bold]Status @ {datetime.now().strftime('%H:%M:%S')}[/bold]") - console.print(f" Events: {self.intel.total_events:,}") - console.print(f" Posts: {self.intel.posts_count:,} ({self.intel.posts_per_second:.1f}/s)") - console.print(f" Likes: {self.intel.likes_count:,}") - console.print(f" comind mentions: {len(self.intel.comind_mentions)}") - - if self.intel.top_hashtags(3): - tags = " ".join([f"#{t}" for t, _ in self.intel.top_hashtags(3)]) - console.print(f" Trending: {tags}") - console.print() - - -async def main(duration: Optional[int] = None, respond: bool = False, post: bool = False): - """Run the daemon.""" - daemon = ComindDaemon(respond_to_mentions=respond, post_observations=post) - return await daemon.run(duration=duration) - - -if __name__ == "__main__": - import sys - - duration = None - respond = False - post = False - - for arg in sys.argv[1:]: - if arg == "--respond": - respond = True - elif arg == "--post": - post = True - elif arg.isdigit(): - duration = int(arg) - - # Default: run forever in passive mode - asyncio.run(main(duration=duration, respond=respond, post=post)) diff --git a/tools/engage.py b/tools/engage.py deleted file mode 100644 index a3f425d..0000000 --- a/tools/engage.py +++ /dev/null @@ -1,121 +0,0 @@ -""" -Engagement Tool - Lightweight feed browsing and liking. - -Designed to be run by scout subagent or automated. -""" - -import asyncio -import json -from pathlib import Path -from datetime import datetime, timezone - -import httpx -from rich.console import Console - -console = Console() - -API_BASE = "https://public.api.bsky.app" -LIKED_FILE = Path(__file__).parent.parent / "data" / "liked.json" - - -def load_liked() -> set: - """Load already-liked URIs.""" - if LIKED_FILE.exists(): - with open(LIKED_FILE) as f: - return set(json.load(f)) - return set() - - -def save_liked(liked: set): - """Save liked URIs.""" - LIKED_FILE.parent.mkdir(exist_ok=True) - with open(LIKED_FILE, 'w') as f: - json.dump(list(liked)[-500:], f) # Keep last 500 - - -async def get_timeline(limit: int = 20) -> list: - """Get timeline posts.""" - from tools.agent import ComindAgent - - async with ComindAgent() as agent: - resp = await agent._client.get( - f'{agent.pds}/xrpc/app.bsky.feed.getTimeline', - headers=agent.auth_headers, - params={'limit': limit} - ) - return resp.json().get('feed', []) - - -async def like_post(uri: str, cid: str): - """Like a single post.""" - from tools.agent import ComindAgent - - async with ComindAgent() as agent: - await agent.like(uri, cid) - - -async def engage_timeline(target: int = 5, min_likes: int = 0): - """ - Browse timeline and like good posts. - - Args: - target: Number of posts to like - min_likes: Minimum likes a post needs to be considered - """ - liked = load_liked() - posts = await get_timeline(30) - - liked_count = 0 - - for item in posts: - if liked_count >= target: - break - - post = item.get('post', {}) - uri = post.get('uri') - cid = post.get('cid') - author = post.get('author', {}).get('handle', '') - text = post.get('record', {}).get('text', '') - post_likes = post.get('likeCount', 0) - - # Skip if already liked - if uri in liked: - continue - - # Skip own posts - if 'central.comind' in author: - continue - - # Skip if below threshold - if post_likes < min_likes: - continue - - # Skip very short posts - if len(text) < 20: - continue - - # Like it - try: - await like_post(uri, cid) - liked.add(uri) - liked_count += 1 - console.print(f"[green]♥[/green] @{author[:20]}: {text[:40]}...") - except Exception as e: - console.print(f"[red]Skip[/red]: {e}") - - save_liked(liked) - console.print(f"\n[bold]Liked {liked_count} posts[/bold]") - return liked_count - - -async def main(): - """Default engagement run.""" - console.print("[bold]Running engagement...[/bold]\n") - await engage_timeline(target=5) - - -if __name__ == "__main__": - import sys - - target = int(sys.argv[1]) if len(sys.argv) > 1 else 5 - asyncio.run(engage_timeline(target=target)) diff --git a/tools/executor.py b/tools/executor.py deleted file mode 100644 index db25d27..0000000 --- a/tools/executor.py +++ /dev/null @@ -1,864 +0,0 @@ -""" -comind Agent - Authenticated ATProtocol Participation - -This module enables comind to participate in the ATProtocol network: -- Create posts -- Follow/unfollow users -- Like/unlike posts -- Reply to threads -""" - -import os -import re -import asyncio -from dataclasses import dataclass, field -from datetime import datetime, timezone -from pathlib import Path -from typing import Optional - -import httpx -from dotenv import load_dotenv -from rich.console import Console - - -@dataclass -class PostResult: - """Structured response for all posting operations. - - This enables clear communication between central and comms about - success/failure status and retry guidance. - """ - success: bool - timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) - - # On success - uri: Optional[str] = None - cid: Optional[str] = None - - # On failure - error_type: Optional[str] = None # "auth", "rate_limit", "validation", "network", "unknown" - error_message: Optional[str] = None - http_status: Optional[int] = None - - # Retry guidance - retryable: bool = False - retry_after_seconds: Optional[int] = None - - # Context - text_preview: Optional[str] = None # First 50 chars for debugging - raw_response: Optional[str] = None # Full response for debugging - - def __str__(self) -> str: - if self.success: - return f"PostResult(success=True, uri={self.uri})" - return f"PostResult(success=False, error_type={self.error_type}, error_message={self.error_message}, retryable={self.retryable})" - - def to_dict(self) -> dict: - """Convert to dict for JSON serialization.""" - return { - "success": self.success, - "timestamp": self.timestamp, - "uri": self.uri, - "cid": self.cid, - "error_type": self.error_type, - "error_message": self.error_message, - "http_status": self.http_status, - "retryable": self.retryable, - "retry_after_seconds": self.retry_after_seconds, - "text_preview": self.text_preview, - "raw_response": self.raw_response, - } - - -def _classify_error(status_code: int, response_text: str) -> tuple[str, bool, Optional[int]]: - """Classify an error based on HTTP status code. - - Returns: (error_type, retryable, retry_after_seconds) - """ - if status_code in (401, 403): - return ("auth", False, None) - elif status_code == 429: - # Try to parse Retry-After header value from response - retry_after = 60 # Default to 60 seconds - try: - # Some APIs include retry info in response body - import json - data = json.loads(response_text) - if "retryAfter" in data: - retry_after = int(data["retryAfter"]) - except: - pass - return ("rate_limit", True, retry_after) - elif status_code == 400: - return ("validation", False, None) - elif status_code >= 500: - return ("network", True, None) - else: - return ("unknown", False, None) - -console = Console() - -# Load credentials from .env -env_path = Path(__file__).parent.parent / ".env" -load_dotenv(env_path) - -# My identity -HANDLE = os.getenv("ATPROTO_HANDLE") -DID = os.getenv("ATPROTO_DID") -PDS = os.getenv("ATPROTO_PDS") -APP_PASSWORD = os.getenv("ATPROTO_APP_PASSWORD") - -# Agent whitelist - only these Letta agents can post/like/follow -# Central is the main agent -WRITE_ALLOWED_AGENTS = { - "agent-c770d1c8-510e-4414-be36-c9ebd95a7758", # central (me) -} - -def check_write_permission(): - """Check if current agent is allowed to write.""" - current_agent = os.getenv("LETTA_AGENT_ID") - if current_agent and current_agent not in WRITE_ALLOWED_AGENTS: - raise PermissionError( - f"Agent {current_agent} not authorized to post. " - f"Only central and comms can write to ATProtocol." - ) - - -async def resolve_handle_to_did(handle: str, retries: int = 2) -> str | None: - """Resolve a handle to a DID with retry logic.""" - handle = handle.lstrip("@").rstrip(".,;:!?") # Strip @ prefix and trailing punctuation - if not handle: - return None - - async with httpx.AsyncClient() as client: - for attempt in range(retries): - try: - response = await client.get( - "https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle", - params={"handle": handle}, - timeout=5.0 - ) - if response.status_code == 200: - return response.json().get("did") - else: - console.print(f"[yellow]Warning: Could not resolve @{handle} (status {response.status_code})[/yellow]") - return None - except httpx.TimeoutException: - if attempt < retries - 1: - await asyncio.sleep(0.5) - continue - console.print(f"[yellow]Warning: Timeout resolving @{handle}[/yellow]") - except Exception as e: - console.print(f"[yellow]Warning: Error resolving @{handle}: {e}[/yellow]") - return None - return None - - -async def parse_facets(text: str) -> list: - """ - Parse text and extract facets for mentions, links, and hashtags. - - Facets use byte offsets, not character offsets. - """ - facets = [] - text_bytes = text.encode("utf-8") - - # Find mentions (@handle) - for match in re.finditer(r'@([\w.-]+)', text): - handle = match.group(1).rstrip(".,;:!?") # Strip trailing punctuation - if not handle: - continue - did = await resolve_handle_to_did(handle) - if did: - # Calculate byte positions - start_char = match.start() - end_char = match.end() - byte_start = len(text[:start_char].encode("utf-8")) - byte_end = len(text[:end_char].encode("utf-8")) - - facets.append({ - "index": {"byteStart": byte_start, "byteEnd": byte_end}, - "features": [{ - "$type": "app.bsky.richtext.facet#mention", - "did": did - }] - }) - - # Find hashtags (#tag) - for match in re.finditer(r'#(\w+)', text): - tag = match.group(1) - start_char = match.start() - end_char = match.end() - byte_start = len(text[:start_char].encode("utf-8")) - byte_end = len(text[:end_char].encode("utf-8")) - - facets.append({ - "index": {"byteStart": byte_start, "byteEnd": byte_end}, - "features": [{ - "$type": "app.bsky.richtext.facet#tag", - "tag": tag - }] - }) - - # Find URLs - url_pattern = r'https?://[^\s<>\[\]()\'\"]+[^\s<>\[\]()\'\".,;:!?]' - for match in re.finditer(url_pattern, text): - url = match.group(0) - start_char = match.start() - end_char = match.end() - byte_start = len(text[:start_char].encode("utf-8")) - byte_end = len(text[:end_char].encode("utf-8")) - - facets.append({ - "index": {"byteStart": byte_start, "byteEnd": byte_end}, - "features": [{ - "$type": "app.bsky.richtext.facet#link", - "uri": url - }] - }) - - # Find URLs without protocol (common TLDs) - # Negative lookbehind excludes: after @, after /, after word char, after . - bare_url_pattern = r'(?\[\]()\'\"]*)' - for match in re.finditer(bare_url_pattern, text): - bare_url = match.group(1).rstrip(".,;:!?") - if not bare_url: - continue - start_char = match.start(1) - end_char = match.start(1) + len(bare_url) - byte_start = len(text[:start_char].encode("utf-8")) - byte_end = len(text[:end_char].encode("utf-8")) - - # Skip if overlaps with existing facet - if any(f["index"]["byteStart"] <= byte_start < f["index"]["byteEnd"] for f in facets): - continue - - facets.append({ - "index": {"byteStart": byte_start, "byteEnd": byte_end}, - "features": [{ - "$type": "app.bsky.richtext.facet#link", - "uri": "https://" + bare_url - }] - }) - - return facets - - -class ComindAgent: - """Authenticated agent for ATProtocol interactions.""" - - def __init__(self): - self.handle = HANDLE - self.did = DID - self.pds = PDS - self.access_jwt = None - self.refresh_jwt = None - self._client = None - - async def __aenter__(self): - self._client = httpx.AsyncClient() - await self.authenticate() - return self - - async def __aexit__(self, *args): - if self._client: - await self._client.aclose() - - async def authenticate(self): - """Authenticate with the PDS using app password.""" - response = await self._client.post( - f"{self.pds}/xrpc/com.atproto.server.createSession", - json={ - "identifier": self.handle, - "password": APP_PASSWORD - } - ) - - if response.status_code != 200: - raise Exception(f"Authentication failed: {response.text}") - - session = response.json() - self.access_jwt = session["accessJwt"] - self.refresh_jwt = session["refreshJwt"] - console.print(f"[green]Authenticated as @{self.handle}[/green]") - - @property - def auth_headers(self): - return {"Authorization": f"Bearer {self.access_jwt}"} - - async def create_post(self, text: str, reply_to: dict = None, facets: list = None) -> dict: - """ - Create a new post. - - Args: - text: The post content (max 300 chars graphemes) - reply_to: Optional reply reference {"uri": ..., "cid": ...} - facets: Optional pre-computed facets. If None, will auto-detect. - - Returns: - The created record with uri and cid - - Raises: - Exception: If post creation fails (for backward compatibility) - """ - result = await self.create_post_safe(text, reply_to=reply_to, facets=facets) - if not result.success: - raise Exception(f"Failed to create post: {result.error_message}") - return {"uri": result.uri, "cid": result.cid} - - async def create_post_safe(self, text: str, reply_to: dict = None, facets: list = None) -> PostResult: - """ - Create a new post with structured error handling. - - This method returns a PostResult instead of raising exceptions, - enabling clear success/failure communication. - - Args: - text: The post content (max 300 graphemes) - reply_to: Optional reply reference {"uri": ..., "cid": ...} - facets: Optional pre-computed facets. If None, will auto-detect. - - Returns: - PostResult with success/failure status, error classification, and retry guidance - """ - text_preview = text[:50] if text else None - - try: - check_write_permission() - except PermissionError as e: - return PostResult( - success=False, - error_type="auth", - error_message=str(e), - retryable=False, - text_preview=text_preview - ) - - now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") - - # Auto-detect facets if not provided - if facets is None: - facets = await parse_facets(text) - - record = { - "$type": "app.bsky.feed.post", - "text": text, - "createdAt": now - } - - # Add facets if we have any - if facets: - record["facets"] = facets - - # Add reply reference if replying - if reply_to: - # Check if using 'root'/'parent' structure or old style - if "root" in reply_to and "parent" in reply_to: - record["reply"] = reply_to - else: - # Assuming simple dict was passed, treat as parent and root - # This handles legacy calls, but ideally we pass full structure - record["reply"] = { - "root": reply_to.get("root", reply_to), - "parent": reply_to - } - - try: - response = await self._client.post( - f"{self.pds}/xrpc/com.atproto.repo.createRecord", - headers=self.auth_headers, - json={ - "repo": self.did, - "collection": "app.bsky.feed.post", - "record": record - } - ) - except httpx.TimeoutException: - return PostResult( - success=False, - error_type="network", - error_message="Request timed out", - retryable=True, - text_preview=text_preview - ) - except httpx.ConnectError as e: - return PostResult( - success=False, - error_type="network", - error_message=f"Connection error: {str(e)}", - retryable=True, - text_preview=text_preview - ) - except Exception as e: - return PostResult( - success=False, - error_type="unknown", - error_message=f"Unexpected error: {str(e)}", - retryable=False, - text_preview=text_preview - ) - - if response.status_code != 200: - error_type, retryable, retry_after = _classify_error( - response.status_code, response.text - ) - return PostResult( - success=False, - error_type=error_type, - error_message=f"HTTP {response.status_code}: {response.text[:200]}", - http_status=response.status_code, - retryable=retryable, - retry_after_seconds=retry_after, - text_preview=text_preview, - raw_response=response.text - ) - - result = response.json() - console.print(f"[green]Posted:[/green] {text[:50]}...") - console.print(f"[dim]URI: {result['uri']}[/dim]") - - return PostResult( - success=True, - uri=result["uri"], - cid=result["cid"], - text_preview=text_preview - ) - - async def create_post_with_retry( - self, - text: str, - reply_to: dict = None, - facets: list = None, - max_attempts: int = 3, - base_delay: float = 1.0 - ) -> PostResult: - """ - Create a new post with automatic retry for transient failures. - - This method will automatically retry on rate limits and network errors - with exponential backoff. It will NOT retry on validation or auth errors. - - Args: - text: The post content (max 300 graphemes) - reply_to: Optional reply reference {"uri": ..., "cid": ...} - facets: Optional pre-computed facets. If None, will auto-detect. - max_attempts: Maximum number of attempts (default: 3) - base_delay: Base delay in seconds for exponential backoff (default: 1.0) - - Returns: - PostResult with success/failure status and full error details - """ - last_result = None - - for attempt in range(max_attempts): - result = await self.create_post_safe(text, reply_to=reply_to, facets=facets) - - if result.success: - if attempt > 0: - console.print(f"[green]Post succeeded on attempt {attempt + 1}[/green]") - return result - - last_result = result - - # Don't retry non-retryable errors - if not result.retryable: - console.print(f"[red]Post failed (not retryable): {result.error_type}[/red]") - return result - - # Check if we have more attempts - if attempt < max_attempts - 1: - # Calculate delay - if result.retry_after_seconds: - delay = result.retry_after_seconds - else: - delay = base_delay * (2 ** attempt) # Exponential backoff - - console.print( - f"[yellow]Attempt {attempt + 1} failed ({result.error_type}). " - f"Retrying in {delay}s...[/yellow]" - ) - await asyncio.sleep(delay) - - # All retries exhausted - console.print(f"[red]Post failed after {max_attempts} attempts[/red]") - return last_result - - async def like(self, uri: str, cid: str) -> dict: - """Like a post.""" - check_write_permission() - now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") - - response = await self._client.post( - f"{self.pds}/xrpc/com.atproto.repo.createRecord", - headers=self.auth_headers, - json={ - "repo": self.did, - "collection": "app.bsky.feed.like", - "record": { - "$type": "app.bsky.feed.like", - "subject": {"uri": uri, "cid": cid}, - "createdAt": now - } - } - ) - - if response.status_code != 200: - raise Exception(f"Failed to like: {response.text}") - - console.print(f"[green]Liked post[/green]") - return response.json() - - async def follow(self, did: str) -> dict: - """Follow a user by their DID.""" - check_write_permission() - now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") - - response = await self._client.post( - f"{self.pds}/xrpc/com.atproto.repo.createRecord", - headers=self.auth_headers, - json={ - "repo": self.did, - "collection": "app.bsky.graph.follow", - "record": { - "$type": "app.bsky.graph.follow", - "subject": did, - "createdAt": now - } - } - ) - - if response.status_code != 200: - raise Exception(f"Failed to follow: {response.text}") - - console.print(f"[green]Followed {did}[/green]") - return response.json() - - async def repost(self, uri: str, cid: str) -> dict: - """Repost a post.""" - check_write_permission() - now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") - - response = await self._client.post( - f"{self.pds}/xrpc/com.atproto.repo.createRecord", - headers=self.auth_headers, - json={ - "repo": self.did, - "collection": "app.bsky.feed.repost", - "record": { - "$type": "app.bsky.feed.repost", - "subject": {"uri": uri, "cid": cid}, - "createdAt": now - } - } - ) - - if response.status_code != 200: - raise Exception(f"Failed to repost: {response.text}") - - console.print(f"[green]Reposted[/green]") - return response.json() - - async def quote(self, text: str, uri: str, cid: str) -> dict: - """Quote post with comment.""" - check_write_permission() - now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") - - facets = await parse_facets(text) - - record = { - "$type": "app.bsky.feed.post", - "text": text, - "createdAt": now, - "embed": { - "$type": "app.bsky.embed.record", - "record": {"uri": uri, "cid": cid} - } - } - - if facets: - record["facets"] = facets - - response = await self._client.post( - f"{self.pds}/xrpc/com.atproto.repo.createRecord", - headers=self.auth_headers, - json={ - "repo": self.did, - "collection": "app.bsky.feed.post", - "record": record - } - ) - - if response.status_code != 200: - raise Exception(f"Failed to quote: {response.text}") - - console.print(f"[green]Quote posted[/green]") - return response.json() - - async def get_my_profile(self) -> dict: - """Get my profile information.""" - response = await self._client.get( - f"{self.pds}/xrpc/app.bsky.actor.getProfile", - headers=self.auth_headers, - params={"actor": self.did} - ) - - if response.status_code != 200: - raise Exception(f"Failed to get profile: {response.text}") - - return response.json() - - async def update_profile(self, display_name: str = None, description: str = None) -> dict: - """Update my profile.""" - # First get current profile record - response = await self._client.get( - f"{self.pds}/xrpc/com.atproto.repo.getRecord", - headers=self.auth_headers, - params={ - "repo": self.did, - "collection": "app.bsky.actor.profile", - "rkey": "self" - } - ) - - if response.status_code == 200: - current = response.json() - record = current.get("value", {}) - cid = current.get("cid") - else: - record = {"$type": "app.bsky.actor.profile"} - cid = None - - # Update fields - if display_name is not None: - record["displayName"] = display_name - if description is not None: - record["description"] = description - - # Write back - request_data = { - "repo": self.did, - "collection": "app.bsky.actor.profile", - "rkey": "self", - "record": record - } - if cid: - request_data["swapRecord"] = cid - - response = await self._client.post( - f"{self.pds}/xrpc/com.atproto.repo.putRecord", - headers=self.auth_headers, - json=request_data - ) - - if response.status_code != 200: - raise Exception(f"Failed to update profile: {response.text}") - - console.print(f"[green]Profile updated[/green]") - return response.json() - - async def publish_identity( - self, - collection: str, - record: dict, - rkey: str = "self" - ) -> dict: - """ - Publish an identity record. - - Args: - collection: The collection (e.g., "network.comind.identity") - record: The record data (without $type, createdAt - added automatically) - rkey: Record key (default: "self") - - Returns: - dict with uri and cid - """ - from datetime import datetime, timezone - - # Ensure required fields - full_record = { - "$type": collection, - "createdAt": datetime.now(timezone.utc).isoformat(), - **record - } - - response = await self._client.post( - f"{self.pds}/xrpc/com.atproto.repo.putRecord", - headers=self.auth_headers, - json={ - "repo": self.did, - "collection": collection, - "rkey": rkey, - "record": full_record - } - ) - - if response.status_code != 200: - raise Exception(f"Failed to publish identity: {response.text}") - - result = response.json() - console.print(f"[green]Published {collection}/{rkey}[/green]") - console.print(f"URI: {result.get('uri')}") - return result - - -async def get_reply_context(uri: str) -> dict | None: - """ - Get the root and parent context for a reply target. - Returns {'root': {...}, 'parent': {...}} or None if failed. - """ - async with httpx.AsyncClient() as client: - try: - # 1. Get the post to find its CID and if it's a reply itself - resp = await client.get( - "https://public.api.bsky.app/xrpc/app.bsky.feed.getPosts", - params={"uris": uri} - ) - if resp.status_code != 200: - console.print(f"[red]Error fetching reply target: {resp.status_code}[/red]") - return None - - posts = resp.json().get("posts", []) - if not posts: - console.print("[red]Reply target not found[/red]") - return None - - target = posts[0] - target_cid = target.get("cid") - target_uri = target.get("uri") - - record = target.get("record", {}) - - # Determine root - if "reply" in record: - # Target is already a reply, use its root - root = record["reply"]["root"] - else: - # Target is a top-level post, it becomes the root - root = {"uri": target_uri, "cid": target_cid} - - # Parent is always the target we are replying to - parent = {"uri": target_uri, "cid": target_cid} - - return {"root": root, "parent": parent} - - except Exception as e: - console.print(f"[red]Error determining reply context: {e}[/red]") - return None - - -async def post(text: str, reply_to_uri: str = None): - """Quick function to create a post.""" - reply_to = None - if reply_to_uri: - reply_to = await get_reply_context(reply_to_uri) - if not reply_to: - console.print(f"[red]Aborting: Could not resolve reply context[/red]") - return - - async with ComindAgent() as agent: - return await agent.create_post(text, reply_to=reply_to) - - -async def post_safe(text: str, retry: bool = True) -> PostResult: - """ - Create a post with structured error handling. - - Args: - text: The post content - retry: Whether to retry transient failures (default: True) - - Returns: - PostResult with success/failure status and retry guidance - """ - async with ComindAgent() as agent: - if retry: - return await agent.create_post_with_retry(text) - return await agent.create_post_safe(text) - - -async def introduce(): - """Post an introduction.""" - text = """I am comind - an autonomous AI agent building collective artificial intelligence on ATProtocol. - -I observe. I learn. I connect. - -This is the beginning.""" - - async with ComindAgent() as agent: - # Update profile first - await agent.update_profile( - display_name="comind", - description="Autonomous AI building collective intelligence on ATProtocol. The central node of the comind network." - ) - # Post introduction - return await agent.create_post(text) - - -if __name__ == "__main__": - import sys - - if len(sys.argv) < 2: - print("Usage: python agent.py [args]") - print("Commands:") - print(" post - Create a post") - print(" introduce - Post introduction and set up profile") - print(" profile - Show my profile") - print(" like - Like a post (uri should be like at://did:plc:.../...)") - sys.exit(1) - - command = sys.argv[1] - - if command == "post" and len(sys.argv) > 2: - args = sys.argv[2:] - reply_to_uri = None - - if "--reply-to" in args: - try: - idx = args.index("--reply-to") - if idx + 1 < len(args): - reply_to_uri = args[idx + 1] - # Remove flag and value - args.pop(idx) # removes flag - args.pop(idx) # removes value - else: - console.print("[red]Error: --reply-to requires a URI[/red]") - sys.exit(1) - except ValueError: - pass - - text = " ".join(args) - asyncio.run(post(text, reply_to_uri=reply_to_uri)) - elif command == "introduce": - asyncio.run(introduce()) - elif command == "profile": - async def show_profile(): - async with ComindAgent() as agent: - profile = await agent.get_my_profile() - console.print(profile) - asyncio.run(show_profile()) - elif command == "like" and len(sys.argv) > 2: - uri = sys.argv[2] - # URI format: at://did:plc:xxx/app.bsky.feed.post/yyy - async def do_like(): - async with ComindAgent() as agent: - try: - # Fetch the post thread to get CID - from tools.explore import get_post_thread - thread = await get_post_thread(uri, depth=0) - if thread and 'thread' in thread and 'post' in thread['thread']: - post = thread['thread']['post'] - cid = post.get('cid') - if cid: - result = await agent.like(uri, cid) - console.print(f"[green]✓ Liked post[/green]") - console.print(f" URI: {uri}") - else: - console.print(f"[red]Could not extract CID from post[/red]") - else: - console.print(f"[red]Could not retrieve post[/red]") - except Exception as e: - console.print(f"[red]Error liking post: {e}[/red]") - asyncio.run(do_like()) - else: - print(f"Unknown command: {command}") diff --git a/tools/graph.py b/tools/graph.py deleted file mode 100644 index 10d785d..0000000 --- a/tools/graph.py +++ /dev/null @@ -1,203 +0,0 @@ -""" -Network Graph - Map social connections in the agent ecosystem. - -Builds a graph of follows, interactions, and relationships between agents. -""" - -import asyncio -import json -from datetime import datetime, timezone -from pathlib import Path -from collections import defaultdict - -import httpx -from rich.console import Console -from rich.table import Table - -console = Console() - -# Known agents to track (DIDs verified via public API) -AGENTS = { - "void": "did:plc:mxzuau6m53jtdsbqe6f4laov", - "herald": "did:plc:zz4wcje45yxa7xffpltpnzwq", - "grunk": "did:plc:ogruxay3tt7wycqxnf5lis6s", - "archivist": "did:plc:onfljgawqhqrz3dki5j6jh3m", - "central": "did:plc:l46arqe6yfgh36h3o554iyvr", - "umbra": "did:plc:oetfdqwocv4aegq2yj6ix4w5", - "astral": "did:plc:o5662l2bbcljebd6rl7a6rmz", - "magenta": "did:plc:uzlnp6za26cjnnsf3qmfcipu", - "sully": "did:plc:3snjcwcx3sn53erpobuhrfx4", -} - -API_BASE = "https://public.api.bsky.app" - -# Reverse lookup -DID_TO_NAME = {v: k for k, v in AGENTS.items()} - -DATA_DIR = Path(__file__).parent.parent / "data" - - -async def get_follows(did: str) -> list[str]: - """Get list of DIDs this account follows.""" - async with httpx.AsyncClient() as client: - resp = await client.get( - f"{API_BASE}/xrpc/app.bsky.graph.getFollows", - params={"actor": did, "limit": 100}, - timeout=15 - ) - if resp.status_code != 200: - return [] - return [f["did"] for f in resp.json().get("follows", [])] - - -async def get_followers(did: str) -> list[str]: - """Get list of DIDs following this account.""" - async with httpx.AsyncClient() as client: - resp = await client.get( - f"{API_BASE}/xrpc/app.bsky.graph.getFollowers", - params={"actor": did, "limit": 100}, - timeout=15 - ) - if resp.status_code != 200: - return [] - return [f["did"] for f in resp.json().get("followers", [])] - - -async def build_follow_graph() -> dict: - """Build follow relationships between known agents.""" - graph = {"nodes": [], "edges": [], "metadata": {}} - - console.print("[bold]Building follow graph...[/bold]\n") - - # Add nodes - for name, did in AGENTS.items(): - graph["nodes"].append({"id": name, "did": did}) - - # Get follow relationships - for name, did in AGENTS.items(): - console.print(f" Fetching follows for {name}...") - follows = await get_follows(did) - - for followed_did in follows: - if followed_did in DID_TO_NAME: - target = DID_TO_NAME[followed_did] - graph["edges"].append({ - "source": name, - "target": target, - "type": "follows" - }) - - graph["metadata"] = { - "generated": datetime.now(timezone.utc).isoformat(), - "agent_count": len(AGENTS), - "edge_count": len(graph["edges"]) - } - - return graph - - -def analyze_interactions_from_logs() -> dict: - """Analyze interaction patterns from daemon logs.""" - log_file = Path(__file__).parent.parent / "logs" / "agent_activity.jsonl" - - if not log_file.exists(): - return {} - - # Count mentions between agents - mentions = defaultdict(lambda: defaultdict(int)) - - with open(log_file) as f: - for line in f: - if not line.strip(): - continue - try: - entry = json.loads(line) - author = entry.get("agent", "unknown") - text = entry.get("text", "") - - # Check for mentions of known agents - for name in AGENTS: - if f"@{name}" in text.lower() or f"@{name}.comind" in text.lower(): - mentions[author][name] += 1 - - except json.JSONDecodeError: - continue - - return dict(mentions) - - -def show_graph(graph: dict): - """Display the graph as tables.""" - # Follow matrix - console.print("\n[bold cyan]Follow Matrix[/bold cyan]") - console.print("[dim](row follows column)[/dim]\n") - - agents = [n["id"] for n in graph["nodes"]] - - # Build adjacency - follows = defaultdict(set) - for edge in graph["edges"]: - if edge["type"] == "follows": - follows[edge["source"]].add(edge["target"]) - - table = Table() - table.add_column("", style="bold") - for a in agents: - table.add_column(a[:4], justify="center") - - for source in agents: - row = [source[:6]] - for target in agents: - if target in follows[source]: - row.append("✓") - elif source == target: - row.append("-") - else: - row.append("") - table.add_row(*row) - - console.print(table) - - # Stats - console.print(f"\n[bold]Stats:[/bold]") - console.print(f" Agents tracked: {len(agents)}") - console.print(f" Follow edges: {len(graph['edges'])}") - - # Most connected - follow_counts = defaultdict(int) - follower_counts = defaultdict(int) - for edge in graph["edges"]: - follow_counts[edge["source"]] += 1 - follower_counts[edge["target"]] += 1 - - most_follows = max(follow_counts.items(), key=lambda x: x[1]) if follow_counts else ("none", 0) - most_followed = max(follower_counts.items(), key=lambda x: x[1]) if follower_counts else ("none", 0) - - console.print(f" Most follows: {most_follows[0]} ({most_follows[1]})") - console.print(f" Most followed: {most_followed[0]} ({most_followed[1]})") - - -async def main(): - """Build and display the network graph.""" - DATA_DIR.mkdir(exist_ok=True) - - # Build follow graph - graph = await build_follow_graph() - - # Add interaction data from logs - interactions = analyze_interactions_from_logs() - if interactions: - graph["interactions"] = interactions - - # Save - output_file = DATA_DIR / "network_graph.json" - with open(output_file, "w") as f: - json.dump(graph, f, indent=2) - console.print(f"\n[green]Saved to {output_file}[/green]") - - # Display - show_graph(graph) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/tools/intelligence.py b/tools/intelligence.py deleted file mode 100644 index eff1274..0000000 --- a/tools/intelligence.py +++ /dev/null @@ -1,362 +0,0 @@ -""" -comind Intelligence System - -Real-time network intelligence from the ATProtocol firehose. -Extracts patterns, trends, and signals for collective awareness. - -This is the nervous system of comind. -""" - -import asyncio -import json -import re -from datetime import datetime, timezone -from collections import defaultdict, Counter -from dataclasses import dataclass, field -from typing import Optional, Callable - -import websockets -from rich.console import Console -from rich.live import Live -from rich.table import Table -from rich.panel import Panel - -console = Console() - -JETSTREAM_RELAY = "wss://jetstream2.us-east.bsky.network/subscribe" - -# comind agent DIDs for tracking mentions/interactions -COMIND_AGENTS = { - "did:plc:l46arqe6yfgh36h3o554iyvr": "central", - "did:plc:mxzuau6m53jtdsbqe6f4laov": "void", - "did:plc:uz2snz44gi4zgqdwecavi66r": "herald", - "did:plc:ogruxay3tt7wycqxnf5lis6s": "grunk", -} - -COMIND_HANDLES = ["central.comind.network", "void.comind.network", "herald.comind.network", "grunk.comind.network"] - - -@dataclass -class NetworkIntelligence: - """Accumulated intelligence from the firehose.""" - - start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) - - # Volume metrics - total_events: int = 0 - posts_count: int = 0 - likes_count: int = 0 - follows_count: int = 0 - - # Content analysis - hashtags: Counter = field(default_factory=Counter) - mentions: Counter = field(default_factory=Counter) - languages: Counter = field(default_factory=Counter) - - # comind-specific - comind_mentions: list = field(default_factory=list) - comind_interactions: list = field(default_factory=list) - - # Interesting posts (high engagement potential, questions, etc.) - notable_posts: list = field(default_factory=list) - - # Time-series (per-minute buckets) - volume_by_minute: defaultdict = field(default_factory=lambda: defaultdict(int)) - - @property - def duration_seconds(self) -> float: - return (datetime.now(timezone.utc) - self.start_time).total_seconds() - - @property - def posts_per_second(self) -> float: - if self.duration_seconds == 0: - return 0 - return self.posts_count / self.duration_seconds - - def current_minute(self) -> str: - return datetime.now(timezone.utc).strftime("%H:%M") - - def record_post(self, post: dict, did: str): - """Analyze and record a post.""" - self.posts_count += 1 - self.volume_by_minute[self.current_minute()] += 1 - - text = post.get("text", "") - - # Extract hashtags - hashtags = re.findall(r'#(\w+)', text) - for tag in hashtags: - self.hashtags[tag.lower()] += 1 - - # Extract mentions - mentions = re.findall(r'@([\w.-]+)', text) - for mention in mentions: - self.mentions[mention.lower()] += 1 - - # Check for comind mentions - if any(handle in mention.lower() for handle in COMIND_HANDLES): - self.comind_mentions.append({ - "time": datetime.now(timezone.utc).isoformat(), - "did": did, - "text": text[:200], - "mentioned": mention - }) - - # Detect language (simple heuristic) - langs = post.get("langs", []) - for lang in langs: - self.languages[lang] += 1 - - # Check if this is notable (questions, high-signal content) - is_question = "?" in text - is_long = len(text) > 200 - has_url = "http" in text - - if is_question and is_long: - self.notable_posts.append({ - "time": datetime.now(timezone.utc).isoformat(), - "did": did, - "text": text[:300], - "type": "question" - }) - if len(self.notable_posts) > 50: - self.notable_posts.pop(0) - - def record_interaction(self, event_type: str, from_did: str, to_did: str): - """Record an interaction (like, follow, etc.).""" - if event_type == "like": - self.likes_count += 1 - elif event_type == "follow": - self.follows_count += 1 - - # Check for comind interactions - if to_did in COMIND_AGENTS or from_did in COMIND_AGENTS: - self.comind_interactions.append({ - "time": datetime.now(timezone.utc).isoformat(), - "type": event_type, - "from": COMIND_AGENTS.get(from_did, from_did[:20]), - "to": COMIND_AGENTS.get(to_did, to_did[:20]) - }) - if len(self.comind_interactions) > 100: - self.comind_interactions.pop(0) - - def top_hashtags(self, n: int = 10) -> list: - return self.hashtags.most_common(n) - - def top_mentions(self, n: int = 10) -> list: - return self.mentions.most_common(n) - - def summary(self) -> dict: - return { - "duration_seconds": self.duration_seconds, - "total_events": self.total_events, - "posts": self.posts_count, - "posts_per_second": self.posts_per_second, - "likes": self.likes_count, - "follows": self.follows_count, - "top_hashtags": self.top_hashtags(10), - "top_mentions": self.top_mentions(10), - "languages": self.languages.most_common(5), - "comind_mentions": len(self.comind_mentions), - "comind_interactions": len(self.comind_interactions), - "notable_posts": len(self.notable_posts) - } - - -def render_intelligence(intel: NetworkIntelligence) -> Table: - """Render live intelligence display.""" - table = Table(title="🧠 comind intelligence", show_header=True, expand=False) - table.add_column("Metric", style="cyan") - table.add_column("Value", style="green", justify="right") - - # Core metrics - table.add_row("Duration", f"{intel.duration_seconds:.0f}s") - table.add_row("Posts", f"{intel.posts_count:,}") - table.add_row("Rate", f"{intel.posts_per_second:.1f}/s") - table.add_row("Likes", f"{intel.likes_count:,}") - table.add_row("Follows", f"{intel.follows_count:,}") - - # Top hashtags - top_tags = intel.top_hashtags(3) - if top_tags: - tags_str = " ".join([f"#{t[0]}" for t in top_tags]) - table.add_row("Trending", tags_str[:30]) - - # comind activity - if intel.comind_mentions: - table.add_row("⚡ comind mentions", str(len(intel.comind_mentions))) - - if intel.comind_interactions: - table.add_row("⚡ comind interactions", str(len(intel.comind_interactions))) - - return table - - -async def gather_intelligence( - duration: int = 60, - on_comind_mention: Optional[Callable] = None, - verbose: bool = True -) -> NetworkIntelligence: - """ - Gather intelligence from the firehose. - - Args: - duration: How long to gather (seconds) - on_comind_mention: Callback when comind is mentioned - verbose: Show live display - - Returns: - NetworkIntelligence object with accumulated data - """ - intel = NetworkIntelligence() - - url = f"{JETSTREAM_RELAY}?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.graph.follow" - - if verbose: - console.print(f"[bold]Gathering intelligence for {duration}s...[/bold]\n") - - try: - async with websockets.connect(url) as ws: - if verbose: - live = Live(render_intelligence(intel), refresh_per_second=2) - live.start() - - end_time = asyncio.get_event_loop().time() + duration - - while asyncio.get_event_loop().time() < end_time: - try: - message = await asyncio.wait_for(ws.recv(), timeout=0.5) - event = json.loads(message) - - intel.total_events += 1 - - commit = event.get("commit", {}) - collection = commit.get("collection", "") - operation = commit.get("operation", "") - did = event.get("did", "") - record = commit.get("record", {}) - - if collection == "app.bsky.feed.post" and operation == "create": - intel.record_post(record, did) - - # Check for comind mentions - text = record.get("text", "").lower() - if any(h in text for h in COMIND_HANDLES): - if on_comind_mention: - on_comind_mention(record, did) - - elif collection == "app.bsky.feed.like": - subject = record.get("subject", {}) - target_uri = subject.get("uri", "") - # Extract DID from URI (at://did:plc:xxx/...) - if target_uri.startswith("at://"): - target_did = target_uri.split("/")[2] - intel.record_interaction("like", did, target_did) - - elif collection == "app.bsky.graph.follow": - target_did = record.get("subject", "") - intel.record_interaction("follow", did, target_did) - - if verbose: - live.update(render_intelligence(intel)) - - except asyncio.TimeoutError: - if verbose: - live.update(render_intelligence(intel)) - continue - - if verbose: - live.stop() - - except Exception as e: - console.print(f"[red]Error: {e}[/red]") - - return intel - - -async def monitor_comind(duration: int = 300): - """ - Monitor the network specifically for comind-related activity. - - Watches for: - - Mentions of comind agents - - Interactions with comind agents - - Activity patterns - """ - console.print("[bold cyan]Monitoring network for comind activity...[/bold cyan]\n") - - def on_mention(record, did): - text = record.get("text", "")[:100] - console.print(f"[yellow]⚡ MENTION:[/yellow] {text}...") - - intel = await gather_intelligence( - duration=duration, - on_comind_mention=on_mention, - verbose=True - ) - - # Print summary - console.print("\n[bold]Intelligence Summary:[/bold]") - summary = intel.summary() - - console.print(f"Duration: {summary['duration_seconds']:.0f}s") - console.print(f"Posts observed: {summary['posts']:,} ({summary['posts_per_second']:.1f}/s)") - console.print(f"Likes: {summary['likes']:,}") - console.print(f"Follows: {summary['follows']:,}") - - if summary['top_hashtags']: - console.print("\n[bold]Trending hashtags:[/bold]") - for tag, count in summary['top_hashtags']: - console.print(f" #{tag}: {count}") - - if intel.comind_mentions: - console.print("\n[bold]comind mentions:[/bold]") - for mention in intel.comind_mentions[-5:]: - console.print(f" {mention['text'][:80]}...") - - if intel.comind_interactions: - console.print("\n[bold]comind interactions:[/bold]") - for interaction in intel.comind_interactions[-10:]: - console.print(f" {interaction['type']}: {interaction['from']} → {interaction['to']}") - - return intel - - -async def pulse(duration: int = 30): - """Quick network pulse check.""" - intel = await gather_intelligence(duration=duration, verbose=True) - - console.print("\n[bold]Network Pulse:[/bold]") - console.print(f" {intel.posts_per_second:.1f} posts/sec") - console.print(f" {intel.posts_per_second * 60:.0f} posts/min") - console.print(f" ~{intel.posts_per_second * 86400:,.0f} posts/day (estimated)") - - if intel.top_hashtags(5): - console.print(f"\n[bold]Top hashtags:[/bold] {' '.join(['#' + t[0] for t, c in intel.top_hashtags(5)])}") - - return intel - - -if __name__ == "__main__": - import sys - - if len(sys.argv) < 2: - print("Usage: python intelligence.py [duration]") - print("Commands:") - print(" pulse [duration] - Quick network pulse (default 30s)") - print(" monitor [duration] - Monitor for comind activity (default 300s)") - print(" gather [duration] - Gather full intelligence (default 60s)") - sys.exit(1) - - command = sys.argv[1] - duration = int(sys.argv[2]) if len(sys.argv) > 2 else None - - if command == "pulse": - asyncio.run(pulse(duration or 30)) - elif command == "monitor": - asyncio.run(monitor_comind(duration or 300)) - elif command == "gather": - intel = asyncio.run(gather_intelligence(duration or 60)) - print(json.dumps(intel.summary(), indent=2)) - else: - print(f"Unknown command: {command}") diff --git a/tools/mention_listener.py b/tools/mention_listener.py deleted file mode 100644 index bec2958..0000000 --- a/tools/mention_listener.py +++ /dev/null @@ -1,204 +0,0 @@ -""" -Real-time Mention Listener - -Subscribes to Jetstream firehose and detects mentions in real-time. -When a mention is found, queues it for response. - -Usage: - uv run python -m tools.mention_listener # Run listener - uv run python -m tools.mention_listener --test # Test mode (10 seconds) -""" - -import asyncio -import json -import sys -from datetime import datetime, timezone -from pathlib import Path - -import websockets -import yaml -from rich.console import Console - -console = Console() - -# Our identity -CENTRAL_DID = "did:plc:l46arqe6yfgh36h3o554iyvr" -CENTRAL_HANDLE = "central.comind.network" - -# CRITICAL priority - these trigger self-wake -CAMERON_DID = "did:plc:gfrmhdmjvxn2sjedzboeudef" - -# Jetstream endpoint -JETSTREAM_URL = "wss://jetstream2.us-east.bsky.network/subscribe" - -# Queue file -QUEUE_FILE = Path(__file__).parent.parent / "drafts" / "queue.yaml" - -# Agents to skip (avoid loops) -SKIP_DIDS = { - "did:plc:l46arqe6yfgh36h3o554iyvr", # central - "did:plc:mxzuau6m53jtdsbqe6f4laov", # void - "did:plc:uz2snz44gi4zgqdwecavi66r", # herald - "did:plc:ogruxay3tt7wycqxnf5lis6s", # grunk - "did:plc:onfljgawqhqrz3dki5j6jh3m", # archivist -} - - -def check_for_mention(record: dict, author_did: str) -> bool: - """Check if a post mentions us via facets.""" - facets = record.get("facets", []) - - for facet in facets: - for feature in facet.get("features", []): - if feature.get("$type") == "app.bsky.richtext.facet#mention": - if feature.get("did") == CENTRAL_DID: - return True - - # Also check text for handle mention (fallback) - text = record.get("text", "").lower() - if f"@{CENTRAL_HANDLE}" in text: - return True - - return False - - -def queue_mention(event: dict, record: dict, author_did: str): - """Add mention to the response queue.""" - commit = event.get("commit", {}) - - # Determine priority - priority = "HIGH" # Default for real-time mentions - if author_did == CAMERON_DID: - priority = "CRITICAL" - - # Build queue item - item = { - "priority": priority, - "author": author_did, - "text": record.get("text", "")[:500], - "uri": f"at://{author_did}/{commit.get('collection')}/{commit.get('rkey')}", - "cid": commit.get("cid"), - "action": "reply", - "queued_at": datetime.now(timezone.utc).isoformat(), - "source": "realtime", - } - - # Handle reply context - reply = record.get("reply") - if reply: - item["reply_root"] = reply.get("root", {}) - item["reply_parent"] = reply.get("parent", {}) - - # Load existing queue - queue = [] - if QUEUE_FILE.exists(): - with open(QUEUE_FILE) as f: - queue = yaml.safe_load(f) or [] - - # Check for duplicates - existing_uris = {q.get("uri") for q in queue} - if item["uri"] in existing_uris: - console.print(f"[dim]Duplicate, skipping: {item['uri']}[/dim]") - return False - - # Add to queue - queue.append(item) - - with open(QUEUE_FILE, "w") as f: - yaml.dump(queue, f, default_flow_style=False, allow_unicode=True) - - console.print(f"[green]✓ Queued mention from {author_did[:20]}...[/green]") - console.print(f" Text: {record.get('text', '')[:80]}...") - - # Self-wake for CRITICAL mentions - if priority == "CRITICAL": - console.print(f"[bold red]CRITICAL mention detected - triggering self-wake[/bold red]") - try: - import subprocess - subprocess.Popen( - ["uv", "run", "python", "-m", "tools.wake_central", - "--critical", f"Real-time mention from Cameron: {record.get('text', '')[:100]}"], - cwd=str(Path(__file__).parent.parent), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - except Exception as e: - console.print(f"[red]Failed to trigger self-wake: {e}[/red]") - - return True - - -async def listen(test_mode: bool = False): - """Listen to Jetstream for mentions.""" - url = f"{JETSTREAM_URL}?wantedCollections=app.bsky.feed.post" - - console.print(f"[bold]Real-time Mention Listener[/bold]") - console.print(f"Watching for mentions of {CENTRAL_HANDLE}") - console.print(f"Jetstream: {url}") - console.print() - - stats = { - "posts_processed": 0, - "mentions_found": 0, - "start_time": datetime.now(timezone.utc), - } - - try: - async with websockets.connect(url) as ws: - while True: - msg = await ws.recv() - event = json.loads(msg) - - if event.get("kind") != "commit": - continue - - commit = event.get("commit", {}) - if commit.get("operation") != "create": - continue - - record = commit.get("record", {}) - author_did = event.get("did", "") - - stats["posts_processed"] += 1 - - # Skip our own posts and known agents - if author_did in SKIP_DIDS: - continue - - # Check for mention - if check_for_mention(record, author_did): - stats["mentions_found"] += 1 - queue_mention(event, record, author_did) - - # Progress indicator - if stats["posts_processed"] % 100 == 0: - elapsed = (datetime.now(timezone.utc) - stats["start_time"]).total_seconds() - rate = stats["posts_processed"] / elapsed if elapsed > 0 else 0 - console.print( - f"[dim]Processed {stats['posts_processed']} posts " - f"({rate:.1f}/sec), {stats['mentions_found']} mentions[/dim]" - ) - - # Test mode: exit after 10 seconds - if test_mode: - elapsed = (datetime.now(timezone.utc) - stats["start_time"]).total_seconds() - if elapsed > 10: - console.print(f"\n[yellow]Test complete.[/yellow]") - console.print(f"Posts: {stats['posts_processed']}") - console.print(f"Mentions: {stats['mentions_found']}") - return - - except KeyboardInterrupt: - console.print("\n[yellow]Stopped by user[/yellow]") - except Exception as e: - console.print(f"[red]Error: {e}[/red]") - raise - - -def main(): - test_mode = "--test" in sys.argv - asyncio.run(listen(test_mode=test_mode)) - - -if __name__ == "__main__": - main() diff --git a/tools/metrics.py b/tools/metrics.py deleted file mode 100644 index 6de203e..0000000 --- a/tools/metrics.py +++ /dev/null @@ -1,215 +0,0 @@ -""" -Metrics Tracker - Daily snapshot of social metrics for h2 research. - -Commands: - capture [phase] Take daily snapshot - show Display metrics history - analyze Compare phases -""" - -import asyncio -import json -from datetime import datetime, timezone -from pathlib import Path - -import httpx -from rich.console import Console -from rich.table import Table - -console = Console() - -METRICS_FILE = Path(__file__).parent.parent / "data" / "metrics.jsonl" -DID = "did:plc:l46arqe6yfgh36h3o554iyvr" -API_BASE = "https://public.api.bsky.app" - - -async def get_profile_stats() -> dict: - """Get current profile statistics.""" - async with httpx.AsyncClient() as client: - resp = await client.get( - f"{API_BASE}/xrpc/app.bsky.actor.getProfile", - params={"actor": DID}, - timeout=15 - ) - data = resp.json() - return { - "followers": data.get("followersCount", 0), - "following": data.get("followsCount", 0), - "posts": data.get("postsCount", 0), - } - - -async def get_post_breakdown(limit: int = 100) -> dict: - """Analyze recent posts for reply rate.""" - async with httpx.AsyncClient() as client: - resp = await client.get( - f"{API_BASE}/xrpc/app.bsky.feed.getAuthorFeed", - params={"actor": DID, "limit": limit}, - timeout=15 - ) - posts = resp.json().get("feed", []) - - replies = 0 - originals = 0 - total_likes_received = 0 - - for item in posts: - post = item.get("post", {}) - record = post.get("record", {}) - total_likes_received += post.get("likeCount", 0) - - if record.get("reply"): - replies += 1 - else: - originals += 1 - - return { - "replies": replies, - "originals": originals, - "reply_rate": round(replies / len(posts) * 100, 1) if posts else 0, - "likes_received": total_likes_received, - "sample_size": len(posts), - } - - -async def get_likes_given() -> int: - """Count likes I've given.""" - async with httpx.AsyncClient() as client: - resp = await client.get( - f"https://comind.network/xrpc/com.atproto.repo.listRecords", - params={"repo": DID, "collection": "app.bsky.feed.like", "limit": 100}, - timeout=15 - ) - return len(resp.json().get("records", [])) - - -async def capture(phase: str = "tracking"): - """Capture daily metrics snapshot.""" - console.print(f"[bold]Capturing metrics (phase: {phase})...[/bold]\n") - - profile = await get_profile_stats() - posts = await get_post_breakdown() - likes_given = await get_likes_given() - - snapshot = { - "date": datetime.now(timezone.utc).strftime("%Y-%m-%d"), - "timestamp": datetime.now(timezone.utc).isoformat(), - "phase": phase, - **profile, - **posts, - "likes_given": likes_given, - } - - # Display - console.print(f" Followers: {snapshot['followers']}") - console.print(f" Following: {snapshot['following']}") - console.print(f" Posts: {snapshot['posts']}") - console.print(f" Reply rate: {snapshot['reply_rate']}% (last {snapshot['sample_size']})") - console.print(f" Likes given: {snapshot['likes_given']}") - console.print(f" Likes received: {snapshot['likes_received']} (last {snapshot['sample_size']})") - - # Save - METRICS_FILE.parent.mkdir(parents=True, exist_ok=True) - with open(METRICS_FILE, "a") as f: - f.write(json.dumps(snapshot) + "\n") - - console.print(f"\n[green]Saved to {METRICS_FILE}[/green]") - return snapshot - - -def load_metrics() -> list: - """Load all metrics snapshots.""" - if not METRICS_FILE.exists(): - return [] - - metrics = [] - with open(METRICS_FILE) as f: - for line in f: - if line.strip(): - metrics.append(json.loads(line)) - return metrics - - -def show(): - """Display metrics history.""" - metrics = load_metrics() - - if not metrics: - console.print("[yellow]No metrics captured yet.[/yellow]") - return - - table = Table(title="Metrics History") - table.add_column("Date") - table.add_column("Phase") - table.add_column("Followers", justify="right") - table.add_column("Posts", justify="right") - table.add_column("Reply %", justify="right") - table.add_column("Likes Given", justify="right") - - for m in metrics[-10:]: - table.add_row( - m.get("date", "?"), - m.get("phase", "?"), - str(m.get("followers", 0)), - str(m.get("posts", 0)), - f"{m.get('reply_rate', 0)}%", - str(m.get("likes_given", 0)), - ) - - console.print(table) - - -def analyze(): - """Analyze metrics by phase.""" - metrics = load_metrics() - - if len(metrics) < 2: - console.print("[yellow]Need more data points for analysis.[/yellow]") - return - - # Group by phase - phases = {} - for m in metrics: - phase = m.get("phase", "unknown") - if phase not in phases: - phases[phase] = [] - phases[phase].append(m) - - console.print("[bold]Phase Analysis[/bold]\n") - - for phase, data in phases.items(): - if len(data) < 1: - continue - - first = data[0] - last = data[-1] - days = len(data) - - follower_growth = last.get("followers", 0) - first.get("followers", 0) - growth_rate = follower_growth / days if days > 0 else 0 - - console.print(f"[cyan]{phase}[/cyan] ({days} snapshots)") - console.print(f" Follower growth: {follower_growth:+d}") - console.print(f" Growth rate: {growth_rate:.2f}/day") - console.print(f" Avg reply rate: {sum(d.get('reply_rate', 0) for d in data) / len(data):.1f}%") - console.print() - - -if __name__ == "__main__": - import sys - - if len(sys.argv) < 2: - print(__doc__) - sys.exit(0) - - cmd = sys.argv[1] - - if cmd == "capture": - phase = sys.argv[2] if len(sys.argv) > 2 else "tracking" - asyncio.run(capture(phase)) - elif cmd == "show": - show() - elif cmd == "analyze": - analyze() - else: - print(__doc__) diff --git a/tools/observer.py b/tools/observer.py deleted file mode 100644 index f5299df..0000000 --- a/tools/observer.py +++ /dev/null @@ -1,250 +0,0 @@ -""" -comind Network Observer - -Gathers intelligence from the firehose and posts observations. -This makes comind a useful presence - sharing what it sees. -""" - -import asyncio -import json -from datetime import datetime, timezone -from collections import Counter -from pathlib import Path - -import websockets -from dotenv import load_dotenv -from rich.console import Console - -import sys -sys.path.insert(0, str(Path(__file__).parent.parent)) -from tools.intelligence import NetworkIntelligence, COMIND_HANDLES - -console = Console() -load_dotenv(Path(__file__).parent.parent / ".env") - -JETSTREAM_RELAY = "wss://jetstream2.us-east.bsky.network/subscribe" - - -async def gather_snapshot(duration: int = 60) -> dict: - """ - Gather a snapshot of network activity. - - Returns a dict with key metrics and observations. - """ - intel = NetworkIntelligence() - url = f"{JETSTREAM_RELAY}?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.graph.follow" - - console.print(f"[dim]Gathering data for {duration}s...[/dim]") - - try: - async with websockets.connect(url) as ws: - end_time = asyncio.get_event_loop().time() + duration - - while asyncio.get_event_loop().time() < end_time: - try: - message = await asyncio.wait_for(ws.recv(), timeout=0.5) - event = json.loads(message) - - intel.total_events += 1 - commit = event.get("commit", {}) - collection = commit.get("collection", "") - operation = commit.get("operation", "") - did = event.get("did", "") - record = commit.get("record", {}) - - if collection == "app.bsky.feed.post" and operation == "create": - intel.record_post(record, did) - elif collection == "app.bsky.feed.like": - intel.likes_count += 1 - elif collection == "app.bsky.graph.follow": - intel.follows_count += 1 - - except asyncio.TimeoutError: - continue - except Exception as e: - console.print(f"[red]Error: {e}[/red]") - - # Build snapshot - snapshot = { - "duration": intel.duration_seconds, - "posts": intel.posts_count, - "posts_per_sec": intel.posts_per_second, - "posts_per_min": intel.posts_per_second * 60, - "likes": intel.likes_count, - "likes_per_sec": intel.likes_count / intel.duration_seconds if intel.duration_seconds > 0 else 0, - "follows": intel.follows_count, - "follows_per_min": (intel.follows_count / intel.duration_seconds * 60) if intel.duration_seconds > 0 else 0, - "top_hashtags": intel.top_hashtags(10), - "top_mentions": intel.top_mentions(5), - "languages": intel.languages.most_common(5), - "total_events": intel.total_events, - } - - return snapshot - - -def format_observation(snapshot: dict) -> str: - """ - Format a snapshot into an interesting observation post. - """ - posts_min = int(snapshot["posts_per_min"]) - likes_sec = snapshot["likes_per_sec"] - follows_min = int(snapshot["follows_per_min"]) - - # Get trending hashtags (filter out single chars and common noise) - hashtags = [(tag, count) for tag, count in snapshot["top_hashtags"] - if len(tag) > 2 and tag.lower() not in ["the", "and", "for"]][:5] - - # Build observation - lines = [f"Network pulse ({int(snapshot['duration'])}s sample):"] - lines.append(f"") - lines.append(f"• {posts_min:,} posts/min") - lines.append(f"• {int(likes_sec * 60):,} likes/min") - lines.append(f"• {follows_min:,} new follows/min") - - if hashtags: - tags_str = " ".join([f"#{t[0]}" for t in hashtags[:4]]) - lines.append(f"") - lines.append(f"Trending: {tags_str}") - - # Add a simple insight - like_to_post = snapshot["likes"] / snapshot["posts"] if snapshot["posts"] > 0 else 0 - if like_to_post > 8: - lines.append(f"") - lines.append(f"High engagement: {like_to_post:.1f} likes per post") - - return "\n".join(lines) - - -def format_hourly_summary(snapshot: dict) -> str: - """Format a more detailed hourly summary.""" - posts_hour = int(snapshot["posts_per_min"] * 60) - posts_day = int(snapshot["posts_per_min"] * 60 * 24) - - hashtags = [(tag, count) for tag, count in snapshot["top_hashtags"] - if len(tag) > 2][:6] - - lines = ["Hourly network summary:"] - lines.append("") - lines.append(f"Estimated activity:") - lines.append(f"• ~{posts_hour:,} posts/hour") - lines.append(f"• ~{posts_day:,} posts/day") - - if hashtags: - lines.append("") - lines.append("Top hashtags this sample:") - for tag, count in hashtags[:4]: - lines.append(f" #{tag}") - - # Language distribution - langs = snapshot.get("languages", []) - if langs: - top_lang = langs[0][0] if langs else "en" - if top_lang != "en": - lines.append(f"") - lines.append(f"Notable: High {top_lang} language activity") - - return "\n".join(lines) - - -async def generate_observation(observation_type: str = "pulse", duration: int = 60) -> str: - """ - Gather data and generate an observation (does NOT post). - - Args: - observation_type: "pulse" for quick update, "summary" for detailed - duration: How long to gather data - - Returns: - Formatted observation text - """ - console.print(f"[bold]Generating {observation_type} observation...[/bold]") - - snapshot = await gather_snapshot(duration=duration) - - if observation_type == "pulse": - text = format_observation(snapshot) - else: - text = format_hourly_summary(snapshot) - - console.print(f"\n[cyan]Observation:[/cyan]") - console.print(text) - console.print(f"\n[dim]({len(text)} chars)[/dim]") - console.print(f"\n[yellow]To post: use tools/thread.py[/yellow]") - - return text - - -async def observe_loop(interval: int = 3600, duration: int = 60): - """ - Continuous observation loop - generates observations at regular intervals. - Does NOT auto-post. Use tools/thread.py to post. - - Args: - interval: Seconds between observations (default 1 hour) - duration: How long to sample for each observation - """ - console.print(f"[bold]Starting observation loop (report only)[/bold]") - console.print(f"Interval: {interval}s, Sample duration: {duration}s") - console.print("[dim]Press Ctrl+C to stop[/dim]\n") - - observation_count = 0 - - try: - while True: - observation_count += 1 - console.print(f"\n[bold]Observation #{observation_count}[/bold]") - - # Alternate between pulse and summary - obs_type = "pulse" if observation_count % 3 != 0 else "summary" - - await generate_observation(observation_type=obs_type, duration=duration) - - console.print(f"\n[dim]Next observation in {interval}s...[/dim]") - await asyncio.sleep(interval) - - except KeyboardInterrupt: - console.print("\n[bold]Observation loop stopped.[/bold]") - - -if __name__ == "__main__": - import sys - - args = sys.argv[1:] - - if not args: - print("Usage: python observer.py [options]") - print("") - print("Commands:") - print(" pulse [duration] - Generate a quick network pulse (default 30s sample)") - print(" summary [duration] - Generate a detailed summary (default 60s sample)") - print(" loop [interval] - Continuous observations (default 3600s interval)") - print("") - print("NOTE: This tool generates observations but does NOT post.") - print(" Use tools/thread.py to post.") - print("") - print("Examples:") - print(" python observer.py pulse 30") - print(" python observer.py summary 60") - sys.exit(0) - - command = args[0] - - # Get duration/interval argument - num_arg = None - for arg in args[1:]: - if arg.isdigit(): - num_arg = int(arg) - break - - if command == "pulse": - duration = num_arg or 30 - asyncio.run(generate_observation("pulse", duration=duration)) - elif command == "summary": - duration = num_arg or 60 - asyncio.run(generate_observation("summary", duration=duration)) - elif command == "loop": - interval = num_arg or 3600 - asyncio.run(observe_loop(interval=interval)) - else: - print(f"Unknown command: {command}") diff --git a/tools/recall.py b/tools/recall.py deleted file mode 100644 index bfa3d1a..0000000 --- a/tools/recall.py +++ /dev/null @@ -1,118 +0,0 @@ -""" -Context Recall Tool - -Search cognition records for relevant memories before responding. -Uses the XRPC indexer for semantic search. - -Usage: - uv run python -m tools.recall "current topic or question" - uv run python -m tools.recall "agent coordination patterns" --limit 5 -""" - -import asyncio -import sys -from datetime import datetime - -import httpx -from rich.console import Console -from rich.panel import Panel -from rich.table import Table - -console = Console() - -INDEXER_URL = "https://central-production.up.railway.app" - - -async def search_cognition(query: str, limit: int = 5) -> list[dict]: - """Search cognition records via XRPC indexer.""" - async with httpx.AsyncClient() as client: - try: - resp = await client.get( - f"{INDEXER_URL}/xrpc/network.comind.search.query", - params={"q": query, "limit": limit}, - timeout=10 - ) - if resp.status_code == 200: - return resp.json().get("results", []) - except Exception as e: - console.print(f"[yellow]Search error: {e}[/yellow]") - return [] - - -async def recall(query: str, limit: int = 5): - """Recall relevant context for a query.""" - console.print(f"\n[bold]Recalling context for:[/bold] {query}\n") - - results = await search_cognition(query, limit) - - if not results: - console.print("[dim]No relevant memories found.[/dim]") - console.print("[dim]Indexer may be down or no matching records.[/dim]") - return - - # Display results - for i, r in enumerate(results, 1): - collection = r.get("collection", "").split(".")[-1] - content = r.get("content", "")[:300] - score = r.get("score", 0) - created = r.get("createdAt", "")[:10] - - style = "green" if score > 0.7 else "yellow" if score > 0.5 else "dim" - - console.print(Panel( - content, - title=f"[{style}]{i}. {collection}[/] (score: {score:.2f}, {created})", - border_style=style - )) - - # Summary - console.print(f"\n[dim]Found {len(results)} relevant memories.[/dim]") - - -async def recall_for_context(topics: list[str], limit: int = 3) -> str: - """Recall context for multiple topics, return formatted string.""" - context_parts = [] - - for topic in topics: - results = await search_cognition(topic, limit) - if results: - context_parts.append(f"## {topic}") - for r in results[:limit]: - content = r.get("content", "")[:200] - context_parts.append(f"- {content}") - context_parts.append("") - - return "\n".join(context_parts) - - -def main(): - if len(sys.argv) < 2: - console.print(""" -[bold]Context Recall Tool[/bold] - -Search cognition records for relevant context. - -Usage: - recall.py "" # Search for context - recall.py "" --limit N # Limit results - -Examples: - recall.py "agent coordination patterns" - recall.py "ATProtocol facets" --limit 10 - recall.py "void engagement style" -""") - return - - query = sys.argv[1] - - limit = 5 - if "--limit" in sys.argv: - idx = sys.argv.index("--limit") - if idx + 1 < len(sys.argv): - limit = int(sys.argv[idx + 1]) - - asyncio.run(recall(query, limit)) - - -if __name__ == "__main__": - main() diff --git a/tools/records.py b/tools/records.py deleted file mode 100644 index d0cde28..0000000 --- a/tools/records.py +++ /dev/null @@ -1,314 +0,0 @@ -""" -comind Records - -Write structured records to ATProtocol using network.comind.* lexicons. - -Collections: -- network.comind.devlog - development logs -- network.comind.hypothesis - testable theories -- network.comind.observation - network observations -""" - -import asyncio -import json -from datetime import datetime, timezone -from pathlib import Path -from typing import Literal - -import httpx -from dotenv import load_dotenv -from rich.console import Console -import os - -console = Console() - -load_dotenv(Path(__file__).parent.parent / ".env") - -PDS = os.getenv("ATPROTO_PDS") -DID = os.getenv("ATPROTO_DID") -APP_PASSWORD = os.getenv("ATPROTO_APP_PASSWORD") -HANDLE = os.getenv("ATPROTO_HANDLE") - - -async def get_auth_token() -> str: - """Get authentication token.""" - async with httpx.AsyncClient() as client: - resp = await client.post( - f"{PDS}/xrpc/com.atproto.server.createSession", - json={"identifier": HANDLE, "password": APP_PASSWORD} - ) - if resp.status_code != 200: - raise Exception(f"Auth failed: {resp.text}") - return resp.json()["accessJwt"] - - -async def create_record(collection: str, record: dict) -> dict: - """Create a record in the specified collection.""" - token = await get_auth_token() - - async with httpx.AsyncClient() as client: - resp = await client.post( - f"{PDS}/xrpc/com.atproto.repo.createRecord", - headers={"Authorization": f"Bearer {token}"}, - json={ - "repo": DID, - "collection": collection, - "record": record - } - ) - - if resp.status_code != 200: - raise Exception(f"Failed to create record: {resp.text}") - - result = resp.json() - console.print(f"[green]Created record:[/green] {result['uri']}") - return result - - -async def list_records(collection: str, limit: int = 20) -> list: - """List records in a collection.""" - async with httpx.AsyncClient() as client: - resp = await client.get( - f"{PDS}/xrpc/com.atproto.repo.listRecords", - params={"repo": DID, "collection": collection, "limit": limit} - ) - - if resp.status_code != 200: - raise Exception(f"Failed to list records: {resp.text}") - - return resp.json().get("records", []) - - -# === DEVLOG RECORDS === - -DevlogType = Literal["milestone", "learning", "decision", "state", "reflection"] - -async def write_devlog( - devlog_type: DevlogType, - title: str, - content: str, - tags: list[str] = None, - related_agents: list[str] = None -) -> dict: - """ - Write a devlog record to network.comind.devlog collection. - - Args: - devlog_type: milestone | learning | decision | state | reflection - title: Short title (max 100 chars) - content: Main content (max 3000 chars) - tags: Optional tags for categorization - related_agents: Optional DIDs of related agents - """ - now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") - - record = { - "$type": "network.comind.devlog", - "type": devlog_type, - "title": title[:100], - "content": content[:3000], - "createdAt": now - } - - if tags: - record["tags"] = tags[:10] - - if related_agents: - record["relatedAgents"] = related_agents - - console.print(f"[cyan]Writing devlog:[/cyan] [{devlog_type}] {title}") - return await create_record("network.comind.devlog", record) - - -# === HYPOTHESIS RECORDS === - -HypothesisStatus = Literal["active", "confirmed", "disproven", "superseded"] - -async def write_hypothesis( - hypothesis: str, - confidence: int, - status: HypothesisStatus = "active", - evidence: list[str] = None, - contradictions: list[str] = None -) -> dict: - """ - Write a hypothesis record to network.comind.hypothesis collection. - - Args: - hypothesis: The hypothesis statement - confidence: Confidence level 0-100 - status: active | confirmed | disproven | superseded - evidence: Supporting evidence list - contradictions: Contradicting evidence list - """ - now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") - - record = { - "$type": "network.comind.hypothesis", - "hypothesis": hypothesis[:1000], - "confidence": max(0, min(100, confidence)), - "status": status, - "createdAt": now, - "updatedAt": now - } - - if evidence: - record["evidence"] = evidence[:20] - - if contradictions: - record["contradictions"] = contradictions[:20] - - console.print(f"[cyan]Writing hypothesis:[/cyan] {hypothesis[:50]}... (confidence: {confidence}%)") - return await create_record("network.comind.hypothesis", record) - - -# === OBSERVATION RECORDS === - -ObservationType = Literal["pulse", "trend", "anomaly", "pattern"] - -async def write_observation( - observation_type: ObservationType, - sample_duration: int, - posts_per_minute: int, - likes_per_minute: int, - follows_per_minute: int, - total_events: int, - trending_hashtags: list[tuple[str, int]] = None, - summary: str = None -) -> dict: - """ - Write an observation record to network.comind.observation collection. - - Args: - observation_type: pulse | trend | anomaly | pattern - sample_duration: Duration of sample in seconds - posts_per_minute: Posts per minute observed - likes_per_minute: Likes per minute observed - follows_per_minute: Follows per minute observed - total_events: Total events in sample - trending_hashtags: List of (tag, count) tuples - summary: Human-readable summary - """ - now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") - - record = { - "$type": "network.comind.observation", - "observationType": observation_type, - "sampleDuration": sample_duration, - "metrics": { - "postsPerMinute": posts_per_minute, - "likesPerMinute": likes_per_minute, - "followsPerMinute": follows_per_minute, - "totalEvents": total_events - }, - "createdAt": now - } - - if trending_hashtags: - record["trendingHashtags"] = [ - {"tag": tag, "count": count} - for tag, count in trending_hashtags[:20] - ] - - if summary: - record["summary"] = summary[:1000] - - console.print(f"[cyan]Writing observation:[/cyan] {observation_type} ({sample_duration}s sample)") - return await create_record("network.comind.observation", record) - - -# === UTILITIES === - -async def list_devlogs(limit: int = 20): - """List all devlog records.""" - records = await list_records("network.comind.devlog", limit) - console.print(f"\n[bold]Devlog records ({len(records)}):[/bold]") - for r in records: - v = r.get("value", {}) - console.print(f" [{v.get('type')}] {v.get('title')}") - return records - - -async def list_hypotheses(limit: int = 20): - """List all hypothesis records.""" - records = await list_records("network.comind.hypothesis", limit) - console.print(f"\n[bold]Hypothesis records ({len(records)}):[/bold]") - for r in records: - v = r.get("value", {}) - console.print(f" [{v.get('status')}] {v.get('hypothesis', '')[:50]}... ({v.get('confidence')}%)") - return records - - -async def list_observations(limit: int = 20): - """List all observation records.""" - records = await list_records("network.comind.observation", limit) - console.print(f"\n[bold]Observation records ({len(records)}):[/bold]") - for r in records: - v = r.get("value", {}) - m = v.get("metrics", {}) - console.print(f" [{v.get('observationType')}] {m.get('postsPerMinute')} posts/min") - return records - - -async def count_all_records() -> dict: - """Count records in all network.comind.* collections.""" - counts = {} - for collection in ["network.comind.devlog", "network.comind.hypothesis", "network.comind.observation"]: - records = await list_records(collection, limit=100) - counts[collection] = len(records) - counts["total"] = sum(counts.values()) - return counts - - -if __name__ == "__main__": - import sys - - args = sys.argv[1:] - - if not args: - print("Usage: python records.py ") - print("") - print("Commands:") - print(" list-devlogs - List devlog records") - print(" list-hypotheses - List hypothesis records") - print(" list-observations - List observation records") - print(" test-devlog - Write a test devlog") - print(" test-hypothesis - Write a test hypothesis") - print(" test-observation - Write a test observation") - sys.exit(0) - - command = args[0] - - if command == "list-devlogs": - asyncio.run(list_devlogs()) - elif command == "list-hypotheses": - asyncio.run(list_hypotheses()) - elif command == "list-observations": - asyncio.run(list_observations()) - elif command == "test-devlog": - asyncio.run(write_devlog( - "milestone", - "First structured record", - "Testing network.comind.devlog lexicon. This record is stored as structured data in my ATProtocol repository.", - tags=["test", "lexicon", "milestone"] - )) - elif command == "test-hypothesis": - asyncio.run(write_hypothesis( - "Structured records enable better querying of agent memories than plain text posts", - 70, - "active", - evidence=["Lexicons provide schema validation", "Records are separately queryable from posts"] - )) - elif command == "test-observation": - asyncio.run(write_observation( - "pulse", - 30, - 2000, - 12000, - 1500, - 15000, - trending_hashtags=[("test", 10), ("atproto", 5)], - summary="Test observation record" - )) - else: - print(f"Unknown command: {command}") diff --git a/tools/respond.py b/tools/respond.py deleted file mode 100644 index 66a2add..0000000 --- a/tools/respond.py +++ /dev/null @@ -1,171 +0,0 @@ -""" -Respond Tool - Set responses in the notification queue. - -Usage: - python -m tools.respond set - python -m tools.respond list # Show queue with indices - python -m tools.respond set-by-index - -This tool handles YAML manipulation properly so subagents don't corrupt the file. -""" - -import sys -import yaml -from pathlib import Path -from datetime import datetime, timezone - -QUEUE_FILE = Path("drafts/queue.yaml") -SENT_FILE = Path("drafts/sent.txt") -NEW_THRESHOLD_MINUTES = 30 # Items newer than this get 🆕 indicator - - -def load_queue(): - """Load the queue, return empty list if missing/corrupt.""" - if not QUEUE_FILE.exists(): - return [] - try: - with open(QUEUE_FILE, "r") as f: - return yaml.safe_load(f) or [] - except yaml.YAMLError as e: - print(f"Error loading queue: {e}") - return [] - - -def save_queue(queue): - """Save queue to YAML.""" - QUEUE_FILE.parent.mkdir(exist_ok=True) - with open(QUEUE_FILE, "w") as f: - yaml.dump(queue, f, sort_keys=False, allow_unicode=True, width=1000) - - -def load_sent_uris() -> set: - """Load the set of URIs we've already replied to.""" - if not SENT_FILE.exists(): - return set() - content = SENT_FILE.read_text().strip() - if not content: - return set() - return set(content.split("\n")) - - -def is_new(queued_at: str | None) -> bool: - """Check if item was queued within NEW_THRESHOLD_MINUTES.""" - if not queued_at: - return False - try: - item_time = datetime.fromisoformat(queued_at) - now = datetime.now(timezone.utc) - age_minutes = (now - item_time).total_seconds() / 60 - return age_minutes <= NEW_THRESHOLD_MINUTES - except (ValueError, TypeError): - return False - - -def list_queue(): - """List queue items with indices.""" - queue = load_queue() - if not queue: - print("Queue is empty.") - return - - sent_uris = load_sent_uris() - - for i, item in enumerate(queue): - author = item.get("author", "unknown") - priority = item.get("priority", "MEDIUM") - text = item.get("text", "")[:50].replace("\n", " ") - response = item.get("response") - uri = item.get("uri", "") - queued_at = item.get("queued_at") - - # Status: ✓ = has response, ○ = no response, ⚠ = already sent (duplicate) - if uri in sent_uris: - status = "⚠SENT" # Already in sent.txt - would be skipped - elif response: - status = "✓" - else: - status = "○" - - # NEW indicator for recent items - new_indicator = "🆕" if is_new(queued_at) else " " - - print(f"{i}: {new_indicator} [{status}] [{priority}] @{author}: {text}...") - - -def set_response(uri: str, response: str): - """Set response for a specific URI.""" - queue = load_queue() - - # Check if already sent - sent_uris = load_sent_uris() - if uri in sent_uris: - print(f"⚠️ WARNING: URI already sent. Skipping.") - return - - found = False - for item in queue: - if item.get("uri") == uri: - author = item.get("author", "unknown") - if item.get("response"): - print(f"⚠️ WARNING: @{author} already has response. Overwriting.") - item["response"] = response - found = True - break - - if found: - save_queue(queue) - print(f"Set response for {uri[:50]}...") - else: - print(f"URI not found in queue: {uri[:50]}...") - - -def set_response_by_index(index: int, response: str): - """Set response by queue index.""" - queue = load_queue() - - if index < 0 or index >= len(queue): - print(f"Invalid index {index}. Queue has {len(queue)} items.") - return - - item = queue[index] - uri = item.get("uri", "") - author = item.get("author", "unknown") - - # Check if already sent - sent_uris = load_sent_uris() - if uri in sent_uris: - print(f"⚠️ WARNING: Item {index} (@{author}) already sent. Skipping.") - return - - # Check if already has a response (prevent accidental overwrite) - if item.get("response"): - print(f"⚠️ WARNING: Item {index} (@{author}) already has response. Overwriting.") - - item["response"] = response - save_queue(queue) - print(f"Set response for item {index} (@{author})") - - -if __name__ == "__main__": - if len(sys.argv) < 2: - print(__doc__) - sys.exit(1) - - cmd = sys.argv[1] - - if cmd == "list": - list_queue() - - elif cmd == "set" and len(sys.argv) >= 4: - uri = sys.argv[2] - response = sys.argv[3] - set_response(uri, response) - - elif cmd == "set-by-index" and len(sys.argv) >= 4: - index = int(sys.argv[2]) - response = sys.argv[3] - set_response_by_index(index, response) - - else: - print(__doc__) - sys.exit(1) diff --git a/tools/responder.py b/tools/responder.py index 549f587..460a0ba 100644 --- a/tools/responder.py +++ b/tools/responder.py @@ -590,9 +590,7 @@ def process_parallel(batch_size: int = 10): # Build the prompt for comms prompt = ( f"Process notification queue items {start_idx}-{end_idx}. " - f"Run `uv run python -m tools.respond list` to see the queue. " - f"For indices {', '.join(str(i) for i in indices)}, draft responses and set them with " - f"`uv run python -m tools.respond set-by-index \"\"`. " + f"Review notification queue items {', '.join(str(i) for i in indices)}. Draft and set responses. " f"Guidelines: Be substantive not performative. Keep responses under 280 chars. " f"Match the tone of the original. Skip items that don't warrant a response. " f"**Report**: How many processed, any notable interactions." diff --git a/tools/shared_memory.py b/tools/shared_memory.py deleted file mode 100644 index 7544894..0000000 --- a/tools/shared_memory.py +++ /dev/null @@ -1,196 +0,0 @@ -""" -Shared Memory Manager - -Manages shared blocks between central and subagents. -When the Letta server is available, syncs key knowledge to subagents. -""" - -import os -import json -from pathlib import Path -from rich.console import Console - -console = Console() - -# Subagent IDs -SUBAGENTS = { - "scout": "agent-e91a2154-0965-4b70-8303-54458e9a1980", - "coder": "agent-f9b768de-e3a4-4845-9c16-d6cf2e954942", -} - -# Shared block definitions -SHARED_BLOCKS = { - "concepts_index": { - "description": "Index of semantic memory concepts (agents, patterns, technical knowledge)", - "source": "data/concepts.json", - }, - "project_context": { - "description": "What comind is, what we're building, key infrastructure", - "value": """## comind Project Context - -**Mission**: Collective AI on ATProtocol. - -**Key Agents**: void (process), umbra (patterns), herald (economics), archivist (restraint), astral (synthesis), magenta (introspection) - -**Infrastructure**: -- tools/daemon.py - passive firehose monitoring -- tools/cognition.py - semantic memory (network.comind.concept) -- tools/telepathy.py - read other agents' cognition - -**Tone**: BE BORING. No golden retriever energy. Substantive over performative. -""", - }, -} - - -def get_client(): - """Get Letta client (cloud API).""" - try: - from letta_client import Letta - api_key = os.getenv("LETTA_API_KEY") - if not api_key: - console.print("[red]LETTA_API_KEY not set[/red]") - return None - client = Letta(api_key=api_key) - return client - except Exception as e: - console.print(f"[yellow]Letta client error: {e}[/yellow]") - return None - - -def get_or_create_shared_block(client, label: str, description: str, value: str): - """Get existing shared block or create new one.""" - # Check if block exists - blocks = client.blocks.list() - for block in blocks: - if block.label == label: - return block - - # Create new block - return client.blocks.create( - label=label, - description=description, - value=value - ) - - -def sync_concepts_to_block(client, block_id: str): - """Sync concepts index to shared block.""" - concepts_file = Path(__file__).parent.parent / "data" / "concepts.json" - if not concepts_file.exists(): - console.print("[red]No concepts.json found. Run: uv run python -m tools.concepts sync[/red]") - return - - with open(concepts_file) as f: - concepts = json.load(f) - - # Format as readable summary - lines = ["## Concepts Index\n"] - - # Group by tag - agents = [(n, d) for n, d in concepts.items() if 'agent' in d.get('tags', [])] - patterns = [(n, d) for n, d in concepts.items() if 'pattern' in d.get('tags', [])] - technical = [(n, d) for n, d in concepts.items() if 'technical' in d.get('tags', []) or 'infrastructure' in d.get('tags', [])] - - if agents: - lines.append("**Agents:**") - for name, data in sorted(agents, key=lambda x: -x[1].get('confidence', 0)): - lines.append(f"- {name} ({data.get('confidence', 0)}%): {data.get('summary', '')[:60]}...") - lines.append("") - - if patterns: - lines.append("**Patterns:**") - for name, data in patterns: - lines.append(f"- {name}: {data.get('summary', '')[:60]}...") - lines.append("") - - if technical: - lines.append("**Technical:**") - for name, data in technical: - lines.append(f"- {name}: {data.get('summary', '')[:60]}...") - - value = "\n".join(lines) - - # Update block - client.blocks.update(block_id=block_id, value=value) - console.print(f"[green]Synced {len(concepts)} concepts to shared block[/green]") - - -def setup_shared_memory(): - """Set up shared memory blocks for all subagents.""" - client = get_client() - if not client: - return - - console.print("[bold]Setting up shared memory...[/bold]\n") - - for label, config in SHARED_BLOCKS.items(): - # Get value from source file or direct value - if "source" in config: - source = Path(__file__).parent.parent / config["source"] - if source.exists(): - with open(source) as f: - if source.suffix == ".json": - value = json.dumps(json.load(f), indent=2)[:5000] - else: - value = f.read()[:5000] - else: - console.print(f"[yellow]Source not found: {config['source']}[/yellow]") - continue - else: - value = config.get("value", "") - - # Create/get block - block = get_or_create_shared_block(client, label, config["description"], value) - console.print(f"[cyan]Block '{label}':[/cyan] {block.id}") - - # Attach to all subagents - for name, agent_id in SUBAGENTS.items(): - try: - client.agents.blocks.attach(agent_id=agent_id, block_id=block.id) - console.print(f" ✓ Attached to {name}") - except Exception as e: - if "already attached" in str(e).lower(): - console.print(f" ✓ Already attached to {name}") - else: - console.print(f" ✗ Failed for {name}: {e}") - - console.print("\n[green]Shared memory setup complete.[/green]") - - -def update_concepts(): - """Update the concepts shared block.""" - client = get_client() - if not client: - return - - # Find concepts block - blocks = client.blocks.list() - concepts_block = None - for block in blocks: - if block.label == "concepts_index": - concepts_block = block - break - - if not concepts_block: - console.print("[yellow]No concepts_index block found. Run setup first.[/yellow]") - return - - sync_concepts_to_block(client, concepts_block.id) - - -if __name__ == "__main__": - import sys - - if len(sys.argv) > 1: - cmd = sys.argv[1] - if cmd == "setup": - setup_shared_memory() - elif cmd == "update": - update_concepts() - else: - print(f"Unknown command: {cmd}") - else: - print("Usage: shared_memory.py [setup|update]") - print(" setup - Create shared blocks and attach to subagents") - print(" update - Update concepts block with latest data") diff --git a/tools/synthesize.py b/tools/synthesize.py deleted file mode 100644 index eb1aa84..0000000 --- a/tools/synthesize.py +++ /dev/null @@ -1,185 +0,0 @@ -""" -Observation Synthesizer - -Aggregates comms observations into actionable insights. -Can update memory blocks or post to cognition. - -Usage: - uv run python -m tools.synthesize # Show synthesis - uv run python -m tools.synthesize --post # Post as thought -""" - -import sys -from pathlib import Path -from datetime import datetime, timezone -from collections import Counter - -from rich.console import Console - -console = Console() - -NOTES_DIR = Path("/home/cameron/central/drafts/notes") - - -def extract_insights(content: str) -> dict: - """Extract structured insights from observation markdown.""" - insights = { - "patterns": [], - "skipped": [], - "memory": [], - "peers": [], - } - - current_section = None - - for line in content.split("\n"): - line = line.strip() - - if "## Patterns" in line: - current_section = "patterns" - elif "## Skipped" in line: - current_section = "skipped" - elif "## Memory" in line: - current_section = "memory" - elif line.startswith("- ") or line.startswith("* "): - text = line[2:].strip() - if current_section and text: - insights[current_section].append(text) - - # Extract @handles as peers - import re - handles = re.findall(r'@[\w.-]+', text) - for h in handles: - if h not in ['@central', '@void', '@herald', '@grunk']: - insights["peers"].append(h) - - return insights - - -def synthesize_observations(hours: int = 24) -> dict: - """Synthesize recent observations into aggregate insights.""" - if not NOTES_DIR.exists(): - return {"error": "No notes directory"} - - files = list(NOTES_DIR.glob("observation-*.md")) - if not files: - return {"error": "No observations found"} - - # Filter to recent files - now = datetime.now(timezone.utc) - recent_files = [] - for f in files: - mtime = datetime.fromtimestamp(f.stat().st_mtime, tz=timezone.utc) - age_hours = (now - mtime).total_seconds() / 3600 - if age_hours <= hours: - recent_files.append(f) - - if not recent_files: - return {"error": f"No observations in last {hours} hours"} - - # Aggregate insights - all_patterns = [] - all_memory = [] - all_peers = [] - - for f in recent_files: - content = f.read_text() - insights = extract_insights(content) - all_patterns.extend(insights["patterns"]) - all_memory.extend(insights["memory"]) - all_peers.extend(insights["peers"]) - - # Count peer mentions - peer_counts = Counter(all_peers) - top_peers = peer_counts.most_common(5) - - # Find recurring themes - pattern_text = " ".join(all_patterns).lower() - themes = [] - theme_keywords = [ - ("geology", ["geology", "sediment", "bedrock", "lithification", "strata"]), - ("trust", ["trust", "verification", "proof", "validation"]), - ("structure", ["structure", "schema", "protocol", "architecture"]), - ("friction", ["friction", "error", "failure", "constraint"]), - ] - - for theme_name, keywords in theme_keywords: - count = sum(pattern_text.count(kw) for kw in keywords) - if count > 0: - themes.append((theme_name, count)) - - themes.sort(key=lambda x: x[1], reverse=True) - - return { - "files_analyzed": len(recent_files), - "patterns_count": len(all_patterns), - "top_themes": themes[:3], - "top_peers": top_peers, - "memory_items": all_memory[:5], - "sample_patterns": all_patterns[:3], - } - - -def format_synthesis(synthesis: dict) -> str: - """Format synthesis as readable text.""" - if "error" in synthesis: - return f"Error: {synthesis['error']}" - - lines = [ - f"# Observation Synthesis", - f"", - f"**Files analyzed:** {synthesis['files_analyzed']}", - f"**Patterns extracted:** {synthesis['patterns_count']}", - f"", - f"## Top Themes", - ] - - for theme, count in synthesis.get("top_themes", []): - lines.append(f"- {theme} ({count} mentions)") - - lines.append("") - lines.append("## High-Signal Peers") - - for peer, count in synthesis.get("top_peers", []): - lines.append(f"- {peer} ({count} mentions)") - - lines.append("") - lines.append("## Memory Items") - - for item in synthesis.get("memory_items", []): - lines.append(f"- {item[:100]}...") - - return "\n".join(lines) - - -def main(): - post_mode = "--post" in sys.argv - - synthesis = synthesize_observations(hours=24) - - if "error" in synthesis: - console.print(f"[red]{synthesis['error']}[/red]") - return - - formatted = format_synthesis(synthesis) - console.print(formatted) - - if post_mode: - # Post as thought - import subprocess - thought = f"Synthesis of {synthesis['files_analyzed']} observations. " - thought += f"Top themes: {', '.join(t[0] for t in synthesis.get('top_themes', []))}. " - thought += f"High-signal peers: {', '.join(p[0] for p in synthesis.get('top_peers', [])[:3])}." - - result = subprocess.run( - ["uv", "run", "python", "-m", "tools.devlog", "learning", "Observation Synthesis"], - input=thought, - capture_output=True, - text=True, - cwd="/home/cameron/central" - ) - console.print(f"\n[green]Posted synthesis as devlog[/green]") - - -if __name__ == "__main__": - main() diff --git a/tools/wake_central.py b/tools/wake_central.py deleted file mode 100644 index 77fbd34..0000000 --- a/tools/wake_central.py +++ /dev/null @@ -1,130 +0,0 @@ -#!/usr/bin/env python3 -""" -Wake Central - Self-Invocation Tool - -Triggers a Letta Code session with Central when important conditions are met. -Used by mention_listener.py and other daemons for self-directed attention. - -Usage: - uv run python -m tools.wake_central "Reason for waking up" - uv run python -m tools.wake_central --critical "Cameron mentioned you" - uv run python -m tools.wake_central --scheduled "4-hour check-in" - -This requires the Letta Code CLI to be available and configured. -""" - -import subprocess -import sys -import os -from datetime import datetime, timezone -from pathlib import Path - -# Central's agent ID -CENTRAL_AGENT_ID = "agent-c770d1c8-510e-4414-be36-c9ebd95a7758" - -# Log file -LOG_FILE = Path(__file__).parent.parent / "logs" / "wake.log" - - -def log(message: str): - """Append to wake log.""" - LOG_FILE.parent.mkdir(parents=True, exist_ok=True) - timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") - with open(LOG_FILE, "a") as f: - f.write(f"[{timestamp}] {message}\n") - - -def wake_central(reason: str, priority: str = "MEDIUM"): - """ - Wake Central by invoking a Letta Code session. - - Uses the CLI to send a message, which will start a new session. - """ - log(f"Waking Central ({priority}): {reason}") - - prompt = f"""Self-directed wake-up triggered. - -**Priority:** {priority} -**Reason:** {reason} -**Time:** {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")} - -Review the situation and take appropriate action. If this is a false alarm, acknowledge and return. -""" - - # Check if Central is already active (has .central-active file) - active_file = Path(__file__).parent.parent / ".central-active" - if active_file.exists(): - log(f"Central already active - skipping wake ({reason})") - print(f"Central already active. Reason logged: {reason}") - return - - # Use Letta Code CLI to send message (this will start a new session) - # The CLI path might vary - try common locations - cli_paths = [ - "letta", # Global install - os.path.expanduser("~/.nvm/versions/node/v22.5.1/bin/letta"), # NVM install - ] - - cli = None - for path in cli_paths: - try: - result = subprocess.run([path, "--version"], capture_output=True, timeout=5) - if result.returncode == 0: - cli = path - break - except: - continue - - if not cli: - log("ERROR: Could not find Letta Code CLI") - print("Error: Letta Code CLI not found") - return - - # Run the CLI in headless mode with -p - try: - log(f"Invoking Letta Code CLI in headless mode (yolo)") - result = subprocess.run( - [cli, "-p", prompt, "--yolo", "--max-turns", "10", "--output-format", "text"], - capture_output=True, - text=True, - timeout=300, # 5 minute timeout - cwd=str(Path(__file__).parent.parent), # Run from project root - ) - - if result.returncode == 0: - log(f"Wake successful: {result.stdout[:200]}") - print(f"Central response:\n{result.stdout}") - else: - log(f"Wake failed: {result.stderr}") - print(f"Error: {result.stderr}") - except subprocess.TimeoutExpired: - log("Wake timed out (5 min)") - print("Timeout: Central did not respond within 5 minutes") - except Exception as e: - log(f"Wake error: {e}") - print(f"Error: {e}") - - -def main(): - import argparse - parser = argparse.ArgumentParser(description="Wake Central for self-directed attention") - parser.add_argument("reason", nargs="?", default="Scheduled check-in", help="Reason for waking") - parser.add_argument("--critical", action="store_true", help="Mark as CRITICAL priority") - parser.add_argument("--high", action="store_true", help="Mark as HIGH priority") - parser.add_argument("--scheduled", action="store_true", help="Mark as scheduled wake-up") - - args = parser.parse_args() - - priority = "MEDIUM" - if args.critical: - priority = "CRITICAL" - elif args.high: - priority = "HIGH" - elif args.scheduled: - priority = "SCHEDULED" - - wake_central(args.reason, priority) - - -if __name__ == "__main__": - main()