From a353b13485db9dddffc51d4df57b27c868da2606 Mon Sep 17 00:00:00 2001 From: hriztam Date: Tue, 30 Dec 2025 01:41:30 +0530 Subject: [PATCH 1/2] New IPC connection established --- daemon/src/bin/test_client.rs | 157 ++++++++++++++++ daemon/src/ipc/handlers.rs | 279 ++++++++++++++++++++++++++++ daemon/src/ipc/message.rs | 329 +++++++++++++++++++++++++++++++++ daemon/src/ipc/mod.rs | 13 +- daemon/src/ipc/protocol.rs | 137 -------------- daemon/src/ipc/server.rs | 334 +++++++++++++++++++++++----------- 6 files changed, 1000 insertions(+), 249 deletions(-) create mode 100644 daemon/src/bin/test_client.rs create mode 100644 daemon/src/ipc/handlers.rs create mode 100644 daemon/src/ipc/message.rs delete mode 100644 daemon/src/ipc/protocol.rs diff --git a/daemon/src/bin/test_client.rs b/daemon/src/bin/test_client.rs new file mode 100644 index 0000000..f81c086 --- /dev/null +++ b/daemon/src/bin/test_client.rs @@ -0,0 +1,157 @@ +//! IPC Test Client +//! +//! CLI tool for testing the daemon's IPC layer. +//! Usage: +//! test_client status - Get current mode +//! test_client set-mode - Change mode (idle|dictation|intelligent|agent) +//! test_client watch - Stream all events (heartbeats, dictation, etc.) + +use std::io::{BufRead, BufReader, Write}; +use std::os::unix::net::UnixStream; +use std::path::PathBuf; + +fn socket_path() -> PathBuf { + let home = std::env::var("HOME").expect("HOME not set"); + PathBuf::from(format!( + "{}/.local/share/second-brain/daemon.sock", + home + )) +} + +fn main() { + let args: Vec = std::env::args().collect(); + + if args.len() < 2 { + print_usage(); + std::process::exit(1); + } + + let command = &args[1]; + + match command.as_str() { + "set-mode" => { + if args.len() < 3 { + eprintln!("Error: set-mode requires a mode argument"); + eprintln!("Usage: test_client set-mode "); + std::process::exit(1); + } + set_mode(&args[2]); + } + "watch" => watch_events(), + "help" | "--help" | "-h" => print_usage(), + other => { + eprintln!("Unknown command: {}", other); + print_usage(); + std::process::exit(1); + } + } +} + +fn print_usage() { + eprintln!("IPC Test Client"); + eprintln!(); + eprintln!("Usage:"); + eprintln!(" test_client set-mode - Change mode (idle|dictation|intelligent|agent)"); + eprintln!(" test_client watch - Stream all events"); + eprintln!(" test_client help - Show this help"); +} + +fn connect() -> UnixStream { + let path = socket_path(); + println!("Connecting to {:?}...", path); + + match UnixStream::connect(&path) { + Ok(stream) => { + println!("Connected!"); + stream + } + Err(e) => { + eprintln!("Failed to connect: {}", e); + eprintln!("Is the daemon running?"); + std::process::exit(1); + } + } +} + +fn send_command(stream: &mut UnixStream, command: &str) { + println!("Sending: {}", command); + writeln!(stream, "{}", command).expect("Failed to write to socket"); + stream.flush().expect("Failed to flush socket"); +} + +fn read_response(stream: &mut UnixStream) -> String { + let mut reader = BufReader::new(stream.try_clone().expect("Failed to clone stream")); + let mut line = String::new(); + reader.read_line(&mut line).expect("Failed to read response"); + line.trim().to_string() +} + +fn set_mode(mode: &str) { + // Validate mode + let valid_modes = ["idle", "dictation", "intelligent", "agent"]; + if !valid_modes.contains(&mode) { + eprintln!("Invalid mode: {}", mode); + eprintln!("Valid modes: {:?}", valid_modes); + std::process::exit(1); + } + + let mut stream = connect(); + + // Build the command + let command = format!( + r#"{{"id":"cli-1","kind":"command","type":"SET_MODE","payload":{{"mode":"{}"}}}}"#, + mode + ); + + send_command(&mut stream, &command); + + // Read response + let response = read_response(&mut stream); + println!("Response: {}", response); + + // Pretty print if it's valid JSON + if let Ok(json) = serde_json::from_str::(&response) { + if let Ok(pretty) = serde_json::to_string_pretty(&json) { + println!("\nFormatted:"); + println!("{}", pretty); + } + } +} + +fn watch_events() { + let stream = connect(); + let reader = BufReader::new(stream); + + println!("Watching for events (Ctrl+C to stop)...\n"); + + for line in reader.lines() { + match line { + Ok(text) if !text.is_empty() => { + // Try to parse and pretty-print + if let Ok(json) = serde_json::from_str::(&text) { + let kind = json.get("kind").and_then(|v| v.as_str()).unwrap_or("?"); + let msg_type = json.get("type").and_then(|v| v.as_str()).unwrap_or("?"); + + // Color code by kind + let prefix = match kind { + "event" => "\x1b[34m[EVENT]\x1b[0m", + "response" => "\x1b[32m[RESPONSE]\x1b[0m", + "error" => "\x1b[31m[ERROR]\x1b[0m", + _ => "[???]", + }; + + println!("{} {} {}", prefix, msg_type, text); + } else { + println!("[RAW] {}", text); + } + } + Ok(_) => {} // Empty line + Err(e) => { + eprintln!("Read error: {}", e); + break; + } + } + } + + println!("\nConnection closed."); +} diff --git a/daemon/src/ipc/handlers.rs b/daemon/src/ipc/handlers.rs new file mode 100644 index 0000000..2cdb437 --- /dev/null +++ b/daemon/src/ipc/handlers.rs @@ -0,0 +1,279 @@ +//! IPC command handlers +//! +//! Processes incoming commands and produces responses. +//! All validation and state mutation logic is centralized here. + +use tracing::{debug, info, warn}; + +use super::message::{ + message_types, Envelope, InvalidModePayload, InvalidModeTransitionPayload, + MalformedMessagePayload, Mode, ModeChangedPayload, RawEnvelope, SetModePayload, + UnknownCommandPayload, +}; + +// ============================================================================= +// Command Handler Result +// ============================================================================= + +/// Result of processing a command +pub enum CommandResult { + /// Send a single response back to the client + Response(String), + /// No response needed (already handled) + NoResponse, +} + +// ============================================================================= +// Transition Validation +// ============================================================================= + +/// Checks if a mode transition is valid +/// +/// Valid transitions (per spec): +/// - idle -> any mode +/// - any mode -> idle (always allowed) +/// - dictation -> intelligent (upgrade allowed while holding keys) +/// - other transitions are invalid +pub fn is_valid_transition(from: Mode, to: Mode) -> bool { + // Same mode is always valid (no-op) + if from == to { + return true; + } + + // Transition to idle is always valid + if to == Mode::Idle { + return true; + } + + // From idle, can go to any mode + if from == Mode::Idle { + return true; + } + + // Upgrade from dictation to intelligent is valid + if from == Mode::Dictation && to == Mode::Intelligent { + return true; + } + + // All other transitions are invalid + // - dictation -> agent (must go through idle) + // - intelligent -> dictation (must release and re-press) + // - intelligent -> agent (must go through idle) + // - agent -> dictation (agent mode is toggle, must toggle off first) + // - agent -> intelligent (agent mode is toggle, must toggle off first) + false +} + +// ============================================================================= +// Command Processing +// ============================================================================= + +/// Process a raw envelope and produce a response +/// +/// Returns a JSON string to send back to the client. +/// The current mode is passed by reference and may be updated. +pub fn handle_command(raw: &RawEnvelope, current_mode: &mut Mode) -> CommandResult { + debug!( + id = %raw.id, + kind = ?raw.kind, + message_type = %raw.message_type, + "processing command" + ); + + let response_json = match raw.message_type.as_str() { + message_types::SET_MODE => handle_set_mode(raw, current_mode), + unknown => { + warn!(command = %unknown, "unknown command type"); + make_unknown_command_error(&raw.id, unknown) + } + }; + + CommandResult::Response(response_json) +} + +/// Handle SET_MODE command +fn handle_set_mode(raw: &RawEnvelope, current_mode: &mut Mode) -> String { + // Parse the payload + let payload: SetModePayload = match serde_json::from_value(raw.payload.clone()) { + Ok(p) => p, + Err(e) => { + warn!(error = %e, "failed to parse SET_MODE payload"); + return make_malformed_message_error(&raw.id, &format!("invalid SET_MODE payload: {}", e)); + } + }; + + let target_mode = payload.mode; + let from_mode = *current_mode; + + info!( + from = %from_mode, + to = %target_mode, + "SET_MODE request" + ); + + // Validate the transition + if !is_valid_transition(from_mode, target_mode) { + info!( + from = %from_mode, + to = %target_mode, + "rejecting invalid mode transition" + ); + return make_invalid_transition_error(&raw.id, from_mode, target_mode); + } + + // Apply the transition + *current_mode = target_mode; + info!(mode = %target_mode, "mode changed successfully"); + + // Build success response + let response = Envelope::response( + &raw.id, + message_types::MODE_CHANGED, + ModeChangedPayload { mode: target_mode }, + ); + + serde_json::to_string(&response).unwrap_or_else(|e| { + make_malformed_message_error(&raw.id, &format!("serialization error: {}", e)) + }) +} + +// ============================================================================= +// Error Response Helpers +// ============================================================================= + +fn make_invalid_transition_error(request_id: &str, from: Mode, to: Mode) -> String { + let response = Envelope::error( + request_id, + message_types::INVALID_MODE_TRANSITION, + InvalidModeTransitionPayload { from, to }, + ); + serde_json::to_string(&response).expect("error serialization should not fail") +} + +fn make_invalid_mode_error(request_id: &str, provided: &str) -> String { + let response = Envelope::error( + request_id, + message_types::INVALID_MODE, + InvalidModePayload { provided: provided.to_string() }, + ); + serde_json::to_string(&response).expect("error serialization should not fail") +} + +fn make_malformed_message_error(request_id: &str, reason: &str) -> String { + let response = Envelope::error( + request_id, + message_types::MALFORMED_MESSAGE, + MalformedMessagePayload { reason: reason.to_string() }, + ); + serde_json::to_string(&response).expect("error serialization should not fail") +} + +fn make_unknown_command_error(request_id: &str, command: &str) -> String { + let response = Envelope::error( + request_id, + message_types::UNKNOWN_COMMAND, + UnknownCommandPayload { command: command.to_string() }, + ); + serde_json::to_string(&response).expect("error serialization should not fail") +} + +/// Create a malformed message error for parse failures (used by server) +pub fn make_parse_error(request_id: &str, reason: &str) -> String { + make_malformed_message_error(request_id, reason) +} + +// ============================================================================= +// Tests +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + use crate::ipc::message::MessageKind; + + fn make_set_mode_command(id: &str, mode: &str) -> RawEnvelope { + let json = format!( + r#"{{"id":"{}","kind":"command","type":"SET_MODE","payload":{{"mode":"{}"}}}}"#, + id, mode + ); + RawEnvelope::from_line(&json).unwrap() + } + + #[test] + fn test_valid_transitions() { + // From idle + assert!(is_valid_transition(Mode::Idle, Mode::Dictation)); + assert!(is_valid_transition(Mode::Idle, Mode::Intelligent)); + assert!(is_valid_transition(Mode::Idle, Mode::Agent)); + + // To idle + assert!(is_valid_transition(Mode::Dictation, Mode::Idle)); + assert!(is_valid_transition(Mode::Intelligent, Mode::Idle)); + assert!(is_valid_transition(Mode::Agent, Mode::Idle)); + + // Dictation upgrade + assert!(is_valid_transition(Mode::Dictation, Mode::Intelligent)); + + // Same mode + assert!(is_valid_transition(Mode::Dictation, Mode::Dictation)); + } + + #[test] + fn test_invalid_transitions() { + // Cannot go directly between non-idle modes (except dictation->intelligent) + assert!(!is_valid_transition(Mode::Dictation, Mode::Agent)); + assert!(!is_valid_transition(Mode::Intelligent, Mode::Dictation)); + assert!(!is_valid_transition(Mode::Intelligent, Mode::Agent)); + assert!(!is_valid_transition(Mode::Agent, Mode::Dictation)); + assert!(!is_valid_transition(Mode::Agent, Mode::Intelligent)); + } + + #[test] + fn test_handle_set_mode_success() { + let mut current_mode = Mode::Idle; + let raw = make_set_mode_command("1", "dictation"); + + let result = handle_command(&raw, &mut current_mode); + + assert!(matches!(result, CommandResult::Response(_))); + assert_eq!(current_mode, Mode::Dictation); + + if let CommandResult::Response(json) = result { + assert!(json.contains("MODE_CHANGED")); + assert!(json.contains("\"kind\":\"response\"")); + assert!(json.contains("\"mode\":\"dictation\"")); + } + } + + #[test] + fn test_handle_set_mode_invalid_transition() { + let mut current_mode = Mode::Agent; + let raw = make_set_mode_command("1", "dictation"); + + let result = handle_command(&raw, &mut current_mode); + + // Mode should not change + assert_eq!(current_mode, Mode::Agent); + + if let CommandResult::Response(json) = result { + assert!(json.contains("INVALID_MODE_TRANSITION")); + assert!(json.contains("\"kind\":\"error\"")); + assert!(json.contains("\"from\":\"agent\"")); + assert!(json.contains("\"to\":\"dictation\"")); + } + } + + #[test] + fn test_handle_unknown_command() { + let mut current_mode = Mode::Idle; + let json = r#"{"id":"1","kind":"command","type":"UNKNOWN_THING","payload":{}}"#; + let raw = RawEnvelope::from_line(json).unwrap(); + + let result = handle_command(&raw, &mut current_mode); + + if let CommandResult::Response(json) = result { + assert!(json.contains("UNKNOWN_COMMAND")); + assert!(json.contains("\"kind\":\"error\"")); + } + } +} diff --git a/daemon/src/ipc/message.rs b/daemon/src/ipc/message.rs new file mode 100644 index 0000000..a1b687b --- /dev/null +++ b/daemon/src/ipc/message.rs @@ -0,0 +1,329 @@ +//! IPC message types using unified envelope format +//! +//! All messages use newline-delimited JSON (JSONL) with a common envelope: +//! ```json +//! {"id": "uuid", "kind": "command|event|response|error", "type": "MESSAGE_TYPE", "payload": {...}} +//! ``` + +use serde::{Deserialize, Serialize}; + +use crate::state::State; + +// ============================================================================= +// Message Envelope +// ============================================================================= + +/// The kind of message being sent +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum MessageKind { + /// Client -> Server: Request an action + Command, + /// Server -> Client: Push notification (streaming, heartbeat) + Event, + /// Server -> Client: Success response to a command + Response, + /// Server -> Client: Error response to a command + Error, +} + +/// Universal message envelope +/// +/// Every IPC message follows this structure. The payload is generic +/// to allow different content based on message type. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Envelope

