Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
### Enhancements

- Expose per-tree RocksDB tuning options ([#1782](https://github.com/0xMiden/node/pull/1782)).
- Added a gRPC server to the NTX builder, configurable via `--ntx-builder.url` / `MIDEN_NODE_NTX_BUILDER_URL` (https://github.com/0xMiden/node/issues/1758).
- Added `GetNoteError` gRPC endpoint to query the latest execution error for network notes (https://github.com/0xMiden/node/issues/1758).
- Added verbose `info!`-level logging to the network transaction builder for transaction execution, note filtering failures, and transaction outcomes ([#1770](https://github.com/0xMiden/node/pull/1770)).
- [BREAKING] Move block proving from Blocker Producer to the Store ([#1579](https://github.com/0xMiden/node/pull/1579)).
- [BREAKING] Updated miden-base dependencies to use `next` branch; renamed `NoteInputs` to `NoteStorage`, `.inputs()` to `.storage()`, and database `inputs` column to `storage` ([#1595](https://github.com/0xMiden/node/pull/1595)).
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 38 additions & 15 deletions bin/node/src/commands/bundled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,44 @@ impl BundledCommand {
.id()
};

// Prepare network transaction builder (bind listener + config before starting RPC,
// so that the ntx-builder URL is available for the RPC proxy).
let mut ntx_builder_url_for_rpc = None;
let ntx_builder_prepared = if should_start_ntx_builder {
let store_ntx_builder_url = Url::parse(&format!("http://{store_ntx_builder_address}"))
.context("Failed to parse URL")?;
let block_producer_url = block_producer_url.clone();
let validator_url = validator_url.clone();

let builder_config = ntx_builder.into_builder_config(
store_ntx_builder_url,
block_producer_url,
validator_url,
&data_directory,
);

// Bind a listener for the ntx-builder gRPC server.
let ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
.await
.context("Failed to bind to ntx-builder gRPC endpoint")?;
let ntx_builder_address = ntx_builder_listener
.local_addr()
.context("Failed to retrieve the ntx-builder's gRPC address")?;
ntx_builder_url_for_rpc = Some(
Url::parse(&format!("http://{ntx_builder_address}"))
.context("Failed to parse ntx-builder URL")?,
);

Some((builder_config, ntx_builder_listener))
} else {
None
};

// Start RPC component.
let rpc_id = {
let block_producer_url = block_producer_url.clone();
let validator_url = validator_url.clone();
let ntx_builder_url = ntx_builder_url_for_rpc;
join_set
.spawn(async move {
let store_url = Url::parse(&format!("http://{store_rpc_address}"))
Expand All @@ -259,6 +293,7 @@ impl BundledCommand {
store_url,
block_producer_url: Some(block_producer_url),
validator_url,
ntx_builder_url,
grpc_options,
}
.serve()
Expand All @@ -275,27 +310,15 @@ impl BundledCommand {
(rpc_id, "rpc"),
]);

// Start network transaction builder. The endpoint is available after loading completes.
if should_start_ntx_builder {
let store_ntx_builder_url = Url::parse(&format!("http://{store_ntx_builder_address}"))
.context("Failed to parse URL")?;
let block_producer_url = block_producer_url.clone();
let validator_url = validator_url.clone();

let builder_config = ntx_builder.into_builder_config(
store_ntx_builder_url,
block_producer_url,
validator_url,
&data_directory,
);

// Start network transaction builder.
if let Some((builder_config, ntx_builder_listener)) = ntx_builder_prepared {
let id = join_set
.spawn(async move {
builder_config
.build()
.await
.context("failed to initialize ntx builder")?
.run()
.run(Some(ntx_builder_listener))
.await
.context("failed while serving ntx builder component")
})
Expand Down
1 change: 1 addition & 0 deletions bin/node/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const ENV_NTX_SCRIPT_CACHE_SIZE: &str = "MIDEN_NTX_DATA_STORE_SCRIPT_CACHE_SIZE"
const ENV_VALIDATOR_KEY: &str = "MIDEN_NODE_VALIDATOR_KEY";
const ENV_VALIDATOR_KMS_KEY_ID: &str = "MIDEN_NODE_VALIDATOR_KMS_KEY_ID";
const ENV_NTX_DATA_DIRECTORY: &str = "MIDEN_NODE_NTX_DATA_DIRECTORY";
const ENV_NTX_BUILDER_URL: &str = "MIDEN_NODE_NTX_BUILDER_URL";

const DEFAULT_NTX_TICKER_INTERVAL: Duration = Duration::from_millis(200);
const DEFAULT_NTX_IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60);
Expand Down
15 changes: 14 additions & 1 deletion bin/node/src/commands/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ use miden_node_utils::clap::GrpcOptionsExternal;
use miden_node_utils::grpc::UrlExt;
use url::Url;

use super::{ENV_BLOCK_PRODUCER_URL, ENV_RPC_URL, ENV_STORE_RPC_URL, ENV_VALIDATOR_URL};
use super::{
ENV_BLOCK_PRODUCER_URL,
ENV_NTX_BUILDER_URL,
ENV_RPC_URL,
ENV_STORE_RPC_URL,
ENV_VALIDATOR_URL,
};
use crate::commands::ENV_ENABLE_OTEL;

#[derive(clap::Subcommand)]
Expand All @@ -28,6 +34,11 @@ pub enum RpcCommand {
#[arg(long = "validator.url", env = ENV_VALIDATOR_URL, value_name = "URL")]
validator_url: Url,

/// The network transaction builder's gRPC url. If unset, the `GetNoteError` endpoint
/// will be unavailable.
#[arg(long = "ntx-builder.url", env = ENV_NTX_BUILDER_URL, value_name = "URL")]
ntx_builder_url: Option<Url>,

/// Enables the exporting of traces for OpenTelemetry.
///
/// This can be further configured using environment variables as defined in the official
Expand All @@ -47,6 +58,7 @@ impl RpcCommand {
store_url,
block_producer_url,
validator_url,
ntx_builder_url,
enable_otel: _,
grpc_options,
} = self;
Expand All @@ -61,6 +73,7 @@ impl RpcCommand {
store_url,
block_producer_url,
validator_url,
ntx_builder_url,
grpc_options,
}
.serve()
Expand Down
5 changes: 4 additions & 1 deletion crates/ntx-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ futures = { workspace = true }
libsqlite3-sys = { workspace = true }
miden-node-db = { workspace = true }
miden-node-proto = { workspace = true }
miden-node-proto-build = { features = ["internal"], workspace = true }
miden-node-utils = { workspace = true }
miden-protocol = { default-features = true, workspace = true }
miden-remote-prover-client = { features = ["tx-prover"], workspace = true }
miden-standards = { workspace = true }
miden-tx = { default-features = true, workspace = true }
thiserror = { workspace = true }
tokio = { features = ["rt-multi-thread"], workspace = true }
tokio-stream = { workspace = true }
tokio-stream = { features = ["net"], workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
tonic-reflection = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }

Expand Down
62 changes: 52 additions & 10 deletions crates/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ use miden_protocol::block::BlockNumber;
use miden_protocol::note::{NoteScript, Nullifier};
use miden_protocol::transaction::TransactionId;
use miden_remote_prover_client::RemoteTransactionProver;
use miden_tx::FailedNote;
use tokio::sync::{Notify, RwLock, Semaphore, mpsc};
use tokio_util::sync::CancellationToken;
use url::Url;

use crate::NoteError;
use crate::chain_state::ChainState;
use crate::clients::{BlockProducerClient, StoreClient};
use crate::db::Db;
use crate::inflight_note::InflightNetworkNote;

// ACTOR REQUESTS
// ================================================================================================
Expand All @@ -37,7 +38,7 @@ pub enum ActorRequest {
/// the oneshot channel, preventing race conditions where the actor could re-select the same
/// notes before the failure is persisted.
NotesFailed {
nullifiers: Vec<Nullifier>,
failed_notes: Vec<(Nullifier, NoteError)>,
block_num: BlockNumber,
ack_tx: tokio::sync::oneshot::Sender<()>,
},
Expand Down Expand Up @@ -423,23 +424,43 @@ impl AccountActor {
);
self.cache_note_scripts(scripts_to_cache).await;
if !failed.is_empty() {
let nullifiers: Vec<_> =
failed.into_iter().map(|note| note.note.nullifier()).collect();
self.mark_notes_failed(&nullifiers, block_num).await;
let failed_notes = log_failed_notes(failed);
self.mark_notes_failed(&failed_notes, block_num).await;
}
self.mode = ActorMode::TransactionInflight(tx_id);
},
// Transaction execution failed.
Err(err) => {
let error_msg = err.as_report();
tracing::error!(
%account_id,
?note_ids,
err = err.as_report(),
err = %error_msg,
"network transaction failed",
);
self.mode = ActorMode::NoViableNotes;
let nullifiers: Vec<_> = notes.iter().map(InflightNetworkNote::nullifier).collect();
self.mark_notes_failed(&nullifiers, block_num).await;

// For `AllNotesFailed`, use the per-note errors which contain the
// specific reason each note failed (e.g. consumability check details).
let failed_notes: Vec<_> = match err {
execute::NtxError::AllNotesFailed(per_note) => log_failed_notes(per_note),
other => {
let error: NoteError = Arc::new(other);
notes
.iter()
.map(|note| {
tracing::info!(
note.id = %note.to_inner().as_note().id(),
nullifier = %note.nullifier(),
err = %error_msg,
"note failed: transaction execution error",
);
(note.nullifier(), error.clone())
})
.collect()
},
};
self.mark_notes_failed(&failed_notes, block_num).await;
},
}
}
Expand All @@ -461,12 +482,16 @@ impl AccountActor {
/// Sends a request to the coordinator to mark notes as failed and waits for the DB write to
/// complete. This prevents a race condition where the actor could re-select the same notes
/// before the failure counts are updated in the database.
async fn mark_notes_failed(&self, nullifiers: &[Nullifier], block_num: BlockNumber) {
async fn mark_notes_failed(
&self,
failed_notes: &[(Nullifier, NoteError)],
block_num: BlockNumber,
) {
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
if self
.request_tx
.send(ActorRequest::NotesFailed {
nullifiers: nullifiers.to_vec(),
failed_notes: failed_notes.to_vec(),
block_num,
ack_tx,
})
Expand All @@ -479,3 +504,20 @@ impl AccountActor {
let _ = ack_rx.await;
}
}

/// Logs each failed note and returns a vec of `(nullifier, error)` pairs.
fn log_failed_notes(failed: Vec<FailedNote>) -> Vec<(Nullifier, NoteError)> {
failed
.into_iter()
.map(|f| {
let error_msg = f.error.as_report();
tracing::info!(
note.id = %f.note.id(),
nullifier = %f.note.nullifier(),
err = %error_msg,
"note failed: consumability check",
);
(f.note.nullifier(), Arc::new(f.error) as NoteError)
})
.collect()
}
41 changes: 36 additions & 5 deletions crates/ntx-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use miden_node_proto::domain::account::NetworkAccountId;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_protocol::account::delta::AccountUpdateDetails;
use miden_protocol::block::BlockHeader;
use tokio::net::TcpListener;
use tokio::sync::{RwLock, mpsc};
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tonic::Status;

Expand All @@ -17,6 +19,7 @@ use crate::chain_state::ChainState;
use crate::clients::StoreClient;
use crate::coordinator::Coordinator;
use crate::db::Db;
use crate::server::NtxBuilderRpcServer;

// NETWORK TRANSACTION BUILDER
// ================================================================================================
Expand Down Expand Up @@ -86,17 +89,45 @@ impl NetworkTransactionBuilder {

/// Runs the network transaction builder event loop until a fatal error occurs.
///
/// If a `TcpListener` is provided, a gRPC server is also spawned to expose the
/// `GetNoteError` endpoint.
///
/// This method:
/// 1. Spawns a background task to load existing network accounts from the store
/// 2. Runs the main event loop, processing mempool events and managing actors
/// 1. Optionally starts a gRPC server for note error queries
/// 2. Spawns a background task to load existing network accounts from the store
/// 3. Runs the main event loop, processing mempool events and managing actors
///
/// # Errors
///
/// Returns an error if:
/// - The mempool event stream ends unexpectedly
/// - An actor encounters a fatal error
/// - The account loader task fails
pub async fn run(mut self) -> anyhow::Result<()> {
/// - The gRPC server fails
pub async fn run(self, listener: Option<TcpListener>) -> anyhow::Result<()> {
let mut join_set = JoinSet::new();

// Start the gRPC server if a listener is provided.
if let Some(listener) = listener {
let server = NtxBuilderRpcServer::new(self.db.clone());
join_set.spawn(async move {
server.serve(listener).await.context("ntx-builder gRPC server failed")
});
}

join_set.spawn(self.run_event_loop());

// Wait for either the event loop or the gRPC server to complete.
// Any completion is treated as fatal.
if let Some(result) = join_set.join_next().await {
result.context("ntx-builder task panicked")??;
}

Ok(())
}

/// Runs the main event loop.
async fn run_event_loop(mut self) -> anyhow::Result<()> {
// Spawn a background task to load network accounts from the store.
// Accounts are sent through a channel and processed in the main event loop.
let (account_tx, mut account_rx) =
Expand Down Expand Up @@ -245,9 +276,9 @@ impl NetworkTransactionBuilder {
/// Processes a request from an account actor.
async fn handle_actor_request(&mut self, request: ActorRequest) -> Result<(), anyhow::Error> {
match request {
ActorRequest::NotesFailed { nullifiers, block_num, ack_tx } => {
ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => {
self.db
.notes_failed(nullifiers, block_num)
.notes_failed(failed_notes, block_num)
.await
.context("failed to mark notes as failed")?;
let _ = ack_tx.send(());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ CREATE TABLE notes (
account_id BLOB NOT NULL,
-- Serialized SingleTargetNetworkNote.
note_data BLOB NOT NULL,
-- Note ID bytes.
note_id BLOB,
-- Backoff tracking: number of failed execution attempts.
attempt_count INTEGER NOT NULL DEFAULT 0,
-- Backoff tracking: block number of the last failed attempt. NULL if never attempted.
last_attempt INTEGER,
-- Latest execution error message. NULL if no error recorded.
last_error TEXT,
-- NULL if the note came from a committed block; transaction ID if created by inflight tx.
created_by BLOB,
-- NULL if unconsumed; transaction ID of the consuming inflight tx.
Expand All @@ -60,6 +64,7 @@ CREATE TABLE notes (
CREATE INDEX idx_notes_account ON notes(account_id);
CREATE INDEX idx_notes_created_by ON notes(created_by) WHERE created_by IS NOT NULL;
CREATE INDEX idx_notes_consumed_by ON notes(consumed_by) WHERE consumed_by IS NOT NULL;
CREATE INDEX idx_notes_note_id ON notes(note_id) WHERE note_id IS NOT NULL;

-- Persistent cache of note scripts, keyed by script root hash.
-- Survives restarts so scripts don't need to be re-fetched from the store.
Expand Down
Loading
Loading