Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions src/apps/desktop/src/api/btw_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! BTW (side question) API
//!
//! Desktop adapter for the core side-question service:
//! - Reads current session context (no new dialog turn, no persistence writes)
//! - Reads current session context without mutating the parent session
//! - Streams answer via `btw://...` events
//! - Supports cancellation by request id

Expand All @@ -14,8 +14,10 @@ use crate::api::app_state::AppState;

use bitfun_core::agentic::coordination::ConversationCoordinator;
use bitfun_core::agentic::side_question::{
SideQuestionService, SideQuestionStreamEvent, SideQuestionStreamRequest,
SideQuestionPersistTarget, SideQuestionService, SideQuestionStreamEvent,
SideQuestionStreamRequest,
};
use std::path::PathBuf;

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -44,6 +46,10 @@ pub struct BtwAskStreamRequest {
pub model_id: Option<String>,
/// Limit how many context messages are included (from the end).
pub max_context_messages: Option<usize>,
pub child_session_id: Option<String>,
pub workspace_path: Option<String>,
pub parent_dialog_turn_id: Option<String>,
pub parent_turn_index: Option<usize>,
}

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -135,6 +141,20 @@ pub async fn btw_ask_stream(
question: request.question.clone(),
model_id: request.model_id.clone(),
max_context_messages: request.max_context_messages,
persist_target: match (&request.child_session_id, &request.workspace_path) {
(Some(child_session_id), Some(workspace_path))
if !child_session_id.trim().is_empty() && !workspace_path.trim().is_empty() =>
{
Some(SideQuestionPersistTarget {
child_session_id: child_session_id.clone(),
workspace_path: PathBuf::from(workspace_path),
parent_session_id: request.session_id.clone(),
parent_dialog_turn_id: request.parent_dialog_turn_id.clone(),
parent_turn_index: request.parent_turn_index,
})
}
_ => None,
},
})
.await
.map_err(|e| e.to_string())?;
Expand Down
34 changes: 34 additions & 0 deletions src/apps/desktop/src/api/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,40 @@ pub async fn get_recent_workspaces(
.collect())
}

#[tauri::command]
pub async fn cleanup_invalid_workspaces(
state: State<'_, AppState>,
app: tauri::AppHandle,
) -> Result<usize, String> {
match state.workspace_service.cleanup_invalid_workspaces().await {
Ok(removed_count) => {
if let Some(workspace_info) = state.workspace_service.get_current_workspace().await {
apply_active_workspace_context(&state, &app, &workspace_info).await;
} else {
clear_active_workspace_context(&state, &app).await;
}

if let Err(e) = state
.workspace_identity_watch_service
.sync_watched_workspaces()
.await
{
warn!(
"Failed to sync workspace identity watchers after workspace cleanup: {}",
e
);
}

info!("Invalid workspaces cleaned up: removed_count={}", removed_count);
Ok(removed_count)
}
Err(e) => {
error!("Failed to cleanup invalid workspaces: {}", e);
Err(format!("Failed to cleanup invalid workspaces: {}", e))
}
}
}