{ + /// Unique identifier for request/response correlation + pub id: String, + /// Message classification + pub kind: MessageKind, + /// Specific message type (e.g., "SET_MODE", "MODE_CHANGED") + #[serde(rename = "type")] + pub message_type: String, + /// Type-specific payload + pub payload: P, +} + +impl

Envelope

{ + /// Create a new envelope + pub fn new(id: impl Into, kind: MessageKind, message_type: impl Into, payload: P) -> Self { + Self { + id: id.into(), + kind, + message_type: message_type.into(), + payload, + } + } + + /// Create a response envelope for a given request ID + pub fn response(request_id: impl Into, message_type: impl Into, payload: P) -> Self { + Self::new(request_id, MessageKind::Response, message_type, payload) + } + + /// Create an error envelope for a given request ID + pub fn error(request_id: impl Into, message_type: impl Into, payload: P) -> Self { + Self::new(request_id, MessageKind::Error, message_type, payload) + } + + /// Create an event envelope with a new ID + pub fn event(id: impl Into, message_type: impl Into, payload: P) -> Self { + Self::new(id, MessageKind::Event, message_type, payload) + } +} + +// ============================================================================= +// Message Type Constants +// ============================================================================= + +pub mod message_types { + // Commands (Client -> Server) + pub const SET_MODE: &str = "SET_MODE"; + + // Responses (Server -> Client) + pub const MODE_CHANGED: &str = "MODE_CHANGED"; + + // Errors (Server -> Client) + pub const INVALID_MODE_TRANSITION: &str = "INVALID_MODE_TRANSITION"; + pub const INVALID_MODE: &str = "INVALID_MODE"; + pub const MALFORMED_MESSAGE: &str = "MALFORMED_MESSAGE"; + pub const UNKNOWN_COMMAND: &str = "UNKNOWN_COMMAND"; + + // Events (Server -> Client) + pub const DICTATION_PARTIAL: &str = "DICTATION_PARTIAL"; + pub const DICTATION_FINAL: &str = "DICTATION_FINAL"; + pub const HEARTBEAT: &str = "HEARTBEAT"; +} + +// ============================================================================= +// Mode +// ============================================================================= + +/// Operating modes of the daemon +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum Mode { + Idle, + Dictation, + Intelligent, + Agent, +} + +impl Default for Mode { + fn default() -> Self { + Self::Idle + } +} + +impl std::fmt::Display for Mode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Mode::Idle => write!(f, "idle"), + Mode::Dictation => write!(f, "dictation"), + Mode::Intelligent => write!(f, "intelligent"), + Mode::Agent => write!(f, "agent"), + } + } +} + +impl From for Mode { + fn from(state: State) -> Self { + match state { + State::Idle => Mode::Idle, + State::DictationActive => Mode::Dictation, + State::IntelligentActive => Mode::Intelligent, + State::AgentActive => Mode::Agent, + } + } +} + +impl Mode { + /// Parse a mode from a string (case-insensitive) + pub fn from_str(s: &str) -> Option { + match s.to_lowercase().as_str() { + "idle" => Some(Mode::Idle), + "dictation" => Some(Mode::Dictation), + "intelligent" => Some(Mode::Intelligent), + "agent" => Some(Mode::Agent), + _ => None, + } + } +} + +// ============================================================================= +// Command Payloads (Client -> Server) +// ============================================================================= + +/// Payload for SET_MODE command +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SetModePayload { + pub mode: Mode, +} + +// ============================================================================= +// Response Payloads (Server -> Client) +// ============================================================================= + +/// Payload for MODE_CHANGED response +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ModeChangedPayload { + pub mode: Mode, +} + +// ============================================================================= +// Error Payloads (Server -> Client) +// ============================================================================= + +/// Payload for INVALID_MODE_TRANSITION error +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InvalidModeTransitionPayload { + pub from: Mode, + pub to: Mode, +} + +/// Payload for INVALID_MODE error +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InvalidModePayload { + pub provided: String, +} + +/// Payload for MALFORMED_MESSAGE error +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MalformedMessagePayload { + pub reason: String, +} + +/// Payload for UNKNOWN_COMMAND error +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UnknownCommandPayload { + pub command: String, +} + +// ============================================================================= +// Event Payloads (Server -> Client) +// ============================================================================= + +/// Payload for DICTATION_PARTIAL and DICTATION_FINAL events +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DictationPayload { + pub text: String, +} + +/// Payload for HEARTBEAT event +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HeartbeatPayload { + pub uptime_ms: u64, +} + +// ============================================================================= +// Raw Message Parsing +// ============================================================================= + +/// Raw envelope for initial parsing (payload as generic Value) +#[derive(Debug, Clone, Deserialize)] +pub struct RawEnvelope { + pub id: String, + pub kind: MessageKind, + #[serde(rename = "type")] + pub message_type: String, + pub payload: serde_json::Value, +} + +impl RawEnvelope { + /// Parse from a JSON line + pub fn from_line(line: &str) -> Result { + serde_json::from_str(line) + } +} + +// ============================================================================= +// Tests +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_envelope_serialization() { + let envelope = Envelope::new( + "test-1", + MessageKind::Command, + message_types::SET_MODE, + SetModePayload { mode: Mode::Dictation }, + ); + + let json = serde_json::to_string(&envelope).unwrap(); + assert!(json.contains("\"id\":\"test-1\"")); + assert!(json.contains("\"kind\":\"command\"")); + assert!(json.contains("\"type\":\"SET_MODE\"")); + assert!(json.contains("\"mode\":\"dictation\"")); + } + + #[test] + fn test_raw_envelope_parsing() { + let json = r#"{"id":"1","kind":"command","type":"SET_MODE","payload":{"mode":"dictation"}}"#; + let raw = RawEnvelope::from_line(json).unwrap(); + + assert_eq!(raw.id, "1"); + assert_eq!(raw.kind, MessageKind::Command); + assert_eq!(raw.message_type, "SET_MODE"); + } + + #[test] + fn test_mode_response_envelope() { + let envelope = Envelope::response( + "1", + message_types::MODE_CHANGED, + ModeChangedPayload { mode: Mode::Dictation }, + ); + + let json = serde_json::to_string(&envelope).unwrap(); + assert!(json.contains("\"kind\":\"response\"")); + assert!(json.contains("\"type\":\"MODE_CHANGED\"")); + } + + #[test] + fn test_error_envelope() { + let envelope = Envelope::error( + "1", + message_types::INVALID_MODE_TRANSITION, + InvalidModeTransitionPayload { + from: Mode::Dictation, + to: Mode::Agent, + }, + ); + + let json = serde_json::to_string(&envelope).unwrap(); + assert!(json.contains("\"kind\":\"error\"")); + assert!(json.contains("\"type\":\"INVALID_MODE_TRANSITION\"")); + assert!(json.contains("\"from\":\"dictation\"")); + assert!(json.contains("\"to\":\"agent\"")); + } + + #[test] + fn test_heartbeat_event() { + let envelope = Envelope::event( + "sys", + message_types::HEARTBEAT, + HeartbeatPayload { uptime_ms: 123456 }, + ); + + let json = serde_json::to_string(&envelope).unwrap(); + assert!(json.contains("\"kind\":\"event\"")); + assert!(json.contains("\"type\":\"HEARTBEAT\"")); + assert!(json.contains("\"uptime_ms\":123456")); + } + + #[test] + fn test_dictation_event() { + let envelope = Envelope::event( + "stream-1", + message_types::DICTATION_PARTIAL, + DictationPayload { text: "hello this is".to_string() }, + ); + + let json = serde_json::to_string(&envelope).unwrap(); + assert!(json.contains("\"type\":\"DICTATION_PARTIAL\"")); + assert!(json.contains("\"text\":\"hello this is\"")); + } +} diff --git a/daemon/src/ipc/mod.rs b/daemon/src/ipc/mod.rs index 7da4c5f..b19a7cd 100644 --- a/daemon/src/ipc/mod.rs +++ b/daemon/src/ipc/mod.rs @@ -1,7 +1,16 @@ //! IPC module for daemon-UI communication +//! +//! Uses newline-delimited JSON (JSONL) with a unified message envelope. +//! See `message.rs` for the envelope format and message types. -mod protocol; +mod handlers; +mod message; mod server; -pub use protocol::{Request, Response, DaemonStatus, Mode, Notification}; +// Re-export the public API +pub use message::{ + message_types, DictationPayload, Envelope, HeartbeatPayload, InvalidModePayload, + InvalidModeTransitionPayload, MalformedMessagePayload, MessageKind, Mode, + ModeChangedPayload, RawEnvelope, SetModePayload, UnknownCommandPayload, +}; pub use server::Server; diff --git a/daemon/src/ipc/protocol.rs b/daemon/src/ipc/protocol.rs deleted file mode 100644 index 27ca655..0000000 --- a/daemon/src/ipc/protocol.rs +++ /dev/null @@ -1,137 +0,0 @@ -//! IPC message protocol definitions -//! -//! All messages are JSON-encoded, prefixed with a 4-byte little-endian length. - -use serde::{Deserialize, Serialize}; - -use crate::events::StateEvent; -use crate::state::State; - -/// Current operating mode of the daemon -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum Mode { - /// No active mode, waiting for hotkey - Idle, - /// Dictation mode: low-latency transcription - Dictation, - /// Intelligent mode: LLM response generation - Intelligent, - /// Agent mode: multi-step task execution - Agent, -} - -impl Default for Mode { - fn default() -> Self { - Self::Idle - } -} - -/// Requests from UI to daemon -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum Request { - /// Request current daemon status - GetStatus, - - /// Set the active mode - SetMode { mode: Mode }, - - /// Ping to check connectivity - Ping, - - /// Subscribe to state change notifications - Subscribe, -} - -/// Responses from daemon to UI -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum Response { - /// Current daemon status - Status(DaemonStatus), - - /// Mode change notification - ModeChange { mode: Mode, active: bool }, - - /// Pong response to ping - Pong, - - /// Subscription confirmed - Subscribed, - - /// Error response - Error { code: String, message: String }, -} - -/// Push notification from daemon to UI (for subscribed clients) -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum Notification { - /// Mode has changed - ModeChanged { - mode: Mode, - previous: Mode, - }, - /// State event occurred - StateEvent(StateEvent), -} - -/// Full daemon status snapshot -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DaemonStatus { - /// Daemon version - pub version: String, - - /// Current mode - pub mode: Mode, - - /// Whether hotkey is registered - pub hotkey_registered: bool, - - /// Uptime in seconds - pub uptime_secs: u64, -} - -impl Default for DaemonStatus { - fn default() -> Self { - Self { - version: env!("CARGO_PKG_VERSION").to_string(), - mode: Mode::default(), - hotkey_registered: false, - uptime_secs: 0, - } - } -} - -/// Convert internal State to IPC Mode -impl From for Mode { - fn from(state: State) -> Self { - match state { - State::Idle => Mode::Idle, - State::DictationActive => Mode::Dictation, - State::IntelligentActive => Mode::Intelligent, - State::AgentActive => Mode::Agent, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_request_serialization() { - let req = Request::SetMode { mode: Mode::Dictation }; - let json = serde_json::to_string(&req).unwrap(); - assert!(json.contains("set_mode")); - assert!(json.contains("dictation")); - } - - #[test] - fn test_response_serialization() { - let resp = Response::Status(DaemonStatus::default()); - let json = serde_json::to_string(&resp).unwrap(); - assert!(json.contains("status")); - } -} diff --git a/daemon/src/ipc/server.rs b/daemon/src/ipc/server.rs index 0431f7b..82a96e5 100644 --- a/daemon/src/ipc/server.rs +++ b/daemon/src/ipc/server.rs @@ -1,21 +1,46 @@ //! Unix domain socket server for IPC //! -//! Provides request-response communication and push notifications for -//! state change events to subscribed clients. +//! Uses newline-delimited JSON (JSONL) for all communication. +//! One task per client connection, with heartbeat and streaming support. use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::{Duration, Instant}; use anyhow::{Context, Result}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{broadcast, RwLock}; +use tokio::time::interval; use tracing::{debug, error, info, warn}; -use crate::events::StateEvent; -use crate::state::State; +use super::handlers::{handle_command, make_parse_error, CommandResult}; +use super::message::{ + message_types, DictationPayload, Envelope, HeartbeatPayload, MessageKind, Mode, RawEnvelope, +}; -use super::protocol::{DaemonStatus, Mode, Notification, Request, Response}; +// ============================================================================= +// Configuration +// ============================================================================= + +/// Heartbeat interval +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); + +/// Simulated dictation interval (for fake streaming) +const DICTATION_INTERVAL: Duration = Duration::from_millis(500); + +/// Simulated dictation phrases +const DICTATION_PHRASES: &[&str] = &[ + "hello", + "hello this", + "hello this is", + "hello this is a", + "hello this is a test", +]; + +// ============================================================================= +// Server +// ============================================================================= /// IPC Server handling client connections pub struct Server { @@ -23,16 +48,14 @@ pub struct Server { listener: Option, state: Arc>, shutdown_tx: broadcast::Sender<()>, - /// Channel for receiving state events to broadcast to subscribed clients - event_rx: Option>, } /// Shared server state struct ServerState { - status: DaemonStatus, - start_time: std::time::Instant, - /// Current internal state (for mode tracking) - current_state: State, + /// Current operating mode + current_mode: Mode, + /// Server start time for uptime calculation + start_time: Instant, } impl Server { @@ -40,18 +63,16 @@ impl Server { pub fn new(socket_path: &Path) -> Result { // Ensure parent directory exists if let Some(parent) = socket_path.parent() { - std::fs::create_dir_all(parent) - .context("failed to create socket directory")?; + std::fs::create_dir_all(parent).context("failed to create socket directory")?; } // Remove stale socket if it exists if socket_path.exists() { - std::fs::remove_file(socket_path) - .context("failed to remove stale socket")?; + std::fs::remove_file(socket_path).context("failed to remove stale socket")?; } - let listener = UnixListener::bind(socket_path) - .context("failed to bind Unix socket")?; + let listener = + UnixListener::bind(socket_path).context("failed to bind Unix socket")?; // Set socket permissions to owner-only (0600) #[cfg(unix)] @@ -63,9 +84,8 @@ impl Server { let (shutdown_tx, _) = broadcast::channel(1); let state = Arc::new(RwLock::new(ServerState { - status: DaemonStatus::default(), - start_time: std::time::Instant::now(), - current_state: State::Idle, + current_mode: Mode::default(), + start_time: Instant::now(), })); info!(?socket_path, "IPC server listening"); @@ -75,52 +95,57 @@ impl Server { listener: Some(listener), state, shutdown_tx, - event_rx: None, }) } - /// Create a new IPC server with state event subscription - pub fn with_events(socket_path: &Path, event_rx: broadcast::Receiver) -> Result { - let mut server = Self::new(socket_path)?; - server.event_rx = Some(event_rx); + /// Create server with initial mode + pub fn with_mode(socket_path: &Path, mode: Mode) -> Result { + let server = Self::new(socket_path)?; + { + // Set initial mode synchronously since we're in construction + let state = server.state.clone(); + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + let mut s = state.write().await; + s.current_mode = mode; + }); + }); + } Ok(server) } - /// Update the current mode in server state - pub async fn set_state(&self, state: State) { - let mut server_state = self.state.write().await; - let old_state = server_state.current_state; - server_state.current_state = state; - server_state.status.mode = state.into(); - server_state.status.hotkey_registered = true; - - if old_state != state { - info!( - from = ?old_state, - to = ?state, - "IPC server: mode updated" - ); + /// Get current mode + pub async fn get_mode(&self) -> Mode { + self.state.read().await.current_mode + } + + /// Set current mode (for external state machine updates) + pub async fn set_mode(&self, mode: Mode) { + let mut state = self.state.write().await; + if state.current_mode != mode { + info!(from = %state.current_mode, to = %mode, "mode updated externally"); + state.current_mode = mode; } } /// Run the server, accepting connections pub async fn run(&self) -> Result<()> { - let listener = self.listener.as_ref() - .context("server not initialized")?; + let listener = self.listener.as_ref().context("server not initialized")?; loop { match listener.accept().await { Ok((stream, _addr)) => { - debug!("client connected"); + info!("client connected"); let state = Arc::clone(&self.state); let mut shutdown_rx = self.shutdown_tx.subscribe(); - + tokio::spawn(async move { tokio::select! { result = Self::handle_client(stream, state) => { if let Err(e) = result { warn!(?e, "client handler error"); } + info!("client disconnected"); } _ = shutdown_rx.recv() => { debug!("client handler shutting down"); @@ -136,97 +161,186 @@ impl Server { } /// Handle a single client connection - async fn handle_client(mut stream: UnixStream, state: Arc>) -> Result<()> { - let mut len_buf = [0u8; 4]; - let mut is_subscribed = false; + async fn handle_client(stream: UnixStream, state: Arc>) -> Result<()> { + let (reader, writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + let writer = Arc::new(tokio::sync::Mutex::new(writer)); + + let mut line = String::new(); + let mut heartbeat_timer = interval(HEARTBEAT_INTERVAL); + let mut dictation_timer = interval(DICTATION_INTERVAL); + let mut dictation_phrase_index = 0; + let mut stream_id = 0u64; loop { - // Read message length (4-byte little-endian) - match stream.read_exact(&mut len_buf).await { - Ok(_) => {} - Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { - debug!("client disconnected"); - return Ok(()); + tokio::select! { + // Read incoming commands + result = reader.read_line(&mut line) => { + match result { + Ok(0) => { + // EOF - client disconnected + return Ok(()); + } + Ok(_) => { + let trimmed = line.trim(); + if !trimmed.is_empty() { + Self::process_line(trimmed, &state, &writer).await?; + } + line.clear(); + } + Err(e) => { + warn!(?e, "read error"); + return Err(e.into()); + } + } + } + + // Send heartbeat + _ = heartbeat_timer.tick() => { + let uptime_ms = { + let s = state.read().await; + s.start_time.elapsed().as_millis() as u64 + }; + + let heartbeat = Envelope::event( + "sys", + message_types::HEARTBEAT, + HeartbeatPayload { uptime_ms }, + ); + + Self::send_message(&writer, &heartbeat).await?; + debug!(uptime_ms, "sent heartbeat"); + } + + // Simulate dictation streaming when in dictation mode + _ = dictation_timer.tick() => { + let current_mode = { + state.read().await.current_mode + }; + + if current_mode == Mode::Dictation { + let phrase = DICTATION_PHRASES[dictation_phrase_index]; + let is_final = dictation_phrase_index == DICTATION_PHRASES.len() - 1; + let stream_id_str = format!("stream-{}", stream_id); + + let message_type = if is_final { + message_types::DICTATION_FINAL + } else { + message_types::DICTATION_PARTIAL + }; + + let event = Envelope::event( + &stream_id_str, + message_type, + DictationPayload { text: phrase.to_string() }, + ); + + Self::send_message(&writer, &event).await?; + debug!(text = phrase, is_final, "sent dictation event"); + + if is_final { + // Reset for next cycle + dictation_phrase_index = 0; + stream_id += 1; + } else { + dictation_phrase_index += 1; + } + } else { + // Reset when not in dictation mode + dictation_phrase_index = 0; + } } - Err(e) => return Err(e.into()), } + } + } + + /// Process a single JSON line + async fn process_line( + line: &str, + state: &Arc>, + writer: &Arc>, + ) -> Result<()> { + debug!(line, "received message"); - let len = u32::from_le_bytes(len_buf) as usize; - if len > 1024 * 1024 { - warn!(len, "message too large, disconnecting"); + // Parse the raw envelope + let raw = match RawEnvelope::from_line(line) { + Ok(r) => r, + Err(e) => { + warn!(?e, "failed to parse message"); + let error_json = make_parse_error("unknown", &format!("parse error: {}", e)); + Self::send_line(writer, &error_json).await?; return Ok(()); } + }; - // Read message body - let mut msg_buf = vec![0u8; len]; - stream.read_exact(&mut msg_buf).await?; - - // Parse request - let request: Request = serde_json::from_slice(&msg_buf) - .context("failed to parse request")?; - - debug!(?request, "received request"); - - // Process request - let (response, subscribe) = Self::process_request(request, &state).await; - if subscribe { - is_subscribed = true; - debug!("client subscribed to notifications"); - } + // Only process commands + if raw.kind != MessageKind::Command { + debug!(kind = ?raw.kind, "ignoring non-command message"); + return Ok(()); + } + + // Get current mode and process command + let response = { + let mut s = state.write().await; + handle_command(&raw, &mut s.current_mode) + }; - // Send response - Self::send_message(&mut stream, &response).await?; + // Send response if any + if let CommandResult::Response(json) = response { + Self::send_line(writer, &json).await?; } - } - /// Send a length-prefixed JSON message - async fn send_message(stream: &mut UnixStream, msg: &T) -> Result<()> { - let msg_bytes = serde_json::to_vec(msg)?; - let msg_len = (msg_bytes.len() as u32).to_le_bytes(); - - stream.write_all(&msg_len).await?; - stream.write_all(&msg_bytes).await?; - Ok(()) } - /// Process a request and return a response - /// Returns (Response, should_subscribe) - async fn process_request(request: Request, state: &Arc>) -> (Response, bool) { - match request { - Request::Ping => (Response::Pong, false), - - Request::GetStatus => { - let mut state = state.write().await; - state.status.uptime_secs = state.start_time.elapsed().as_secs(); - (Response::Status(state.status.clone()), false) - } - - Request::SetMode { mode } => { - let mut state = state.write().await; - let old_mode = state.status.mode; - state.status.mode = mode; - info!(?old_mode, ?mode, "mode changed via IPC"); - (Response::ModeChange { mode, active: mode != Mode::Idle }, false) - } - - Request::Subscribe => { - (Response::Subscribed, true) - } - } + /// Send a serializable message as JSONL + async fn send_message( + writer: &Arc>, + message: &T, + ) -> Result<()> { + let json = serde_json::to_string(message)?; + Self::send_line(writer, &json).await + } + + /// Send a raw JSON string as a line + async fn send_line( + writer: &Arc>, + json: &str, + ) -> Result<()> { + let mut w = writer.lock().await; + w.write_all(json.as_bytes()).await?; + w.write_all(b"\n").await?; + w.flush().await?; + Ok(()) } /// Gracefully shutdown the server pub async fn shutdown(&self) { let _ = self.shutdown_tx.send(()); - + // Remove socket file if self.socket_path.exists() { if let Err(e) = std::fs::remove_file(&self.socket_path) { warn!(?e, "failed to remove socket file"); } } - + info!("IPC server shutdown complete"); } } + +// For compatibility with existing main.rs +impl Server { + /// Create a new IPC server with event subscription (compatibility shim) + pub fn with_events( + socket_path: &Path, + _event_rx: broadcast::Receiver, + ) -> Result { + Self::new(socket_path) + } + + /// Update the current mode in server state (compatibility with main.rs) + pub async fn set_state(&self, state: crate::state::State) { + self.set_mode(state.into()).await; + } +} From 63b350e44e9dce74e99b82b976b94262f7515440 Mon Sep 17 00:00:00 2001 From: hriztam Date: Tue, 30 Dec 2025 01:42:30 +0530 Subject: [PATCH 2/2] swift changes --- SecondBrainUI/Sources/IPC/IPCClient.swift | 350 ++++++++++++++++++++ SecondBrainUI/Sources/IPC/IPCHandlers.swift | 96 ++++++ SecondBrainUI/Sources/IPC/IPCMessage.swift | 257 ++++++++++++++ 3 files changed, 703 insertions(+) create mode 100644 SecondBrainUI/Sources/IPC/IPCClient.swift create mode 100644 SecondBrainUI/Sources/IPC/IPCHandlers.swift create mode 100644 SecondBrainUI/Sources/IPC/IPCMessage.swift diff --git a/SecondBrainUI/Sources/IPC/IPCClient.swift b/SecondBrainUI/Sources/IPC/IPCClient.swift new file mode 100644 index 0000000..90d387e --- /dev/null +++ b/SecondBrainUI/Sources/IPC/IPCClient.swift @@ -0,0 +1,350 @@ +// +// IPCClient.swift +// SecondBrain +// +// Unix domain socket client for daemon communication. +// Uses newline-delimited JSON (JSONL) with persistent connection. +// + +import Foundation +import Network +import os.log + +/// Client for communicating with the second-brain daemon over Unix domain socket +class IPCClient { + + // MARK: - Types + + /// Connection state + enum ConnectionState { + case disconnected + case connecting + case connected + case failed(Error) + } + + /// Delegate for receiving IPC events + weak var delegate: IPCClientDelegate? + + // MARK: - Properties + + private let socketPath: String + private var connection: NWConnection? + private let queue = DispatchQueue(label: "com.secondbrain.ipc-client", qos: .userInitiated) + private let logger = Logger(subsystem: "com.secondbrain", category: "IPCClient") + + /// Buffer for incomplete lines + private var lineBuffer = Data() + + /// Current connection state + private(set) var state: ConnectionState = .disconnected + + /// Counter for generating request IDs + private var requestCounter: UInt64 = 0 + + /// Pending responses keyed by request ID + private var pendingRequests: [String: CheckedContinuation] = [:] + private let pendingLock = NSLock() + + // MARK: - Initialization + + init() { + // Socket path matches daemon config + let home = FileManager.default.homeDirectoryForCurrentUser.path + self.socketPath = "\(home)/.local/share/second-brain/daemon.sock" + } + + // MARK: - Connection Management + + /// Connect to the daemon + func connect() async throws { + guard state != .connected else { return } + + state = .connecting + logger.info("Connecting to daemon at \(self.socketPath)") + + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let endpoint = NWEndpoint.unix(path: socketPath) + let parameters = NWParameters.tcp + parameters.allowLocalEndpointReuse = true + + connection = NWConnection(to: endpoint, using: parameters) + + var didResume = false + connection?.stateUpdateHandler = { [weak self] newState in + guard let self = self, !didResume else { return } + + switch newState { + case .ready: + didResume = true + self.state = .connected + self.logger.info("Connected to daemon") + self.startReadLoop() + continuation.resume() + + case .failed(let error): + didResume = true + self.state = .failed(error) + self.logger.error("Connection failed: \(error.localizedDescription)") + continuation.resume(throwing: IPCError.connectionFailed(error)) + + case .cancelled: + didResume = true + self.state = .disconnected + continuation.resume(throwing: IPCError.notConnected) + + case .waiting(let error): + self.logger.warning("Connection waiting: \(error.localizedDescription)") + + default: + break + } + } + + connection?.start(queue: queue) + } + } + + /// Disconnect from the daemon + func disconnect() { + logger.info("Disconnecting from daemon") + connection?.cancel() + connection = nil + state = .disconnected + + // Cancel all pending requests + pendingLock.lock() + let pending = pendingRequests + pendingRequests.removeAll() + pendingLock.unlock() + + for (_, continuation) in pending { + continuation.resume(throwing: IPCError.notConnected) + } + } + + // MARK: - Read Loop + + private func startReadLoop() { + guard let connection = connection else { return } + + readData(from: connection) + } + + private func readData(from connection: NWConnection) { + connection.receive(minimumIncompleteLength: 1, maximumLength: 65536) { [weak self] data, _, isComplete, error in + guard let self = self else { return } + + if let error = error { + self.logger.error("Read error: \(error.localizedDescription)") + self.handleDisconnect() + return + } + + if let data = data, !data.isEmpty { + self.processIncomingData(data) + } + + if isComplete { + self.logger.info("Connection closed by daemon") + self.handleDisconnect() + return + } + + // Continue reading + self.readData(from: connection) + } + } + + private func processIncomingData(_ data: Data) { + lineBuffer.append(data) + + // Process complete lines + while let newlineIndex = lineBuffer.firstIndex(of: UInt8(ascii: "\n")) { + let lineData = lineBuffer[lineBuffer.startIndex.. String { + requestCounter += 1 + return "swift-\(requestCounter)" + } + + /// Send a command and wait for a response + private func sendCommand(type: String, payload: P) async throws -> RawEnvelope { + guard let connection = connection, state == .connected else { + throw IPCError.notConnected + } + + let requestId = nextRequestId() + let envelope = IPCEnvelope.command(id: requestId, type: type, payload: payload) + + // Encode to JSON line + let jsonString = try envelope.toJSONString() + let lineData = (jsonString + "\n").data(using: .utf8)! + + logger.debug("Sending: \(jsonString)") + + // Register for response before sending + let response: RawEnvelope = try await withCheckedThrowingContinuation { continuation in + pendingLock.lock() + pendingRequests[requestId] = continuation + pendingLock.unlock() + + connection.send(content: lineData, completion: .contentProcessed { [weak self] error in + if let error = error { + self?.pendingLock.lock() + self?.pendingRequests.removeValue(forKey: requestId) + self?.pendingLock.unlock() + continuation.resume(throwing: error) + } + }) + } + + return response + } + + // MARK: - High-Level API + + /// Set the daemon mode + func setMode(_ mode: Mode) async throws { + let response = try await sendCommand( + type: MessageType.setMode, + payload: SetModePayload(mode: mode) + ) + + switch response.kind { + case .response: + if response.type == MessageType.modeChanged { + logger.info("Mode changed to \(mode.rawValue)") + } + case .error: + throw IPCError.daemonError(type: response.type, payload: response.payload) + default: + throw IPCError.unexpectedMessageType(response.type) + } + } +} + +// MARK: - Delegate Protocol + +/// Protocol for receiving IPC events +protocol IPCClientDelegate: AnyObject { + /// Called when a heartbeat is received + func ipcClient(_ client: IPCClient, didReceiveHeartbeat uptimeMs: UInt64) + + /// Called when a dictation partial result is received + func ipcClient(_ client: IPCClient, didReceiveDictationPartial text: String, streamId: String) + + /// Called when a dictation final result is received + func ipcClient(_ client: IPCClient, didReceiveDictationFinal text: String, streamId: String) + + /// Called when a mode change notification is received + func ipcClient(_ client: IPCClient, didReceiveModeChange mode: Mode) + + /// Called when an unknown event is received + func ipcClient(_ client: IPCClient, didReceiveUnknownEvent type: String, payload: [String: AnyCodable]) + + /// Called when the connection is lost + func ipcClientDidDisconnect(_ client: IPCClient) +} + +// MARK: - Default Delegate Implementations + +extension IPCClientDelegate { + func ipcClient(_ client: IPCClient, didReceiveHeartbeat uptimeMs: UInt64) {} + func ipcClient(_ client: IPCClient, didReceiveDictationPartial text: String, streamId: String) {} + func ipcClient(_ client: IPCClient, didReceiveDictationFinal text: String, streamId: String) {} + func ipcClient(_ client: IPCClient, didReceiveModeChange mode: Mode) {} + func ipcClient(_ client: IPCClient, didReceiveUnknownEvent type: String, payload: [String: AnyCodable]) {} + func ipcClientDidDisconnect(_ client: IPCClient) {} +} diff --git a/SecondBrainUI/Sources/IPC/IPCHandlers.swift b/SecondBrainUI/Sources/IPC/IPCHandlers.swift new file mode 100644 index 0000000..b90f249 --- /dev/null +++ b/SecondBrainUI/Sources/IPC/IPCHandlers.swift @@ -0,0 +1,96 @@ +// +// IPCHandlers.swift +// SecondBrain +// +// Handler implementations for IPC events. +// Provides a concrete implementation of IPCClientDelegate with logging. +// + +import Foundation +import os.log + +/// Default IPC event handler with logging +class IPCEventHandler: IPCClientDelegate { + + private let logger = Logger(subsystem: "com.secondbrain", category: "IPCHandler") + + /// Current mode (updated when mode change notifications are received) + private(set) var currentMode: Mode = .idle + + /// Last known uptime (from heartbeat) + private(set) var lastUptimeMs: UInt64 = 0 + + /// Current partial dictation text + private(set) var currentDictation: String = "" + + /// Callback for mode changes + var onModeChanged: ((Mode) -> Void)? + + /// Callback for dictation updates + var onDictationUpdate: ((String, Bool) -> Void)? + + /// Callback for heartbeat + var onHeartbeat: ((UInt64) -> Void)? + + /// Callback for disconnect + var onDisconnect: (() -> Void)? + + // MARK: - IPCClientDelegate + + func ipcClient(_ client: IPCClient, didReceiveHeartbeat uptimeMs: UInt64) { + lastUptimeMs = uptimeMs + + let uptimeSecs = uptimeMs / 1000 + let hours = uptimeSecs / 3600 + let mins = (uptimeSecs % 3600) / 60 + let secs = uptimeSecs % 60 + + logger.debug("Heartbeat: uptime \(hours)h \(mins)m \(secs)s") + onHeartbeat?(uptimeMs) + } + + func ipcClient(_ client: IPCClient, didReceiveDictationPartial text: String, streamId: String) { + currentDictation = text + logger.info("Dictation partial [\(streamId)]: \(text)") + onDictationUpdate?(text, false) + } + + func ipcClient(_ client: IPCClient, didReceiveDictationFinal text: String, streamId: String) { + currentDictation = text + logger.info("Dictation final [\(streamId)]: \(text)") + onDictationUpdate?(text, true) + + // Clear after delivering final + currentDictation = "" + } + + func ipcClient(_ client: IPCClient, didReceiveModeChange mode: Mode) { + let oldMode = currentMode + currentMode = mode + logger.info("Mode changed: \(oldMode.rawValue) -> \(mode.rawValue)") + onModeChanged?(mode) + } + + func ipcClient(_ client: IPCClient, didReceiveUnknownEvent type: String, payload: [String: AnyCodable]) { + logger.warning("Unknown event type: \(type), payload: \(payload)") + } + + func ipcClientDidDisconnect(_ client: IPCClient) { + logger.warning("Disconnected from daemon") + currentMode = .idle + currentDictation = "" + onDisconnect?() + } +} + +// MARK: - Convenience Extensions + +extension IPCClient { + /// Create a client with a default event handler + static func withDefaultHandler() -> (IPCClient, IPCEventHandler) { + let client = IPCClient() + let handler = IPCEventHandler() + client.delegate = handler + return (client, handler) + } +} diff --git a/SecondBrainUI/Sources/IPC/IPCMessage.swift b/SecondBrainUI/Sources/IPC/IPCMessage.swift new file mode 100644 index 0000000..c2d71a7 --- /dev/null +++ b/SecondBrainUI/Sources/IPC/IPCMessage.swift @@ -0,0 +1,257 @@ +// +// IPCMessage.swift +// SecondBrain +// +// IPC message types using unified envelope format. +// All messages are newline-delimited JSON (JSONL). +// + +import Foundation + +// MARK: - Message Kind + +/// Classification of IPC messages +enum MessageKind: String, Codable { + case command + case event + case response + case error +} + +// MARK: - Message Types + +/// Message type constants matching the Rust daemon +enum MessageType { + // Commands (Client -> Server) + static let setMode = "SET_MODE" + + // Responses (Server -> Client) + static let modeChanged = "MODE_CHANGED" + + // Errors (Server -> Client) + static let invalidModeTransition = "INVALID_MODE_TRANSITION" + static let invalidMode = "INVALID_MODE" + static let malformedMessage = "MALFORMED_MESSAGE" + static let unknownCommand = "UNKNOWN_COMMAND" + + // Events (Server -> Client) + static let dictationPartial = "DICTATION_PARTIAL" + static let dictationFinal = "DICTATION_FINAL" + static let heartbeat = "HEARTBEAT" +} + +// MARK: - Mode + +/// Operating modes of the daemon +enum Mode: String, Codable { + case idle + case dictation + case intelligent + case agent + + var displayName: String { + switch self { + case .idle: return "Idle" + case .dictation: return "Dictation" + case .intelligent: return "Intelligent" + case .agent: return "Agent" + } + } +} + +// MARK: - Message Envelope + +/// Universal message envelope for IPC communication +/// All messages follow this structure. +struct IPCEnvelope: Codable { + let id: String + let kind: MessageKind + let type: String + let payload: P + + /// Create a command envelope + static func command(id: String, type: String, payload: P) -> IPCEnvelope

{ + IPCEnvelope(id: id, kind: .command, type: type, payload: payload) + } + + /// Encode the envelope to a JSON string (for sending) + func toJSONString() throws -> String { + let encoder = JSONEncoder() + let data = try encoder.encode(self) + guard let string = String(data: data, encoding: .utf8) else { + throw IPCError.encodingFailed + } + return string + } +} + +/// Raw envelope for initial parsing (payload as dictionary) +struct RawEnvelope: Decodable { + let id: String + let kind: MessageKind + let type: String + let payload: [String: AnyCodable] + + /// Parse from a JSON line + static func from(line: String) throws -> RawEnvelope { + guard let data = line.data(using: .utf8) else { + throw IPCError.invalidData + } + let decoder = JSONDecoder() + return try decoder.decode(RawEnvelope.self, from: data) + } +} + +// MARK: - Command Payloads + +/// Payload for SET_MODE command +struct SetModePayload: Codable { + let mode: Mode +} + +// MARK: - Response Payloads + +/// Payload for MODE_CHANGED response +struct ModeChangedPayload: Codable { + let mode: Mode +} + +// MARK: - Error Payloads + +/// Payload for INVALID_MODE_TRANSITION error +struct InvalidModeTransitionPayload: Codable { + let from: Mode + let to: Mode +} + +/// Payload for INVALID_MODE error +struct InvalidModePayload: Codable { + let provided: String +} + +/// Payload for MALFORMED_MESSAGE error +struct MalformedMessagePayload: Codable { + let reason: String +} + +/// Payload for UNKNOWN_COMMAND error +struct UnknownCommandPayload: Codable { + let command: String +} + +// MARK: - Event Payloads + +/// Payload for DICTATION_PARTIAL and DICTATION_FINAL events +struct DictationPayload: Codable { + let text: String +} + +/// Payload for HEARTBEAT event +struct HeartbeatPayload: Codable { + let uptimeMs: UInt64 + + private enum CodingKeys: String, CodingKey { + case uptimeMs = "uptime_ms" + } +} + +// MARK: - IPC Errors + +enum IPCError: LocalizedError { + case notConnected + case connectionFailed(Error) + case encodingFailed + case invalidData + case decodingFailed(Error) + case unexpectedMessageType(String) + case daemonError(type: String, payload: [String: AnyCodable]) + + var errorDescription: String? { + switch self { + case .notConnected: + return "Not connected to daemon" + case .connectionFailed(let error): + return "Connection failed: \(error.localizedDescription)" + case .encodingFailed: + return "Failed to encode message" + case .invalidData: + return "Invalid data received" + case .decodingFailed(let error): + return "Failed to decode message: \(error.localizedDescription)" + case .unexpectedMessageType(let type): + return "Unexpected message type: \(type)" + case .daemonError(let type, _): + return "Daemon error: \(type)" + } + } +} + +// MARK: - AnyCodable Helper + +/// Type-erased Codable for raw payload access +struct AnyCodable: Codable { + let value: Any + + init(_ value: Any) { + self.value = value + } + + init(from decoder: Decoder) throws { + let container = try decoder.singleValueContainer() + + if container.decodeNil() { + self.value = NSNull() + } else if let bool = try? container.decode(Bool.self) { + self.value = bool + } else if let int = try? container.decode(Int.self) { + self.value = int + } else if let double = try? container.decode(Double.self) { + self.value = double + } else if let string = try? container.decode(String.self) { + self.value = string + } else if let array = try? container.decode([AnyCodable].self) { + self.value = array.map { $0.value } + } else if let dict = try? container.decode([String: AnyCodable].self) { + self.value = dict.mapValues { $0.value } + } else { + throw DecodingError.dataCorruptedError( + in: container, + debugDescription: "Could not decode AnyCodable" + ) + } + } + + func encode(to encoder: Encoder) throws { + var container = encoder.singleValueContainer() + + switch value { + case is NSNull: + try container.encodeNil() + case let bool as Bool: + try container.encode(bool) + case let int as Int: + try container.encode(int) + case let double as Double: + try container.encode(double) + case let string as String: + try container.encode(string) + case let array as [Any]: + try container.encode(array.map { AnyCodable($0) }) + case let dict as [String: Any]: + try container.encode(dict.mapValues { AnyCodable($0) }) + default: + throw EncodingError.invalidValue( + value, + EncodingError.Context( + codingPath: container.codingPath, + debugDescription: "Could not encode AnyCodable" + ) + ) + } + } + + /// Get the value as a specific type + func as(_ type: T.Type) -> T? { + return value as? T + } +}