From baa02470b60645c627d8432193fe462e8cab6428 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 20 Mar 2026 18:10:13 +0900 Subject: [PATCH 01/12] feat(kernel): add proactive signal types, filter, and context pack (#786) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Restructure proactive module: proactive.rs → proactive/judgment.rs - Add ProactiveSignal enum with 5 signal kinds - Add ProactiveConfig (bon::Builder + Deserialize, no Default) - Add ProactiveFilter with quiet hours, cooldown, and rate limiting - Add KernelEvent::ProactiveSignal variant + constructor - Add structured context pack builder for Mita - Reformat heartbeat message as structured context pack - Wire handle_proactive_signal + deliver_proactive_to_mita in kernel --- crates/kernel/src/event.rs | 19 ++ crates/kernel/src/kernel.rs | 62 +++++- crates/kernel/src/proactive/config.rs | 156 +++++++++++++ crates/kernel/src/proactive/context.rs | 176 +++++++++++++++ crates/kernel/src/proactive/filter.rs | 210 ++++++++++++++++++ .../{proactive.rs => proactive/judgment.rs} | 0 crates/kernel/src/proactive/mod.rs | 34 +++ crates/kernel/src/proactive/signal.rs | 63 ++++++ 8 files changed, 716 insertions(+), 4 deletions(-) create mode 100644 crates/kernel/src/proactive/config.rs create mode 100644 crates/kernel/src/proactive/context.rs create mode 100644 crates/kernel/src/proactive/filter.rs rename crates/kernel/src/{proactive.rs => proactive/judgment.rs} (100%) create mode 100644 crates/kernel/src/proactive/mod.rs create mode 100644 crates/kernel/src/proactive/signal.rs diff --git a/crates/kernel/src/event.rs b/crates/kernel/src/event.rs index 58067f639..9f707566f 100644 --- a/crates/kernel/src/event.rs +++ b/crates/kernel/src/event.rs @@ -362,6 +362,13 @@ pub enum KernelEvent { /// delivers a heartbeat message to it. MitaHeartbeat, + // === Proactive === + /// Proactive signal for Mita orchestration. + /// + /// Emitted by internal event sources (idle check, task failure, time + /// triggers) after passing through the `ProactiveFilter`. + ProactiveSignal(crate::proactive::ProactiveSignal), + // === System === /// Periodic idle check — transitions Ready sessions to Suspended. IdleCheck, @@ -389,6 +396,7 @@ impl KernelEvent { | Self::ScheduledTask { .. } | Self::MitaDirective { .. } | Self::MitaHeartbeat + | Self::ProactiveSignal(_) | Self::IdleCheck => EventPriority::Low, } } @@ -592,6 +600,14 @@ impl KernelEventEnvelope { } } + /// Create a `ProactiveSignal` event. + pub fn proactive_signal(signal: crate::proactive::ProactiveSignal) -> Self { + Self { + base: EventBase::from(SessionKey::new()), + kind: KernelEvent::ProactiveSignal(signal), + } + } + /// Create a `MitaHeartbeat` event. pub fn mita_heartbeat() -> Self { Self { @@ -682,6 +698,9 @@ impl KernelEventEnvelope { format!("mita directive for session {}", self.base.session_key) } KernelEvent::MitaHeartbeat => "periodic mita heartbeat".to_string(), + KernelEvent::ProactiveSignal(signal) => { + format!("proactive signal: {}", signal.kind_name()) + } KernelEvent::IdleCheck => "periodic idle check".to_string(), KernelEvent::Shutdown => "shutdown requested".to_string(), } diff --git a/crates/kernel/src/kernel.rs b/crates/kernel/src/kernel.rs index 5c85bb777..d3bc7f2fb 100644 --- a/crates/kernel/src/kernel.rs +++ b/crates/kernel/src/kernel.rs @@ -121,6 +121,10 @@ pub struct KernelConfig { /// Mita heartbeat interval. `None` disables the heartbeat. #[default(_code = "None")] pub mita_heartbeat_interval: Option, + /// Proactive signal filter configuration. `None` disables proactive + /// signals (heartbeat-only mode). + #[default(_code = "None")] + pub proactive: Option, // Event queue configuration. Controls whether the kernel uses a single // global queue (`num_shards = 0`) or sharded parallel processing. pub event_queue: ShardedEventQueueConfig, @@ -191,6 +195,9 @@ pub struct Kernel { trace_service: crate::trace::TraceService, /// Provider for generating the skills prompt block. skill_prompt_provider: crate::handle::SkillPromptProvider, + /// Optional proactive signal filter. `None` when proactive config is + /// absent, which disables all proactive signals. + proactive_filter: std::sync::Mutex>, } impl Kernel { @@ -254,6 +261,13 @@ impl Kernel { dynamic_tool_provider, ); + let proactive_filter = std::sync::Mutex::new( + config + .proactive + .as_ref() + .map(|c| crate::proactive::ProactiveFilter::new(c.clone())), + ); + Self { config, process_table: Arc::new(SessionTable::new()), @@ -273,6 +287,7 @@ impl Kernel { guard_pipeline, trace_service, skill_prompt_provider, + proactive_filter, } } @@ -683,6 +698,9 @@ impl Kernel { KernelEvent::MitaHeartbeat => { self.handle_mita_heartbeat().await; } + KernelEvent::ProactiveSignal(signal) => { + self.handle_proactive_signal(signal).await; + } KernelEvent::IdleCheck => { // Periodic idle check — handled by session table reaping. self.process_table @@ -1669,11 +1687,47 @@ impl Kernel { return; } - // Deliver heartbeat message to the existing Mita session. + // Deliver structured heartbeat context pack to the existing Mita session. + let active_count = self + .process_table + .list() + .iter() + .filter(|p| matches!(p.state, SessionState::Active | SessionState::Ready)) + .count(); + let context = crate::proactive::build_heartbeat_context_pack(active_count); let msg = InboundMessage::synthetic( - "Heartbeat triggered. Analyze active sessions and determine if any proactive actions \ - are needed. Review your previous tape entries to avoid repeating recent actions." - .to_string(), + context, + crate::identity::UserId("system".to_string()), + session_key, + ); + + self.deliver_to_session(session_key, msg).await; + } + + /// Handle a proactive signal — build context pack and deliver to Mita. + async fn handle_proactive_signal(&self, signal: crate::proactive::ProactiveSignal) { + info!(kind = signal.kind_name(), "handling proactive signal"); + + let context = crate::proactive::build_context_pack(&signal, None); + self.deliver_proactive_to_mita(&context).await; + } + + /// Deliver a proactive context pack to the Mita session. + /// + /// Similar to `handle_mita_heartbeat` delivery but with structured + /// context pack text instead of a one-line instruction. + async fn deliver_proactive_to_mita(&self, context: &str) { + let session_key = crate::session::SessionKey::deterministic("mita"); + + if !self.process_table.contains(&session_key) { + // Mita session not running — the next heartbeat will bootstrap it. + // Drop this signal rather than spawning Mita just for one event. + info!("proactive signal dropped: Mita session not yet running"); + return; + } + + let msg = InboundMessage::synthetic( + context.to_string(), crate::identity::UserId("system".to_string()), session_key, ); diff --git a/crates/kernel/src/proactive/config.rs b/crates/kernel/src/proactive/config.rs new file mode 100644 index 000000000..23915f813 --- /dev/null +++ b/crates/kernel/src/proactive/config.rs @@ -0,0 +1,156 @@ +// Copyright 2025 Rararulab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Configuration for the proactive event filter. +//! +//! Loaded from YAML; when absent, proactive signals are disabled entirely. +//! Do NOT derive `Default` — the absence of config means "feature off". + +use std::{collections::HashMap, time::Duration}; + +use serde::{Deserialize, Serialize}; + +/// Configuration for the proactive signal filter. +/// +/// Controls quiet hours, per-signal cooldowns, global rate limiting, +/// and work-hour boundaries for time-based signals. +/// +/// This struct intentionally does NOT derive `Default`. If the proactive +/// config section is missing from the YAML file, the feature is disabled. +#[derive(Debug, Clone, bon::Builder, Serialize, Deserialize)] +pub struct ProactiveConfig { + /// Quiet hours — suppress all proactive signals. + /// + /// Format: `["HH:MM", "HH:MM"]` (start, end). Handles midnight + /// wrap-around (e.g. `["23:00", "08:00"]`). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub quiet_hours: Option<(String, String)>, + + /// Per-signal-kind minimum interval in seconds. + /// + /// Keys are signal kind names (e.g. `"session_idle"`, `"task_failed"`). + /// Values are seconds. + #[serde(default, deserialize_with = "deserialize_cooldowns")] + pub cooldowns: HashMap, + + /// Global rate limit — maximum signals per hour. + pub max_hourly: u32, + + /// Work hours start time (e.g. `"09:00"`). + pub work_hours_start: String, + + /// Work hours end time (e.g. `"18:00"`). + pub work_hours_end: String, + + /// IANA timezone for time calculations (e.g. `"Asia/Shanghai"`). + pub timezone: String, +} + +impl ProactiveConfig { + /// Parse `work_hours_start` as a `jiff::civil::Time`. + pub fn parsed_work_start(&self) -> Option { + parse_time_str(&self.work_hours_start) + } + + /// Parse `work_hours_end` as a `jiff::civil::Time`. + pub fn parsed_work_end(&self) -> Option { + parse_time_str(&self.work_hours_end) + } + + /// Parse quiet hours start as a `jiff::civil::Time`. + pub fn parsed_quiet_start(&self) -> Option { + self.quiet_hours + .as_ref() + .and_then(|(s, _)| parse_time_str(s)) + } + + /// Parse quiet hours end as a `jiff::civil::Time`. + pub fn parsed_quiet_end(&self) -> Option { + self.quiet_hours + .as_ref() + .and_then(|(_, e)| parse_time_str(e)) + } + + /// Parse the configured timezone as a `jiff::tz::TimeZone`. + pub fn parsed_timezone(&self) -> Option { + jiff::tz::TimeZone::get(&self.timezone).ok() + } +} + +/// Parse an "HH:MM" string into a `jiff::civil::Time`. +fn parse_time_str(s: &str) -> Option { + let parts: Vec<&str> = s.split(':').collect(); + if parts.len() != 2 { + return None; + } + let hour: i8 = parts[0].parse().ok()?; + let minute: i8 = parts[1].parse().ok()?; + jiff::civil::Time::new(hour, minute, 0, 0).ok() +} + +/// Deserialize cooldown values as seconds (u64 → Duration). +fn deserialize_cooldowns<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + let raw: HashMap = HashMap::deserialize(deserializer)?; + Ok(raw + .into_iter() + .map(|(k, secs)| (k, Duration::from_secs(secs))) + .collect()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_time_str_valid() { + let t = parse_time_str("09:30").expect("should parse"); + assert_eq!(t.hour(), 9); + assert_eq!(t.minute(), 30); + } + + #[test] + fn parse_time_str_invalid() { + assert!(parse_time_str("invalid").is_none()); + assert!(parse_time_str("25:00").is_none()); + } + + #[test] + fn deserialize_config_from_yaml() { + let yaml = r#" +quiet_hours: ["23:00", "08:00"] +cooldowns: + session_idle: 3600 + task_failed: 600 +max_hourly: 5 +work_hours_start: "09:00" +work_hours_end: "18:00" +timezone: "Asia/Shanghai" +"#; + let config: ProactiveConfig = serde_yaml::from_str(yaml).expect("should parse"); + assert_eq!(config.max_hourly, 5); + assert_eq!( + config.cooldowns.get("session_idle"), + Some(&Duration::from_secs(3600)) + ); + assert_eq!( + config.cooldowns.get("task_failed"), + Some(&Duration::from_secs(600)) + ); + assert!(config.parsed_work_start().is_some()); + assert!(config.parsed_quiet_start().is_some()); + } +} diff --git a/crates/kernel/src/proactive/context.rs b/crates/kernel/src/proactive/context.rs new file mode 100644 index 000000000..48aeb273a --- /dev/null +++ b/crates/kernel/src/proactive/context.rs @@ -0,0 +1,176 @@ +// Copyright 2025 Rararulab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Structured context pack builder for Mita proactive signals. +//! +//! Converts a [`ProactiveSignal`] + optional session context into a +//! human-readable structured text block that Mita can reason about. + +use jiff::Timestamp; + +use super::signal::ProactiveSignal; + +/// Optional session context attached to a proactive signal. +#[derive(Debug, Clone)] +pub struct SessionContext { + /// Human-readable session name (if available). + pub session_name: Option, + /// Session key for routing. + pub session_key: String, + /// When the session became idle (human-readable). + pub idle_since: Option, + /// Last user message in the session (for context). + pub last_user_message: Option, +} + +/// Build a structured context pack for Mita from a proactive signal. +/// +/// The output is a multi-section text block that Mita can parse to +/// decide what action (if any) to take. +pub fn build_context_pack( + signal: &ProactiveSignal, + session_context: Option<&SessionContext>, +) -> String { + let now = Timestamp::now(); + let mut sections = Vec::new(); + + // [Proactive Event] section + sections.push(format!( + "[Proactive Event]\nkind: {}\ntimestamp: {}", + signal.kind_name(), + now, + )); + + // Signal-specific details + match signal { + ProactiveSignal::SessionIdle { idle_duration } => { + let mins = idle_duration.as_secs() / 60; + sections + .last_mut() + .unwrap() + .push_str(&format!("\nidle_duration: {}m", mins,)); + } + ProactiveSignal::TaskFailed { error } => { + sections + .last_mut() + .unwrap() + .push_str(&format!("\nerror: {}", truncate(error, 200),)); + } + ProactiveSignal::SessionCompleted { summary } => { + sections + .last_mut() + .unwrap() + .push_str(&format!("\nsummary: {}", truncate(summary, 200),)); + } + ProactiveSignal::MorningGreeting | ProactiveSignal::DailySummary => { + // No extra fields for time events. + } + } + + // [Context] section (if session context is available) + if let Some(ctx) = session_context { + let mut context_lines = Vec::new(); + let name_display = ctx + .session_name + .as_deref() + .map(|n| format!("\"{}\" ({})", n, ctx.session_key)) + .unwrap_or_else(|| ctx.session_key.clone()); + context_lines.push(format!("session: {}", name_display)); + + if let Some(idle) = &ctx.idle_since { + context_lines.push(format!("idle_since: {}", idle)); + } + if let Some(msg) = &ctx.last_user_message { + context_lines.push(format!("last_user_message: \"{}\"", truncate(msg, 100),)); + } + sections.push(format!("[Context]\n{}", context_lines.join("\n"))); + } + + // [Available Actions] section + sections.push( + "[Available Actions]\n- dispatch_rara: send a message to user through a session\n- \ + notify: push notification to user's device\n- (no action): decide this event doesn't \ + need intervention" + .to_string(), + ); + + sections.join("\n\n") +} + +/// Build a structured context pack for a heartbeat patrol. +/// +/// Replaces the previous one-line heartbeat message with the same +/// structured format used by proactive signals. +pub fn build_heartbeat_context_pack(active_session_count: usize) -> String { + let now = Timestamp::now(); + + format!( + "[Proactive Event]\nkind: heartbeat_patrol\ntimestamp: \ + {now}\n\n[Context]\nactive_sessions: {active_session_count}\nAnalyze active sessions and \ + determine if any proactive actions are needed. Review your previous tape entries to \ + avoid repeating recent actions.\n\n[Available Actions]\n- dispatch_rara: send a message \ + to user through a session\n- notify: push notification to user's device\n- (no action): \ + decide this event doesn't need intervention" + ) +} + +/// Truncate a string to at most `max` characters. +fn truncate(s: &str, max: usize) -> &str { + match s.char_indices().nth(max) { + Some((idx, _)) => &s[..idx], + None => s, + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + #[test] + fn context_pack_session_idle() { + let signal = ProactiveSignal::SessionIdle { + idle_duration: Duration::from_secs(7200), + }; + let ctx = SessionContext { + session_name: Some("PR review".to_string()), + session_key: "session-abc".to_string(), + idle_since: Some("2h ago".to_string()), + last_user_message: Some("check that PR".to_string()), + }; + let pack = build_context_pack(&signal, Some(&ctx)); + assert!(pack.contains("kind: session_idle")); + assert!(pack.contains("idle_duration: 120m")); + assert!(pack.contains("\"PR review\"")); + assert!(pack.contains("[Available Actions]")); + } + + #[test] + fn context_pack_morning_greeting() { + let signal = ProactiveSignal::MorningGreeting; + let pack = build_context_pack(&signal, None); + assert!(pack.contains("kind: morning_greeting")); + assert!(pack.contains("[Available Actions]")); + // No [Context] section when no session context. + assert!(!pack.contains("[Context]")); + } + + #[test] + fn heartbeat_context_pack() { + let pack = build_heartbeat_context_pack(3); + assert!(pack.contains("kind: heartbeat_patrol")); + assert!(pack.contains("active_sessions: 3")); + } +} diff --git a/crates/kernel/src/proactive/filter.rs b/crates/kernel/src/proactive/filter.rs new file mode 100644 index 000000000..37a310b21 --- /dev/null +++ b/crates/kernel/src/proactive/filter.rs @@ -0,0 +1,210 @@ +// Copyright 2025 Rararulab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Pure rule-based filter for proactive signals. Zero LLM cost. +//! +//! Checks quiet hours, per-kind cooldowns, and global hourly rate limits +//! before allowing a signal through to Mita. + +use std::{collections::HashMap, time::Duration}; + +use jiff::Timestamp; +use tracing::debug; + +use super::{config::ProactiveConfig, signal::ProactiveSignal}; + +/// Pure rule-based filter for proactive signals. +/// +/// All checks are deterministic with zero LLM cost. Signals that fail +/// any check are silently dropped. +pub struct ProactiveFilter { + /// Filter configuration (quiet hours, cooldowns, rate limits). + config: ProactiveConfig, + /// Last fire time per signal kind (for cooldown dedup). + last_fired: HashMap, + /// Number of signals passed in the current hourly window. + hourly_count: u32, + /// Start of the current hourly window. + hourly_window_start: Timestamp, +} + +impl ProactiveFilter { + /// Create a new filter from the given configuration. + pub fn new(config: ProactiveConfig) -> Self { + Self { + config, + last_fired: HashMap::new(), + hourly_count: 0, + hourly_window_start: Timestamp::now(), + } + } + + /// Check whether a signal should pass through all filter rules. + /// + /// Returns `true` if the signal passes quiet hours, cooldown, and + /// rate limit checks. Does NOT update internal state — call + /// [`Self::record_fired`] after successfully emitting the signal. + pub fn should_pass(&mut self, signal: &ProactiveSignal) -> bool { + let now = Timestamp::now(); + + // 1. Quiet hours check. + if self.is_quiet_hours(now) { + debug!( + kind = signal.kind_name(), + "proactive filter: suppressed by quiet hours" + ); + return false; + } + + // 2. Per-kind cooldown dedup. + let kind = signal.kind_name(); + if let Some(cooldown) = self.config.cooldowns.get(kind) { + if let Some(last) = self.last_fired.get(kind) { + let elapsed_secs = now + .since(*last) + .ok() + .and_then(|s| s.total(jiff::Unit::Second).ok()) + .unwrap_or(0.0); + if Duration::from_secs_f64(elapsed_secs) < *cooldown { + debug!( + kind, + elapsed_secs = elapsed_secs as u64, + cooldown_secs = cooldown.as_secs(), + "proactive filter: suppressed by cooldown" + ); + return false; + } + } + } + + // 3. Global hourly rate limit. + self.maybe_reset_hourly_window(now); + if self.hourly_count >= self.config.max_hourly { + debug!( + kind, + hourly_count = self.hourly_count, + max_hourly = self.config.max_hourly, + "proactive filter: suppressed by hourly rate limit" + ); + return false; + } + + true + } + + /// Record that a signal was successfully emitted. + /// + /// Updates the cooldown timestamp and hourly counter. + pub fn record_fired(&mut self, signal: &ProactiveSignal) { + let now = Timestamp::now(); + self.last_fired.insert(signal.kind_name().to_string(), now); + self.maybe_reset_hourly_window(now); + self.hourly_count += 1; + } + + /// Check if the current time falls within configured quiet hours. + /// + /// Handles midnight wrap-around: if start > end (e.g. 23:00–08:00), + /// quiet hours span midnight. + fn is_quiet_hours(&self, now: Timestamp) -> bool { + let (start, end) = match ( + self.config.parsed_quiet_start(), + self.config.parsed_quiet_end(), + ) { + (Some(s), Some(e)) => (s, e), + _ => return false, + }; + + let tz = match self.config.parsed_timezone() { + Some(tz) => tz, + None => return false, + }; + + let local = now.to_zoned(tz); + let current_time = local.time(); + + if start <= end { + // Normal range (e.g. 01:00–06:00). + current_time >= start && current_time < end + } else { + // Midnight wrap-around (e.g. 23:00–08:00). + current_time >= start || current_time < end + } + } + + /// Reset the hourly window if more than one hour has elapsed. + fn maybe_reset_hourly_window(&mut self, now: Timestamp) { + let elapsed_secs = now + .since(self.hourly_window_start) + .and_then(|s| s.total(jiff::Unit::Second)) + .unwrap_or(0.0); + if elapsed_secs >= 3600.0 { + self.hourly_window_start = now; + self.hourly_count = 0; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_config() -> ProactiveConfig { + ProactiveConfig::builder() + .cooldowns(HashMap::from([( + "session_idle".to_string(), + Duration::from_secs(3600), + )])) + .max_hourly(3) + .work_hours_start("09:00".to_string()) + .work_hours_end("18:00".to_string()) + .timezone("UTC".to_string()) + .build() + } + + #[test] + fn pass_when_no_cooldown_hit() { + let mut filter = ProactiveFilter::new(test_config()); + let signal = ProactiveSignal::MorningGreeting; + assert!(filter.should_pass(&signal)); + } + + #[test] + fn block_after_rate_limit() { + let mut filter = ProactiveFilter::new(test_config()); + let signal = ProactiveSignal::MorningGreeting; + + // Exhaust the hourly limit. + for _ in 0..3 { + assert!(filter.should_pass(&signal)); + filter.record_fired(&signal); + } + // Fourth should be blocked. + assert!(!filter.should_pass(&signal)); + } + + #[test] + fn cooldown_blocks_same_kind() { + let mut filter = ProactiveFilter::new(test_config()); + let signal = ProactiveSignal::SessionIdle { + idle_duration: Duration::from_secs(600), + }; + + assert!(filter.should_pass(&signal)); + filter.record_fired(&signal); + + // Same kind within cooldown should be blocked. + assert!(!filter.should_pass(&signal)); + } +} diff --git a/crates/kernel/src/proactive.rs b/crates/kernel/src/proactive/judgment.rs similarity index 100% rename from crates/kernel/src/proactive.rs rename to crates/kernel/src/proactive/judgment.rs diff --git a/crates/kernel/src/proactive/mod.rs b/crates/kernel/src/proactive/mod.rs new file mode 100644 index 000000000..cf2975b7c --- /dev/null +++ b/crates/kernel/src/proactive/mod.rs @@ -0,0 +1,34 @@ +// Copyright 2025 Rararulab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Proactive subsystem — event-driven signals + group-chat judgment. +//! +//! This module contains: +//! - **Signal** — typed proactive events emitted by the kernel +//! - **Config** — YAML-driven filter configuration +//! - **Filter** — pure rule-based gate (quiet hours, cooldowns, rate limits) +//! - **Context** — structured context pack builder for Mita +//! - **Judgment** — lightweight LLM judgment for group-chat replies + +mod config; +mod context; +mod filter; +mod judgment; +mod signal; + +pub use config::ProactiveConfig; +pub use context::{SessionContext, build_context_pack, build_heartbeat_context_pack}; +pub use filter::ProactiveFilter; +pub use judgment::{ProactiveJudgment, should_reply}; +pub use signal::ProactiveSignal; diff --git a/crates/kernel/src/proactive/signal.rs b/crates/kernel/src/proactive/signal.rs new file mode 100644 index 000000000..daa1163d5 --- /dev/null +++ b/crates/kernel/src/proactive/signal.rs @@ -0,0 +1,63 @@ +// Copyright 2025 Rararulab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Typed proactive signals emitted by the kernel for Mita orchestration. + +use std::time::Duration; + +use serde::Serialize; + +/// Proactive signal for Mita orchestration. +/// +/// Each variant represents a distinct event that may warrant proactive +/// action from the Mita background agent. Signals flow through the +/// [`super::ProactiveFilter`] before reaching Mita. +#[derive(Debug, Clone, Serialize)] +pub enum ProactiveSignal { + /// Session has been idle beyond threshold. + SessionIdle { + /// How long the session has been idle. + idle_duration: Duration, + }, + /// Scheduled task agent failed to spawn. + TaskFailed { + /// Error description from the spawn failure. + error: String, + }, + /// Conversation naturally completed (turn ended without pending work). + SessionCompleted { + /// Brief summary of what the session accomplished. + summary: String, + }, + /// Daily morning greeting trigger (fires at work hours start). + MorningGreeting, + /// End-of-day summary trigger (fires at work hours end). + DailySummary, +} + +impl ProactiveSignal { + /// Returns a stable string key for this signal kind. + /// + /// Used as the cooldown map key in [`super::ProactiveFilter`] so that + /// rate limiting is per-kind, not per-instance. + pub fn kind_name(&self) -> &'static str { + match self { + Self::SessionIdle { .. } => "session_idle", + Self::TaskFailed { .. } => "task_failed", + Self::SessionCompleted { .. } => "session_completed", + Self::MorningGreeting => "morning_greeting", + Self::DailySummary => "daily_summary", + } + } +} From b3e631ae90b162a8fadafb2c7ed3c35ccef67c41 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 20 Mar 2026 18:18:54 +0900 Subject: [PATCH 02/12] feat(kernel): wire proactive signal emit points and time scheduler (#786) - IdleCheck: emit SessionIdle for sessions idle >30 min - handle_scheduled_task: emit TaskFailed on spawn failure - TurnCompleted: emit SessionCompleted for worker sessions - Processor 0 scheduler: compute next MorningGreeting/DailySummary from work_hours + timezone config - Add ProactiveConfig to MitaConfig + wire to KernelConfig - Update kernel AGENT.md with proactive V2 section Closes #786 --- crates/app/src/lib.rs | 6 ++ crates/kernel/AGENT.md | 43 ++++++++++ crates/kernel/src/kernel.rs | 159 +++++++++++++++++++++++++++++++++++- 3 files changed, 206 insertions(+), 2 deletions(-) diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index 750c6fe21..036a7ea6e 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -113,6 +113,11 @@ pub struct MitaConfig { serialize_with = "humantime_serde::serialize" )] pub heartbeat_interval: Duration, + + /// Proactive signal filter configuration. When absent, event-driven + /// proactive signals are disabled (heartbeat-only mode). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub proactive: Option, } /// Configuration for the gateway supervisor. @@ -347,6 +352,7 @@ pub async fn start_with_options( let kernel_config = rara_kernel::kernel::KernelConfig { mita_heartbeat_interval: Some(config.mita.heartbeat_interval), + proactive: config.mita.proactive.clone(), context_folding: config.context_folding.clone(), ..Default::default() }; diff --git a/crates/kernel/AGENT.md b/crates/kernel/AGENT.md index c79226704..03c835bea 100644 --- a/crates/kernel/AGENT.md +++ b/crates/kernel/AGENT.md @@ -237,3 +237,46 @@ Detects when the agent is stuck calling the same tool repeatedly without progres - Do NOT publish TaskReport without going through the syscall — `exec_publish_report` enforces `source_session` and `tags` invariants - Do NOT use `UserId("system")` in synthetic messages for ProactiveTurn — always use the subscription owner's identity to prevent privilege escalation on session restore - Do NOT construct `TaskNotification` outside `handle_publish_task_report` — it builds the `TaskReportRef` and coordinates result persistence + +--- + +## Proactive V2 — Event-Driven Proactive Signals + +### What + +Event-driven proactive signals that supplement the polling heartbeat. Internal kernel events (idle sessions, task failures, time triggers) emit `ProactiveSignal` variants, which pass through a pure rule-based `ProactiveFilter` before being delivered to Mita as structured context packs. + +### Key Files + +| File | Role | +|------|------| +| `proactive/signal.rs` | `ProactiveSignal` enum (5 signal kinds) | +| `proactive/config.rs` | `ProactiveConfig` — YAML-driven filter settings | +| `proactive/filter.rs` | `ProactiveFilter` — quiet hours, cooldowns, rate limiting | +| `proactive/context.rs` | `build_context_pack()` / `build_heartbeat_context_pack()` | +| `proactive/judgment.rs` | Group-chat LLM judgment (pre-existing, unchanged) | +| `kernel.rs` | Signal emit points + `handle_proactive_signal` + scheduler time events | + +### Signal Flow + +``` +Kernel event (IdleCheck / TaskFailed / Scheduler) + → ProactiveSignal created + → ProactiveFilter::should_pass() (quiet hours → cooldown → rate limit) + → KernelEvent::ProactiveSignal pushed to event queue + → handle_proactive_signal() builds context pack + → deliver_proactive_to_mita() sends to Mita session +``` + +### Critical Invariants + +- `ProactiveFilter` is behind `std::sync::Mutex>` in the Kernel — `None` when proactive config is absent, disabling all signals +- `try_emit_proactive_signal()` is the single entry point for all signal emission — do NOT push `ProactiveSignal` events directly to the queue +- `ProactiveConfig` does NOT derive `Default` — absence means "feature off" +- Time events (MorningGreeting/DailySummary) are computed from `work_hours_start`/`work_hours_end` + timezone in the processor 0 scheduler + +### What NOT To Do + +- Do NOT push `KernelEventEnvelope::proactive_signal()` without going through `try_emit_proactive_signal()` — it enforces filter checks and records fire timestamps +- Do NOT derive `Default` on `ProactiveConfig` — the feature is opt-in via YAML config +- Do NOT add new signal kinds without adding a `kind_name()` match arm — cooldown keys depend on it diff --git a/crates/kernel/src/kernel.rs b/crates/kernel/src/kernel.rs index d3bc7f2fb..bee04e629 100644 --- a/crates/kernel/src/kernel.rs +++ b/crates/kernel/src/kernel.rs @@ -454,6 +454,10 @@ impl Kernel { let mita_interval = self.config.mita_heartbeat_interval; let mut next_mita = mita_interval.map(|d| tokio::time::Instant::now() + d); + // Proactive time events (morning greeting / daily summary). + let mut next_proactive_time: Option<(tokio::time::Instant, bool)> = + self.compute_next_proactive_time_event(); + loop { // Compute next wake time for the unified scheduler (processor 0 only). let scheduler_sleep = if id == 0 { @@ -477,8 +481,10 @@ impl Kernel { tokio::time::Instant::now() + clamped }); - // Find the earliest deadline among: mita heartbeat, next scheduled job. - let earliest = [next_mita, next_job_instant] + // Find the earliest deadline among: mita heartbeat, next scheduled + // job, and next proactive time event. + let next_proactive_instant = next_proactive_time.map(|(inst, _)| inst); + let earliest = [next_mita, next_job_instant, next_proactive_instant] .into_iter() .flatten() .min() @@ -523,6 +529,19 @@ impl Kernel { } } + // Check if a proactive time event is due. + if let Some((proactive_at, is_morning)) = next_proactive_time { + if now >= proactive_at { + let signal = if is_morning { + crate::proactive::ProactiveSignal::MorningGreeting + } else { + crate::proactive::ProactiveSignal::DailySummary + }; + self.try_emit_proactive_signal(signal); + next_proactive_time = self.compute_next_proactive_time_event(); + } + } + // Evict expired rate-limiter entries. self.io.gc_rate_limiter(); @@ -705,6 +724,9 @@ impl Kernel { // Periodic idle check — handled by session table reaping. self.process_table .reap_terminal(std::time::Duration::from_secs(300)); + + // Emit SessionIdle signals for sessions idle beyond threshold. + self.emit_idle_signals(); } KernelEvent::Shutdown => { info!("shutdown event received"); @@ -1467,6 +1489,10 @@ impl Kernel { error = %e, "failed to spawn scheduled job agent" ); + // Emit TaskFailed proactive signal so Mita is aware. + self.try_emit_proactive_signal(crate::proactive::ProactiveSignal::TaskFailed { + error: format!("job {job_id}: {e}"), + }); } } } @@ -1735,6 +1761,120 @@ impl Kernel { self.deliver_to_session(session_key, msg).await; } + /// Compute the next proactive time event (morning greeting or daily + /// summary) based on the configured work hours and timezone. + /// + /// Returns `Some((instant, is_morning))` where `is_morning` is `true` + /// for a morning greeting and `false` for a daily summary. Returns + /// `None` when proactive config is absent or work hours are invalid. + fn compute_next_proactive_time_event(&self) -> Option<(tokio::time::Instant, bool)> { + let config = self.config.proactive.as_ref()?; + let tz = config.parsed_timezone()?; + let work_start = config.parsed_work_start()?; + let work_end = config.parsed_work_end()?; + + let now = jiff::Timestamp::now(); + let local_now = now.to_zoned(tz.clone()); + let today = local_now.date(); + let current_time = local_now.time(); + + // Determine the next event: whichever of morning/daily is soonest. + let (target_time, is_morning) = if current_time < work_start { + // Before work hours — next event is morning greeting today. + (work_start, true) + } else if current_time < work_end { + // During work hours — next event is daily summary today. + (work_end, false) + } else { + // After work hours — next event is morning greeting tomorrow. + (work_start, true) + }; + + // Build the target zoned datetime. + let target_date = if !is_morning || current_time < work_start { + today + } else { + // Tomorrow for morning greeting when we're past work_end. + today.tomorrow().ok()? + }; + + let target_dt = target_date + .to_zoned(tz) + .ok() + .and_then(|zdt| zdt.with().time(target_time).build().ok())?; + let target_ts = target_dt.timestamp(); + let delta = target_ts.since(now).ok()?; + let delta_secs = delta.total(jiff::Unit::Second).ok()?; + if delta_secs <= 0.0 { + return None; + } + + let instant = tokio::time::Instant::now() + Duration::from_secs_f64(delta_secs); + Some((instant, is_morning)) + } + + /// Emit `SessionIdle` signals for sessions that have been idle beyond + /// a reasonable threshold (30 minutes). + fn emit_idle_signals(&self) { + let idle_threshold = Duration::from_secs(30 * 60); + let now = jiff::Timestamp::now(); + + for stats in self.process_table.list() { + // Only check Ready sessions (Active means currently running). + if stats.state != SessionState::Ready { + continue; + } + // Skip the Mita session itself. + if stats.session_key == crate::session::SessionKey::deterministic("mita") { + continue; + } + + let idle_duration = stats + .last_activity + .and_then(|la| { + now.since(la) + .ok() + .and_then(|s| s.total(jiff::Unit::Second).ok()) + }) + .map(|secs| Duration::from_secs_f64(secs)) + .unwrap_or(Duration::ZERO); + + if idle_duration >= idle_threshold { + let signal = crate::proactive::ProactiveSignal::SessionIdle { idle_duration }; + self.try_emit_proactive_signal(signal); + } + } + } + + /// Try to emit a proactive signal through the filter. + /// + /// Checks the proactive filter; if the signal passes, pushes a + /// `ProactiveSignal` event into the kernel event queue and records + /// the fire. + fn try_emit_proactive_signal(&self, signal: crate::proactive::ProactiveSignal) { + let mut guard = self + .proactive_filter + .lock() + .unwrap_or_else(|e| e.into_inner()); + + let filter = match guard.as_mut() { + Some(f) => f, + None => return, // Proactive disabled. + }; + + if !filter.should_pass(&signal) { + return; + } + filter.record_fired(&signal); + + if let Err(e) = self + .event_queue + .try_push(KernelEventEnvelope::proactive_signal(signal)) + { + error!(%e, "failed to push ProactiveSignal event"); + } + } + /// Deliver a message to a live process: buffer if the process is paused /// or busy (Running state), otherwise start a new LLM turn. async fn deliver_to_session(&self, session_key: SessionKey, msg: InboundMessage) { @@ -2683,6 +2823,21 @@ impl Kernel { .unwrap_or(false); if has_result_tx { + // Worker/child session completed — emit SessionCompleted signal. + if !_turn_failed { + let summary = self + .process_table + .with(&session_key, |p| { + p.result + .as_ref() + .map(|r| r.output.chars().take(100).collect::()) + .unwrap_or_default() + }) + .unwrap_or_default(); + self.try_emit_proactive_signal( + crate::proactive::ProactiveSignal::SessionCompleted { summary }, + ); + } self.cleanup_process(session_key).await; return; } From 4e3a36dd8abc3adc1129321ef124afad50cda0fb Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 20 Mar 2026 18:35:20 +0900 Subject: [PATCH 03/12] fix(kernel): address code review findings for proactive v2 (#786) - Fix SessionCompleted feedback loop: skip Mita child sessions - Fix per-kind cooldown: use session-scoped cooldown keys so idle detection works across multiple sessions independently - Move idle threshold from hardcoded 30m to ProactiveConfig field - Replace .unwrap() with .expect() in context.rs - Add warning logs for invalid config values - Fix should_pass doc comment about state mutation - Add std::sync::Mutex rationale comment Closes #786 --- crates/kernel/src/kernel.rs | 72 ++++++++++++++++++-------- crates/kernel/src/proactive/config.rs | 33 ++++++++++-- crates/kernel/src/proactive/context.rs | 6 +-- crates/kernel/src/proactive/filter.rs | 47 ++++++++++------- crates/kernel/src/proactive/signal.rs | 13 +++++ 5 files changed, 122 insertions(+), 49 deletions(-) diff --git a/crates/kernel/src/kernel.rs b/crates/kernel/src/kernel.rs index bee04e629..db49bc01e 100644 --- a/crates/kernel/src/kernel.rs +++ b/crates/kernel/src/kernel.rs @@ -197,6 +197,8 @@ pub struct Kernel { skill_prompt_provider: crate::handle::SkillPromptProvider, /// Optional proactive signal filter. `None` when proactive config is /// absent, which disables all proactive signals. + /// Uses `std::sync::Mutex` (not `tokio::sync::Mutex`) because the lock + /// is held only for brief, non-async filter checks — no `.await` inside. proactive_filter: std::sync::Mutex>, } @@ -537,7 +539,7 @@ impl Kernel { } else { crate::proactive::ProactiveSignal::DailySummary }; - self.try_emit_proactive_signal(signal); + self.try_emit_proactive_signal(signal, None); next_proactive_time = self.compute_next_proactive_time_event(); } } @@ -1490,9 +1492,12 @@ impl Kernel { "failed to spawn scheduled job agent" ); // Emit TaskFailed proactive signal so Mita is aware. - self.try_emit_proactive_signal(crate::proactive::ProactiveSignal::TaskFailed { - error: format!("job {job_id}: {e}"), - }); + self.try_emit_proactive_signal( + crate::proactive::ProactiveSignal::TaskFailed { + error: format!("job {job_id}: {e}"), + }, + None, + ); } } } @@ -1814,9 +1819,13 @@ impl Kernel { } /// Emit `SessionIdle` signals for sessions that have been idle beyond - /// a reasonable threshold (30 minutes). + /// the configured threshold. fn emit_idle_signals(&self) { - let idle_threshold = Duration::from_secs(30 * 60); + let idle_threshold_secs = match self.config.proactive.as_ref() { + Some(c) => c.idle_threshold_secs, + None => return, // Proactive disabled. + }; + let idle_threshold = Duration::from_secs(idle_threshold_secs); let now = jiff::Timestamp::now(); for stats in self.process_table.list() { @@ -1841,7 +1850,8 @@ impl Kernel { if idle_duration >= idle_threshold { let signal = crate::proactive::ProactiveSignal::SessionIdle { idle_duration }; - self.try_emit_proactive_signal(signal); + let sk = stats.session_key.to_string(); + self.try_emit_proactive_signal(signal, Some(&sk)); } } } @@ -1850,8 +1860,13 @@ impl Kernel { /// /// Checks the proactive filter; if the signal passes, pushes a /// `ProactiveSignal` event into the kernel event queue and records - /// the fire. - fn try_emit_proactive_signal(&self, signal: crate::proactive::ProactiveSignal) { + /// the fire. The optional `session_key` enables per-session cooldown + /// tracking for session-scoped signals. + fn try_emit_proactive_signal( + &self, + signal: crate::proactive::ProactiveSignal, + session_key: Option<&str>, + ) { let mut guard = self .proactive_filter .lock() @@ -1862,10 +1877,10 @@ impl Kernel { None => return, // Proactive disabled. }; - if !filter.should_pass(&signal) { + if !filter.should_pass(&signal, session_key) { return; } - filter.record_fired(&signal); + filter.record_fired(&signal, session_key); if let Err(e) = self .event_queue @@ -2823,20 +2838,31 @@ impl Kernel { .unwrap_or(false); if has_result_tx { - // Worker/child session completed — emit SessionCompleted signal. + // Worker/child session completed — emit SessionCompleted signal, + // but only for non-Mita children to avoid feedback loops where + // Mita dispatches agent → completes → signal → Mita again. if !_turn_failed { - let summary = self + let mita_key = crate::session::SessionKey::deterministic("mita"); + let is_mita_child = self .process_table - .with(&session_key, |p| { - p.result - .as_ref() - .map(|r| r.output.chars().take(100).collect::()) - .unwrap_or_default() - }) - .unwrap_or_default(); - self.try_emit_proactive_signal( - crate::proactive::ProactiveSignal::SessionCompleted { summary }, - ); + .with(&session_key, |p| p.parent_id.as_ref() == Some(&mita_key)) + .unwrap_or(false); + + if !is_mita_child { + let summary = self + .process_table + .with(&session_key, |p| { + p.result + .as_ref() + .map(|r| r.output.chars().take(100).collect::()) + .unwrap_or_default() + }) + .unwrap_or_default(); + self.try_emit_proactive_signal( + crate::proactive::ProactiveSignal::SessionCompleted { summary }, + None, + ); + } } self.cleanup_process(session_key).await; return; diff --git a/crates/kernel/src/proactive/config.rs b/crates/kernel/src/proactive/config.rs index 23915f813..1944d55a9 100644 --- a/crates/kernel/src/proactive/config.rs +++ b/crates/kernel/src/proactive/config.rs @@ -20,6 +20,7 @@ use std::{collections::HashMap, time::Duration}; use serde::{Deserialize, Serialize}; +use tracing::warn; /// Configuration for the proactive signal filter. /// @@ -55,17 +56,35 @@ pub struct ProactiveConfig { /// IANA timezone for time calculations (e.g. `"Asia/Shanghai"`). pub timezone: String, + + /// Idle threshold in seconds — sessions idle beyond this duration + /// trigger a `SessionIdle` signal. + pub idle_threshold_secs: u64, } impl ProactiveConfig { /// Parse `work_hours_start` as a `jiff::civil::Time`. pub fn parsed_work_start(&self) -> Option { - parse_time_str(&self.work_hours_start) + let result = parse_time_str(&self.work_hours_start); + if result.is_none() { + warn!( + value = self.work_hours_start.as_str(), + "proactive config: invalid work_hours_start, time events disabled" + ); + } + result } /// Parse `work_hours_end` as a `jiff::civil::Time`. pub fn parsed_work_end(&self) -> Option { - parse_time_str(&self.work_hours_end) + let result = parse_time_str(&self.work_hours_end); + if result.is_none() { + warn!( + value = self.work_hours_end.as_str(), + "proactive config: invalid work_hours_end, time events disabled" + ); + } + result } /// Parse quiet hours start as a `jiff::civil::Time`. @@ -84,7 +103,14 @@ impl ProactiveConfig { /// Parse the configured timezone as a `jiff::tz::TimeZone`. pub fn parsed_timezone(&self) -> Option { - jiff::tz::TimeZone::get(&self.timezone).ok() + let result = jiff::tz::TimeZone::get(&self.timezone).ok(); + if result.is_none() { + warn!( + value = self.timezone.as_str(), + "proactive config: invalid timezone, proactive features disabled" + ); + } + result } } @@ -139,6 +165,7 @@ max_hourly: 5 work_hours_start: "09:00" work_hours_end: "18:00" timezone: "Asia/Shanghai" +idle_threshold_secs: 1800 "#; let config: ProactiveConfig = serde_yaml::from_str(yaml).expect("should parse"); assert_eq!(config.max_hourly, 5); diff --git a/crates/kernel/src/proactive/context.rs b/crates/kernel/src/proactive/context.rs index 48aeb273a..07f67c1ff 100644 --- a/crates/kernel/src/proactive/context.rs +++ b/crates/kernel/src/proactive/context.rs @@ -58,19 +58,19 @@ pub fn build_context_pack( let mins = idle_duration.as_secs() / 60; sections .last_mut() - .unwrap() + .expect("sections is non-empty") .push_str(&format!("\nidle_duration: {}m", mins,)); } ProactiveSignal::TaskFailed { error } => { sections .last_mut() - .unwrap() + .expect("sections is non-empty") .push_str(&format!("\nerror: {}", truncate(error, 200),)); } ProactiveSignal::SessionCompleted { summary } => { sections .last_mut() - .unwrap() + .expect("sections is non-empty") .push_str(&format!("\nsummary: {}", truncate(summary, 200),)); } ProactiveSignal::MorningGreeting | ProactiveSignal::DailySummary => { diff --git a/crates/kernel/src/proactive/filter.rs b/crates/kernel/src/proactive/filter.rs index 37a310b21..4c279e7d1 100644 --- a/crates/kernel/src/proactive/filter.rs +++ b/crates/kernel/src/proactive/filter.rs @@ -53,9 +53,10 @@ impl ProactiveFilter { /// Check whether a signal should pass through all filter rules. /// /// Returns `true` if the signal passes quiet hours, cooldown, and - /// rate limit checks. Does NOT update internal state — call - /// [`Self::record_fired`] after successfully emitting the signal. - pub fn should_pass(&mut self, signal: &ProactiveSignal) -> bool { + /// rate limit checks. Note: the hourly window may be reset as a + /// side effect. Call [`Self::record_fired`] after successfully + /// emitting the signal. + pub fn should_pass(&mut self, signal: &ProactiveSignal, session_key: Option<&str>) -> bool { let now = Timestamp::now(); // 1. Quiet hours check. @@ -67,10 +68,10 @@ impl ProactiveFilter { return false; } - // 2. Per-kind cooldown dedup. - let kind = signal.kind_name(); - if let Some(cooldown) = self.config.cooldowns.get(kind) { - if let Some(last) = self.last_fired.get(kind) { + // 2. Per-kind cooldown dedup (session-scoped for session signals). + let cooldown_key = signal.cooldown_key(session_key); + if let Some(cooldown) = self.config.cooldowns.get(signal.kind_name()) { + if let Some(last) = self.last_fired.get(&cooldown_key) { let elapsed_secs = now .since(*last) .ok() @@ -78,7 +79,8 @@ impl ProactiveFilter { .unwrap_or(0.0); if Duration::from_secs_f64(elapsed_secs) < *cooldown { debug!( - kind, + kind = signal.kind_name(), + cooldown_key = cooldown_key.as_str(), elapsed_secs = elapsed_secs as u64, cooldown_secs = cooldown.as_secs(), "proactive filter: suppressed by cooldown" @@ -92,7 +94,7 @@ impl ProactiveFilter { self.maybe_reset_hourly_window(now); if self.hourly_count >= self.config.max_hourly { debug!( - kind, + kind = signal.kind_name(), hourly_count = self.hourly_count, max_hourly = self.config.max_hourly, "proactive filter: suppressed by hourly rate limit" @@ -106,9 +108,10 @@ impl ProactiveFilter { /// Record that a signal was successfully emitted. /// /// Updates the cooldown timestamp and hourly counter. - pub fn record_fired(&mut self, signal: &ProactiveSignal) { + pub fn record_fired(&mut self, signal: &ProactiveSignal, session_key: Option<&str>) { let now = Timestamp::now(); - self.last_fired.insert(signal.kind_name().to_string(), now); + self.last_fired + .insert(signal.cooldown_key(session_key), now); self.maybe_reset_hourly_window(now); self.hourly_count += 1; } @@ -170,6 +173,7 @@ mod tests { .work_hours_start("09:00".to_string()) .work_hours_end("18:00".to_string()) .timezone("UTC".to_string()) + .idle_threshold_secs(1800) .build() } @@ -177,7 +181,7 @@ mod tests { fn pass_when_no_cooldown_hit() { let mut filter = ProactiveFilter::new(test_config()); let signal = ProactiveSignal::MorningGreeting; - assert!(filter.should_pass(&signal)); + assert!(filter.should_pass(&signal, None)); } #[test] @@ -187,24 +191,27 @@ mod tests { // Exhaust the hourly limit. for _ in 0..3 { - assert!(filter.should_pass(&signal)); - filter.record_fired(&signal); + assert!(filter.should_pass(&signal, None)); + filter.record_fired(&signal, None); } // Fourth should be blocked. - assert!(!filter.should_pass(&signal)); + assert!(!filter.should_pass(&signal, None)); } #[test] - fn cooldown_blocks_same_kind() { + fn cooldown_blocks_same_session_only() { let mut filter = ProactiveFilter::new(test_config()); let signal = ProactiveSignal::SessionIdle { idle_duration: Duration::from_secs(600), }; - assert!(filter.should_pass(&signal)); - filter.record_fired(&signal); + assert!(filter.should_pass(&signal, Some("session-a"))); + filter.record_fired(&signal, Some("session-a")); + + // Same session within cooldown should be blocked. + assert!(!filter.should_pass(&signal, Some("session-a"))); - // Same kind within cooldown should be blocked. - assert!(!filter.should_pass(&signal)); + // Different session should still pass. + assert!(filter.should_pass(&signal, Some("session-b"))); } } diff --git a/crates/kernel/src/proactive/signal.rs b/crates/kernel/src/proactive/signal.rs index daa1163d5..da22ff6d3 100644 --- a/crates/kernel/src/proactive/signal.rs +++ b/crates/kernel/src/proactive/signal.rs @@ -60,4 +60,17 @@ impl ProactiveSignal { Self::DailySummary => "daily_summary", } } + + /// Returns a cooldown key that distinguishes per-session signals. + /// + /// For session-scoped signals like `SessionIdle`, the key includes + /// the session identifier so that cooldowns apply per-session rather + /// than globally blocking all sessions of the same kind. + pub fn cooldown_key(&self, session_key: Option<&str>) -> String { + match (self, session_key) { + (Self::SessionIdle { .. }, Some(sk)) => format!("session_idle:{sk}"), + (Self::SessionCompleted { .. }, Some(sk)) => format!("session_completed:{sk}"), + _ => self.kind_name().to_string(), + } + } } From 4437c5ca79103ee7f286e5482a02b10dcce6a39a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 20 Mar 2026 19:02:25 +0900 Subject: [PATCH 04/12] fix(kernel): address round-2 review findings for proactive v2 (#786) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename _turn_failed → turn_failed (variable is now used) - Build SessionContext in handle_proactive_signal from process table so Mita receives session name and idle duration in context pack - Embed session_key in SessionIdle/SessionCompleted signal variants, removing the separate session_key parameter from filter API - Cache parsed timezone/time values in ProactiveFilter::new() to avoid re-parsing IANA timezone on every signal check - Evict stale cooldown entries in maybe_reset_hourly_window to prevent unbounded last_fired HashMap growth - Hoist SessionKey::deterministic("mita") outside emit_idle_signals loop - Deduplicate truncate() — shared via proactive/mod.rs - Add ProactiveConfig::validate() for startup-time config validation - Refactor build_heartbeat_context_pack to sections style with shared AVAILABLE_ACTIONS constant Closes #786 --- crates/kernel/src/kernel.rs | 85 ++++++++++++++++--------- crates/kernel/src/proactive/config.rs | 27 ++++++++ crates/kernel/src/proactive/context.rs | 51 ++++++++------- crates/kernel/src/proactive/filter.rs | 80 +++++++++++++++++------ crates/kernel/src/proactive/judgment.rs | 8 +-- crates/kernel/src/proactive/mod.rs | 8 +++ crates/kernel/src/proactive/signal.rs | 22 +++++-- 7 files changed, 192 insertions(+), 89 deletions(-) diff --git a/crates/kernel/src/kernel.rs b/crates/kernel/src/kernel.rs index db49bc01e..ce59cd721 100644 --- a/crates/kernel/src/kernel.rs +++ b/crates/kernel/src/kernel.rs @@ -539,7 +539,7 @@ impl Kernel { } else { crate::proactive::ProactiveSignal::DailySummary }; - self.try_emit_proactive_signal(signal, None); + self.try_emit_proactive_signal(signal); next_proactive_time = self.compute_next_proactive_time_event(); } } @@ -1492,12 +1492,9 @@ impl Kernel { "failed to spawn scheduled job agent" ); // Emit TaskFailed proactive signal so Mita is aware. - self.try_emit_proactive_signal( - crate::proactive::ProactiveSignal::TaskFailed { - error: format!("job {job_id}: {e}"), - }, - None, - ); + self.try_emit_proactive_signal(crate::proactive::ProactiveSignal::TaskFailed { + error: format!("job {job_id}: {e}"), + }); } } } @@ -1736,10 +1733,37 @@ impl Kernel { } /// Handle a proactive signal — build context pack and deliver to Mita. + /// + /// For session-scoped signals, looks up session info from the process + /// table to populate the context pack with session name, idle duration, + /// and last activity time. async fn handle_proactive_signal(&self, signal: crate::proactive::ProactiveSignal) { info!(kind = signal.kind_name(), "handling proactive signal"); - let context = crate::proactive::build_context_pack(&signal, None); + let session_ctx = match &signal { + crate::proactive::ProactiveSignal::SessionIdle { + session_key, + idle_duration, + } => self + .process_table + .with(session_key, |p| crate::proactive::SessionContext { + session_name: Some(p.manifest.name.clone()), + session_key: session_key.to_string(), + idle_since: Some(format!("{}m ago", idle_duration.as_secs() / 60)), + last_user_message: None, + }), + crate::proactive::ProactiveSignal::SessionCompleted { session_key, .. } => self + .process_table + .with(session_key, |p| crate::proactive::SessionContext { + session_name: Some(p.manifest.name.clone()), + session_key: session_key.to_string(), + idle_since: None, + last_user_message: None, + }), + _ => None, + }; + + let context = crate::proactive::build_context_pack(&signal, session_ctx.as_ref()); self.deliver_proactive_to_mita(&context).await; } @@ -1827,6 +1851,7 @@ impl Kernel { }; let idle_threshold = Duration::from_secs(idle_threshold_secs); let now = jiff::Timestamp::now(); + let mita_key = crate::session::SessionKey::deterministic("mita"); for stats in self.process_table.list() { // Only check Ready sessions (Active means currently running). @@ -1834,7 +1859,7 @@ impl Kernel { continue; } // Skip the Mita session itself. - if stats.session_key == crate::session::SessionKey::deterministic("mita") { + if stats.session_key == mita_key { continue; } @@ -1849,9 +1874,11 @@ impl Kernel { .unwrap_or(Duration::ZERO); if idle_duration >= idle_threshold { - let signal = crate::proactive::ProactiveSignal::SessionIdle { idle_duration }; - let sk = stats.session_key.to_string(); - self.try_emit_proactive_signal(signal, Some(&sk)); + let signal = crate::proactive::ProactiveSignal::SessionIdle { + session_key: stats.session_key.clone(), + idle_duration, + }; + self.try_emit_proactive_signal(signal); } } } @@ -1860,13 +1887,9 @@ impl Kernel { /// /// Checks the proactive filter; if the signal passes, pushes a /// `ProactiveSignal` event into the kernel event queue and records - /// the fire. The optional `session_key` enables per-session cooldown - /// tracking for session-scoped signals. - fn try_emit_proactive_signal( - &self, - signal: crate::proactive::ProactiveSignal, - session_key: Option<&str>, - ) { + /// the fire. Session-scoped cooldowns use the session key embedded + /// in the signal variant. + fn try_emit_proactive_signal(&self, signal: crate::proactive::ProactiveSignal) { let mut guard = self .proactive_filter .lock() @@ -1877,10 +1900,10 @@ impl Kernel { None => return, // Proactive disabled. }; - if !filter.should_pass(&signal, session_key) { + if !filter.should_pass(&signal) { return; } - filter.record_fired(&signal, session_key); + filter.record_fired(&signal); if let Err(e) = self .event_queue @@ -2694,7 +2717,7 @@ impl Kernel { // Track whether the turn errored so we can choose the right terminal // state below (Completed vs Failed). - let mut _turn_failed = false; + let mut turn_failed = false; let agent_name = self .process_table @@ -2798,8 +2821,8 @@ impl Kernel { } Err(err_msg) => { span.record("success", false); - _turn_failed = !interrupted; - if _turn_failed { + turn_failed = !interrupted; + if turn_failed { error!(session_key = %session_key, error = %err_msg, "turn failed"); } else { info!(session_key = %session_key, "turn interrupted by user"); @@ -2810,7 +2833,7 @@ impl Kernel { // already sent a confirmation message) and when // origin_endpoint is None (same rationale as reply delivery // above). - if _turn_failed && origin_endpoint.is_some() { + if turn_failed && origin_endpoint.is_some() { let envelope = OutboundEnvelope::error( in_reply_to, user.clone(), @@ -2841,7 +2864,7 @@ impl Kernel { // Worker/child session completed — emit SessionCompleted signal, // but only for non-Mita children to avoid feedback loops where // Mita dispatches agent → completes → signal → Mita again. - if !_turn_failed { + if !turn_failed { let mita_key = crate::session::SessionKey::deterministic("mita"); let is_mita_child = self .process_table @@ -2859,8 +2882,10 @@ impl Kernel { }) .unwrap_or_default(); self.try_emit_proactive_signal( - crate::proactive::ProactiveSignal::SessionCompleted { summary }, - None, + crate::proactive::ProactiveSignal::SessionCompleted { + session_key: session_key.clone(), + summary, + }, ); } } @@ -2876,7 +2901,7 @@ impl Kernel { // After each successful turn, spawn an async task to extract long-term // memories from the conversation tape. Failures are logged but never // block the main event loop. - if !_turn_failed { + if !turn_failed { let tape_service = self.tape_service.clone(); let knowledge = Arc::clone(&self.knowledge); let driver_registry = Arc::clone(self.syscall.driver_registry()); @@ -2929,7 +2954,7 @@ impl Kernel { // After the first successful turn, spawn a task to auto-generate a // human-readable title for the session using LLM. Only fires when // the session has no title yet. - if !_turn_failed { + if !turn_failed { let session_index = Arc::clone(self.io.session_index()); let needs_title = match session_index.get_session(&session_key).await { Ok(Some(entry)) => entry.title.is_none(), diff --git a/crates/kernel/src/proactive/config.rs b/crates/kernel/src/proactive/config.rs index 1944d55a9..53bc6c580 100644 --- a/crates/kernel/src/proactive/config.rs +++ b/crates/kernel/src/proactive/config.rs @@ -63,6 +63,33 @@ pub struct ProactiveConfig { } impl ProactiveConfig { + /// Validate the configuration at startup. + /// + /// Logs warnings for any invalid values so operators know what's + /// misconfigured rather than silently disabling features. + pub fn validate(&self) { + if self.parsed_timezone().is_none() { + // Warning already logged by parsed_timezone(). + } + if self.parsed_work_start().is_none() || self.parsed_work_end().is_none() { + // Warnings already logged by parsed_work_start/end(). + } + if let Some((start, end)) = &self.quiet_hours { + if parse_time_str(start).is_none() { + warn!( + value = start.as_str(), + "proactive config: invalid quiet_hours start, quiet hours disabled" + ); + } + if parse_time_str(end).is_none() { + warn!( + value = end.as_str(), + "proactive config: invalid quiet_hours end, quiet hours disabled" + ); + } + } + } + /// Parse `work_hours_start` as a `jiff::civil::Time`. pub fn parsed_work_start(&self) -> Option { let result = parse_time_str(&self.work_hours_start); diff --git a/crates/kernel/src/proactive/context.rs b/crates/kernel/src/proactive/context.rs index 07f67c1ff..2194776a8 100644 --- a/crates/kernel/src/proactive/context.rs +++ b/crates/kernel/src/proactive/context.rs @@ -19,7 +19,13 @@ use jiff::Timestamp; -use super::signal::ProactiveSignal; +use super::{signal::ProactiveSignal, truncate}; + +/// Shared available actions block appended to all context packs. +const AVAILABLE_ACTIONS: &str = "[Available Actions]\n- dispatch_rara: send a message to user \ + through a session\n- notify: push notification to user's \ + device\n- (no action): decide this event doesn't need \ + intervention"; /// Optional session context attached to a proactive signal. #[derive(Debug, Clone)] @@ -54,7 +60,7 @@ pub fn build_context_pack( // Signal-specific details match signal { - ProactiveSignal::SessionIdle { idle_duration } => { + ProactiveSignal::SessionIdle { idle_duration, .. } => { let mins = idle_duration.as_secs() / 60; sections .last_mut() @@ -67,7 +73,7 @@ pub fn build_context_pack( .expect("sections is non-empty") .push_str(&format!("\nerror: {}", truncate(error, 200),)); } - ProactiveSignal::SessionCompleted { summary } => { + ProactiveSignal::SessionCompleted { summary, .. } => { sections .last_mut() .expect("sections is non-empty") @@ -98,12 +104,7 @@ pub fn build_context_pack( } // [Available Actions] section - sections.push( - "[Available Actions]\n- dispatch_rara: send a message to user through a session\n- \ - notify: push notification to user's device\n- (no action): decide this event doesn't \ - need intervention" - .to_string(), - ); + sections.push(AVAILABLE_ACTIONS.to_string()); sections.join("\n\n") } @@ -114,23 +115,22 @@ pub fn build_context_pack( /// structured format used by proactive signals. pub fn build_heartbeat_context_pack(active_session_count: usize) -> String { let now = Timestamp::now(); + let mut sections = Vec::new(); - format!( - "[Proactive Event]\nkind: heartbeat_patrol\ntimestamp: \ - {now}\n\n[Context]\nactive_sessions: {active_session_count}\nAnalyze active sessions and \ - determine if any proactive actions are needed. Review your previous tape entries to \ - avoid repeating recent actions.\n\n[Available Actions]\n- dispatch_rara: send a message \ - to user through a session\n- notify: push notification to user's device\n- (no action): \ - decide this event doesn't need intervention" - ) -} + sections.push(format!( + "[Proactive Event]\nkind: heartbeat_patrol\ntimestamp: {}", + now, + )); -/// Truncate a string to at most `max` characters. -fn truncate(s: &str, max: usize) -> &str { - match s.char_indices().nth(max) { - Some((idx, _)) => &s[..idx], - None => s, - } + sections.push(format!( + "[Context]\nactive_sessions: {}\nAnalyze active sessions and determine if any proactive \ + actions are needed. Review your previous tape entries to avoid repeating recent actions.", + active_session_count, + )); + + sections.push(AVAILABLE_ACTIONS.to_string()); + + sections.join("\n\n") } #[cfg(test)] @@ -141,7 +141,10 @@ mod tests { #[test] fn context_pack_session_idle() { + use crate::session::SessionKey; + let signal = ProactiveSignal::SessionIdle { + session_key: SessionKey::deterministic("session-abc"), idle_duration: Duration::from_secs(7200), }; let ctx = SessionContext { diff --git a/crates/kernel/src/proactive/filter.rs b/crates/kernel/src/proactive/filter.rs index 4c279e7d1..fbb03efce 100644 --- a/crates/kernel/src/proactive/filter.rs +++ b/crates/kernel/src/proactive/filter.rs @@ -27,10 +27,17 @@ use super::{config::ProactiveConfig, signal::ProactiveSignal}; /// Pure rule-based filter for proactive signals. /// /// All checks are deterministic with zero LLM cost. Signals that fail -/// any check are silently dropped. +/// any check are silently dropped. Parsed config values (timezone, times) +/// are cached at construction to avoid re-parsing on every signal check. pub struct ProactiveFilter { /// Filter configuration (quiet hours, cooldowns, rate limits). config: ProactiveConfig, + /// Cached parsed timezone (avoids IANA lookup per signal). + timezone: Option, + /// Cached parsed quiet hours start. + quiet_start: Option, + /// Cached parsed quiet hours end. + quiet_end: Option, /// Last fire time per signal kind (for cooldown dedup). last_fired: HashMap, /// Number of signals passed in the current hourly window. @@ -41,9 +48,18 @@ pub struct ProactiveFilter { impl ProactiveFilter { /// Create a new filter from the given configuration. + /// + /// Parses timezone and time strings once and caches the results. pub fn new(config: ProactiveConfig) -> Self { + config.validate(); + let timezone = config.parsed_timezone(); + let quiet_start = config.parsed_quiet_start(); + let quiet_end = config.parsed_quiet_end(); Self { config, + timezone, + quiet_start, + quiet_end, last_fired: HashMap::new(), hourly_count: 0, hourly_window_start: Timestamp::now(), @@ -56,7 +72,7 @@ impl ProactiveFilter { /// rate limit checks. Note: the hourly window may be reset as a /// side effect. Call [`Self::record_fired`] after successfully /// emitting the signal. - pub fn should_pass(&mut self, signal: &ProactiveSignal, session_key: Option<&str>) -> bool { + pub fn should_pass(&mut self, signal: &ProactiveSignal) -> bool { let now = Timestamp::now(); // 1. Quiet hours check. @@ -69,7 +85,7 @@ impl ProactiveFilter { } // 2. Per-kind cooldown dedup (session-scoped for session signals). - let cooldown_key = signal.cooldown_key(session_key); + let cooldown_key = signal.cooldown_key(); if let Some(cooldown) = self.config.cooldowns.get(signal.kind_name()) { if let Some(last) = self.last_fired.get(&cooldown_key) { let elapsed_secs = now @@ -108,10 +124,9 @@ impl ProactiveFilter { /// Record that a signal was successfully emitted. /// /// Updates the cooldown timestamp and hourly counter. - pub fn record_fired(&mut self, signal: &ProactiveSignal, session_key: Option<&str>) { + pub fn record_fired(&mut self, signal: &ProactiveSignal) { let now = Timestamp::now(); - self.last_fired - .insert(signal.cooldown_key(session_key), now); + self.last_fired.insert(signal.cooldown_key(), now); self.maybe_reset_hourly_window(now); self.hourly_count += 1; } @@ -121,16 +136,13 @@ impl ProactiveFilter { /// Handles midnight wrap-around: if start > end (e.g. 23:00–08:00), /// quiet hours span midnight. fn is_quiet_hours(&self, now: Timestamp) -> bool { - let (start, end) = match ( - self.config.parsed_quiet_start(), - self.config.parsed_quiet_end(), - ) { + let (start, end) = match (self.quiet_start, self.quiet_end) { (Some(s), Some(e)) => (s, e), _ => return false, }; - let tz = match self.config.parsed_timezone() { - Some(tz) => tz, + let tz = match &self.timezone { + Some(tz) => tz.clone(), None => return false, }; @@ -147,6 +159,9 @@ impl ProactiveFilter { } /// Reset the hourly window if more than one hour has elapsed. + /// + /// Also evicts stale cooldown entries to prevent unbounded growth + /// of the `last_fired` map from per-session cooldown keys. fn maybe_reset_hourly_window(&mut self, now: Timestamp) { let elapsed_secs = now .since(self.hourly_window_start) @@ -155,6 +170,22 @@ impl ProactiveFilter { if elapsed_secs >= 3600.0 { self.hourly_window_start = now; self.hourly_count = 0; + + // Evict cooldown entries older than the maximum cooldown duration + // to prevent unbounded growth from session-scoped keys. + let max_cooldown = self + .config + .cooldowns + .values() + .max() + .copied() + .unwrap_or(Duration::from_secs(3600)); + self.last_fired.retain(|_, ts| { + now.since(*ts) + .and_then(|s| s.total(jiff::Unit::Second)) + .unwrap_or(0.0) + < max_cooldown.as_secs_f64() * 2.0 + }); } } } @@ -181,7 +212,7 @@ mod tests { fn pass_when_no_cooldown_hit() { let mut filter = ProactiveFilter::new(test_config()); let signal = ProactiveSignal::MorningGreeting; - assert!(filter.should_pass(&signal, None)); + assert!(filter.should_pass(&signal)); } #[test] @@ -191,27 +222,34 @@ mod tests { // Exhaust the hourly limit. for _ in 0..3 { - assert!(filter.should_pass(&signal, None)); - filter.record_fired(&signal, None); + assert!(filter.should_pass(&signal)); + filter.record_fired(&signal); } // Fourth should be blocked. - assert!(!filter.should_pass(&signal, None)); + assert!(!filter.should_pass(&signal)); } #[test] fn cooldown_blocks_same_session_only() { + use crate::session::SessionKey; + let mut filter = ProactiveFilter::new(test_config()); - let signal = ProactiveSignal::SessionIdle { + let signal_a = ProactiveSignal::SessionIdle { + session_key: SessionKey::deterministic("session-a"), + idle_duration: Duration::from_secs(600), + }; + let signal_b = ProactiveSignal::SessionIdle { + session_key: SessionKey::deterministic("session-b"), idle_duration: Duration::from_secs(600), }; - assert!(filter.should_pass(&signal, Some("session-a"))); - filter.record_fired(&signal, Some("session-a")); + assert!(filter.should_pass(&signal_a)); + filter.record_fired(&signal_a); // Same session within cooldown should be blocked. - assert!(!filter.should_pass(&signal, Some("session-a"))); + assert!(!filter.should_pass(&signal_a)); // Different session should still pass. - assert!(filter.should_pass(&signal, Some("session-b"))); + assert!(filter.should_pass(&signal_b)); } } diff --git a/crates/kernel/src/proactive/judgment.rs b/crates/kernel/src/proactive/judgment.rs index f2837b0d7..069421c80 100644 --- a/crates/kernel/src/proactive/judgment.rs +++ b/crates/kernel/src/proactive/judgment.rs @@ -260,13 +260,7 @@ fn parse_judgment(text: &str) -> ProactiveJudgment { } } -/// Truncate a string to at most `max` characters. -fn truncate(s: &str, max: usize) -> &str { - match s.char_indices().nth(max) { - Some((idx, _)) => &s[..idx], - None => s, - } -} +use super::truncate; #[cfg(test)] mod tests { diff --git a/crates/kernel/src/proactive/mod.rs b/crates/kernel/src/proactive/mod.rs index cf2975b7c..28bf07bab 100644 --- a/crates/kernel/src/proactive/mod.rs +++ b/crates/kernel/src/proactive/mod.rs @@ -32,3 +32,11 @@ pub use context::{SessionContext, build_context_pack, build_heartbeat_context_pa pub use filter::ProactiveFilter; pub use judgment::{ProactiveJudgment, should_reply}; pub use signal::ProactiveSignal; + +/// Truncate a string to at most `max` characters. +pub(crate) fn truncate(s: &str, max: usize) -> &str { + match s.char_indices().nth(max) { + Some((idx, _)) => &s[..idx], + None => s, + } +} diff --git a/crates/kernel/src/proactive/signal.rs b/crates/kernel/src/proactive/signal.rs index da22ff6d3..65e0a3eb1 100644 --- a/crates/kernel/src/proactive/signal.rs +++ b/crates/kernel/src/proactive/signal.rs @@ -18,6 +18,8 @@ use std::time::Duration; use serde::Serialize; +use crate::session::SessionKey; + /// Proactive signal for Mita orchestration. /// /// Each variant represents a distinct event that may warrant proactive @@ -27,6 +29,8 @@ use serde::Serialize; pub enum ProactiveSignal { /// Session has been idle beyond threshold. SessionIdle { + /// The session that has been idle. + session_key: SessionKey, /// How long the session has been idle. idle_duration: Duration, }, @@ -37,8 +41,10 @@ pub enum ProactiveSignal { }, /// Conversation naturally completed (turn ended without pending work). SessionCompleted { + /// The session that completed. + session_key: SessionKey, /// Brief summary of what the session accomplished. - summary: String, + summary: String, }, /// Daily morning greeting trigger (fires at work hours start). MorningGreeting, @@ -64,12 +70,14 @@ impl ProactiveSignal { /// Returns a cooldown key that distinguishes per-session signals. /// /// For session-scoped signals like `SessionIdle`, the key includes - /// the session identifier so that cooldowns apply per-session rather - /// than globally blocking all sessions of the same kind. - pub fn cooldown_key(&self, session_key: Option<&str>) -> String { - match (self, session_key) { - (Self::SessionIdle { .. }, Some(sk)) => format!("session_idle:{sk}"), - (Self::SessionCompleted { .. }, Some(sk)) => format!("session_completed:{sk}"), + /// the embedded session identifier so that cooldowns apply per-session + /// rather than globally blocking all sessions of the same kind. + pub fn cooldown_key(&self) -> String { + match self { + Self::SessionIdle { session_key, .. } => format!("session_idle:{session_key}"), + Self::SessionCompleted { session_key, .. } => { + format!("session_completed:{session_key}") + } _ => self.kind_name().to_string(), } } From ac42dfb7e628673e39a63bee00fb37239a192138 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 21 Mar 2026 01:36:40 +0900 Subject: [PATCH 05/12] fix(kernel): move SessionCompleted to idle-based trigger (#786) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SessionCompleted previously fired when a worker/child session finished (has_result_tx branch). Now it fires from emit_idle_signals() when a user conversation is idle for session_completed_secs (default 600s), detected alongside SessionIdle with a two-threshold approach: - idle >= completed_threshold AND < idle_threshold → SessionCompleted - idle >= idle_threshold → SessionIdle (not both) Closes #786 --- crates/kernel/src/kernel.rs | 43 ++++++++------------------- crates/kernel/src/proactive/config.rs | 6 ++++ crates/kernel/src/proactive/filter.rs | 1 + 3 files changed, 19 insertions(+), 31 deletions(-) diff --git a/crates/kernel/src/kernel.rs b/crates/kernel/src/kernel.rs index ce59cd721..7d98d9140 100644 --- a/crates/kernel/src/kernel.rs +++ b/crates/kernel/src/kernel.rs @@ -1845,11 +1845,12 @@ impl Kernel { /// Emit `SessionIdle` signals for sessions that have been idle beyond /// the configured threshold. fn emit_idle_signals(&self) { - let idle_threshold_secs = match self.config.proactive.as_ref() { - Some(c) => c.idle_threshold_secs, + let proactive_config = match self.config.proactive.as_ref() { + Some(c) => c, None => return, // Proactive disabled. }; - let idle_threshold = Duration::from_secs(idle_threshold_secs); + let completed_threshold = Duration::from_secs(proactive_config.session_completed_secs); + let idle_threshold = Duration::from_secs(proactive_config.idle_threshold_secs); let now = jiff::Timestamp::now(); let mita_key = crate::session::SessionKey::deterministic("mita"); @@ -1873,12 +1874,20 @@ impl Kernel { .map(|secs| Duration::from_secs_f64(secs)) .unwrap_or(Duration::ZERO); + // Emit at most one signal per session: SessionCompleted for the + // shorter window, SessionIdle for the longer one (not both). if idle_duration >= idle_threshold { let signal = crate::proactive::ProactiveSignal::SessionIdle { session_key: stats.session_key.clone(), idle_duration, }; self.try_emit_proactive_signal(signal); + } else if idle_duration >= completed_threshold { + let signal = crate::proactive::ProactiveSignal::SessionCompleted { + session_key: stats.session_key.clone(), + summary: String::new(), + }; + self.try_emit_proactive_signal(signal); } } } @@ -2861,34 +2870,6 @@ impl Kernel { .unwrap_or(false); if has_result_tx { - // Worker/child session completed — emit SessionCompleted signal, - // but only for non-Mita children to avoid feedback loops where - // Mita dispatches agent → completes → signal → Mita again. - if !turn_failed { - let mita_key = crate::session::SessionKey::deterministic("mita"); - let is_mita_child = self - .process_table - .with(&session_key, |p| p.parent_id.as_ref() == Some(&mita_key)) - .unwrap_or(false); - - if !is_mita_child { - let summary = self - .process_table - .with(&session_key, |p| { - p.result - .as_ref() - .map(|r| r.output.chars().take(100).collect::()) - .unwrap_or_default() - }) - .unwrap_or_default(); - self.try_emit_proactive_signal( - crate::proactive::ProactiveSignal::SessionCompleted { - session_key: session_key.clone(), - summary, - }, - ); - } - } self.cleanup_process(session_key).await; return; } diff --git a/crates/kernel/src/proactive/config.rs b/crates/kernel/src/proactive/config.rs index 53bc6c580..a5f5f7a5e 100644 --- a/crates/kernel/src/proactive/config.rs +++ b/crates/kernel/src/proactive/config.rs @@ -60,6 +60,11 @@ pub struct ProactiveConfig { /// Idle threshold in seconds — sessions idle beyond this duration /// trigger a `SessionIdle` signal. pub idle_threshold_secs: u64, + + /// Session completed threshold in seconds — sessions idle beyond this + /// duration trigger a `SessionCompleted` signal (conversation ended). + /// Typically shorter than `idle_threshold_secs` (e.g. 600 = 10 minutes). + pub session_completed_secs: u64, } impl ProactiveConfig { @@ -193,6 +198,7 @@ work_hours_start: "09:00" work_hours_end: "18:00" timezone: "Asia/Shanghai" idle_threshold_secs: 1800 +session_completed_secs: 600 "#; let config: ProactiveConfig = serde_yaml::from_str(yaml).expect("should parse"); assert_eq!(config.max_hourly, 5); diff --git a/crates/kernel/src/proactive/filter.rs b/crates/kernel/src/proactive/filter.rs index fbb03efce..c7c570c44 100644 --- a/crates/kernel/src/proactive/filter.rs +++ b/crates/kernel/src/proactive/filter.rs @@ -205,6 +205,7 @@ mod tests { .work_hours_end("18:00".to_string()) .timezone("UTC".to_string()) .idle_threshold_secs(1800) + .session_completed_secs(600) .build() } From ebda5ae8d0b434882c2569b5a34b1eaed03a26b3 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 21 Mar 2026 01:39:52 +0900 Subject: [PATCH 06/12] fix(kernel): populate last_user_message from tape in context pack (#786) Read the last user message from the session tape when building the proactive context pack for SessionIdle and SessionCompleted signals, so Mita has conversation context for its judgment. Closes #786 --- crates/kernel/src/kernel.rs | 61 +++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/crates/kernel/src/kernel.rs b/crates/kernel/src/kernel.rs index 7d98d9140..75461fb03 100644 --- a/crates/kernel/src/kernel.rs +++ b/crates/kernel/src/kernel.rs @@ -1744,22 +1744,26 @@ impl Kernel { crate::proactive::ProactiveSignal::SessionIdle { session_key, idle_duration, - } => self - .process_table - .with(session_key, |p| crate::proactive::SessionContext { - session_name: Some(p.manifest.name.clone()), - session_key: session_key.to_string(), - idle_since: Some(format!("{}m ago", idle_duration.as_secs() / 60)), - last_user_message: None, - }), - crate::proactive::ProactiveSignal::SessionCompleted { session_key, .. } => self - .process_table - .with(session_key, |p| crate::proactive::SessionContext { - session_name: Some(p.manifest.name.clone()), - session_key: session_key.to_string(), - idle_since: None, - last_user_message: None, - }), + } => { + let last_msg = self.last_user_message_from_tape(session_key).await; + self.process_table + .with(session_key, |p| crate::proactive::SessionContext { + session_name: Some(p.manifest.name.clone()), + session_key: session_key.to_string(), + idle_since: Some(format!("{}m ago", idle_duration.as_secs() / 60)), + last_user_message: last_msg, + }) + } + crate::proactive::ProactiveSignal::SessionCompleted { session_key, .. } => { + let last_msg = self.last_user_message_from_tape(session_key).await; + self.process_table + .with(session_key, |p| crate::proactive::SessionContext { + session_name: Some(p.manifest.name.clone()), + session_key: session_key.to_string(), + idle_since: None, + last_user_message: last_msg, + }) + } _ => None, }; @@ -1767,6 +1771,31 @@ impl Kernel { self.deliver_proactive_to_mita(&context).await; } + /// Read the last user message from a session's tape. + async fn last_user_message_from_tape( + &self, + session_key: &crate::session::SessionKey, + ) -> Option { + let tape_name = session_key.to_string(); + let entries = self + .tape_service + .from_last_anchor(&tape_name, Some(&[crate::memory::TapEntryKind::Message])) + .await + .ok()?; + + // Walk backwards to find the last user message. + entries.iter().rev().find_map(|entry| { + let payload = entry.payload.as_object()?; + let role = payload.get("role")?.as_str()?; + if role == "user" { + let content = payload.get("content")?.as_str()?; + Some(content.chars().take(200).collect()) + } else { + None + } + }) + } + /// Deliver a proactive context pack to the Mita session. /// /// Similar to `handle_mita_heartbeat` delivery but with structured From 0cb8ee4db37abd7b2c7e62ca381c49790f32ae9e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 21 Mar 2026 01:45:19 +0900 Subject: [PATCH 07/12] feat(kernel): add Mita history section to proactive context pack (#786) Add [Mita History] section to both signal and heartbeat context packs so Mita can see its recent actions and avoid redundant behavior. - Add MitaHistory struct to proactive::context - Wire build_mita_history() into handle_proactive_signal and handle_mita_heartbeat - Extract last 5 tool calls from the Mita tape as recent action lines - Add tests for history inclusion, empty-history omission, and heartbeat variant Closes #786 --- crates/kernel/src/kernel.rs | 40 ++++++++++++- crates/kernel/src/proactive/context.rs | 80 ++++++++++++++++++++++++-- crates/kernel/src/proactive/mod.rs | 2 +- 3 files changed, 115 insertions(+), 7 deletions(-) diff --git a/crates/kernel/src/kernel.rs b/crates/kernel/src/kernel.rs index 75461fb03..10d0aca2e 100644 --- a/crates/kernel/src/kernel.rs +++ b/crates/kernel/src/kernel.rs @@ -1722,7 +1722,9 @@ impl Kernel { .iter() .filter(|p| matches!(p.state, SessionState::Active | SessionState::Ready)) .count(); - let context = crate::proactive::build_heartbeat_context_pack(active_count); + let mita_history = self.build_mita_history().await; + let context = + crate::proactive::build_heartbeat_context_pack(active_count, mita_history.as_ref()); let msg = InboundMessage::synthetic( context, crate::identity::UserId("system".to_string()), @@ -1767,10 +1769,44 @@ impl Kernel { _ => None, }; - let context = crate::proactive::build_context_pack(&signal, session_ctx.as_ref()); + let mita_history = self.build_mita_history().await; + let context = crate::proactive::build_context_pack( + &signal, + session_ctx.as_ref(), + mita_history.as_ref(), + ); self.deliver_proactive_to_mita(&context).await; } + /// Extract recent Mita actions from the Mita tape. + async fn build_mita_history(&self) -> Option { + let mita_tape = crate::session::SessionKey::deterministic("mita").to_string(); + let entries = self + .tape_service + .from_last_anchor(&mita_tape, Some(&[crate::memory::TapEntryKind::ToolCall])) + .await + .ok()?; + + // Take last 5 tool calls as recent actions. + let recent_actions: Vec = entries + .iter() + .rev() + .take(5) + .filter_map(|entry| { + let payload = entry.payload.as_object()?; + let tool_name = payload.get("name")?.as_str()?; + let timestamp = entry.timestamp; + Some(format!("{}: called {}", timestamp, tool_name)) + }) + .collect(); + + if recent_actions.is_empty() { + None + } else { + Some(crate::proactive::MitaHistory { recent_actions }) + } + } + /// Read the last user message from a session's tape. async fn last_user_message_from_tape( &self, diff --git a/crates/kernel/src/proactive/context.rs b/crates/kernel/src/proactive/context.rs index 2194776a8..e6323e889 100644 --- a/crates/kernel/src/proactive/context.rs +++ b/crates/kernel/src/proactive/context.rs @@ -27,6 +27,13 @@ const AVAILABLE_ACTIONS: &str = "[Available Actions]\n- dispatch_rara: send a me device\n- (no action): decide this event doesn't need \ intervention"; +/// Recent Mita action history for context pack. +#[derive(Debug, Clone)] +pub struct MitaHistory { + /// Recent Mita actions as human-readable lines. + pub recent_actions: Vec, +} + /// Optional session context attached to a proactive signal. #[derive(Debug, Clone)] pub struct SessionContext { @@ -47,6 +54,7 @@ pub struct SessionContext { pub fn build_context_pack( signal: &ProactiveSignal, session_context: Option<&SessionContext>, + mita_history: Option<&MitaHistory>, ) -> String { let now = Timestamp::now(); let mut sections = Vec::new(); @@ -103,6 +111,16 @@ pub fn build_context_pack( sections.push(format!("[Context]\n{}", context_lines.join("\n"))); } + // [Mita History] section + if let Some(history) = mita_history { + if !history.recent_actions.is_empty() { + sections.push(format!( + "[Mita History]\n{}", + history.recent_actions.join("\n") + )); + } + } + // [Available Actions] section sections.push(AVAILABLE_ACTIONS.to_string()); @@ -113,7 +131,10 @@ pub fn build_context_pack( /// /// Replaces the previous one-line heartbeat message with the same /// structured format used by proactive signals. -pub fn build_heartbeat_context_pack(active_session_count: usize) -> String { +pub fn build_heartbeat_context_pack( + active_session_count: usize, + mita_history: Option<&MitaHistory>, +) -> String { let now = Timestamp::now(); let mut sections = Vec::new(); @@ -128,6 +149,16 @@ pub fn build_heartbeat_context_pack(active_session_count: usize) -> String { active_session_count, )); + // [Mita History] section + if let Some(history) = mita_history { + if !history.recent_actions.is_empty() { + sections.push(format!( + "[Mita History]\n{}", + history.recent_actions.join("\n") + )); + } + } + sections.push(AVAILABLE_ACTIONS.to_string()); sections.join("\n\n") @@ -153,7 +184,7 @@ mod tests { idle_since: Some("2h ago".to_string()), last_user_message: Some("check that PR".to_string()), }; - let pack = build_context_pack(&signal, Some(&ctx)); + let pack = build_context_pack(&signal, Some(&ctx), None); assert!(pack.contains("kind: session_idle")); assert!(pack.contains("idle_duration: 120m")); assert!(pack.contains("\"PR review\"")); @@ -163,7 +194,7 @@ mod tests { #[test] fn context_pack_morning_greeting() { let signal = ProactiveSignal::MorningGreeting; - let pack = build_context_pack(&signal, None); + let pack = build_context_pack(&signal, None, None); assert!(pack.contains("kind: morning_greeting")); assert!(pack.contains("[Available Actions]")); // No [Context] section when no session context. @@ -172,8 +203,49 @@ mod tests { #[test] fn heartbeat_context_pack() { - let pack = build_heartbeat_context_pack(3); + let pack = build_heartbeat_context_pack(3, None); assert!(pack.contains("kind: heartbeat_patrol")); assert!(pack.contains("active_sessions: 3")); } + + #[test] + fn context_pack_with_mita_history() { + let signal = ProactiveSignal::MorningGreeting; + let history = MitaHistory { + recent_actions: vec![ + "2026-03-21T08:00:00Z: called dispatch_rara".to_string(), + "2026-03-21T07:30:00Z: called notify".to_string(), + ], + }; + let pack = build_context_pack(&signal, None, Some(&history)); + assert!(pack.contains("[Mita History]")); + assert!(pack.contains("called dispatch_rara")); + assert!(pack.contains("called notify")); + // History should appear before Available Actions. + let history_pos = pack.find("[Mita History]").expect("history section exists"); + let actions_pos = pack + .find("[Available Actions]") + .expect("actions section exists"); + assert!(history_pos < actions_pos); + } + + #[test] + fn context_pack_empty_history_omitted() { + let signal = ProactiveSignal::MorningGreeting; + let history = MitaHistory { + recent_actions: vec![], + }; + let pack = build_context_pack(&signal, None, Some(&history)); + assert!(!pack.contains("[Mita History]")); + } + + #[test] + fn heartbeat_context_pack_with_history() { + let history = MitaHistory { + recent_actions: vec!["2026-03-21T09:00:00Z: called notify".to_string()], + }; + let pack = build_heartbeat_context_pack(2, Some(&history)); + assert!(pack.contains("[Mita History]")); + assert!(pack.contains("called notify")); + } } diff --git a/crates/kernel/src/proactive/mod.rs b/crates/kernel/src/proactive/mod.rs index 28bf07bab..970b4bdbe 100644 --- a/crates/kernel/src/proactive/mod.rs +++ b/crates/kernel/src/proactive/mod.rs @@ -28,7 +28,7 @@ mod judgment; mod signal; pub use config::ProactiveConfig; -pub use context::{SessionContext, build_context_pack, build_heartbeat_context_pack}; +pub use context::{MitaHistory, SessionContext, build_context_pack, build_heartbeat_context_pack}; pub use filter::ProactiveFilter; pub use judgment::{ProactiveJudgment, should_reply}; pub use signal::ProactiveSignal; From 1a0111ec2cc224f71aa5a010c50e82a350fd3729 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 21 Mar 2026 01:51:05 +0900 Subject: [PATCH 08/12] feat(kernel): add lightweight LLM judgment layer for proactive signals (#786) Add a signal_judgment module that pre-filters proactive signals with a cheap LLM call before routing them through a full Mita agent turn. This prevents noise from low-value signals (e.g. idle sessions with no actionable context). - New SignalJudgment enum (ShouldAct/ShouldDrop) with YES/NO parsing - Optional judgment_model field in ProactiveConfig (backward compatible) - Wired into handle_proactive_signal after context pack is built - Defaults to ShouldDrop on LLM errors (fail-safe for silence) - 6 unit tests for parse function Closes #786 --- crates/kernel/src/kernel.rs | 45 +++- crates/kernel/src/proactive/config.rs | 5 + crates/kernel/src/proactive/mod.rs | 3 + .../kernel/src/proactive/signal_judgment.rs | 217 ++++++++++++++++++ 4 files changed, 269 insertions(+), 1 deletion(-) create mode 100644 crates/kernel/src/proactive/signal_judgment.rs diff --git a/crates/kernel/src/kernel.rs b/crates/kernel/src/kernel.rs index 10d0aca2e..64fe43253 100644 --- a/crates/kernel/src/kernel.rs +++ b/crates/kernel/src/kernel.rs @@ -45,7 +45,7 @@ use futures::FutureExt; use jiff::Timestamp; use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, error, info, info_span, warn}; +use tracing::{Instrument, debug, error, info, info_span, warn}; use crate::{ KernelError, @@ -1775,6 +1775,49 @@ impl Kernel { session_ctx.as_ref(), mita_history.as_ref(), ); + + // Lightweight LLM judgment: is this signal worth a full Mita turn? + if let Some(model) = self + .config + .proactive + .as_ref() + .and_then(|c| c.judgment_model.as_deref()) + { + match self + .syscall + .driver_registry() + .resolve("__proactive_judgment__", None, None) + { + Ok((driver, resolved_model)) => { + let model_name = if model.is_empty() { + &resolved_model + } else { + model + }; + match crate::proactive::should_act(&driver, model_name, &context).await { + crate::proactive::SignalJudgment::ShouldDrop { reason } => { + info!( + kind = signal.kind_name(), + reason = %reason, + "signal dropped by LLM judgment" + ); + return; + } + crate::proactive::SignalJudgment::ShouldAct { reason } => { + debug!( + kind = signal.kind_name(), + reason = %reason, + "signal approved by LLM judgment" + ); + } + } + } + Err(e) => { + debug!(error = %e, "signal judgment: driver not available, passing through"); + } + } + } + self.deliver_proactive_to_mita(&context).await; } diff --git a/crates/kernel/src/proactive/config.rs b/crates/kernel/src/proactive/config.rs index a5f5f7a5e..86a0ab81d 100644 --- a/crates/kernel/src/proactive/config.rs +++ b/crates/kernel/src/proactive/config.rs @@ -65,6 +65,11 @@ pub struct ProactiveConfig { /// duration trigger a `SessionCompleted` signal (conversation ended). /// Typically shorter than `idle_threshold_secs` (e.g. 600 = 10 minutes). pub session_completed_secs: u64, + + /// Optional model name for lightweight signal judgment LLM calls. + /// When absent, signals pass directly to Mita without LLM pre-filtering. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub judgment_model: Option, } impl ProactiveConfig { diff --git a/crates/kernel/src/proactive/mod.rs b/crates/kernel/src/proactive/mod.rs index 970b4bdbe..b68c8657b 100644 --- a/crates/kernel/src/proactive/mod.rs +++ b/crates/kernel/src/proactive/mod.rs @@ -20,18 +20,21 @@ //! - **Filter** — pure rule-based gate (quiet hours, cooldowns, rate limits) //! - **Context** — structured context pack builder for Mita //! - **Judgment** — lightweight LLM judgment for group-chat replies +//! - **Signal judgment** — lightweight LLM pre-filter for proactive signals mod config; mod context; mod filter; mod judgment; mod signal; +mod signal_judgment; pub use config::ProactiveConfig; pub use context::{MitaHistory, SessionContext, build_context_pack, build_heartbeat_context_pack}; pub use filter::ProactiveFilter; pub use judgment::{ProactiveJudgment, should_reply}; pub use signal::ProactiveSignal; +pub use signal_judgment::{SignalJudgment, should_act}; /// Truncate a string to at most `max` characters. pub(crate) fn truncate(s: &str, max: usize) -> &str { diff --git a/crates/kernel/src/proactive/signal_judgment.rs b/crates/kernel/src/proactive/signal_judgment.rs new file mode 100644 index 000000000..a2739c5fb --- /dev/null +++ b/crates/kernel/src/proactive/signal_judgment.rs @@ -0,0 +1,217 @@ +// Copyright 2025 Rararulab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Lightweight LLM pre-judgment for proactive signals. +//! +//! Before routing a proactive signal through a full Mita agent turn, a +//! cheap LLM call decides whether the signal is worth acting on. This +//! avoids waking Mita for signals that would just produce noise. +//! +//! # Flow +//! +//! ```text +//! proactive signal +//! -> rule filter (quiet hours, cooldowns, rate limit) +//! -> context pack built +//! -> lightweight LLM call: "should Mita act?" +//! -> no -> silently drop +//! -> yes -> deliver to Mita (full agent turn) +//! ``` + +use tracing::{debug, info, warn}; + +use crate::llm::{ + self, + driver::LlmDriverRef, + types::{CompletionRequest, ToolChoice}, +}; + +/// System prompt for the lightweight signal judgment. +const SIGNAL_JUDGMENT_PROMPT: &str = r#"You are a judgment module for Mita, a background orchestration agent. + +Your task: Given a proactive event and its context, decide whether Mita should take action. + +Mita should act when: +- The user likely needs a reminder or follow-up +- Important information should be communicated proactively +- The event indicates something the user would want to know about + +Mita should NOT act when: +- The idle session has no actionable context (user just said "ok" or "thanks") +- The event is routine and doesn't need user attention +- Acting would be annoying rather than helpful + +Respond with EXACTLY one line: +- "YES: " if Mita should act on this event +- "NO: " if this event should be silently dropped + +Be conservative — when in doubt, drop. The user should feel helped, not pestered."#; + +/// Result of the lightweight signal judgment. +#[derive(Debug, Clone)] +pub enum SignalJudgment { + /// Mita should act on this signal. + ShouldAct { reason: String }, + /// The signal should be silently dropped. + ShouldDrop { reason: String }, +} + +/// Run a lightweight LLM judgment to decide whether a proactive signal +/// is worth a full Mita agent turn. +/// +/// The `context_pack` should be the already-built context string that +/// would be delivered to Mita. +/// +/// Returns [`SignalJudgment::ShouldDrop`] on any error — better to +/// silently drop than to spam the user. +pub async fn should_act(driver: &LlmDriverRef, model: &str, context_pack: &str) -> SignalJudgment { + let messages = vec![ + llm::Message::system(SIGNAL_JUDGMENT_PROMPT), + llm::Message::user(format!( + "Proactive event context:\n{context_pack}\n\nShould Mita act on this event?" + )), + ]; + + let request = CompletionRequest { + model: model.to_string(), + messages, + tools: Vec::new(), + temperature: Some(0.0), + max_tokens: Some(100), + thinking: None, + tool_choice: ToolChoice::None, + parallel_tool_calls: false, + frequency_penalty: None, + }; + + let response = match driver.complete(request).await { + Ok(resp) => resp, + Err(e) => { + warn!(error = %e, "signal judgment: LLM call failed, dropping signal"); + return SignalJudgment::ShouldDrop { + reason: "LLM call failed".into(), + }; + } + }; + + let reply_text = response.content.unwrap_or_default(); + parse_signal_judgment(&reply_text) +} + +/// Parse the LLM response into a [`SignalJudgment`]. +/// +/// Expected format: `"YES: reason"` or `"NO: reason"`. +/// Defaults to drop on unparseable responses. +fn parse_signal_judgment(text: &str) -> SignalJudgment { + let trimmed = text.trim(); + let upper = trimmed.to_uppercase(); + + if upper.starts_with("YES") { + let reason = trimmed + .get(3..) + .map(|s| s.trim_start_matches(':').trim()) + .unwrap_or("") + .to_string(); + info!(reason = %reason, "signal judgment = ACT"); + SignalJudgment::ShouldAct { + reason: if reason.is_empty() { + "LLM decided to act".into() + } else { + reason + }, + } + } else { + let reason = if upper.starts_with("NO") { + trimmed + .get(2..) + .map(|s| s.trim_start_matches(':').trim()) + .unwrap_or("") + .to_string() + } else { + debug!(raw_response = %trimmed, "signal judgment: unparseable response, defaulting to drop"); + format!("unparseable response: {}", truncate(trimmed, 80)) + }; + info!(reason = %reason, "signal judgment = DROP"); + SignalJudgment::ShouldDrop { + reason: if reason.is_empty() { + "LLM decided to drop".into() + } else { + reason + }, + } + } +} + +use super::truncate; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_yes_with_reason() { + match parse_signal_judgment("YES: user has a pending task reminder") { + SignalJudgment::ShouldAct { reason } => { + assert!(reason.contains("pending task")); + } + other => panic!("expected ShouldAct, got {other:?}"), + } + } + + #[test] + fn parse_no_with_reason() { + match parse_signal_judgment("NO: user just said thanks, nothing actionable") { + SignalJudgment::ShouldDrop { reason } => { + assert!(reason.contains("thanks")); + } + other => panic!("expected ShouldDrop, got {other:?}"), + } + } + + #[test] + fn parse_yes_lowercase() { + match parse_signal_judgment("yes: important deadline approaching") { + SignalJudgment::ShouldAct { reason } => { + assert!(reason.contains("deadline")); + } + other => panic!("expected ShouldAct, got {other:?}"), + } + } + + #[test] + fn parse_no_bare() { + match parse_signal_judgment("NO") { + SignalJudgment::ShouldDrop { reason } => { + assert_eq!(reason, "LLM decided to drop"); + } + other => panic!("expected ShouldDrop, got {other:?}"), + } + } + + #[test] + fn parse_garbage_defaults_to_drop() { + match parse_signal_judgment("I'm not sure what to do here") { + SignalJudgment::ShouldDrop { .. } => {} + other => panic!("expected ShouldDrop, got {other:?}"), + } + } + + #[test] + fn parse_empty_defaults_to_drop() { + match parse_signal_judgment("") { + SignalJudgment::ShouldDrop { .. } => {} + other => panic!("expected ShouldDrop, got {other:?}"), + } + } +} From fd916fbb028daa2bec208545e4c95800d971f513 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 21 Mar 2026 01:55:27 +0900 Subject: [PATCH 09/12] feat(app): add mita_update_proactive_config tool (#786) Add a Mita-exclusive tool that allows dynamic updates to the proactive filter configuration (quiet hours, cooldowns, rate limits). The tool reads/writes config_dir()/mita/proactive.yaml. Closes #786 --- crates/agents/src/lib.rs | 1 + .../src/tools/mita_update_proactive_config.rs | 177 ++++++++++++++++++ crates/app/src/tools/mod.rs | 4 + 3 files changed, 182 insertions(+) create mode 100644 crates/app/src/tools/mita_update_proactive_config.rs diff --git a/crates/agents/src/lib.rs b/crates/agents/src/lib.rs index 106e53685..4abef7a70 100644 --- a/crates/agents/src/lib.rs +++ b/crates/agents/src/lib.rs @@ -136,6 +136,7 @@ static MITA_MANIFEST: LazyLock = LazyLock::new(|| AgentManifest { "update-soul-state".to_string(), "evolve-soul".to_string(), "update-session-title".to_string(), + "update-proactive-config".to_string(), ], max_children: Some(0), max_context_tokens: None, diff --git a/crates/app/src/tools/mita_update_proactive_config.rs b/crates/app/src/tools/mita_update_proactive_config.rs new file mode 100644 index 000000000..ca1c48685 --- /dev/null +++ b/crates/app/src/tools/mita_update_proactive_config.rs @@ -0,0 +1,177 @@ +// Copyright 2025 Rararulab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Mita-exclusive tool for dynamically updating the proactive filter +//! configuration (quiet hours, cooldowns, rate limits). +//! +//! Reads/writes `config_dir()/mita/proactive.yaml` so changes persist +//! across restarts without touching the main config file. + +use std::{collections::HashMap, path::PathBuf, time::Duration}; + +use async_trait::async_trait; +use rara_kernel::{ + proactive::ProactiveConfig, + tool::{ToolContext, ToolExecute}, +}; +use rara_tool_macro::ToolDef; +use schemars::JsonSchema; +use serde::Deserialize; +use serde_json::{Value, json}; +use tracing::info; + +use super::notify::push_notification; + +/// Valid field names that can be updated. +const UPDATABLE_FIELDS: &[&str] = &["quiet_hours", "max_hourly", "cooldowns"]; + +/// Input parameters for the update-proactive-config tool. +#[derive(Debug, Deserialize, JsonSchema)] +pub struct UpdateProactiveConfigParams { + /// Field to update. One of: "quiet_hours", "max_hourly", "cooldowns". + field: String, + /// New value as JSON (will be parsed according to field type). + value: Value, +} + +/// Mita-exclusive tool: update a specific field in the proactive filter config. +#[derive(ToolDef)] +#[tool( + name = "update-proactive-config", + description = "Update proactive filter configuration. Adjusts quiet hours, cooldowns, or rate \ + limits based on user preferences.", + bypass_interceptor +)] +pub struct UpdateProactiveConfigTool; + +impl UpdateProactiveConfigTool { + /// Create a new instance. + pub fn new() -> Self { Self } +} + +/// Resolve the proactive config file path: `config_dir()/mita/proactive.yaml`. +fn config_path() -> PathBuf { rara_paths::config_dir().join("mita").join("proactive.yaml") } + +/// Load the current proactive config from disk, returning `None` if the file +/// does not exist. +fn load_config() -> anyhow::Result> { + let path = config_path(); + if !path.exists() { + return Ok(None); + } + let contents = std::fs::read_to_string(&path)?; + let config: ProactiveConfig = serde_yaml::from_str(&contents)?; + Ok(Some(config)) +} + +/// Write the proactive config to disk, creating parent directories if needed. +fn save_config(config: &ProactiveConfig) -> anyhow::Result<()> { + let path = config_path(); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + let yaml = serde_yaml::to_string(config)?; + std::fs::write(&path, yaml)?; + Ok(()) +} + +#[async_trait] +impl ToolExecute for UpdateProactiveConfigTool { + type Output = Value; + type Params = UpdateProactiveConfigParams; + + async fn run( + &self, + params: UpdateProactiveConfigParams, + context: &ToolContext, + ) -> anyhow::Result { + if !UPDATABLE_FIELDS.contains(¶ms.field.as_str()) { + anyhow::bail!( + "invalid field '{}': must be one of {}", + params.field, + UPDATABLE_FIELDS.join(", ") + ); + } + + let mut config = load_config()?.ok_or_else(|| { + anyhow::anyhow!( + "proactive config not found at {}; cannot update a non-existent config", + config_path().display() + ) + })?; + + match params.field.as_str() { + "quiet_hours" => { + // Accept null to disable, or ["HH:MM", "HH:MM"] to set. + let quiet: Option<(String, String)> = serde_json::from_value(params.value.clone()) + .map_err(|e| { + anyhow::anyhow!( + "invalid quiet_hours value: {e}. Expected null or [\"HH:MM\", \ + \"HH:MM\"]" + ) + })?; + config.quiet_hours = quiet; + info!( + quiet_hours = ?config.quiet_hours, + "proactive config: quiet_hours updated" + ); + } + "max_hourly" => { + let max: u32 = serde_json::from_value(params.value.clone()).map_err(|e| { + anyhow::anyhow!("invalid max_hourly value: {e}. Expected a positive integer") + })?; + config.max_hourly = max; + info!(max_hourly = max, "proactive config: max_hourly updated"); + } + "cooldowns" => { + // Accept a map of signal_kind -> seconds. + let raw: HashMap = serde_json::from_value(params.value.clone()) + .map_err(|e| { + anyhow::anyhow!( + "invalid cooldowns value: {e}. Expected an object mapping signal \ + names to seconds" + ) + })?; + // Merge into existing cooldowns rather than replacing. + for (key, secs) in &raw { + config + .cooldowns + .insert(key.clone(), Duration::from_secs(*secs)); + } + info!( + merged_keys = raw.len(), + total = config.cooldowns.len(), + "proactive config: cooldowns updated" + ); + } + _ => unreachable!(), + } + + save_config(&config)?; + + push_notification( + context, + format!( + "\u{2699}\u{fe0f} Proactive config updated: {}", + params.field + ), + ); + + Ok(json!({ + "status": "ok", + "field": params.field, + "message": format!("Proactive config field '{}' updated.", params.field) + })) + } +} diff --git a/crates/app/src/tools/mod.rs b/crates/app/src/tools/mod.rs index 7605ed9c8..09efc5205 100644 --- a/crates/app/src/tools/mod.rs +++ b/crates/app/src/tools/mod.rs @@ -36,6 +36,7 @@ mod mita_distill_user_notes; mod mita_evolve_soul; mod mita_list_sessions; mod mita_read_tape; +mod mita_update_proactive_config; mod mita_update_session_title; mod mita_update_soul_state; mod mita_write_user_note; @@ -69,6 +70,7 @@ use mita_distill_user_notes::DistillUserNotesTool; use mita_evolve_soul::EvolveSoulTool; use mita_list_sessions::ListSessionsTool; use mita_read_tape::ReadTapeTool; +use mita_update_proactive_config::UpdateProactiveConfigTool; use mita_update_session_title::UpdateSessionTitleTool; use mita_update_soul_state::UpdateSoulStateTool; use mita_write_user_note::MitaWriteUserNoteTool; @@ -240,6 +242,8 @@ pub fn register_all(registry: &mut ToolRegistry, deps: ToolDeps) -> ToolRegistra // Mita soul evolution tools Arc::new(UpdateSoulStateTool::new()), Arc::new(EvolveSoulTool::new()), + // Mita proactive config tool + Arc::new(UpdateProactiveConfigTool::new()), // ACP delegation Arc::new(AcpDelegateTool::new(deps.acp_registry.clone())), // ACP management tools From 255065218674e3899cb1273599a160b82f711890 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 21 Mar 2026 01:59:36 +0900 Subject: [PATCH 10/12] refactor(kernel): use dynamic tool list in proactive context pack (#786) Replace hardcoded AVAILABLE_ACTIONS with dynamic tool list from Mita's agent manifest when available, falling back to the static const. Closes #786 --- crates/kernel/src/kernel.rs | 14 +++++- crates/kernel/src/proactive/context.rs | 63 ++++++++++++++++++++++---- 2 files changed, 66 insertions(+), 11 deletions(-) diff --git a/crates/kernel/src/kernel.rs b/crates/kernel/src/kernel.rs index 64fe43253..9cf9269ec 100644 --- a/crates/kernel/src/kernel.rs +++ b/crates/kernel/src/kernel.rs @@ -1723,8 +1723,14 @@ impl Kernel { .filter(|p| matches!(p.state, SessionState::Active | SessionState::Ready)) .count(); let mita_history = self.build_mita_history().await; - let context = - crate::proactive::build_heartbeat_context_pack(active_count, mita_history.as_ref()); + let mita_tools: Option> = + self.agent_registry.get("mita").map(|m| m.tools.clone()); + let tools_ref = mita_tools.as_deref(); + let context = crate::proactive::build_heartbeat_context_pack( + active_count, + mita_history.as_ref(), + tools_ref, + ); let msg = InboundMessage::synthetic( context, crate::identity::UserId("system".to_string()), @@ -1770,10 +1776,14 @@ impl Kernel { }; let mita_history = self.build_mita_history().await; + let mita_tools: Option> = + self.agent_registry.get("mita").map(|m| m.tools.clone()); + let tools_ref = mita_tools.as_deref(); let context = crate::proactive::build_context_pack( &signal, session_ctx.as_ref(), mita_history.as_ref(), + tools_ref, ); // Lightweight LLM judgment: is this signal worth a full Mita turn? diff --git a/crates/kernel/src/proactive/context.rs b/crates/kernel/src/proactive/context.rs index e6323e889..0c633a336 100644 --- a/crates/kernel/src/proactive/context.rs +++ b/crates/kernel/src/proactive/context.rs @@ -55,6 +55,7 @@ pub fn build_context_pack( signal: &ProactiveSignal, session_context: Option<&SessionContext>, mita_history: Option<&MitaHistory>, + available_tools: Option<&[String]>, ) -> String { let now = Timestamp::now(); let mut sections = Vec::new(); @@ -121,8 +122,16 @@ pub fn build_context_pack( } } - // [Available Actions] section - sections.push(AVAILABLE_ACTIONS.to_string()); + // [Available Actions] section — prefer dynamic tool list from agent manifest. + if let Some(tools) = available_tools { + let tool_lines: Vec = tools.iter().map(|t| format!("- {t}")).collect(); + sections.push(format!( + "[Available Actions]\n{}\n- (no action): decide this event doesn't need intervention", + tool_lines.join("\n") + )); + } else { + sections.push(AVAILABLE_ACTIONS.to_string()); + } sections.join("\n\n") } @@ -134,6 +143,7 @@ pub fn build_context_pack( pub fn build_heartbeat_context_pack( active_session_count: usize, mita_history: Option<&MitaHistory>, + available_tools: Option<&[String]>, ) -> String { let now = Timestamp::now(); let mut sections = Vec::new(); @@ -159,7 +169,15 @@ pub fn build_heartbeat_context_pack( } } - sections.push(AVAILABLE_ACTIONS.to_string()); + if let Some(tools) = available_tools { + let tool_lines: Vec = tools.iter().map(|t| format!("- {t}")).collect(); + sections.push(format!( + "[Available Actions]\n{}\n- (no action): decide this event doesn't need intervention", + tool_lines.join("\n") + )); + } else { + sections.push(AVAILABLE_ACTIONS.to_string()); + } sections.join("\n\n") } @@ -184,7 +202,7 @@ mod tests { idle_since: Some("2h ago".to_string()), last_user_message: Some("check that PR".to_string()), }; - let pack = build_context_pack(&signal, Some(&ctx), None); + let pack = build_context_pack(&signal, Some(&ctx), None, None); assert!(pack.contains("kind: session_idle")); assert!(pack.contains("idle_duration: 120m")); assert!(pack.contains("\"PR review\"")); @@ -194,7 +212,7 @@ mod tests { #[test] fn context_pack_morning_greeting() { let signal = ProactiveSignal::MorningGreeting; - let pack = build_context_pack(&signal, None, None); + let pack = build_context_pack(&signal, None, None, None); assert!(pack.contains("kind: morning_greeting")); assert!(pack.contains("[Available Actions]")); // No [Context] section when no session context. @@ -203,7 +221,7 @@ mod tests { #[test] fn heartbeat_context_pack() { - let pack = build_heartbeat_context_pack(3, None); + let pack = build_heartbeat_context_pack(3, None, None); assert!(pack.contains("kind: heartbeat_patrol")); assert!(pack.contains("active_sessions: 3")); } @@ -217,7 +235,7 @@ mod tests { "2026-03-21T07:30:00Z: called notify".to_string(), ], }; - let pack = build_context_pack(&signal, None, Some(&history)); + let pack = build_context_pack(&signal, None, Some(&history), None); assert!(pack.contains("[Mita History]")); assert!(pack.contains("called dispatch_rara")); assert!(pack.contains("called notify")); @@ -235,16 +253,43 @@ mod tests { let history = MitaHistory { recent_actions: vec![], }; - let pack = build_context_pack(&signal, None, Some(&history)); + let pack = build_context_pack(&signal, None, Some(&history), None); assert!(!pack.contains("[Mita History]")); } + #[test] + fn context_pack_with_dynamic_tools() { + let signal = ProactiveSignal::MorningGreeting; + let tools = vec![ + "dispatch_rara".to_string(), + "notify".to_string(), + "schedule_reminder".to_string(), + ]; + let pack = build_context_pack(&signal, None, None, Some(&tools)); + assert!(pack.contains("[Available Actions]")); + assert!(pack.contains("- dispatch_rara")); + assert!(pack.contains("- notify")); + assert!(pack.contains("- schedule_reminder")); + assert!(pack.contains("- (no action)")); + // Should NOT contain the hardcoded fallback wording. + assert!(!pack.contains("push notification to user")); + } + + #[test] + fn heartbeat_context_pack_with_dynamic_tools() { + let tools = vec!["dispatch_rara".to_string(), "notify".to_string()]; + let pack = build_heartbeat_context_pack(1, None, Some(&tools)); + assert!(pack.contains("- dispatch_rara")); + assert!(pack.contains("- notify")); + assert!(pack.contains("- (no action)")); + } + #[test] fn heartbeat_context_pack_with_history() { let history = MitaHistory { recent_actions: vec!["2026-03-21T09:00:00Z: called notify".to_string()], }; - let pack = build_heartbeat_context_pack(2, Some(&history)); + let pack = build_heartbeat_context_pack(2, Some(&history), None); assert!(pack.contains("[Mita History]")); assert!(pack.contains("called notify")); } From f04ea91c0809927f8e86fa608682379046fbdde8 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 21 Mar 2026 02:02:51 +0900 Subject: [PATCH 11/12] docs(kernel): update AGENT.md and design doc for proactive v2 fixes (#786) - Add SAFETY comment to proactive_filter Mutex field - Update AGENT.md: signal judgment layer, signal_judgment.rs, idle-based SessionCompleted, new invariants for judgment bypass and ProactiveConfig - Add addendum to design doc covering LLM judgment, Mita History, idle-based SessionCompleted, mita_update_proactive_config tool, dynamic actions Closes #786 --- crates/kernel/AGENT.md | 9 +- crates/kernel/src/kernel.rs | 3 + docs/plans/2026-03-20-proactive-v2-design.md | 231 +++++++++++++++++++ 3 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 docs/plans/2026-03-20-proactive-v2-design.md diff --git a/crates/kernel/AGENT.md b/crates/kernel/AGENT.md index 03c835bea..37fa00ea1 100644 --- a/crates/kernel/AGENT.md +++ b/crates/kernel/AGENT.md @@ -244,7 +244,7 @@ Detects when the agent is stuck calling the same tool repeatedly without progres ### What -Event-driven proactive signals that supplement the polling heartbeat. Internal kernel events (idle sessions, task failures, time triggers) emit `ProactiveSignal` variants, which pass through a pure rule-based `ProactiveFilter` before being delivered to Mita as structured context packs. +Event-driven proactive signals that supplement the polling heartbeat. Internal kernel events (idle sessions, task failures, time triggers) emit `ProactiveSignal` variants, which pass through a pure rule-based `ProactiveFilter`, then an optional lightweight LLM judgment layer (`signal_judgment.rs`), before being delivered to Mita as structured context packs. ### Key Files @@ -255,6 +255,7 @@ Event-driven proactive signals that supplement the polling heartbeat. Internal k | `proactive/filter.rs` | `ProactiveFilter` — quiet hours, cooldowns, rate limiting | | `proactive/context.rs` | `build_context_pack()` / `build_heartbeat_context_pack()` | | `proactive/judgment.rs` | Group-chat LLM judgment (pre-existing, unchanged) | +| `proactive/signal_judgment.rs` | Lightweight LLM pre-filter for proactive signals | | `kernel.rs` | Signal emit points + `handle_proactive_signal` + scheduler time events | ### Signal Flow @@ -265,9 +266,12 @@ Kernel event (IdleCheck / TaskFailed / Scheduler) → ProactiveFilter::should_pass() (quiet hours → cooldown → rate limit) → KernelEvent::ProactiveSignal pushed to event queue → handle_proactive_signal() builds context pack + → signal_judgment (optional LLM pre-filter, lightweight model) → deliver_proactive_to_mita() sends to Mita session ``` +`SessionCompleted` is idle-based: fires after `session_completed_secs` (~10min) of inactivity, not on turn completion. + ### Critical Invariants - `ProactiveFilter` is behind `std::sync::Mutex>` in the Kernel — `None` when proactive config is absent, disabling all signals @@ -278,5 +282,6 @@ Kernel event (IdleCheck / TaskFailed / Scheduler) ### What NOT To Do - Do NOT push `KernelEventEnvelope::proactive_signal()` without going through `try_emit_proactive_signal()` — it enforces filter checks and records fire timestamps -- Do NOT derive `Default` on `ProactiveConfig` — the feature is opt-in via YAML config +- Do NOT derive `Default` on `ProactiveConfig` — absence means feature off, and `judgment_model` absence means no LLM pre-filter - Do NOT add new signal kinds without adding a `kind_name()` match arm — cooldown keys depend on it +- Do NOT bypass signal judgment by calling `deliver_proactive_to_mita` directly — always go through `handle_proactive_signal` which enforces the judgment layer diff --git a/crates/kernel/src/kernel.rs b/crates/kernel/src/kernel.rs index 9cf9269ec..e9256a1c4 100644 --- a/crates/kernel/src/kernel.rs +++ b/crates/kernel/src/kernel.rs @@ -199,6 +199,9 @@ pub struct Kernel { /// absent, which disables all proactive signals. /// Uses `std::sync::Mutex` (not `tokio::sync::Mutex`) because the lock /// is held only for brief, non-async filter checks — no `.await` inside. + /// + /// SAFETY: MUST NOT hold this lock across `.await` points. The only call + /// site is `try_emit_proactive_signal` which is a synchronous function. proactive_filter: std::sync::Mutex>, } diff --git a/docs/plans/2026-03-20-proactive-v2-design.md b/docs/plans/2026-03-20-proactive-v2-design.md new file mode 100644 index 000000000..14288ac3c --- /dev/null +++ b/docs/plans/2026-03-20-proactive-v2-design.md @@ -0,0 +1,231 @@ +# Proactive V2 — Event-Driven Proactive Architecture + +## Problem + +Rara's current proactive mechanism is insufficient: + +1. **Polling-only** — Mita heartbeat fires every 30 minutes, no real-time event response +2. **No signal sources** — Mita can only see process table and tape, blind to external changes +3. **Vague strategy** — Heartbeat message is "Analyze active sessions...", LLM freewheels every time +4. **No memory** — Mita doesn't remember past decisions, starts from zero each heartbeat + +## Design + +### Architecture + +``` +Internal/External Signals + │ + ▼ +┌─────────────┐ ┌──────────────┐ ┌─────────────┐ +│ Event Source │────▶│ Event Filter │────▶│ Mita │ +│ (signals) │ │ (rule-based) │ │ (orchestrate)│ +└─────────────┘ └──────────────┘ └──────┬──────┘ + │ dispatch_rara / notify + ▼ + ┌─────────────┐ + │ Rara │ + │ (user-facing) │ + └──────┬──────┘ + │ user reactions in tape + ▼ + ┌─────────────┐ + │ Tape/Soul │ + │ (feedback) │ + └─────────────┘ +``` + +Three layers: + +- **Event Source** — Produces structured events via existing `KernelEvent` pipeline +- **Event Filter** — Pure rules, no LLM: quiet hours, cooldowns, rate limiting +- **Mita Orchestration** — Receives filtered events + context pack, decides action via existing tools + +Heartbeat demoted from "sole driver" to "fallback patrol" — catches anything events missed. + +### Role Separation + +- **Rara** = front-stage, all user-facing communication (including proactive outreach) +- **Mita** = back-stage orchestrator, decides when/what Rara should do, never talks to user directly + +Feedback loop: Rara delivers proactive message → user reacts → reaction captured in tape → Mita reads tape on next cycle to calibrate. + +## Event Source Layer + +### KernelEvent Extension + +Proactive signals are new `KernelEvent` variants, not a separate event system: + +```rust +pub enum KernelEvent { + // ... existing variants ... + + /// Proactive signal for Mita orchestration. + ProactiveSignal(ProactiveSignal), +} + +pub enum ProactiveSignal { + // --- Internal session events (Phase 1) --- + /// Session has been idle beyond threshold. + SessionIdle { idle_duration: Duration }, + /// Scheduled task agent failed. + TaskFailed { error: String }, + /// Conversation naturally completed. + SessionCompleted { summary: String }, + + // --- Time events (Phase 1) --- + /// Daily morning greeting trigger. + MorningGreeting, + /// End-of-day summary trigger. + DailySummary, + + // --- GitHub events (Phase 2) --- + /// PR review requested. + PrReviewRequested { repo: String, pr_number: u64 }, + /// CI status changed. + CiStatusChanged { repo: String, status: String }, + /// Issue assigned. + IssueAssigned { repo: String, issue_number: u64 }, +} +``` + +### Signal Emit Points (Phase 1) + +**Session events:** + +| Signal | Emit location | Trigger condition | +|--------|--------------|-------------------| +| `SessionIdle` | `IdleCheck` handler | Session idle beyond threshold; emit alongside existing Ready→Suspended transition | +| `TaskFailed` | `handle_scheduled_task` error path | ScheduledJobAgent execution failure | +| `SessionCompleted` | `TurnCompleted` handler | Conversation naturally ends | + +**Time events:** + +| Signal | Emit location | Trigger condition | +|--------|--------------|-------------------| +| `MorningGreeting` | Processor 0 scheduler | Work hours start (from proactive.yaml) | +| `DailySummary` | Processor 0 scheduler | Work hours end (from proactive.yaml) | + +Time events: Processor 0 already computes `min(next_mita_heartbeat, next_job_deadline)`. Add `next_proactive_time_event` to the calculation. No new threads or processors. + +## Event Filter Layer + +Pure rule-based, zero LLM cost. Runs in `handle_event` after matching `ProactiveSignal`, before forwarding to Mita. + +```rust +pub struct ProactiveFilter { + /// Quiet hours — suppress all proactive signals. + quiet_hours: Option<(NaiveTime, NaiveTime)>, + /// Per-signal-kind minimum interval (dedup). + cooldowns: HashMap, + /// Last fire time per signal kind. + last_fired: HashMap, + /// Global rate limit. + max_hourly: u32, + /// Hourly counter. + hourly_count: u32, + hourly_window_start: Timestamp, +} + +impl ProactiveFilter { + /// Returns true = pass through, false = silently drop. + pub fn should_pass(&mut self, signal: &ProactiveSignal) -> bool { + // 1. Quiet hours check + // 2. Cooldown dedup (e.g., SessionIdle same session max once per hour) + // 3. Global hourly rate limit + } +} +``` + +### Configuration + +Stored at `config_dir()/mita/proactive.yaml` — this is Mita's own runtime config, not app config: + +```yaml +# ~/.config/rara/mita/proactive.yaml +quiet_hours: ["23:00", "08:00"] +cooldowns: + session_idle: 1h + daily_summary: 20h + morning_greeting: 20h + task_failed: 10m +max_hourly: 5 +work_hours: + start: "09:00" + end: "18:00" + timezone: "Asia/Shanghai" +``` + +Mita can hot-update this config via a new `mita_update_proactive_config` tool — e.g., user says "weekends don't bother me", Mita adjusts quiet_hours. + +## Mita Context Pack + +When a filtered event reaches Mita, it's delivered as a structured message (replacing the current one-line "Analyze active sessions..."): + +``` +[Proactive Event] +kind: session_idle +timestamp: 2026-03-20T14:30:00Z + +[Context] +session: "PR review discussion" (session-abc123) +idle_since: 2h ago +last_user_message: "我看看那个 PR" +user_status: online (last seen 10m ago) + +[Mita History] +last_action: 4h ago — dispatched daily summary → user replied positively +last_action_on_this_session: none + +[Available Actions] +- dispatch_rara: send a message to user through this session +- notify: push notification to user's device +- (no action): decide this event doesn't need intervention +``` + +Heartbeat patrol also uses this format with `kind: heartbeat_patrol`, context containing process table summary and changes since last patrol. + +## Change Scope + +### New files + +- `kernel/src/proactive/signal.rs` — `ProactiveSignal` enum + context pack builder +- `kernel/src/proactive/filter.rs` — `ProactiveFilter` rule engine +- `kernel/src/proactive/config.rs` — Load from `config_dir()/mita/proactive.yaml` +- `kernel/src/proactive/mod.rs` — Re-exports (existing `proactive.rs` moves into directory) +- `app/src/tools/mita_update_proactive_config.rs` — Mita tool to adjust filter config + +### Modified files + +- `kernel/src/event.rs` — Add `ProactiveSignal` variant to `KernelEvent` +- `kernel/src/kernel.rs` scheduler — Add time event trigger points (morning/daily) +- `kernel/src/kernel.rs` `handle_event` — Match `ProactiveSignal`, run filter → build context → deliver to Mita +- `kernel/src/kernel.rs` `IdleCheck` handler — Emit `SessionIdle` signal +- `kernel/src/kernel.rs` `handle_mita_heartbeat` — Reformat as structured context pack + +### Unchanged + +- Mita's existing tool interface (dispatch_rara, notify, read_tape, etc.) +- JobWheel / ScheduledTask system +- Event queue / Processor architecture +- GroupPolicy / ProactiveJudgment (group chat judgment) + +## Phases + +| Phase | Scope | External deps | +|-------|-------|--------------| +| 1 | Internal session events + time events + filter + context pack | None | +| 2 | GitHub signals (via Symphony poll) | GitHub API (already integrated) | +| 3 | Generic webhook endpoint | HTTP listener | + +Estimated Phase 1: ~500 lines new code, 6-8 files touched. + +## Addendum (2026-03-21) + +Additions made during Phase 1 implementation that extend the original design: + +- **LLM judgment layer** — Optional lightweight LLM pre-filter (`signal_judgment.rs`) between `ProactiveFilter` and Mita delivery. Uses a cheap model (configured via `judgment_model` in `ProactiveConfig`) to decide whether a signal warrants Mita's attention. Skipped when `judgment_model` is absent. +- **Mita History in context pack** — `[Mita History]` section now populated from tape, showing Mita's recent proactive actions and user reactions for calibration. +- **SessionCompleted changed to idle-based** — `SessionCompleted` now fires after `session_completed_secs` (~10min) of session inactivity, not on turn completion. This avoids false positives from multi-turn conversations. +- **`mita_update_proactive_config` tool** — Mita can hot-update `ProactiveConfig` at runtime (e.g., user says "don't bother me on weekends"). +- **Dynamic available actions** — `[Available Actions]` section in context pack is now built dynamically from Mita's actual tool registry instead of being hardcoded. From 5bb0a79cd289a9f12a0e54cf490ec267f1e9b4ed Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sun, 22 Mar 2026 00:25:08 +0900 Subject: [PATCH 12/12] fix(kernel): proactive config hot-reload and review fixes (#786) - Add ReloadProactiveConfig kernel event so update-proactive-config tool syncs in-memory filter immediately after writing to disk - On startup, prefer runtime proactive.yaml over main config to resolve config source-of-truth conflict - Populate SessionCompleted.summary with session name instead of empty string - Validate session_completed_secs < idle_threshold_secs in ProactiveConfig Closes #786 --- crates/app/src/lib.rs | 21 +++++++++- .../src/tools/mita_update_proactive_config.rs | 5 +++ crates/kernel/src/event.rs | 17 ++++++++ crates/kernel/src/kernel.rs | 41 ++++++++++++++++++- crates/kernel/src/proactive/config.rs | 8 ++++ 5 files changed, 90 insertions(+), 2 deletions(-) diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index 036a7ea6e..f92ab1a5f 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -350,9 +350,28 @@ pub async fn start_with_options( boot::McpDynamicToolProvider::new(rara.mcp_manager.clone()), )); + // Prefer Mita's own proactive.yaml (updated at runtime by the + // update-proactive-config tool) over the main config file. + let proactive_config = { + let runtime_path = rara_paths::config_dir().join("mita").join("proactive.yaml"); + match std::fs::read_to_string(&runtime_path) { + Ok(contents) => match serde_yaml::from_str(&contents) { + Ok(c) => { + tracing::info!(path = %runtime_path.display(), "loaded proactive config from runtime override"); + Some(c) + } + Err(e) => { + tracing::warn!(error = %e, path = %runtime_path.display(), "invalid runtime proactive config, falling back to main config"); + config.mita.proactive.clone() + } + }, + Err(_) => config.mita.proactive.clone(), + } + }; + let kernel_config = rara_kernel::kernel::KernelConfig { mita_heartbeat_interval: Some(config.mita.heartbeat_interval), - proactive: config.mita.proactive.clone(), + proactive: proactive_config, context_folding: config.context_folding.clone(), ..Default::default() }; diff --git a/crates/app/src/tools/mita_update_proactive_config.rs b/crates/app/src/tools/mita_update_proactive_config.rs index ca1c48685..af4c66fee 100644 --- a/crates/app/src/tools/mita_update_proactive_config.rs +++ b/crates/app/src/tools/mita_update_proactive_config.rs @@ -160,6 +160,11 @@ impl ToolExecute for UpdateProactiveConfigTool { save_config(&config)?; + // Notify kernel to reload the in-memory proactive filter. + let _ = context + .event_queue + .try_push(rara_kernel::event::KernelEventEnvelope::reload_proactive_config()); + push_notification( context, format!( diff --git a/crates/kernel/src/event.rs b/crates/kernel/src/event.rs index 9f707566f..f085b86c5 100644 --- a/crates/kernel/src/event.rs +++ b/crates/kernel/src/event.rs @@ -370,6 +370,13 @@ pub enum KernelEvent { ProactiveSignal(crate::proactive::ProactiveSignal), // === System === + /// Reload proactive filter configuration from disk. + /// + /// Emitted by the `update-proactive-config` tool after writing a new + /// config to `config_dir()/mita/proactive.yaml`. The kernel re-reads + /// the file and reconstructs the in-memory `ProactiveFilter`. + ReloadProactiveConfig, + /// Periodic idle check — transitions Ready sessions to Suspended. IdleCheck, @@ -397,6 +404,7 @@ impl KernelEvent { | Self::MitaDirective { .. } | Self::MitaHeartbeat | Self::ProactiveSignal(_) + | Self::ReloadProactiveConfig | Self::IdleCheck => EventPriority::Low, } } @@ -616,6 +624,14 @@ impl KernelEventEnvelope { } } + /// Create a `ReloadProactiveConfig` event. + pub fn reload_proactive_config() -> Self { + Self { + base: EventBase::from(SessionKey::new()), + kind: KernelEvent::ReloadProactiveConfig, + } + } + /// Create an `IdleCheck` event. pub fn idle_check() -> Self { Self { @@ -701,6 +717,7 @@ impl KernelEventEnvelope { KernelEvent::ProactiveSignal(signal) => { format!("proactive signal: {}", signal.kind_name()) } + KernelEvent::ReloadProactiveConfig => "reload proactive filter config".to_string(), KernelEvent::IdleCheck => "periodic idle check".to_string(), KernelEvent::Shutdown => "shutdown requested".to_string(), } diff --git a/crates/kernel/src/kernel.rs b/crates/kernel/src/kernel.rs index e9256a1c4..9d63d6a35 100644 --- a/crates/kernel/src/kernel.rs +++ b/crates/kernel/src/kernel.rs @@ -725,6 +725,9 @@ impl Kernel { KernelEvent::ProactiveSignal(signal) => { self.handle_proactive_signal(signal).await; } + KernelEvent::ReloadProactiveConfig => { + self.handle_reload_proactive_config(); + } KernelEvent::IdleCheck => { // Periodic idle check — handled by session table reaping. self.process_table @@ -1963,6 +1966,38 @@ impl Kernel { Some((instant, is_morning)) } + /// Reload the proactive filter from `config_dir()/mita/proactive.yaml`. + /// + /// Called when the `update-proactive-config` tool writes a new config + /// to disk. Reconstructs the in-memory `ProactiveFilter` so changes + /// take effect immediately without restarting rara. + fn handle_reload_proactive_config(&self) { + let path = rara_paths::config_dir().join("mita").join("proactive.yaml"); + let config = match std::fs::read_to_string(&path) { + Ok(contents) => { + match serde_yaml::from_str::(&contents) { + Ok(c) => c, + Err(e) => { + error!(error = %e, path = %path.display(), "failed to parse proactive config"); + return; + } + } + } + Err(e) => { + error!(error = %e, path = %path.display(), "failed to read proactive config"); + return; + } + }; + + let new_filter = crate::proactive::ProactiveFilter::new(config); + let mut guard = self + .proactive_filter + .lock() + .unwrap_or_else(|e| e.into_inner()); + *guard = Some(new_filter); + info!("proactive filter reloaded from {}", path.display()); + } + /// Emit `SessionIdle` signals for sessions that have been idle beyond /// the configured threshold. fn emit_idle_signals(&self) { @@ -2004,9 +2039,13 @@ impl Kernel { }; self.try_emit_proactive_signal(signal); } else if idle_duration >= completed_threshold { + let summary = self + .process_table + .with(&stats.session_key, |p| p.manifest.name.clone()) + .unwrap_or_default(); let signal = crate::proactive::ProactiveSignal::SessionCompleted { session_key: stats.session_key.clone(), - summary: String::new(), + summary, }; self.try_emit_proactive_signal(signal); } diff --git a/crates/kernel/src/proactive/config.rs b/crates/kernel/src/proactive/config.rs index 86a0ab81d..f5c3454c2 100644 --- a/crates/kernel/src/proactive/config.rs +++ b/crates/kernel/src/proactive/config.rs @@ -84,6 +84,14 @@ impl ProactiveConfig { if self.parsed_work_start().is_none() || self.parsed_work_end().is_none() { // Warnings already logged by parsed_work_start/end(). } + if self.session_completed_secs >= self.idle_threshold_secs { + warn!( + session_completed_secs = self.session_completed_secs, + idle_threshold_secs = self.idle_threshold_secs, + "proactive config: session_completed_secs must be less than idle_threshold_secs, \ + SessionCompleted signals will never fire" + ); + } if let Some((start, end)) = &self.quiet_hours { if parse_time_str(start).is_none() { warn!(