#[tauri::command]
pub async fn get_opened_workspaces(
state: State<'_, AppState>,
Expand Down
1 change: 1 addition & 0 deletions src/apps/desktop/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ pub async fn run() {
subscribe_config_updates,
get_model_configs,
get_recent_workspaces,
cleanup_invalid_workspaces,
get_opened_workspaces,
open_workspace,
create_assistant_workspace,
Expand Down
26 changes: 26 additions & 0 deletions src/crates/core/src/agentic/coordination/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1814,6 +1814,32 @@ Update the persona files and delete BOOTSTRAP.md as soon as bootstrap is complet
&self.session_manager
}

/// Persist a completed `/btw` side-question turn into an existing child session.
pub async fn persist_btw_turn(
&self,
workspace_path: &Path,
child_session_id: &str,
request_id: &str,
question: &str,
full_text: &str,
parent_session_id: &str,
parent_dialog_turn_id: Option<&str>,
parent_turn_index: Option<usize>,
) -> BitFunResult<()> {
self.session_manager
.persist_btw_turn(
workspace_path,
child_session_id,
request_id,
question,
full_text,
parent_session_id,
parent_dialog_turn_id,
parent_turn_index,
)
.await
}

/// Set global coordinator (called during initialization)
///
/// Skips if global coordinator already exists
Expand Down
80 changes: 64 additions & 16 deletions src/crates/core/src/agentic/persistence/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const JSON_WRITE_MAX_RETRIES: usize = 5;
const JSON_WRITE_RETRY_BASE_DELAY_MS: u64 = 30;

static JSON_FILE_WRITE_LOCKS: OnceLock<Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>> = OnceLock::new();
static SESSION_INDEX_LOCKS: OnceLock<Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>> = OnceLock::new();

#[derive(Debug, Clone, Serialize, Deserialize)]
struct StoredSessionMetadataFile {
Expand Down Expand Up @@ -285,6 +286,16 @@ impl PersistenceManager {
.clone()
}

async fn get_session_index_lock(&self, workspace_path: &Path) -> Arc<Mutex<()>> {
let index_path = self.index_path(workspace_path);
let registry = SESSION_INDEX_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
let mut registry_guard = registry.lock().await;
registry_guard
.entry(index_path)
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}

fn build_temp_json_path(path: &Path, attempt: usize) -> BitFunResult<PathBuf> {
let parent = path.parent().ok_or_else(|| {
BitFunError::io(format!(
Expand Down Expand Up @@ -468,7 +479,7 @@ impl PersistenceManager {
}
}

async fn rebuild_index(&self, workspace_path: &Path) -> BitFunResult<Vec<SessionMetadata>> {
async fn rebuild_index_locked(&self, workspace_path: &Path) -> BitFunResult<Vec<SessionMetadata>> {
let sessions_root = self.ensure_project_sessions_dir(workspace_path).await?;
let mut metadata_list = Vec::new();
let mut entries = fs::read_dir(&sessions_root)
Expand Down Expand Up @@ -515,7 +526,7 @@ impl PersistenceManager {
Ok(metadata_list)
}

async fn upsert_index_entry(
async fn upsert_index_entry_locked(
&self,
workspace_path: &Path,
metadata: &SessionMetadata,
Expand Down Expand Up @@ -548,7 +559,7 @@ impl PersistenceManager {
self.write_json_atomic(&index_path, &index).await
}

async fn remove_index_entry(
async fn remove_index_entry_locked(
&self,
workspace_path: &Path,
session_id: &str,
Expand All @@ -568,6 +579,32 @@ impl PersistenceManager {
self.write_json_atomic(&index_path, &index).await
}

async fn rebuild_index(&self, workspace_path: &Path) -> BitFunResult<Vec<SessionMetadata>> {
let lock = self.get_session_index_lock(workspace_path).await;
let _guard = lock.lock().await;
self.rebuild_index_locked(workspace_path).await
}

async fn upsert_index_entry(
&self,
workspace_path: &Path,
metadata: &SessionMetadata,
) -> BitFunResult<()> {
let lock = self.get_session_index_lock(workspace_path).await;
let _guard = lock.lock().await;
self.upsert_index_entry_locked(workspace_path, metadata).await
}

async fn remove_index_entry(
&self,
workspace_path: &Path,
session_id: &str,
) -> BitFunResult<()> {
let lock = self.get_session_index_lock(workspace_path).await;
let _guard = lock.lock().await;
self.remove_index_entry_locked(workspace_path, session_id).await
}

pub async fn list_session_metadata(
&self,
workspace_path: &Path,
Expand All @@ -576,15 +613,28 @@ impl PersistenceManager {
return Ok(Vec::new());
}

let lock = self.get_session_index_lock(workspace_path).await;
let _guard = lock.lock().await;
let index_path = self.index_path(workspace_path);
if let Some(index) = self
.read_json_optional::<StoredSessionIndex>(&index_path)
.await?
{
let has_stale_entry = index
.sessions
.iter()
.any(|metadata| !self.metadata_path(workspace_path, &metadata.session_id).exists());
if has_stale_entry {
warn!(
"Session index contains stale entries, rebuilding: {}",
index_path.display()
);
return self.rebuild_index_locked(workspace_path).await;
}
return Ok(index.sessions);
}

self.rebuild_index(workspace_path).await
self.rebuild_index_locked(workspace_path).await
}

pub async fn save_session_metadata(
Expand Down Expand Up @@ -944,6 +994,16 @@ impl PersistenceManager {
workspace_path: &Path,
turn: &DialogTurnData,
) -> BitFunResult<()> {
let mut metadata = self
.load_session_metadata(workspace_path, &turn.session_id)
.await?
.ok_or_else(|| {
BitFunError::NotFound(format!(
"Session metadata not found: {}",
turn.session_id
))
})?;

self.ensure_turns_dir(workspace_path, &turn.session_id)
.await?;

Expand All @@ -957,18 +1017,6 @@ impl PersistenceManager {
)
.await?;

let mut metadata = self
.load_session_metadata(workspace_path, &turn.session_id)
.await?
.unwrap_or_else(|| {
SessionMetadata::new(
turn.session_id.clone(),
"New Session".to_string(),
"agentic".to_string(),
"default".to_string(),
)
});

let turns = self
.load_session_turns(workspace_path, &turn.session_id)
.await?;
Expand Down
90 changes: 90 additions & 0 deletions src/crates/core/src/agentic/session/session_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::service::snapshot::ensure_snapshot_manager_for_workspace;
use crate::util::errors::{BitFunError, BitFunResult};
use dashmap::DashMap;
use log::{debug, error, info, warn};
use serde_json::json;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
Expand Down Expand Up @@ -914,6 +915,95 @@ impl SessionManager {
Ok(())
}

/// Persist a completed `/btw` side-question turn into an existing child session.
pub async fn persist_btw_turn(
&self,
workspace_path: &Path,
child_session_id: &str,
request_id: &str,
question: &str,
full_text: &str,
parent_session_id: &str,
parent_dialog_turn_id: Option<&str>,
parent_turn_index: Option<usize>,
) -> BitFunResult<()> {
let session = self
.sessions
.get(child_session_id)
.ok_or_else(|| BitFunError::NotFound(format!("Session not found: {}", child_session_id)))?;

let turn_id = format!("btw-turn-{}", request_id);
let user_message_id = format!("btw-user-{}", request_id);
let round_id = format!("btw-round-{}", request_id);
let text_id = format!("btw-text-{}", request_id);
let now = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;

let mut turn = DialogTurnData::new(
turn_id.clone(),
0,
child_session_id.to_string(),
UserMessageData {
id: user_message_id,
content: question.to_string(),
timestamp: now,
metadata: Some(json!({
"kind": "btw",
"parentSessionId": parent_session_id,
"parentRequestId": request_id,
"parentDialogTurnId": parent_dialog_turn_id,
"parentTurnIndex": parent_turn_index,
})),
},
);
turn.timestamp = now;
turn.start_time = now;
turn.end_time = Some(now);
turn.duration_ms = Some(0);
turn.status = TurnStatus::Completed;
turn.model_rounds = vec![ModelRoundData {
id: round_id,
turn_id: turn_id.clone(),
round_index: 0,
timestamp: now,
text_items: vec![TextItemData {
id: text_id,
content: full_text.to_string(),
is_streaming: false,
timestamp: now,
is_markdown: true,
order_index: None,
is_subagent_item: None,
parent_task_tool_id: None,
subagent_session_id: None,
status: Some("completed".to_string()),
}],
tool_items: vec![],
thinking_items: vec![],
start_time: now,
end_time: Some(now),
status: "completed".to_string(),
}];

drop(session);

self.persistence_manager
.save_dialog_turn(workspace_path, &turn)
.await?;

if let Some(mut session) = self.sessions.get_mut(child_session_id) {
if !session.dialog_turn_ids.iter().any(|existing| existing == &turn_id) {
session.dialog_turn_ids.push(turn_id);
}
session.updated_at = SystemTime::now();
session.last_activity_at = SystemTime::now();
}

Ok(())
}

// ============ Helper Methods ============

/// Get session's message history (complete)
Expand Down
Loading
Loading