diff --git a/Cargo.lock b/Cargo.lock index 8c722c039..c5b2486a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1823,6 +1823,7 @@ version = "3.7.0" dependencies = [ "borsh", "near-sdk", + "serde_json", ] [[package]] diff --git a/crates/chain-gateway-test-contract/Cargo.toml b/crates/chain-gateway-test-contract/Cargo.toml index b0e09a9cc..7eae4d33e 100644 --- a/crates/chain-gateway-test-contract/Cargo.toml +++ b/crates/chain-gateway-test-contract/Cargo.toml @@ -26,6 +26,7 @@ crate-type = ["cdylib", "lib"] [dependencies] borsh = { workspace = true } near-sdk = { workspace = true } +serde_json = { workspace = true } [lints] workspace = true diff --git a/crates/chain-gateway-test-contract/res/chain_gateway_test_contract.wasm b/crates/chain-gateway-test-contract/res/chain_gateway_test_contract.wasm index 91d624b0f..fa0c1e632 100644 Binary files a/crates/chain-gateway-test-contract/res/chain_gateway_test_contract.wasm and b/crates/chain-gateway-test-contract/res/chain_gateway_test_contract.wasm differ diff --git a/crates/chain-gateway-test-contract/src/lib.rs b/crates/chain-gateway-test-contract/src/lib.rs index 7d29717b9..648b1d1a1 100644 --- a/crates/chain-gateway-test-contract/src/lib.rs +++ b/crates/chain-gateway-test-contract/src/lib.rs @@ -1,12 +1,55 @@ -use near_sdk::{env::log_str, near}; +use near_sdk::{ + Gas, NearToken, Promise, + env::{self, log_str}, + near, +}; pub fn compiled_wasm() -> &'static [u8] { include_bytes!("../res/chain_gateway_test_contract.wasm") } pub const DEFAULT_VALUE: &str = "hello from test"; + +// public view method pub const VIEW_METHOD: &str = "view_value"; -pub const WRITE_METHOD: &str = "set_value"; + +// public set method +pub const SET_VALUE: &str = "set_value"; +pub const SET_VALUE_TGAS: u64 = FIVE_TGAS; + +// teragas as u64. We don't use near_sdk::Gas on purpose, such that the near indexer can re-use +// these constants without depending on near_sdk. +pub const FIVE_TGAS: u64 = 5; + +// methods that spawn promises +pub const SET_VALUE_IN_PROMISE: &str = "set_value_in_promise"; +pub const SET_VALUE_IN_PROMISE_TGAS: u64 = SET_VALUE_TGAS + FIVE_TGAS; + +pub const SPAWN_PROMISE_WITH_CALLBACK: &str = "spawn_promise_with_callback"; +pub const SPAWN_PROMISE_WITH_CALLBACK_TGAS: u64 = + SET_VALUE_IN_PROMISE_TGAS + SET_VALUE_TGAS + FIVE_TGAS; + +// private method for setting value +pub const PRIVATE_SET: &str = "private_set"; +pub const PRIVATE_SET_ARGS_TGAS: u64 = 5; + +#[near(serializers=[json])] +pub struct PrivateSetArgs { + pub value: String, + pub succeeds: bool, +} + +#[near(serializers=[json])] +pub struct SetValueInPromiseArgs { + pub value: String, + pub return_error: bool, +} + +#[near(serializers=[json])] +pub struct SetValueWithMarker { + pub successfully_spawn_promise: bool, + pub end_marker: String, +} #[derive(Debug)] #[near(contract_state)] @@ -32,4 +75,66 @@ impl Contract { log_str(&format!("Setting value to: {value}")); self.stored_value = value; } + + /// Spawns a cross-contract promise to [`private_set`](Self::private_set), returning an error + /// if `return_error` is set. Used by integration tests to verify + /// `ExecutorFunctionCallSuccessWithPromise` event tracking. + #[handle_result] + pub fn set_value_in_promise(&mut self, args: SetValueInPromiseArgs) -> Result { + if args.return_error { + Err("computer says no".to_string()) + } else { + let private_set_args = PrivateSetArgs { + value: args.value, + succeeds: true, + }; + Ok(Promise::new(env::current_account_id()).function_call( + PRIVATE_SET.to_string(), + serde_json::to_vec(&serde_json::json!({"args": private_set_args})).unwrap(), + NearToken::from_near(0), + Gas::from_tgas(PRIVATE_SET_ARGS_TGAS), + )) + } + } + + /// Chains two promises: first calls [`set_value_in_promise`](Self::set_value_in_promise) + /// (which may fail based on `successfully_spawn_promise`), then a callback that writes + /// `end_marker` via [`private_set`](Self::private_set). The callback acts as a + /// synchronization marker so tests can poll [`view_value`](Self::view_value) to know the + /// full chain completed. + pub fn spawn_promise_with_callback(args: SetValueWithMarker) -> Promise { + let set_value_args = SetValueInPromiseArgs { + value: "doesn't matter".to_string(), + return_error: !args.successfully_spawn_promise, + }; + let promise = Promise::new(env::current_account_id()).function_call( + SET_VALUE_IN_PROMISE.to_string(), + serde_json::to_vec(&serde_json::json!({"args": set_value_args})).unwrap(), + NearToken::from_near(0), + Gas::from_tgas(SET_VALUE_IN_PROMISE_TGAS), + ); + let private_set_args = PrivateSetArgs { + value: args.end_marker, + succeeds: true, + }; + let callback = Promise::new(env::current_account_id()).function_call( + PRIVATE_SET.to_string(), + serde_json::to_vec(&serde_json::json!({"args": private_set_args})).unwrap(), + NearToken::from_near(0), + Gas::from_tgas(PRIVATE_SET_ARGS_TGAS), + ); + promise.then(callback) + } + + /// Can only be called by the contract itself (via a promise). + #[private] + #[handle_result] + pub fn private_set(&mut self, args: PrivateSetArgs) -> Result<(), String> { + if args.succeeds { + self.set_value(args.value); + Ok(()) + } else { + Err("intentional error for testing".to_string()) + } + } } diff --git a/crates/chain-gateway/src/chain_gateway.rs b/crates/chain-gateway/src/chain_gateway.rs index b5d71f7ea..f67090161 100644 --- a/crates/chain-gateway/src/chain_gateway.rs +++ b/crates/chain-gateway/src/chain_gateway.rs @@ -2,10 +2,15 @@ use std::path::Path; use near_account_id::AccountId; use near_async::ActorSystem; +use near_indexer::StreamerMessage; use near_indexer::near_primitives::transaction::SignedTransaction; use nearcore::NearConfig; +use tokio::sync::mpsc::Receiver; use crate::errors::{ChainGatewayError, NearClientError, NearRpcError, NearViewClientError}; +use crate::event_subscriber; +use crate::event_subscriber::block_events::BlockUpdate; +use crate::event_subscriber::subscriber::BlockEventSubscriber; use crate::near_internals_wrapper::{ NearClientActorHandle, NearRpcActorHandle, NearViewClientActorHandle, }; @@ -92,41 +97,64 @@ impl NodeHandle { } impl ChainGateway { - /// Spawns a near node with `config`. + /// Spawns a near node with `indexer_config`. /// The [`NodeHandle`] can be used to shut down the actor system for the node and liveness checks. /// The node dies if [`NodeHandle`] is dropped. + /// Returns a stream for BlockUpdates if BlockEventSubscriber is not None. pub async fn start( - config: near_indexer::IndexerConfig, - ) -> Result<(ChainGateway, NodeHandle), ChainGatewayError> { - let near_config: NearConfig = - config - .load_near_config() - .map_err(|err| ChainGatewayError::FailureLoadingConfig { - msg: err.to_string(), - })?; - let home_dir = config.home_dir; + indexer_config: near_indexer::IndexerConfig, + subscriber: Option, + ) -> Result< + ( + ChainGateway, + NodeHandle, + Option>, + ), + ChainGatewayError, + > { + let near_config: NearConfig = indexer_config.load_near_config().map_err(|err| { + ChainGatewayError::FailureLoadingConfig { + msg: err.to_string(), + } + })?; + + let home_dir = indexer_config.home_dir.clone(); + let streamer_setup = subscriber.map(|subscriber| StreamerSetup { + subscriber, + indexer_config, + near_config: near_config.clone(), + }); + let (ready_sender, ready_receiver) = tokio::sync::oneshot::channel(); let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel(); let thread_handle = std::thread::spawn(move || { - // blocking method, resumes once `shutdown_sender` sends the shutdown signal - run_node(ready_sender, near_config, &home_dir, shutdown_receiver) + run_node( + ready_sender, + near_config, + &home_dir, + shutdown_receiver, + streamer_setup, + ) }); - let chain_gateway = ready_receiver.await.expect("startup thread died")?; + let (chain_gateway, stream) = ready_receiver.await.expect("startup thread died")?; let node_handle = NodeHandle { thread_handle, shutdown_sender: Some(shutdown_sender), }; - Ok((chain_gateway, node_handle)) + Ok((chain_gateway, node_handle, stream)) } } +type RunNodeResult = Result<(ChainGateway, Option>), ChainGatewayError>; + fn run_node( - ready_sender: tokio::sync::oneshot::Sender>, + ready_sender: tokio::sync::oneshot::Sender, near_config: nearcore::NearConfig, home_dir: &Path, shutdown_receiver: tokio::sync::oneshot::Receiver<()>, + streamer_setup: Option, ) { let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -146,15 +174,43 @@ fn run_node( } }; + let indexer_and_params = streamer_setup.map(|s| { + let indexer = + near_indexer::Indexer::from_near_node(s.indexer_config, s.near_config, &near_node); + (indexer, s.subscriber) + }); + let view_client = NearViewClientActorHandle::new(near_node.view_client); let client = NearClientActorHandle::new(near_node.client); let rpc_handler = NearRpcActorHandle::new(near_node.rpc_handler); - let _ = ready_sender.send(Ok(ChainGateway { - view_client, - client, - rpc_handler, - })); + let stream = if let Some((indexer, streamer_config)) = indexer_and_params { + let raw_stream: Receiver = indexer.streamer(); + match event_subscriber::streamer::start( + streamer_config, + raw_stream, + view_client.clone(), + ) + .await + { + Ok(rx) => Some(rx), + Err(err) => { + let _ = ready_sender.send(Err(err)); + return; + } + } + } else { + None + }; + + let _ = ready_sender.send(Ok(( + ChainGateway { + view_client, + client, + rpc_handler, + }, + stream, + ))); match shutdown_receiver.await { Ok(()) => { @@ -168,3 +224,11 @@ fn run_node( actor_system.stop(); }); } + +/// Parameters for optionally starting the block-event streaming pipeline +/// alongside the nearcore node. +struct StreamerSetup { + subscriber: BlockEventSubscriber, + indexer_config: near_indexer::IndexerConfig, + near_config: NearConfig, +} diff --git a/crates/chain-gateway/src/errors.rs b/crates/chain-gateway/src/errors.rs index e86463196..604b944c5 100644 --- a/crates/chain-gateway/src/errors.rs +++ b/crates/chain-gateway/src/errors.rs @@ -120,4 +120,15 @@ pub enum ChainGatewayError { #[error("view client error while {op}: {message}")] ViewError { op: ChainGatewayOp, message: String }, + + #[error( + "block event channel has been full for too long, the receiver must drain messages otherwise we risk out of memory" + )] + BlockEventBufferFull, + + #[error("near indexer dropped")] + BlockEventIndexerDropped, + + #[error("block event stream has been closed or dropped by the receiver")] + BlockEventReceiverDropped, } diff --git a/crates/chain-gateway/src/event_subscriber.rs b/crates/chain-gateway/src/event_subscriber.rs new file mode 100644 index 000000000..18077ee1b --- /dev/null +++ b/crates/chain-gateway/src/event_subscriber.rs @@ -0,0 +1,4 @@ +pub mod block_events; +mod stats; +pub(super) mod streamer; +pub mod subscriber; diff --git a/crates/chain-gateway/src/event_subscriber/block_events.rs b/crates/chain-gateway/src/event_subscriber/block_events.rs new file mode 100644 index 000000000..f979fc2b7 --- /dev/null +++ b/crates/chain-gateway/src/event_subscriber/block_events.rs @@ -0,0 +1,65 @@ +use derive_more::{Deref, From}; +use near_account_id::AccountId; +use near_indexer_primitives::CryptoHash; + +use crate::types::BlockHeight; + +/// The BlockUpdate returned by the Chain indexer. +/// Similar to [`ChainBlockUpdate`](../../node/src/indexer/handler.rs) in the `mpc-node` crate. +#[derive(Debug)] +pub struct BlockUpdate { + pub context: BlockContext, + pub events: Vec, +} + +/// Context for a single block +#[derive(Debug)] +pub struct BlockContext { + pub hash: CryptoHash, + pub height: BlockHeight, + pub prev_hash: CryptoHash, + pub last_final_block: CryptoHash, + pub block_entropy: [u8; 32], + pub block_timestamp_nanosec: u64, +} + +#[derive(Debug)] +pub struct MatchedEvent { + /// this is needed such that the caller can identify the block event + pub id: BlockEventId, + /// any data associated with that event + pub event_data: EventData, +} + +/// An identifier for a block event +#[derive(Debug, Deref, From, Clone, Copy, PartialEq, Eq)] +pub struct BlockEventId(pub u64); + +/// Event data, matching a filter [`super::subscriber::BlockEventFilter`] +#[derive(Debug, PartialEq)] +pub enum EventData { + ExecutorFunctionCallSuccessWithPromise(ExecutorFunctionCallSuccessWithPromiseData), + ReceiverFunctionCall(ReceiverFunctionCallData), +} + +/// Event data for a receipt matching a [`super::subscriber::BlockEventFilter::ExecutorFunctionCallSuccessWithPromise`] +#[derive(Debug, PartialEq)] +pub struct ExecutorFunctionCallSuccessWithPromiseData { + /// the receipt_id of the receipt this event came from + pub receipt_id: CryptoHash, + /// predecessor_id who signed the transaction + pub predecessor_id: AccountId, + /// the receipt that will hold the outcome of this receipt + pub next_receipt_id: CryptoHash, + /// raw bytes used for function call + pub args_raw: Vec, +} + +/// Event data for a receipt matching a [`super::subscriber::BlockEventFilter::ReceiverFunctionCall`] +#[derive(Debug, PartialEq)] +pub struct ReceiverFunctionCallData { + /// the receipt id for the matched transaction + pub receipt_id: CryptoHash, + /// whether the execution outcome was successful + pub is_success: bool, +} diff --git a/crates/chain-gateway/src/event_subscriber/stats.rs b/crates/chain-gateway/src/event_subscriber/stats.rs new file mode 100644 index 000000000..3839eae48 --- /dev/null +++ b/crates/chain-gateway/src/event_subscriber/stats.rs @@ -0,0 +1,58 @@ +use crate::{primitives::FetchLatestFinalBlockInfo, types::BlockHeight}; + +#[derive(Debug, Clone)] +pub struct IndexerStats { + pub blocks_processed_count: u64, + pub last_processed_block_height: BlockHeight, +} + +impl IndexerStats { + pub(crate) fn new() -> Self { + IndexerStats { + blocks_processed_count: 0, + last_processed_block_height: 0.into(), + } + } +} + +/// Periodically logs indexer progress stats. +/// Based on [`indexer_logger`](../../node/src/indexer/stats.rs) in the `mpc-node` crate, +/// but uses a `watch` channel instead of a `Mutex` to read stats, since blocks are no longer +/// processed by multiple threads. +pub async fn indexer_logger( + stats_rx: tokio::sync::watch::Receiver, + info_fetcher: impl FetchLatestFinalBlockInfo, +) { + let interval_secs = 10; + let mut prev_blocks_processed_count: u64 = 0; + + loop { + tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; + let stats_copy = stats_rx.borrow().clone(); + + let block_processing_speed: f64 = (stats_copy + .blocks_processed_count + .saturating_sub(prev_blocks_processed_count) + as f64) + / (interval_secs as f64); + + let blocks_behind = match info_fetcher.fetch_latest_final_block_info().await { + Ok(block_info) => { + let tip: u64 = block_info.observed_at.into(); + let processed: u64 = stats_copy.last_processed_block_height.into(); + format!("{}", tip.saturating_sub(processed)) + } + Err(_) => "∞".to_string(), + }; + + tracing::info!( + target: "chain gateway", + "# {} | Blocks done: {}. Bps {:.2} b/s, block remaining: {}", + stats_copy.last_processed_block_height, + stats_copy.blocks_processed_count, + block_processing_speed, + blocks_behind, + ); + prev_blocks_processed_count = stats_copy.blocks_processed_count; + } +} diff --git a/crates/chain-gateway/src/event_subscriber/streamer.rs b/crates/chain-gateway/src/event_subscriber/streamer.rs new file mode 100644 index 000000000..90376defe --- /dev/null +++ b/crates/chain-gateway/src/event_subscriber/streamer.rs @@ -0,0 +1,45 @@ +mod block_processor; +mod config; + +use block_processor::listen_blocks; +use config::StreamerConfig; +use near_indexer::StreamerMessage; + +use crate::{errors::ChainGatewayError, primitives::FetchLatestFinalBlockInfo}; + +use super::{ + block_events::BlockUpdate, + stats::{IndexerStats, indexer_logger}, + subscriber::BlockEventSubscriber, +}; + +pub(crate) async fn start( + block_event_subscriber: BlockEventSubscriber, + stream: tokio::sync::mpsc::Receiver, + info_fetcher: impl FetchLatestFinalBlockInfo, +) -> Result, ChainGatewayError> { + let StreamerConfig { + buffer_size, + block_events, + backpressure_timeout, + } = block_event_subscriber.into(); + let (stats_tx, stats_rx) = tokio::sync::watch::channel(IndexerStats::new()); + let (block_tx, block_rx) = tokio::sync::mpsc::channel(buffer_size); + + tokio::spawn(async move { + if let Err(err) = listen_blocks( + stream, + block_events, + stats_tx, + block_tx, + backpressure_timeout, + ) + .await + { + tracing::error!(target: "chain gateway", "block event listener stoppd: {err}"); + } + }); + tokio::spawn(indexer_logger(stats_rx, info_fetcher)); + + Ok(block_rx) +} diff --git a/crates/chain-gateway/src/event_subscriber/streamer/block_processor.rs b/crates/chain-gateway/src/event_subscriber/streamer/block_processor.rs new file mode 100644 index 000000000..5a8651601 --- /dev/null +++ b/crates/chain-gateway/src/event_subscriber/streamer/block_processor.rs @@ -0,0 +1,183 @@ +use std::time::Duration; + +use near_indexer::IndexerExecutionOutcomeWithReceipt; +use near_indexer_primitives::{ + types::FunctionArgs, + views::{ActionView, ExecutionStatusView, ReceiptEnumView, ReceiptView}, +}; + +use crate::{ + errors::ChainGatewayError, + event_subscriber::{ + block_events::{ + BlockContext, BlockUpdate, EventData, ExecutorFunctionCallSuccessWithPromiseData, + MatchedEvent, ReceiverFunctionCallData, + }, + stats::IndexerStats, + }, +}; + +use super::config::{BlockEventIdsByContractIds, BlockEvents}; + +pub(super) async fn listen_blocks( + mut stream: tokio::sync::mpsc::Receiver, + block_events: BlockEvents, + stats_tx: tokio::sync::watch::Sender, + block_update_sender: tokio::sync::mpsc::Sender, + backpressure_timeout: Duration, +) -> Result<(), ChainGatewayError> { + let mut blocks_processed_count: u64 = 0; + // Note: the mpc-node indexer (handler.rs) uses `buffer_unordered` for concurrent + // block processing. We deliberately use sequential processing here because: + // 1. `process_block` is synchronous — there is no async work to overlap. + // 2. `buffer_unordered` does not preserve ordering, yet consumers expect + // block updates in block-height order. + // 3. There is no performance gain in concurrent processing here, especially if we use + // bounded channels. + loop { + let streamer_message = stream + .recv() + .await + .ok_or(ChainGatewayError::BlockEventIndexerDropped)?; + let block_height = streamer_message.block.header.height; + // TODO(#2626): we can ignore blocks that are older than a specific block height. This + // requires some care on the node side, which is why we will only do so after we integrated + // the chain-gateway struct with the node. + let block_update = process_block(streamer_message, &block_events); + // Only send if we have something the consumer is interested in. + if !block_update.events.is_empty() { + tokio::time::timeout(backpressure_timeout, block_update_sender.send(block_update)) + .await + .map_err(|_| ChainGatewayError::BlockEventBufferFull)? + .map_err(|_| ChainGatewayError::BlockEventReceiverDropped)?; + } + blocks_processed_count = blocks_processed_count.saturating_add(1); + stats_tx.send_modify(|s| { + s.blocks_processed_count = blocks_processed_count; + s.last_processed_block_height = block_height.into(); + }); + } +} + +fn filter_executor_function_calls( + res: &mut Vec, + executor_filters: &BlockEventIdsByContractIds, + outcome: &IndexerExecutionOutcomeWithReceipt, +) { + let execution_outcome = &outcome.execution_outcome; + let ExecutionStatusView::SuccessReceiptId(next_receipt_id) = execution_outcome.outcome.status + else { + return; + }; + let receipt = outcome.receipt.clone(); + let executor_id = &execution_outcome.outcome.executor_id; + let Some(filter_methods_for_executor) = executor_filters.filter_methods_for(executor_id) else { + return; + }; + let Some((args, contract_method_name)) = try_extract_function_call_args(&receipt) else { + return; + }; + let Some(filter_ids) = filter_methods_for_executor.filter_ids_for(contract_method_name) else { + return; + }; + for filter_id in filter_ids { + res.push(MatchedEvent { + id: *filter_id, + event_data: EventData::ExecutorFunctionCallSuccessWithPromise( + ExecutorFunctionCallSuccessWithPromiseData { + receipt_id: receipt.receipt_id, + predecessor_id: receipt.predecessor_id.clone(), + next_receipt_id, + args_raw: args.to_vec(), + }, + ), + }); + } +} + +fn filter_receipt_function_calls( + res: &mut Vec, + receiver_filters: &BlockEventIdsByContractIds, + outcome: &IndexerExecutionOutcomeWithReceipt, +) { + let receipt = &outcome.receipt; + let Some(methods_filter) = receiver_filters.filter_methods_for(&receipt.receiver_id) else { + return; + }; + + let Some((_, contract_method_name)) = try_extract_function_call_args(receipt) else { + return; + }; + let Some(filter_ids) = methods_filter.filter_ids_for(contract_method_name) else { + return; + }; + + let is_success = matches!( + outcome.execution_outcome.outcome.status, + ExecutionStatusView::SuccessValue(_) | ExecutionStatusView::SuccessReceiptId(_) + ); + + for filter_id in filter_ids { + res.push(MatchedEvent { + id: *filter_id, + event_data: EventData::ReceiverFunctionCall(ReceiverFunctionCallData { + receipt_id: receipt.receipt_id, + is_success, + }), + }); + } +} + +fn process_block( + streamer_message: near_indexer_primitives::StreamerMessage, + block_events: &BlockEvents, +) -> BlockUpdate { + let mut filtered_events = vec![]; + for shard in streamer_message.shards { + for outcome in &shard.receipt_execution_outcomes { + filter_executor_function_calls( + &mut filtered_events, + &block_events.executor_filters, + outcome, + ); + filter_receipt_function_calls( + &mut filtered_events, + &block_events.receipt_receiver_filters, + outcome, + ); + } + } + let context = BlockContext { + hash: streamer_message.block.header.hash, + height: streamer_message.block.header.height.into(), + prev_hash: streamer_message.block.header.prev_hash, + block_entropy: streamer_message.block.header.random_value.into(), + block_timestamp_nanosec: streamer_message.block.header.timestamp_nanosec, + last_final_block: streamer_message.block.header.last_final_block, + }; + BlockUpdate { + context, + events: filtered_events, + } +} + +fn try_extract_function_call_args(receipt: &ReceiptView) -> Option<(&FunctionArgs, &String)> { + let ReceiptEnumView::Action { ref actions, .. } = receipt.receipt else { + return None; + }; + if actions.len() != 1 { + return None; + } + let ActionView::FunctionCall { + ref method_name, + ref args, + .. + } = actions[0] + else { + return None; + }; + + tracing::debug!(target: "mpc", "found `{}` function call", method_name); + + Some((args, method_name)) +} diff --git a/crates/chain-gateway/src/event_subscriber/streamer/config.rs b/crates/chain-gateway/src/event_subscriber/streamer/config.rs new file mode 100644 index 000000000..810dc987a --- /dev/null +++ b/crates/chain-gateway/src/event_subscriber/streamer/config.rs @@ -0,0 +1,99 @@ +use std::{collections::BTreeMap, time::Duration}; + +use near_account_id::AccountId; + +use crate::event_subscriber::{ + block_events::BlockEventId, + subscriber::{BlockEventFilter, BlockEventSubscriber}, +}; + +pub(super) struct StreamerConfig { + pub(super) block_events: BlockEvents, + pub(super) buffer_size: usize, + pub(super) backpressure_timeout: Duration, +} + +// helper struct for efficient access +pub(super) struct BlockEvents { + pub(super) executor_filters: BlockEventIdsByContractIds, + pub(super) receipt_receiver_filters: BlockEventIdsByContractIds, +} + +// helper struct for efficient access +pub(super) struct BlockEventIdsByContractIds(BTreeMap); + +// helper struct for efficient access +pub(super) struct BlockEventIdsByMethodNames(BTreeMap>); + +impl BlockEventIdsByMethodNames { + pub(crate) fn filter_ids_for(&self, method_name: &str) -> Option<&Vec> { + self.0.get(method_name) + } +} + +impl BlockEventIdsByContractIds { + pub(crate) fn filter_methods_for( + &self, + contract: &AccountId, + ) -> Option<&BlockEventIdsByMethodNames> { + self.0.get(contract) + } +} + +impl From for StreamerConfig { + fn from(value: BlockEventSubscriber) -> Self { + let mut executor_filters: BTreeMap>> = + BTreeMap::new(); + let mut receipt_receiver_filters: BTreeMap>> = + BTreeMap::new(); + for (id, filter) in value.subscriptions { + match filter { + BlockEventFilter::ExecutorFunctionCallSuccessWithPromise { + transaction_outcome_executor_id, + method_name, + } => { + executor_filters + .entry(transaction_outcome_executor_id) + .or_default() + .entry(method_name) + .or_default() + .push(id); + } + BlockEventFilter::ReceiverFunctionCall { + receipt_receiver_id, + method_name, + } => { + receipt_receiver_filters + .entry(receipt_receiver_id) + .or_default() + .entry(method_name) + .or_default() + .push(id); + } + } + } + + let block_events = BlockEvents { + executor_filters: BlockEventIdsByContractIds( + executor_filters + .into_iter() + .map(|(k, v)| (k, BlockEventIdsByMethodNames(v))) + .collect(), + ), + receipt_receiver_filters: BlockEventIdsByContractIds( + receipt_receiver_filters + .into_iter() + .map(|(k, v)| (k, BlockEventIdsByMethodNames(v))) + .collect(), + ), + }; + + let buffer_size = value.buffer_size; + let backpressure_timeout = value.backpressure_timeout; + StreamerConfig { + block_events, + buffer_size, + backpressure_timeout, + } + } +} diff --git a/crates/chain-gateway/src/event_subscriber/subscriber.rs b/crates/chain-gateway/src/event_subscriber/subscriber.rs new file mode 100644 index 000000000..84135d839 --- /dev/null +++ b/crates/chain-gateway/src/event_subscriber/subscriber.rs @@ -0,0 +1,74 @@ +use std::time::Duration; + +use near_account_id::AccountId; + +use super::block_events::BlockEventId; + +const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_secs(60); + +pub struct BlockEventSubscriber { + pub(super) subscriptions: Vec<(BlockEventId, BlockEventFilter)>, + next_id: BlockEventId, + pub(super) buffer_size: usize, + pub(super) backpressure_timeout: Duration, +} + +impl BlockEventSubscriber { + pub fn new(buffer_size: usize) -> Self { + BlockEventSubscriber { + subscriptions: vec![], + next_id: 0.into(), + buffer_size, + backpressure_timeout: DEFAULT_SEND_TIMEOUT, + } + } + + pub fn with_backpressure_timeout(mut self, timeout: Duration) -> Self { + self.backpressure_timeout = timeout; + self + } + + /// Add a filter and get a unique identifier for it. + /// Can be called multiple times before build(). + /// The identifier can be used to match a return value to the given filter. + pub fn subscribe(&mut self, filter: BlockEventFilter) -> BlockEventId { + let filter_id = self.next_id; + self.subscriptions.push((filter_id, filter)); + self.next_id = filter_id.overflowing_add(1).0.into(); + filter_id + } +} + +/// Filters, can be extended if necessary +pub enum BlockEventFilter { + /// Filters for executions of method `method_name` on `transaction_outcome_executor_id` + /// that spawn a promise (execution status == `SuccessReceiptId`). + /// + /// Calls to `transaction_outcome_executor_id.method_name`, that do not spawn a promise will be + /// ignored. + /// + /// If a transaction matches this filter, then [`super::block_events::ExecutorFunctionCallSuccessWithPromiseData`] will be extracted + /// and placed in the [`super::block_events::BlockUpdate`] + /// + /// When to use: + /// Use this for tracking calls across blocks. The MPC node uses this to filter out signature + /// requests and keep track of the yield index for resolving the request. + ExecutorFunctionCallSuccessWithPromise { + transaction_outcome_executor_id: AccountId, + method_name: String, + }, + + /// Filters for calls to `receipt_receiver_id.method_name`, regardless if they spawn a + /// promise, have been successful or not. + /// If a transaction matches this filter, then [`super::block_events::ReceiverFunctionCallData`] will be extracted + /// and placed in the [`super::block_events::BlockUpdate`]. + /// + /// When to use: + /// Use this if one just wants to track calls to a specific method on a specific contract + /// without the additional data of [`super::block_events::ExecutorFunctionCallSuccessWithPromiseData`]. + /// The MPC node uses this to track calls to private contract methods. + ReceiverFunctionCall { + receipt_receiver_id: AccountId, + method_name: String, + }, +} diff --git a/crates/chain-gateway/src/lib.rs b/crates/chain-gateway/src/lib.rs index 04b5af996..f50cded2a 100644 --- a/crates/chain-gateway/src/lib.rs +++ b/crates/chain-gateway/src/lib.rs @@ -1,5 +1,6 @@ pub mod chain_gateway; pub mod errors; +pub mod event_subscriber; mod near_internals_wrapper; pub mod primitives; pub mod state_viewer; diff --git a/crates/chain-gateway/src/state_viewer/subscription.rs b/crates/chain-gateway/src/state_viewer/subscription.rs index 59a2cbeca..6660e0e10 100644 --- a/crates/chain-gateway/src/state_viewer/subscription.rs +++ b/crates/chain-gateway/src/state_viewer/subscription.rs @@ -26,6 +26,8 @@ impl WatchContractState for ContractMethodSubscription where Res: DeserializeOwned + Send + Clone, { + /// The constructor marks the initial value as seen, so + /// `changed().await` will not fire until a genuinely new value arrives. async fn changed(&mut self) -> Result<(), ChainGatewayError> { self.inner .last_observed diff --git a/crates/chain-gateway/src/types.rs b/crates/chain-gateway/src/types.rs index 089b6c736..2a1eb1074 100644 --- a/crates/chain-gateway/src/types.rs +++ b/crates/chain-gateway/src/types.rs @@ -1,4 +1,4 @@ -use derive_more::{From, Into}; +use derive_more::{Display, From, Into}; use near_indexer_primitives::CryptoHash; use serde::{Deserialize, Serialize, de::DeserializeOwned}; @@ -9,7 +9,7 @@ use crate::errors::ChainGatewayError; pub struct NoArgs {} #[derive( - Into, From, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug, + Into, From, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug, Display, )] pub struct BlockHeight(u64); diff --git a/crates/chain-gateway/tests/common.rs b/crates/chain-gateway/tests/common.rs index 5f2988980..748a1b801 100644 --- a/crates/chain-gateway/tests/common.rs +++ b/crates/chain-gateway/tests/common.rs @@ -1,3 +1,3 @@ -mod contract; +pub(super) mod accounts; pub(super) mod localnet; pub(super) mod node; diff --git a/crates/chain-gateway/tests/common/accounts.rs b/crates/chain-gateway/tests/common/accounts.rs new file mode 100644 index 000000000..9414c29f6 --- /dev/null +++ b/crates/chain-gateway/tests/common/accounts.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use chain_gateway::transaction_sender::TransactionSigner; +use ed25519_dalek::{SigningKey, VerifyingKey}; + +fn public_key_str(signing_key: &SigningKey) -> String { + let verifying_key: VerifyingKey = signing_key.verifying_key(); + let verifying_key_vec: Vec = verifying_key.as_bytes().to_vec(); + let near_pk: near_sdk::PublicKey = + near_sdk::PublicKey::from_parts(near_sdk::CurveType::ED25519, verifying_key_vec).unwrap(); + String::from(&near_pk) +} + +#[derive(Clone)] +pub struct Contract { + pub account_id: near_account_id::AccountId, + pub signing_key: SigningKey, +} + +impl Contract { + pub fn public_key_str(&self) -> String { + public_key_str(&self.signing_key) + } +} + +pub(super) fn compiled_test_contract_wasm() -> &'static [u8] { + chain_gateway_test_contract::compiled_wasm() +} + +#[derive(Clone)] +pub struct TestAccount { + pub account_id: near_account_id::AccountId, + pub signing_key: SigningKey, + pub signer: Arc, +} + +impl TestAccount { + pub fn new(account_id: near_account_id::AccountId, signing_key: SigningKey) -> Self { + let signer = Arc::new(TransactionSigner::from_key( + account_id.clone(), + signing_key.clone(), + )); + Self { + account_id, + signing_key, + signer, + } + } + pub fn public_key_str(&self) -> String { + public_key_str(&self.signing_key) + } +} + +pub(super) fn test_contract(account_id: near_account_id::AccountId) -> Contract { + let signing_key = SigningKey::from_bytes(&[1u8; 32]); + Contract { + account_id, + signing_key, + } +} diff --git a/crates/chain-gateway/tests/common/contract.rs b/crates/chain-gateway/tests/common/contract.rs deleted file mode 100644 index aefe8b2d1..000000000 --- a/crates/chain-gateway/tests/common/contract.rs +++ /dev/null @@ -1,32 +0,0 @@ -use ed25519_dalek::{SigningKey, VerifyingKey}; - -const TEST_CONTRACT_ACCOUNT: &str = "test-contract.near"; - -#[derive(Clone)] -pub struct Contract { - pub account_id: near_account_id::AccountId, - pub signing_key: SigningKey, -} - -impl Contract { - pub fn public_key_str(&self) -> String { - let verifying_key: VerifyingKey = self.signing_key.verifying_key(); - let verifying_key_vec: Vec = verifying_key.as_bytes().to_vec(); - let near_pk: near_sdk::PublicKey = - near_sdk::PublicKey::from_parts(near_sdk::CurveType::ED25519, verifying_key_vec) - .unwrap(); - String::from(&near_pk) - } -} - -pub(super) fn test_contract() -> Contract { - let signing_key = SigningKey::from_bytes(&[1u8; 32]); - Contract { - account_id: TEST_CONTRACT_ACCOUNT.parse().unwrap(), - signing_key, - } -} - -pub(super) fn compiled_test_contract_wasm() -> &'static [u8] { - chain_gateway_test_contract::compiled_wasm() -} diff --git a/crates/chain-gateway/tests/common/localnet.rs b/crates/chain-gateway/tests/common/localnet.rs index d70ba2a07..1a3b504ea 100644 --- a/crates/chain-gateway/tests/common/localnet.rs +++ b/crates/chain-gateway/tests/common/localnet.rs @@ -1,10 +1,13 @@ use std::time::{Duration, Instant}; +use chain_gateway::event_subscriber::block_events::BlockUpdate; +use chain_gateway::event_subscriber::subscriber::BlockEventSubscriber; use chain_gateway::state_viewer::ViewMethod; use chain_gateway::types::{NoArgs, ObservedState}; use chain_gateway_test_contract::VIEW_METHOD; +use ed25519_dalek::SigningKey; -use super::contract::{Contract, compiled_test_contract_wasm, test_contract}; +use super::accounts::{Contract, TestAccount, compiled_test_contract_wasm, test_contract}; use super::node::{LocalNode, LocalNodeBuilder}; pub struct Localnet { @@ -14,23 +17,70 @@ pub struct Localnet { } impl Localnet { - /// Two-node setup for sender tests. + /// Takes the block update receiver from the observer, panics if already taken. + pub fn take_block_update_receiver(&mut self) -> tokio::sync::mpsc::Receiver { + self.observer + .block_update_receiver + .take() + .expect("block_update_receiver already taken") + } + + pub async fn new(contract_id: near_account_id::AccountId) -> Self { + LocalnetBuilder::new(contract_id).build().await + } +} + +pub struct LocalnetBuilder { + contract_id: near_account_id::AccountId, + test_account: Option, + event_subscriber: Option, +} + +impl LocalnetBuilder { + pub fn new(contract_id: near_account_id::AccountId) -> Self { + LocalnetBuilder { + contract_id, + event_subscriber: None, + test_account: None, + } + } + + pub fn with_event_subscriber(mut self, subscriber: BlockEventSubscriber) -> Self { + self.event_subscriber = Some(subscriber); + self + } + + pub fn with_test_account( + mut self, + test_account_id: near_account_id::AccountId, + ) -> (Self, TestAccount) { + let signing_key = SigningKey::from_bytes(&[3u8; 32]); + let test_account = TestAccount::new(test_account_id, signing_key); + self.test_account = Some(test_account.clone()); + (self, test_account) + } + + /// Build and start the two-node localnet. /// /// The observer is a non-validator node that syncs from the validator. /// Genesis is copied from the validator to ensure both nodes share the same chain. - pub async fn new() -> Self { + pub async fn build(self) -> Localnet { let validator_home = make_test_home_dir("validator.near"); let observer_home = make_test_home_dir("observer.near"); - let contract = test_contract(); + let contract = test_contract(self.contract_id); - // start a validator node (this is what the MPC node calls the "near indexer node") - let validator = LocalNodeBuilder::new("validator", validator_home) + // Start a validator node (this is what the MPC node calls the "near indexer node"). + let mut validator = LocalNodeBuilder::new("validator", validator_home) .with_consensus_min_peers(1) - .with_contract(contract.clone(), compiled_test_contract_wasm()) - .start() - .await; + .with_contract(contract.clone(), compiled_test_contract_wasm()); + + if let Some(test_account) = self.test_account { + validator = validator.with_account(test_account); + } + + let validator = validator.start(None).await; - // start an observer node (non-validator, just like what the MPC node would be running) + // Start an observer node (non-validator, just like what the MPC node would be running). // Copy genesis from validator so both nodes share the same chain // (indexer_init_configs embeds genesis_time = Utc::now(), so independent // genesis generation produces different genesis hashes). @@ -42,7 +92,7 @@ impl Localnet { .with_genesis_from(&validator) .without_validator_key() .with_boot_nodes(&boot_node) - .start() + .start(self.event_subscriber) .await; let localnet = Localnet { @@ -107,7 +157,7 @@ impl Localnet { } } -/// Returns a temp directory +/// Returns a temp directory. /// The returned `TempDir` is automatically deleted when dropped. fn make_test_home_dir(account_id: &str) -> tempfile::TempDir { tempfile::Builder::new() diff --git a/crates/chain-gateway/tests/common/node.rs b/crates/chain-gateway/tests/common/node.rs index d60048c15..351af2417 100644 --- a/crates/chain-gateway/tests/common/node.rs +++ b/crates/chain-gateway/tests/common/node.rs @@ -1,16 +1,20 @@ use base64::Engine; -use chain_gateway::{ChainGateway, NodeHandle}; +use chain_gateway::{ + ChainGateway, NodeHandle, + event_subscriber::{block_events::BlockUpdate, subscriber::BlockEventSubscriber}, +}; use near_indexer::near_primitives::types::Finality; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use tempfile::TempDir; -use super::contract::Contract; +use super::accounts::{Contract, TestAccount}; pub struct LocalNode { pub home_dir: TempDir, pub ports: PortsConfig, pub chain_gateway: ChainGateway, pub node_handle: NodeHandle, + pub block_update_receiver: Option>, } #[derive(Clone)] @@ -25,7 +29,7 @@ pub(crate) struct LocalNodeBuilder { } impl LocalNodeBuilder { - pub(crate) async fn start(self) -> LocalNode { + pub(crate) async fn start(self, streamer_config: Option) -> LocalNode { let indexer_config = near_indexer::IndexerConfig { home_dir: self.home_dir.path().to_path_buf(), sync_mode: near_indexer::SyncModeEnum::LatestSynced, @@ -34,8 +38,8 @@ impl LocalNodeBuilder { validate_genesis: true, }; - let (chain_gateway, node_handle) = - chain_gateway::chain_gateway::ChainGateway::start(indexer_config) + let (chain_gateway, node_handle, block_update_receiver) = + chain_gateway::chain_gateway::ChainGateway::start(indexer_config, streamer_config) .await .expect("chain_gateway::start should succeed"); @@ -46,9 +50,9 @@ impl LocalNodeBuilder { ports: ports.unwrap(), chain_gateway, node_handle, + block_update_receiver, } } - pub(crate) fn new(node_name: &str, home_dir: TempDir) -> Self { let account_id = format!("{}.near", node_name); near_indexer::indexer_init_configs( @@ -138,66 +142,26 @@ impl LocalNodeBuilder { /// and AccessKey (FullAccess). This embeds the contract directly in genesis so /// we don't need to deploy via transaction. pub(crate) fn with_contract(self, contract: Contract, wasm: &[u8]) -> Self { - let genesis_path = self.home_dir.path().join("genesis.json"); - let genesis_text = std::fs::read_to_string(&genesis_path).expect("read genesis.json"); - let mut genesis: serde_json::Value = - serde_json::from_str(&genesis_text).expect("parse genesis.json"); - let existing_total_supply: u128 = genesis - .get("total_supply") - .unwrap() - .as_str() - .expect("total supply should be a string") - .parse() - .expect("total spply should parse as u128"); - let contract_amount: u128 = 10000000000000000000000000; - let total_supply = existing_total_supply + contract_amount; - *genesis.get_mut("total_supply").unwrap() = serde_json::json!(total_supply.to_string()); - - let code_hash = near_indexer::near_primitives::hash::hash(wasm).to_string(); - let code_base64 = base64::engine::general_purpose::STANDARD.encode(wasm); - - let records = genesis - .get_mut("records") - .expect("genesis should have records") - .as_array_mut() - .expect("records should be an array"); - - // Account record - records.push(serde_json::json!({ - "Account": { - "account_id": contract.account_id, - "account": { - "amount": contract_amount.to_string(), - "locked": "0", - "code_hash": code_hash.to_string(), - "storage_usage": 0, - "version": "V1" - } - } - })); - - // Contract record (code field uses base64 encoding, matching StateRecord serde) - records.push(serde_json::json!({ - "Contract": { - "account_id": contract.account_id, - "code": code_base64 - } - })); - - // AccessKey record - records.push(serde_json::json!({ - "AccessKey": { - "account_id": contract.account_id, - "public_key": contract.public_key_str(), - "access_key": { - "nonce": 0, - "permission": "FullAccess" - } - } - })); + inject_genesis_account( + &self.home_dir.path().join("genesis.json"), + &contract.account_id, + &contract.public_key_str(), + Some(wasm), + ); + self + } - let updated = serde_json::to_string_pretty(&genesis).expect("serialize genesis.json"); - std::fs::write(&genesis_path, updated).expect("write genesis.json"); + /// Inject a plain account (no contract code) into genesis.json before the node starts. + /// + /// Adds Account and AccessKey records. Use this for user accounts that only need + /// to sign transactions, not deploy code. + pub(crate) fn with_account(self, account: TestAccount) -> Self { + inject_genesis_account( + &self.home_dir.path().join("genesis.json"), + &account.account_id, + &account.public_key_str(), + None, + ); self } @@ -242,3 +206,81 @@ impl PortsConfig { } } } + +/// Shared helper for injecting an account into genesis.json. +/// +/// When `wasm` is `Some`, the account is treated as a contract: the code hash is +/// computed from the WASM bytes and a `Contract` record is added. +/// When `wasm` is `None`, the account is a plain user account with default code hash. +fn inject_genesis_account( + genesis_path: &Path, + account_id: &near_account_id::AccountId, + public_key_str: &str, + wasm: Option<&[u8]>, +) { + let genesis_text = std::fs::read_to_string(genesis_path).expect("read genesis.json"); + let mut genesis: serde_json::Value = + serde_json::from_str(&genesis_text).expect("parse genesis.json"); + + let existing_total_supply: u128 = genesis + .get("total_supply") + .unwrap() + .as_str() + .expect("total supply should be a string") + .parse() + .expect("total supply should parse as u128"); + let amount: u128 = 10000000000000000000000000; + let total_supply = existing_total_supply + amount; + *genesis.get_mut("total_supply").unwrap() = serde_json::json!(total_supply.to_string()); + + let code_hash = match wasm { + Some(bytes) => near_indexer::near_primitives::hash::hash(bytes).to_string(), + None => "11111111111111111111111111111111".to_string(), + }; + + let records = genesis + .get_mut("records") + .expect("genesis should have records") + .as_array_mut() + .expect("records should be an array"); + + // Account record + records.push(serde_json::json!({ + "Account": { + "account_id": account_id, + "account": { + "amount": amount.to_string(), + "locked": "0", + "code_hash": code_hash, + "storage_usage": 0, + "version": "V1" + } + } + })); + + // Contract record (only for contract accounts) + if let Some(bytes) = wasm { + let code_base64 = base64::engine::general_purpose::STANDARD.encode(bytes); + records.push(serde_json::json!({ + "Contract": { + "account_id": account_id, + "code": code_base64 + } + })); + } + + // AccessKey record + records.push(serde_json::json!({ + "AccessKey": { + "account_id": account_id, + "public_key": public_key_str, + "access_key": { + "nonce": 0, + "permission": "FullAccess" + } + } + })); + + let updated = serde_json::to_string_pretty(&genesis).expect("serialize genesis.json"); + std::fs::write(genesis_path, updated).expect("write genesis.json"); +} diff --git a/crates/chain-gateway/tests/event_subscriber_integration.rs b/crates/chain-gateway/tests/event_subscriber_integration.rs new file mode 100644 index 000000000..64039c2c9 --- /dev/null +++ b/crates/chain-gateway/tests/event_subscriber_integration.rs @@ -0,0 +1,425 @@ +use std::time::Duration; + +use chain_gateway::{ + Gas, + event_subscriber::{ + block_events::{ + BlockEventId, BlockUpdate, EventData, ExecutorFunctionCallSuccessWithPromiseData, + ReceiverFunctionCallData, + }, + subscriber::{BlockEventFilter, BlockEventSubscriber}, + }, + state_viewer::{SubscribeToContractMethod, WatchContractState}, + transaction_sender::{SubmitFunctionCall, TransactionSigner}, +}; +use chain_gateway_test_contract as test_contract; +use common::localnet::Localnet; +use rstest::rstest; + +use crate::common::{accounts::TestAccount, localnet::LocalnetBuilder}; + +use super::common; + +const EVENT_TIMEOUT: Duration = Duration::from_secs(10); +struct ExecutorFunctionCallTest { + test_account: TestAccount, + contract_id: near_account_id::AccountId, + localnet: Localnet, + receiver: tokio::sync::mpsc::Receiver, + set_value_in_promise_event_id: BlockEventId, +} + +async fn setup_executor_function_call_filter() -> ExecutorFunctionCallTest { + let contract_id: near_account_id::AccountId = "test-contract.near".parse().unwrap(); + let mut subscriber = BlockEventSubscriber::new(1); + let set_value_in_promise_event_id = + subscriber.subscribe(BlockEventFilter::ExecutorFunctionCallSuccessWithPromise { + transaction_outcome_executor_id: contract_id.clone(), + method_name: test_contract::SET_VALUE_IN_PROMISE.to_string(), + }); + + let localnet = LocalnetBuilder::new(contract_id.clone()); + let (localnet, test_account) = + localnet.with_test_account("test-subscriber-sender.near".parse().unwrap()); + let mut localnet = localnet.with_event_subscriber(subscriber).build().await; + let receiver = localnet.take_block_update_receiver(); + ExecutorFunctionCallTest { + test_account, + contract_id, + localnet, + receiver, + set_value_in_promise_event_id, + } +} + +/// Spins up a two-node localnet where the observer is started with a +/// `BlockEventSubscriber` filtering for executor function calls. +/// Ensures happy path: successful calls are tracked. +#[tokio::test] +async fn test_event_subscriber_executor_function_call_success_success_calls_are_tracked() { + // Given: A subscription for tracking executions on contract_id.[`SET_VALUE_IN_PROMISE`] + let ExecutorFunctionCallTest { + test_account, + contract_id, + localnet, + mut receiver, + set_value_in_promise_event_id, + } = setup_executor_function_call_filter().await; + let observer_gw = &localnet.observer.chain_gateway; + + // When: A transaction returning a promise succeeds + let args = test_contract::SetValueInPromiseArgs { + value: "succeeded".to_string(), + return_error: false, + }; + let args = serde_json::to_vec(&serde_json::json!({ "args": args })).unwrap(); + + observer_gw + .submit_function_call_tx( + &test_account.signer, + contract_id, + test_contract::SET_VALUE_IN_PROMISE.to_string(), + args.clone(), + Gas::from_teragas(test_contract::SET_VALUE_IN_PROMISE_TGAS), + ) + .await + .unwrap(); + + // Then: expect a matching block update + let BlockUpdate { events, .. } = tokio::time::timeout(EVENT_TIMEOUT, receiver.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(events.len(), 1); + + let matched = events + .iter() + .find(|e| e.id == set_value_in_promise_event_id) + .expect("expected executor event"); + + let EventData::ExecutorFunctionCallSuccessWithPromise( + ExecutorFunctionCallSuccessWithPromiseData { + ref predecessor_id, + ref args_raw, + .. + }, + ) = matched.event_data + else { + panic!("expected ExecutorFunctionCallSuccessWithPromise"); + }; + + assert_eq!( + *predecessor_id, test_account.account_id, + "predecessor_id should match user account" + ); + assert_eq!(*args_raw, args, "args must match"); + + localnet.shutdown().await; +} + +/// Ensures failure path: if spawning the promise fails, no executor event is logged. +#[tokio::test] +async fn test_event_subscriber_executor_function_call_success_failure_calls_are_ignored() { + // Given: A subscription for tracking executions on contract_id.[`SET_VALUE_IN_PROMISE`] + let ExecutorFunctionCallTest { + test_account, + contract_id, + localnet, + mut receiver, + set_value_in_promise_event_id: _, + } = setup_executor_function_call_filter().await; + let observer_gw = &localnet.observer.chain_gateway; + + // When: + // A transaction calls contract.SET_VALUE_IN_PROMISE but the spawned promise fails. + // Add a backmarker to not wait indefinitely or be subject to race conditions. + let end_marker: &str = "if you read this, you can be sure that the spawned promise has failed"; + let args = test_contract::SetValueWithMarker { + successfully_spawn_promise: false, + end_marker: end_marker.to_string(), + }; + let args = serde_json::to_vec(&serde_json::json!({ "args": args })).unwrap(); + + observer_gw + .submit_function_call_tx( + &test_account.signer, + contract_id.clone(), + test_contract::SPAWN_PROMISE_WITH_CALLBACK.to_string(), + args.clone(), + Gas::from_teragas(test_contract::SPAWN_PROMISE_WITH_CALLBACK_TGAS), + ) + .await + .unwrap(); + let mut watch_value = observer_gw + .subscribe_to_contract_method::(contract_id, test_contract::VIEW_METHOD) + .await; + + loop { + if watch_value + .latest() + .expect("we don't expect an error") + .value + == end_marker + { + break; + } + tokio::time::timeout(EVENT_TIMEOUT, watch_value.changed()) + .await + .unwrap() + .unwrap(); + } + + drop(watch_value); + + assert!( + receiver.is_empty(), + "expected no executor events for a failed call, found: {:?}", + receiver.recv().await.unwrap() + ); + + localnet.shutdown().await; +} + +struct ReceiverFunctionCallTest { + test_account: TestAccount, + contract_id: near_account_id::AccountId, + contract_signer: TransactionSigner, + localnet: Localnet, + receiver: tokio::sync::mpsc::Receiver, + private_set_event_id: BlockEventId, +} + +async fn setup_receiver_function_call_filter() -> ReceiverFunctionCallTest { + let contract_id: near_account_id::AccountId = "test-contract.near".parse().unwrap(); + let mut subscriber = BlockEventSubscriber::new(1); + let private_set_event_id = subscriber.subscribe(BlockEventFilter::ReceiverFunctionCall { + receipt_receiver_id: contract_id.clone(), + method_name: test_contract::PRIVATE_SET.to_string(), + }); + + let localnet = LocalnetBuilder::new(contract_id.clone()); + let (localnet, test_account) = + localnet.with_test_account("test-subscriber-sender.near".parse().unwrap()); + let mut localnet = localnet.with_event_subscriber(subscriber).build().await; + let contract_signer = + TransactionSigner::from_key(contract_id.clone(), localnet.contract.signing_key.clone()); + let receiver = localnet.take_block_update_receiver(); + ReceiverFunctionCallTest { + test_account, + contract_id, + contract_signer, + localnet, + receiver, + private_set_event_id, + } +} + +/// Ensures `ReceiverFunctionCall` registers for private methods that return success. +#[tokio::test] +#[rstest] +#[case::successful_calls_will_be_logged(true)] +#[case::failed_calls_will_be_logged(false)] +async fn test_event_subscriber_receiver(#[case] expect_success: bool) { + // Given: A subscription for tracking calls to the private contract_id.PRIVATE_SET + let ReceiverFunctionCallTest { + test_account: _, + contract_id, + contract_signer, + localnet, + mut receiver, + private_set_event_id, + } = setup_receiver_function_call_filter().await; + let observer_gw = &localnet.observer.chain_gateway; + + // When: the contract calls itself: + let args = test_contract::PrivateSetArgs { + value: "maybe it works, maybe it doesn't".to_string(), + succeeds: expect_success, + }; + let args = serde_json::to_vec(&serde_json::json!({ "args": args })).unwrap(); + + observer_gw + .submit_function_call_tx( + &contract_signer, + contract_id, + test_contract::PRIVATE_SET.to_string(), + args.clone(), + Gas::from_teragas(test_contract::PRIVATE_SET_ARGS_TGAS), + ) + .await + .unwrap(); + + // Then: expect a matching block update + let BlockUpdate { events, .. } = tokio::time::timeout(EVENT_TIMEOUT, receiver.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(events.len(), 1); + + let matched = events + .iter() + .find(|e| e.id == private_set_event_id) + .expect("expected executor event"); + + let EventData::ReceiverFunctionCall(ReceiverFunctionCallData { is_success, .. }) = + matched.event_data + else { + panic!("expected ReceiverFunctionCall"); + }; + assert_eq!(is_success, expect_success); + + localnet.shutdown().await; +} + +/// Ensures `ReceiverFunctionCall` for private methods called by non-contract +/// are registered but have an error (NEAR rejects: predecessor != contract). +#[tokio::test] +async fn test_event_subscriber_receiver_error_if_non_private_call() { + // Given: A subscription for tracking calls to the private contract_id.PRIVATE_SET + let ReceiverFunctionCallTest { + test_account, + contract_id, + contract_signer: _, + localnet, + mut receiver, + private_set_event_id, + } = setup_receiver_function_call_filter().await; + let observer_gw = &localnet.observer.chain_gateway; + + // When: other than the contract calls it: + let args = test_contract::PrivateSetArgs { + value: "this will fail".to_string(), + succeeds: true, + }; + let args = serde_json::to_vec(&serde_json::json!({ "args": args })).unwrap(); + + observer_gw + .submit_function_call_tx( + &test_account.signer, + contract_id, + test_contract::PRIVATE_SET.to_string(), + args.clone(), + Gas::from_teragas(test_contract::PRIVATE_SET_ARGS_TGAS), + ) + .await + .unwrap(); + + // Then: expect a matching block update + let BlockUpdate { events, .. } = tokio::time::timeout(EVENT_TIMEOUT, receiver.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(events.len(), 1); + + let matched = events + .iter() + .find(|e| e.id == private_set_event_id) + .expect("expected executor event"); + + let EventData::ReceiverFunctionCall(ReceiverFunctionCallData { is_success, .. }) = + matched.event_data + else { + panic!("expected ReceiverFunctionCall"); + }; + assert!(!is_success); + + localnet.shutdown().await; +} + +/// Verifies that the send-timeout circuit breaker works: when the consumer does not read from the +/// channel and the buffer is full, `listen_blocks` exits with `BlockEventBufferFull` and the +/// receiver channel closes. +#[tokio::test] +async fn test_event_subscriber_backpressure_buffer_full_closes_channel() { + // Given: subscribing to three events that fire in three different blocks + let contract_id: near_account_id::AccountId = + "test-backpressure-handling.near".parse().unwrap(); + let mut subscriber = + BlockEventSubscriber::new(1).with_backpressure_timeout(Duration::from_nanos(1)); + let _ = subscriber.subscribe(BlockEventFilter::ExecutorFunctionCallSuccessWithPromise { + transaction_outcome_executor_id: contract_id.clone(), + method_name: test_contract::SPAWN_PROMISE_WITH_CALLBACK.to_string(), + }); + let _ = subscriber.subscribe(BlockEventFilter::ExecutorFunctionCallSuccessWithPromise { + transaction_outcome_executor_id: contract_id.clone(), + method_name: test_contract::SET_VALUE_IN_PROMISE.to_string(), + }); + let _ = subscriber.subscribe(BlockEventFilter::ReceiverFunctionCall { + receipt_receiver_id: contract_id.clone(), + method_name: test_contract::PRIVATE_SET.to_string(), + }); + let localnet = LocalnetBuilder::new(contract_id.clone()); + let (localnet, test_account) = + localnet.with_test_account("test-subscriber-sender.near".parse().unwrap()); + let mut localnet = localnet.with_event_subscriber(subscriber).build().await; + let mut receiver = localnet.take_block_update_receiver(); + + const MARKER: &str = "race condition avoided"; + + // When: We call the method, leading to the promise chain + let args = test_contract::SetValueWithMarker { + successfully_spawn_promise: true, + end_marker: MARKER.to_string(), + }; + let args = serde_json::to_vec(&serde_json::json!({ "args": args })).unwrap(); + + let observer_gw = &localnet.observer.chain_gateway; + + observer_gw + .submit_function_call_tx( + &test_account.signer, + contract_id.clone(), + test_contract::SPAWN_PROMISE_WITH_CALLBACK.to_string(), + args.clone(), + Gas::from_teragas(test_contract::SPAWN_PROMISE_WITH_CALLBACK_TGAS), + ) + .await + .unwrap(); + + // wait for change to take effect + let mut watch_value = observer_gw + .subscribe_to_contract_method::(contract_id, test_contract::VIEW_METHOD) + .await; + + loop { + if watch_value + .latest() + .expect("we don't expect an error") + .value + == *MARKER + { + break; + } + tokio::time::timeout(EVENT_TIMEOUT, watch_value.changed()) + .await + .unwrap() + .unwrap(); + } + + drop(watch_value); + + // Then: expect the sender to drop the channel and the streamer to close + let mut received_blocks = 0u32; + let closed = tokio::time::timeout(Duration::from_secs(30), async { + // drain the only event that squeezed through before the timeout. + while receiver.recv().await.is_some() { + received_blocks += 1; + } + }) + .await; + + assert!( + closed.is_ok(), + "receiver channel should have closed (listen_blocks exited with BlockEventBufferFull)" + ); + + assert_eq!( + received_blocks, 1, + "buffer size was one, we only expect one block update before the stream closes" + ); + + localnet.shutdown().await; +} diff --git a/crates/chain-gateway/tests/sender_integration.rs b/crates/chain-gateway/tests/sender_integration.rs index 416aa7797..99b9e7dbc 100644 --- a/crates/chain-gateway/tests/sender_integration.rs +++ b/crates/chain-gateway/tests/sender_integration.rs @@ -1,13 +1,11 @@ -use std::sync::Arc; use std::time::{Duration, Instant}; -use chain_gateway::state_viewer::ViewMethod; -use chain_gateway::transaction_sender::{SubmitFunctionCall, TransactionSigner}; +use chain_gateway::Gas; use chain_gateway::types::NoArgs; -use chain_gateway_test_contract::{DEFAULT_VALUE, VIEW_METHOD}; -use common::localnet::Localnet; +use chain_gateway::{state_viewer::ViewMethod, transaction_sender::SubmitFunctionCall}; +use chain_gateway_test_contract as test_contract; -use super::common; +use crate::common::localnet::LocalnetBuilder; /// This integration test uses the `ChainGateway` struct to spin up two neard nodes /// for a localnet. One of the nodes is an observer node (what the MPC node would be running), @@ -20,39 +18,34 @@ use super::common; /// sign and route the transaction. #[tokio::test] async fn test_submit_set_value_and_read_back() { - let localnet = Localnet::new().await; - let observer = &localnet.observer; - let contract = &localnet.contract; - let contract_id = contract.account_id.clone(); + let contract_id: near_account_id::AccountId = "test-contract-sender.near".parse().unwrap(); + let localnet = LocalnetBuilder::new(contract_id.clone()); + let (localnet, user) = localnet.with_test_account("dummy_user.near".parse().unwrap()); + let signer = user.signer; + let localnet = localnet.build().await; + let observer_gw = &localnet.observer.chain_gateway; // Verify initial state: get_value should return DEFAULT_VALUE - let initial: chain_gateway::types::ObservedState = observer - .chain_gateway - .view_method(contract_id.clone(), VIEW_METHOD, &NoArgs {}) + let initial: chain_gateway::types::ObservedState = observer_gw + .view_method(contract_id.clone(), test_contract::VIEW_METHOD, &NoArgs {}) .await .expect("initial view call should succeed"); - assert_eq!(initial.value, DEFAULT_VALUE); + assert_eq!(initial.value, test_contract::DEFAULT_VALUE); - // Submit set_value transaction via the observer + // Submit set_value transaction via the observer, using a separate user account let new_value = "updated by sender test"; - let args = serde_json::json!({ "value": new_value }); - let signer = Arc::new(TransactionSigner::from_key( - contract_id.clone(), - contract.signing_key.clone(), - )); - observer - .chain_gateway + observer_gw .submit_function_call_tx( &signer, contract_id.clone(), - "set_value".to_string(), - serde_json::to_vec(&args).unwrap(), - near_indexer_primitives::types::Gas::from_teragas(30), + test_contract::SET_VALUE.to_string(), + serde_json::to_vec(&serde_json::json!({ "value": new_value })).unwrap(), + Gas::from_teragas(30), ) .await - .expect("submit_function_call_tx should succeed"); + .unwrap(); // Poll get_value until state reflects the new value. let deadline = Instant::now() + Duration::from_secs(30); @@ -60,9 +53,8 @@ async fn test_submit_set_value_and_read_back() { loop { localnet.assert_nodes_alive(); - let result: chain_gateway::types::ObservedState = observer - .chain_gateway - .view_method(contract_id.clone(), VIEW_METHOD, &NoArgs {}) + let result: chain_gateway::types::ObservedState = observer_gw + .view_method(contract_id.clone(), test_contract::VIEW_METHOD, &NoArgs {}) .await .expect("view call should succeed"); diff --git a/crates/chain-gateway/tests/state_viewer_integration.rs b/crates/chain-gateway/tests/state_viewer_integration.rs index 863dc3e33..036e2609e 100644 --- a/crates/chain-gateway/tests/state_viewer_integration.rs +++ b/crates/chain-gateway/tests/state_viewer_integration.rs @@ -4,37 +4,35 @@ use chain_gateway::state_viewer::WatchContractState; use chain_gateway::state_viewer::{SubscribeToContractMethod, ViewMethod}; use chain_gateway::types::NoArgs; use chain_gateway::types::ObservedState; -use chain_gateway_test_contract::{DEFAULT_VALUE, VIEW_METHOD}; +use chain_gateway_test_contract as test_contract; use crate::common::localnet::Localnet; /// Checks if viewing a valid contract method succeeds #[tokio::test] async fn test_view_method_contract_state() { - let localnet = Localnet::new().await; - let contract_account_id = localnet.contract.account_id.clone(); + let contract_id: near_account_id::AccountId = "test-contract-view.near".parse().unwrap(); + let localnet = Localnet::new(contract_id.clone()).await; + let observer_gw = &localnet.observer.chain_gateway; - let value: ObservedState = localnet - .observer - .chain_gateway - .view_method(contract_account_id, VIEW_METHOD, &NoArgs {}) + let value: ObservedState = observer_gw + .view_method(contract_id, test_contract::VIEW_METHOD, &NoArgs {}) .await .expect("view call should succeed"); - assert_eq!(value.value, DEFAULT_VALUE); + assert_eq!(value.value, test_contract::DEFAULT_VALUE); localnet.shutdown().await; } /// Checks if viewing an invalid contract method fails #[tokio::test] async fn test_view_method_nonexistent_method_returns_error() { - let localnet = Localnet::new().await; - let contract_account_id = localnet.contract.account_id.clone(); + let contract_id: near_account_id::AccountId = "test-contract-view-error.near".parse().unwrap(); + let localnet = Localnet::new(contract_id.clone()).await; + let observer_gw = &localnet.observer.chain_gateway; - let result = localnet - .observer - .chain_gateway - .view_method::(contract_account_id, "nonexistent", &NoArgs {}) + let result = observer_gw + .view_method::(contract_id, "nonexistent", &NoArgs {}) .await; let err = result.expect_err("calling a nonexistent method should fail"); @@ -45,18 +43,17 @@ async fn test_view_method_nonexistent_method_returns_error() { /// Checks if subscribing to the state succeeds #[tokio::test] async fn test_subscription_receives_initial_value() { - let localnet = Localnet::new().await; - let contract_account_id = localnet.contract.account_id.clone(); + let contract_id: near_account_id::AccountId = "test-contract-subscribe.near".parse().unwrap(); + let localnet = Localnet::new(contract_id.clone()).await; + let observer_gw = &localnet.observer.chain_gateway; { - let mut sub = localnet - .observer - .chain_gateway - .subscribe_to_contract_method::(contract_account_id, VIEW_METHOD) + let mut sub = observer_gw + .subscribe_to_contract_method::(contract_id, test_contract::VIEW_METHOD) .await; let res = sub.latest().expect("subscription latest should succeed"); - assert_eq!(res.value, DEFAULT_VALUE); + assert_eq!(res.value, test_contract::DEFAULT_VALUE); } localnet.shutdown().await; } diff --git a/crates/chain-gateway/tests/test.rs b/crates/chain-gateway/tests/test.rs index bd61b59b9..548a87e40 100644 --- a/crates/chain-gateway/tests/test.rs +++ b/crates/chain-gateway/tests/test.rs @@ -1,3 +1,5 @@ mod common; +mod event_subscriber_integration; mod sender_integration; mod state_viewer_integration; +mod view_subscription; diff --git a/crates/chain-gateway/tests/view_subscription.rs b/crates/chain-gateway/tests/view_subscription.rs new file mode 100644 index 000000000..43ceb272c --- /dev/null +++ b/crates/chain-gateway/tests/view_subscription.rs @@ -0,0 +1,53 @@ +use std::time::Duration; + +use chain_gateway::{ + Gas, + state_viewer::{SubscribeToContractMethod, WatchContractState}, + transaction_sender::SubmitFunctionCall, +}; +use chain_gateway_test_contract::{DEFAULT_VALUE, SET_VALUE, VIEW_METHOD}; + +use crate::common::localnet::LocalnetBuilder; + +/// Checks if subscribing to the state succeeds +#[tokio::test] +async fn test_subscription() { + let contract_id: near_account_id::AccountId = + "test-contract-subscription.near".parse().unwrap(); + let localnet = LocalnetBuilder::new(contract_id.clone()); + let (localnet, user) = localnet.with_test_account("dummy_user.near".parse().unwrap()); + let signer = user.signer; + let localnet = localnet.build().await; + let observer_gw = &localnet.observer.chain_gateway; + + let mut sub = observer_gw + .subscribe_to_contract_method::(contract_id.clone(), VIEW_METHOD) + .await; + + let res = sub.latest().expect("subscription latest should succeed"); + assert_eq!(res.value, DEFAULT_VALUE); + + // Submit set_value transaction via the observer, using a separate user account + let new_value = "updated by sender test"; + + observer_gw + .submit_function_call_tx( + &signer, + contract_id.clone(), + SET_VALUE.to_string(), + serde_json::to_vec(&serde_json::json!({ "value": new_value })).unwrap(), + Gas::from_teragas(30), + ) + .await + .unwrap(); + + tokio::time::timeout(Duration::from_secs(30), sub.changed()) + .await + .expect("expect subscription to fire on change") + .expect("expect changed to succeed"); + let result = sub.latest().unwrap(); + assert_eq!(result.value, new_value); + + drop(sub); + localnet.shutdown().await; +} diff --git a/docs/chain-gateway-design.md b/docs/chain-gateway-design.md index 65536d985..0a843671b 100644 --- a/docs/chain-gateway-design.md +++ b/docs/chain-gateway-design.md @@ -502,26 +502,20 @@ If we want this interface to be re-usable in other parts of our code, we can cre ```rust impl BlockEventSubscriber { - pub fn new(subscription_replay: SubscriptionReplay) -> Self; - - /// Configure queue size between producer and consumer. - /// we can define overflow behavior later, by default we could just stop producing (neard indexer will consume unlimited amount of memory). - pub fn buffer_size(&mut self, n: usize) -> Self; + /// Create a new subscriber with the given channel buffer size. + pub fn new(buffer_size: usize) -> Self; /// Add a subscription and get a unique identifier for it. - /// Can be called multiple times before build(). - /// the identifier can be used to match a return value to the given subscription id. - pub fn add_subscription(&mut self, filter: SubscriptionFilter) -> SubscriptionId; - - /// Finalise and start streaming. - pub async fn start(&mut self) -> Result, BuilderError>; + /// Can be called multiple times before passing the subscriber to `ChainGateway::start()`. + /// The identifier can be used to match returned events to the given subscription. + pub fn subscribe(&mut self, filter: BlockEventFilter) -> BlockEventId; } -/// an identifier for a subscription -pub struct SubscriptionId(pub u64); +/// An identifier for a subscription, returned by `subscribe()`. +pub struct BlockEventId(pub u64); -/// Filter - can be easily extended later -pub enum SubscriptionFilter { +/// Filter — can be easily extended later. +pub enum BlockEventFilter { /// Filter for events where a receipt outcome was executed by `transaction_outcome_executor_id` and called `method_name`. ExecutorFunctionCall { transaction_outcome_executor_id: AccountId, @@ -533,37 +527,38 @@ pub enum SubscriptionFilter { method_name: String, }, } +``` -/// we want to offer the possibility to re-play blocks if necessary (c.f. [#236](https://github.com/near/mpc/issues/236)) -pub enum SubscriptionReplay { - /// no replay, start once indexer has caught up to the current block height - None, - /// Start at a specific height - BlockHeight(u64), -} +> **Note:** Block replay from a specific height (`SubscriptionReplay`) is planned but not yet implemented. Replay leverages the NEAR indexer's `sync_from_block_height` config option and comes essentially for free in the chain-gateway implementation — we just need to expose it through the `BlockEventSubscriber` API. See [#236](https://github.com/near/mpc/issues/236). + +The subscriber is passed to `ChainGateway::start()`, which returns an `Option>`: +```rust +let (chain_gateway, node_handle, block_update_receiver) = + ChainGateway::start(indexer_config, Some(subscriber)).await?; ``` Example usage: ```rust +let mut subscriber = BlockEventSubscriber::new(100); -let mut subscriber = BlockEventSubscriber::new(SubscriptionReplay::None); - -let signature_requests_id = subscriber.add_subscription( - SubscriptionFilter::ExecutorFunctionCall { +let signature_requests_id = subscriber.subscribe( + BlockEventFilter::ExecutorFunctionCall { transaction_outcome_executor_id: "v1.signer".parse()?, method_name: "sign".to_string(), } ); -let ckd_request_id = subscriber.add_subscription( - SubscriptionFilter::ExecutorFunctionCall { +let ckd_request_id = subscriber.subscribe( + BlockEventFilter::ExecutorFunctionCall { transaction_outcome_executor_id: "v1.signer".parse()?, method_name: "request_app_private_key".to_string(), } ); -let mut block_stream_receiver : tokio::sync::mpsc::Receiver = subscriber.start().await?; +let (chain_gateway, node_handle, block_update_receiver) = + ChainGateway::start(indexer_config, Some(subscriber)).await?; +let mut block_stream_receiver = block_update_receiver.unwrap(); while let Some(update) = block_stream_receiver.recv().await { for matched in update.events { @@ -574,21 +569,20 @@ while let Some(update) = block_stream_receiver.recv().await { } } } - ``` Specific types (c.f. [Appendix](#current-block-update) and `indexer/handler.rs` for justification). ```rust /// The BlockUpdate returned by the Chain indexer. Similar to the current `BlockUpdate` pub struct BlockUpdate { - pub ctx: BlockContext, + pub context: BlockContext, pub events: Vec, } /// Context for a single block pub struct BlockContext { pub hash: CryptoHash, - pub height: u64, + pub height: BlockHeight, pub prev_hash: CryptoHash, pub last_final_block: CryptoHash, pub block_entropy: [u8; 32], @@ -596,34 +590,34 @@ pub struct BlockContext { } pub struct MatchedEvent { - /// this is needed such that the caller can identify the filter - pub id: SubscriptionId, - /// any data associated with that event + /// Identifies which subscription matched this event. + pub id: BlockEventId, + /// Data associated with the event. pub event_data: EventData, } -/// this can be extended if required +/// This can be extended if required. pub enum EventData { - ExecutorFunctionCall(ExecutorFunctionCallEventData), + ExecutorFunctionCall(ExecutorFunctionCallEventWithSuccessReceiptId), ReceiverFunctionCall(ReceiverFunctionCallEventData), } /// This event is associated to a transaction that matched a specific (transaction_outcome_executor_id: AccountId, method_name: String) pattern. -struct ExecutorFunctionCallEventData { +struct ExecutorFunctionCallEventWithSuccessReceiptId { /// the receipt_id of the receipt this event came from receipt_id: CryptoHash, /// predecessor_id who signed the transaction - predecessor_id : AccountId, + predecessor_id: AccountId, /// the receipt that will hold the outcome of this receipt next_receipt_id: CryptoHash, - /// raw bytes used for function call. Could probably also be a String. + /// raw bytes used for function call args_raw: Vec, } -/// This event is associated to a transaction that matched a specific SubscriptionFilter +/// This event is associated to a transaction that matched a specific BlockEventFilter. struct ReceiverFunctionCallEventData { - // the receipt id for the matched transaction - receipt_id: CrpytoHash, + /// the receipt id for the matched transaction + receipt_id: CryptoHash, } ```