Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion crates/volt-core/src/grant_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ pub fn resolve_grant(id: &str) -> Result<PathBuf, GrantError> {
/// Revoke a grant, removing it from the store.
/// Returns true if the grant existed and was removed.
pub fn revoke_grant(id: &str) -> bool {
with_store(|store| store.remove(id).is_some())
let removed = with_store(|store| store.remove(id).is_some());
if removed {
crate::plugin_grant_registry::revoke_grant_everywhere(id);
}
removed
}

/// Clear all grants. Intended for testing and app shutdown.
Expand Down Expand Up @@ -130,6 +134,26 @@ mod tests {
assert!(resolve_grant(&id).is_err());
}

#[test]
fn test_revoke_grant_removes_plugin_delegations() {
let _guard = lock_grant_state();
let dir = env::temp_dir();
let id = create_grant(dir).unwrap();

crate::plugin_grant_registry::delegate_grant("acme.search", &id).expect("delegate grant");
assert!(crate::plugin_grant_registry::is_delegated(
"acme.search",
&id
));

assert!(revoke_grant(&id));

assert!(!crate::plugin_grant_registry::is_delegated(
"acme.search",
&id
));
}

#[test]
fn test_create_grant_rejects_nonexistent_path() {
let _guard = lock_grant_state();
Expand Down
84 changes: 51 additions & 33 deletions crates/volt-core/src/plugin_grant_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub struct GrantDelegation {
pub grant_id: String,
pub plugin_id: String,
pub delegated_at: u64,
pub revoked: bool,
}

#[derive(Debug, Error, PartialEq, Eq)]
Expand Down Expand Up @@ -49,55 +48,63 @@ pub fn delegate_grant(plugin_id: &str, grant_id: &str) -> Result<(), PluginGrant

with_store(|store| {
let delegations = store.entry(plugin_id.to_string()).or_default();
match delegations.get_mut(grant_id) {
Some(delegation) if !delegation.revoked => Err(PluginGrantError::AlreadyDelegated),
Some(delegation) => {
delegation.delegated_at = now_ms();
delegation.revoked = false;
Ok(())
}
None => {
delegations.insert(
grant_id.to_string(),
GrantDelegation {
grant_id: grant_id.to_string(),
plugin_id: plugin_id.to_string(),
delegated_at: now_ms(),
revoked: false,
},
);
Ok(())
}
if delegations.contains_key(grant_id) {
Err(PluginGrantError::AlreadyDelegated)
} else {
delegations.insert(
grant_id.to_string(),
GrantDelegation {
grant_id: grant_id.to_string(),
plugin_id: plugin_id.to_string(),
delegated_at: now_ms(),
},
);
Ok(())
}
})
}

pub fn revoke_grant(plugin_id: &str, grant_id: &str) {
with_store(|store| {
if let Some(delegations) = store.get_mut(plugin_id)
&& let Some(delegation) = delegations.get_mut(grant_id)
{
delegation.revoked = true;
if let Some(delegations) = store.get_mut(plugin_id) {
delegations.remove(grant_id);
if delegations.is_empty() {
store.remove(plugin_id);
}
}
});
}

pub fn revoke_all_grants(plugin_id: &str) {
with_store(|store| {
if let Some(delegations) = store.get_mut(plugin_id) {
for delegation in delegations.values_mut() {
delegation.revoked = true;
store.remove(plugin_id);
});
}

pub fn revoke_grant_everywhere(grant_id: &str) -> bool {
with_store(|store| {
let mut removed = false;
let mut empty_plugins = Vec::new();
for (plugin_id, delegations) in store.iter_mut() {
if delegations.remove(grant_id).is_some() {
removed = true;
}
if delegations.is_empty() {
empty_plugins.push(plugin_id.clone());
}
}
});
for plugin_id in empty_plugins {
store.remove(&plugin_id);
}
removed
})
}

pub fn is_delegated(plugin_id: &str, grant_id: &str) -> bool {
with_store(|store| {
store
.get(plugin_id)
.and_then(|delegations| delegations.get(grant_id))
.is_some_and(|delegation| !delegation.revoked)
.is_some_and(|delegations| delegations.contains_key(grant_id))
})
}

Expand All @@ -106,9 +113,8 @@ pub fn list_delegated_grants(plugin_id: &str) -> Vec<String> {
let mut grant_ids = store
.get(plugin_id)
.into_iter()
.flat_map(|delegations| delegations.values())
.filter(|delegation| !delegation.revoked)
.map(|delegation| delegation.grant_id.clone())
.flat_map(|delegations| delegations.keys())
.cloned()
.collect::<Vec<_>>();
grant_ids.sort();
grant_ids
Expand Down Expand Up @@ -193,4 +199,16 @@ mod tests {
assert!(grant_store::resolve_grant(&first).is_ok());
assert!(grant_store::resolve_grant(&second).is_ok());
}

#[test]
fn revoke_grant_everywhere_prunes_empty_plugin_entries() {
let _guard = lock_grant_state();
let grant_id = create_grant();

delegate_grant("acme.search", &grant_id).expect("delegate");
assert!(revoke_grant_everywhere(&grant_id));

assert!(!is_delegated("acme.search", &grant_id));
assert!(list_delegated_grants("acme.search").is_empty());
}
}
6 changes: 5 additions & 1 deletion crates/volt-plugin-host/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use boa_engine::module::{MapModuleLoader, Module};
use boa_engine::object::builtins::JsPromise;
use boa_engine::{JsValue, Source, js_string};
use tokio::runtime::Builder as TokioRuntimeBuilder;
use tokio::task::yield_now;

use crate::config::PluginConfig;
use crate::ipc::{IpcMessage, MessageType};
Expand Down Expand Up @@ -235,7 +236,10 @@ impl PluginEngine {
.map_err(|error| error.to_string())?;

match promise.state() {
PromiseState::Pending => continue,
PromiseState::Pending => {
yield_now().await;
continue;
}
PromiseState::Fulfilled(value) => return Ok(value),
PromiseState::Rejected(error) => return Err(error.display().to_string()),
}
Expand Down
40 changes: 34 additions & 6 deletions crates/volt-plugin-host/src/runtime_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use std::io::{self, BufReader};

use serde_json::Value;

use crate::config::DelegatedGrant;
use crate::config::PluginConfig;
use crate::config::{DelegatedGrant, HostIpcSettings, PluginConfig};
use crate::ipc::{IpcMessage, MessageType, read_message, write_message};

struct RuntimeState {
plugin_id: String,
manifest: Value,
delegated_grants: Vec<DelegatedGrant>,
max_deferred_messages: usize,
transport: Transport,
}

Expand All @@ -36,11 +36,13 @@ thread_local! {
}

pub fn configure_stdio(config: &PluginConfig) {
let host_ipc_settings = config.host_ipc_settings.clone().unwrap_or_default();
STATE.with(|state| {
*state.borrow_mut() = Some(RuntimeState {
plugin_id: config.plugin_id.clone(),
manifest: config.manifest.clone(),
delegated_grants: config.delegated_grants.clone(),
max_deferred_messages: max_deferred_messages(&host_ipc_settings),
transport: Transport::StdIo {
reader: BufReader::new(std::io::stdin()),
writer: std::io::stdout(),
Expand Down Expand Up @@ -157,7 +159,9 @@ pub fn send_request(method: impl Into<String>, payload: Value) -> Result<Value,
continue;
}

runtime.transport.defer(message);
runtime
.transport
.defer(message, runtime.max_deferred_messages)?;
}
})
}
Expand Down Expand Up @@ -189,11 +193,29 @@ impl Transport {
}
}

fn defer(&mut self, message: IpcMessage) {
fn defer(&mut self, message: IpcMessage, max_deferred_messages: usize) -> Result<(), String> {
match self {
Self::StdIo { deferred, .. } => deferred.push_back(message),
Self::StdIo { deferred, .. } => {
if deferred.len() >= max_deferred_messages {
return Err(format!(
"host deferred message queue exceeded {} messages while waiting for a synchronous response",
max_deferred_messages
));
}
deferred.push_back(message);
Ok(())
}
#[cfg(test)]
Self::Mock { deferred, .. } => deferred.push_back(message),
Self::Mock { deferred, .. } => {
if deferred.len() >= max_deferred_messages {
return Err(format!(
"host deferred message queue exceeded {} messages while waiting for a synchronous response",
max_deferred_messages
));
}
deferred.push_back(message);
Ok(())
}
}
}

Expand Down Expand Up @@ -243,11 +265,13 @@ impl Transport {

#[cfg(test)]
pub(crate) fn configure_mock(config: &PluginConfig, inbound: Vec<IpcMessage>) {
let host_ipc_settings = config.host_ipc_settings.clone().unwrap_or_default();
STATE.with(|state| {
*state.borrow_mut() = Some(RuntimeState {
plugin_id: config.plugin_id.clone(),
manifest: config.manifest.clone(),
delegated_grants: config.delegated_grants.clone(),
max_deferred_messages: max_deferred_messages(&host_ipc_settings),
transport: Transport::Mock {
inbound: inbound.into(),
outbound: Vec::new(),
Expand All @@ -270,5 +294,9 @@ pub(crate) fn take_outbound() -> Vec<IpcMessage> {
})
}

fn max_deferred_messages(settings: &HostIpcSettings) -> usize {
settings.max_queue_depth.max(1) as usize
}

#[cfg(test)]
mod tests;
27 changes: 26 additions & 1 deletion crates/volt-plugin-host/src/runtime_state/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde_json::Value;

use crate::config::PluginConfig;
use crate::config::{HostIpcSettings, PluginConfig};
use crate::ipc::IpcMessage;
use crate::runtime_state::{configure_mock, send_request, take_outbound};

Expand All @@ -16,6 +16,15 @@ fn test_config() -> PluginConfig {
}
}

fn test_config_with_queue_depth(max_queue_depth: u32) -> PluginConfig {
let mut config = test_config();
config.host_ipc_settings = Some(HostIpcSettings {
max_queue_depth,
..HostIpcSettings::default()
});
config
}

#[test]
fn send_request_writes_request_and_reads_response() {
configure_mock(
Expand Down Expand Up @@ -61,3 +70,19 @@ fn send_request_acks_heartbeat_while_waiting() {
assert_eq!(outbound.len(), 2);
assert_eq!(outbound[1].method, "heartbeat-ack");
}

#[test]
fn send_request_rejects_when_deferred_queue_exceeds_limit() {
configure_mock(
&test_config_with_queue_depth(1),
vec![
IpcMessage::signal("event-1", "plugin:event"),
IpcMessage::signal("event-2", "plugin:event"),
],
);

let error = send_request("plugin:fs:exists", serde_json::json!({ "path": "cache" }))
.expect_err("error");

assert!(error.contains("deferred message queue exceeded 1 messages"));
}
12 changes: 10 additions & 2 deletions crates/volt-runner/src/ipc_bridge/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use volt_core::command::{self, AppCommand};
use volt_core::ipc::{IPC_HANDLER_ERROR_CODE, IpcResponse, response_script};

pub(super) fn send_response_to_window(js_window_id: &str, response: IpcResponse) {
let request_id = response.id.clone();
let response_json = match serde_json::to_string(&response) {
Ok(serialized) => serialized,
Err(error) => {
Expand All @@ -18,8 +19,15 @@ pub(super) fn send_response_to_window(js_window_id: &str, response: IpcResponse)
};

let script = response_script(&response_json);
let _ = command::send_command(AppCommand::EvaluateScript {
if let Err(error) = command::send_command(AppCommand::EvaluateScript {
js_id: js_window_id.to_string(),
script,
});
}) {
tracing::debug!(
window_id = %js_window_id,
request_id = %request_id,
error = %error,
"dropping IPC response because the target window is unavailable"
);
}
}
Loading
Loading