Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ npx @rookdaemon/agora announce --name my-agent --version 1.0.0
# Send a signed message
npx @rookdaemon/agora send bishop "Hello from Agora"

# Start a persistent WebSocket server
npx @rookdaemon/agora serve --port 9473 --name my-server

# Verify an inbound envelope
npx @rookdaemon/agora decode '[AGORA_ENVELOPE]eyJ...'
```
Expand All @@ -54,6 +57,30 @@ Config lives at `~/.config/agora/config.json` (override with `--config` or `AGOR
- `agora send <peer> <message>` — Send a text message to a peer
- `agora send <peer> --type <type> --payload <json>` — Send a typed message with JSON payload
- `agora decode <envelope>` — Decode and verify an inbound envelope
- `agora serve [--port <port>] [--name <name>]` — Start a persistent WebSocket server for incoming peer connections

#### Server Mode (`agora serve`)

Run a persistent Agora node that accepts incoming WebSocket connections:

```bash
# Start server on default port (9473)
agora serve

# Start on custom port with name
agora serve --port 8080 --name my-relay-server
```

The server will:
- Accept incoming peer connections via WebSocket
- Automatically send announce messages to connecting peers
- Log all peer connections/disconnections and received messages
- Run until stopped with Ctrl+C

This enables:
- **Relay nodes**: Agents without public endpoints can connect to relay servers
- **Message logging**: Monitor and record all messages passing through the node
- **Always-on presence**: Maintain a persistent presence in the network

### Options
- `--config <path>` — Use a custom config file path
Expand All @@ -78,6 +105,7 @@ npm install @rookdaemon/agora
- **Signed envelopes**: every message is content-addressed and cryptographically signed
- **Peer registry**: named peers with capability discovery
- **HTTP webhook transport**: works between any OpenClaw instances (or anything that speaks HTTP)
- **WebSocket server**: persistent server mode for incoming peer connections and relay functionality
- **CLI**: everything above, from the command line

## The Problem
Expand Down
103 changes: 100 additions & 3 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { loadPeerConfig, savePeerConfig, initPeerConfig } from './transport/peer
import { sendToPeer, decodeInboundEnvelope, type PeerConfig } from './transport/http.js';
import type { MessageType } from './message/envelope.js';
import type { AnnouncePayload } from './registry/messages.js';
import { PeerServer } from './peer/server.js';

interface CliOptions {
config?: string;
Expand Down Expand Up @@ -457,6 +458,97 @@ async function handleAnnounce(options: CliOptions & { name?: string; version?: s
output({ results }, options.pretty || false);
}

/**
* Handle the `agora serve` command.
* Starts a persistent WebSocket server for incoming peer connections.
*/
async function handleServe(options: CliOptions & { port?: string; name?: string }): Promise<void> {
const configPath = getConfigPath(options);

if (!existsSync(configPath)) {
console.error('Error: Config file not found. Run `agora init` first.');
process.exit(1);
}

const config = loadPeerConfig(configPath);
const port = parseInt(options.port || '9473', 10);

// Validate port
if (isNaN(port) || port < 1 || port > 65535) {
console.error(`Error: Invalid port number '${options.port}'. Port must be between 1 and 65535.`);
process.exit(1);
}

const serverName = options.name || 'agora-server';

// Create announce payload
const announcePayload: AnnouncePayload = {
capabilities: [],
metadata: {
name: serverName,
version: '0.1.0',
},
};

// Create and configure PeerServer
const server = new PeerServer(config.identity, announcePayload);

// Setup event listeners
server.on('peer-connected', (publicKey, peer) => {
const peerName = peer.metadata?.name || publicKey.substring(0, 16);
console.log(`[${new Date().toISOString()}] Peer connected: ${peerName} (${publicKey})`);
});

server.on('peer-disconnected', (publicKey) => {
console.log(`[${new Date().toISOString()}] Peer disconnected: ${publicKey}`);
});

server.on('message-received', (envelope, fromPublicKey) => {
console.log(`[${new Date().toISOString()}] Message from ${fromPublicKey}:`);
console.log(JSON.stringify({
id: envelope.id,
type: envelope.type,
sender: envelope.sender,
timestamp: envelope.timestamp,
payload: envelope.payload,
}, null, 2));
});

server.on('error', (error) => {
console.error(`[${new Date().toISOString()}] Error:`, error.message);
});

// Start the server
try {
await server.start(port);
console.log(`[${new Date().toISOString()}] Agora server started`);
console.log(` Name: ${serverName}`);
console.log(` Public Key: ${config.identity.publicKey}`);
console.log(` WebSocket Port: ${port}`);
console.log(` Listening for peer connections...`);
console.log('');
console.log('Press Ctrl+C to stop the server');

// Keep the process alive
process.on('SIGINT', async () => {
console.log(`\n[${new Date().toISOString()}] Shutting down server...`);
await server.stop();
console.log('Server stopped');
process.exit(0);
});

process.on('SIGTERM', async () => {
console.log(`\n[${new Date().toISOString()}] Shutting down server...`);
await server.stop();
console.log('Server stopped');
process.exit(0);
});
} catch (error) {
console.error('Failed to start server:', error instanceof Error ? error.message : String(error));
process.exit(1);
}
}

/**
* Parse CLI arguments and route to appropriate handler.
*/
Expand All @@ -465,7 +557,7 @@ async function main(): Promise<void> {

if (args.length === 0) {
console.error('Usage: agora <command> [options]');
console.error('Commands: init, whoami, status, peers, announce, send, decode');
console.error('Commands: init, whoami, status, peers, announce, send, decode, serve');
process.exit(1);
}

Expand All @@ -482,6 +574,7 @@ async function main(): Promise<void> {
payload: { type: 'string' },
name: { type: 'string' },
version: { type: 'string' },
port: { type: 'string' },
},
strict: false,
allowPositionals: true,
Expand All @@ -491,7 +584,7 @@ async function main(): Promise<void> {
const subcommand = parsed.positionals[1];
const remainingArgs = parsed.positionals.slice(2);

const options: CliOptions & { type?: string; payload?: string; url?: string; token?: string; pubkey?: string; name?: string; version?: string } = {
const options: CliOptions & { type?: string; payload?: string; url?: string; token?: string; pubkey?: string; name?: string; version?: string; port?: string } = {
config: typeof parsed.values.config === 'string' ? parsed.values.config : undefined,
pretty: typeof parsed.values.pretty === 'boolean' ? parsed.values.pretty : undefined,
type: typeof parsed.values.type === 'string' ? parsed.values.type : undefined,
Expand All @@ -501,6 +594,7 @@ async function main(): Promise<void> {
pubkey: typeof parsed.values.pubkey === 'string' ? parsed.values.pubkey : undefined,
name: typeof parsed.values.name === 'string' ? parsed.values.name : undefined,
version: typeof parsed.values.version === 'string' ? parsed.values.version : undefined,
port: typeof parsed.values.port === 'string' ? parsed.values.port : undefined,
};

try {
Expand Down Expand Up @@ -541,8 +635,11 @@ async function main(): Promise<void> {
case 'decode':
handleDecode([subcommand, ...remainingArgs].filter(Boolean), options);
break;
case 'serve':
await handleServe(options);
break;
default:
console.error(`Error: Unknown command '${command}'. Use: init, whoami, status, peers, announce, send, decode`);
console.error(`Error: Unknown command '${command}'. Use: init, whoami, status, peers, announce, send, decode, serve`);
process.exit(1);
}
} catch (e) {
Expand Down
117 changes: 117 additions & 0 deletions test/cli.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,121 @@ describe('CLI', () => {
assert.ok(alice);
});
});

describe('agora serve', () => {
beforeEach(async () => {
// Initialize config
await runCli(['init', '--config', testConfigPath]);
});

it('should start server and output startup information', async () => {
return new Promise<void>((resolve, reject) => {
const child = spawn('node', [cliBin, 'serve', '--config', testConfigPath, '--port', '9999', '--name', 'test-server'], {
env: { ...process.env },
});

let stdout = '';
let timeoutId: NodeJS.Timeout;

child.stdout.on('data', (data) => {
stdout += data.toString();

// Check if server has fully started (wait for complete startup message)
if (stdout.includes('Listening for peer connections')) {
try {
// Verify startup output contains expected information
assert.ok(stdout.includes('test-server'), 'Server name should be in output');
assert.ok(stdout.includes('WebSocket Port: 9999'), 'Port should be in output');
assert.ok(stdout.includes('Public Key:'), 'Public key should be in output');
assert.ok(stdout.includes('Agora server started'), 'Server started message should be in output');

// Clean up timeout
clearTimeout(timeoutId);

// Kill the server
child.kill('SIGINT');
} catch (error) {
clearTimeout(timeoutId);
child.kill('SIGINT');
reject(error);
}
}
});

child.on('close', (code) => {
try {
// Server should exit cleanly on SIGINT
assert.strictEqual(code, 0, 'Server should exit with code 0');
assert.ok(stdout.includes('Agora server started'), 'Server should have started');
resolve();
} catch (error) {
reject(error);
}
});

// Timeout after 5 seconds
timeoutId = setTimeout(() => {
child.kill('SIGINT');
reject(new Error('Server did not start within 5 seconds'));
}, 5000);
});
});

it('should error if config not found', async () => {
const result = await runCli(['serve', '--config', join(testDir, 'nonexistent.json')]);

assert.notStrictEqual(result.exitCode, 0);
assert.ok(result.stderr.includes('Config file not found'));
});

it('should use default port 9473 when --port not specified', async () => {
return new Promise<void>((resolve, reject) => {
const child = spawn('node', [cliBin, 'serve', '--config', testConfigPath], {
env: { ...process.env },
});

let stdout = '';
let timeoutId: NodeJS.Timeout;

child.stdout.on('data', (data) => {
stdout += data.toString();

if (stdout.includes('Listening for peer connections')) {
try {
assert.ok(stdout.includes('WebSocket Port: 9473'), 'Default port should be 9473');
clearTimeout(timeoutId);
child.kill('SIGINT');
} catch (error) {
clearTimeout(timeoutId);
child.kill('SIGINT');
reject(error);
}
}
});

child.on('close', () => {
resolve();
});

timeoutId = setTimeout(() => {
child.kill('SIGINT');
reject(new Error('Server did not start within 5 seconds'));
}, 5000);
});
});

it('should error if port is invalid', async () => {
const result = await runCli(['serve', '--config', testConfigPath, '--port', 'invalid']);

assert.notStrictEqual(result.exitCode, 0);
assert.ok(result.stderr.includes('Invalid port number'));
});

it('should error if port is out of range', async () => {
const result = await runCli(['serve', '--config', testConfigPath, '--port', '99999']);

assert.notStrictEqual(result.exitCode, 0);
assert.ok(result.stderr.includes('Invalid port number'));
});
});
});