Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/chain-gateway-test-contract/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ crate-type = ["cdylib", "lib"]
[dependencies]
borsh = { workspace = true }
near-sdk = { workspace = true }
serde_json = { workspace = true }

[lints]
workspace = true
Binary file not shown.
109 changes: 107 additions & 2 deletions crates/chain-gateway-test-contract/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,55 @@
use near_sdk::{env::log_str, near};
use near_sdk::{
Gas, NearToken, Promise,
env::{self, log_str},
near,
};

pub fn compiled_wasm() -> &'static [u8] {
include_bytes!("../res/chain_gateway_test_contract.wasm")
}

pub const DEFAULT_VALUE: &str = "hello from test";

// public view method
pub const VIEW_METHOD: &str = "view_value";
pub const WRITE_METHOD: &str = "set_value";

// public set method
pub const SET_VALUE: &str = "set_value";
pub const SET_VALUE_TGAS: u64 = FIVE_TGAS;

// teragas as u64. We don't use near_sdk::Gas on purpose, such that the near indexer can re-use
// these constants without depending on near_sdk.
pub const FIVE_TGAS: u64 = 5;

// methods that spawn promises
pub const SET_VALUE_IN_PROMISE: &str = "set_value_in_promise";
pub const SET_VALUE_IN_PROMISE_TGAS: u64 = SET_VALUE_TGAS + FIVE_TGAS;

pub const SPAWN_PROMISE_WITH_CALLBACK: &str = "spawn_promise_with_callback";
pub const SPAWN_PROMISE_WITH_CALLBACK_TGAS: u64 =
SET_VALUE_IN_PROMISE_TGAS + SET_VALUE_TGAS + FIVE_TGAS;

// private method for setting value
pub const PRIVATE_SET: &str = "private_set";
pub const PRIVATE_SET_ARGS_TGAS: u64 = 5;

#[near(serializers=[json])]
pub struct PrivateSetArgs {
pub value: String,
pub succeeds: bool,
}

#[near(serializers=[json])]
pub struct SetValueInPromiseArgs {
pub value: String,
pub return_error: bool,
}

#[near(serializers=[json])]
pub struct SetValueWithMarker {
pub successfully_spawn_promise: bool,
pub end_marker: String,
}

#[derive(Debug)]
#[near(contract_state)]
Expand All @@ -32,4 +75,66 @@ impl Contract {
log_str(&format!("Setting value to: {value}"));
self.stored_value = value;
}

/// Spawns a cross-contract promise to [`private_set`](Self::private_set), returning an error
/// if `return_error` is set. Used by integration tests to verify
/// `ExecutorFunctionCallSuccessWithPromise` event tracking.
#[handle_result]
pub fn set_value_in_promise(&mut self, args: SetValueInPromiseArgs) -> Result<Promise, String> {
if args.return_error {
Err("computer says no".to_string())
} else {
let private_set_args = PrivateSetArgs {
value: args.value,
succeeds: true,
};
Ok(Promise::new(env::current_account_id()).function_call(
PRIVATE_SET.to_string(),
serde_json::to_vec(&serde_json::json!({"args": private_set_args})).unwrap(),
NearToken::from_near(0),
Gas::from_tgas(PRIVATE_SET_ARGS_TGAS),
))
}
}

/// Chains two promises: first calls [`set_value_in_promise`](Self::set_value_in_promise)
/// (which may fail based on `successfully_spawn_promise`), then a callback that writes
/// `end_marker` via [`private_set`](Self::private_set). The callback acts as a
/// synchronization marker so tests can poll [`view_value`](Self::view_value) to know the
/// full chain completed.
pub fn spawn_promise_with_callback(args: SetValueWithMarker) -> Promise {
let set_value_args = SetValueInPromiseArgs {
value: "doesn't matter".to_string(),
return_error: !args.successfully_spawn_promise,
};
let promise = Promise::new(env::current_account_id()).function_call(
SET_VALUE_IN_PROMISE.to_string(),
serde_json::to_vec(&serde_json::json!({"args": set_value_args})).unwrap(),
NearToken::from_near(0),
Gas::from_tgas(SET_VALUE_IN_PROMISE_TGAS),
);
let private_set_args = PrivateSetArgs {
value: args.end_marker,
succeeds: true,
};
let callback = Promise::new(env::current_account_id()).function_call(
PRIVATE_SET.to_string(),
serde_json::to_vec(&serde_json::json!({"args": private_set_args})).unwrap(),
NearToken::from_near(0),
Gas::from_tgas(PRIVATE_SET_ARGS_TGAS),
);
promise.then(callback)
}

/// Can only be called by the contract itself (via a promise).
#[private]
#[handle_result]
pub fn private_set(&mut self, args: PrivateSetArgs) -> Result<(), String> {
if args.succeeds {
self.set_value(args.value);
Ok(())
} else {
Err("intentional error for testing".to_string())
}
}
}
104 changes: 84 additions & 20 deletions crates/chain-gateway/src/chain_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ use std::path::Path;

use near_account_id::AccountId;
use near_async::ActorSystem;
use near_indexer::StreamerMessage;
use near_indexer::near_primitives::transaction::SignedTransaction;
use nearcore::NearConfig;
use tokio::sync::mpsc::Receiver;

use crate::errors::{ChainGatewayError, NearClientError, NearRpcError, NearViewClientError};
use crate::event_subscriber;
use crate::event_subscriber::block_events::BlockUpdate;
use crate::event_subscriber::subscriber::BlockEventSubscriber;
use crate::near_internals_wrapper::{
NearClientActorHandle, NearRpcActorHandle, NearViewClientActorHandle,
};
Expand Down Expand Up @@ -92,41 +97,64 @@ impl NodeHandle {
}

impl ChainGateway {
/// Spawns a near node with `config`.
/// Spawns a near node with `indexer_config`.
/// The [`NodeHandle`] can be used to shut down the actor system for the node and liveness checks.
/// The node dies if [`NodeHandle`] is dropped.
/// Returns a stream for BlockUpdates if BlockEventSubscriber is not None.
pub async fn start(
config: near_indexer::IndexerConfig,
) -> Result<(ChainGateway, NodeHandle), ChainGatewayError> {
let near_config: NearConfig =
config
.load_near_config()
.map_err(|err| ChainGatewayError::FailureLoadingConfig {
msg: err.to_string(),
})?;
let home_dir = config.home_dir;
indexer_config: near_indexer::IndexerConfig,
subscriber: Option<BlockEventSubscriber>,
) -> Result<
(
ChainGateway,
NodeHandle,
Option<tokio::sync::mpsc::Receiver<BlockUpdate>>,
),
ChainGatewayError,
> {
let near_config: NearConfig = indexer_config.load_near_config().map_err(|err| {
ChainGatewayError::FailureLoadingConfig {
msg: err.to_string(),
}
})?;

let home_dir = indexer_config.home_dir.clone();
let streamer_setup = subscriber.map(|subscriber| StreamerSetup {
subscriber,
indexer_config,
near_config: near_config.clone(),
});

let (ready_sender, ready_receiver) = tokio::sync::oneshot::channel();
let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel();

let thread_handle = std::thread::spawn(move || {
// blocking method, resumes once `shutdown_sender` sends the shutdown signal
run_node(ready_sender, near_config, &home_dir, shutdown_receiver)
run_node(
ready_sender,
near_config,
&home_dir,
shutdown_receiver,
streamer_setup,
)
});

let chain_gateway = ready_receiver.await.expect("startup thread died")?;
let (chain_gateway, stream) = ready_receiver.await.expect("startup thread died")?;
let node_handle = NodeHandle {
thread_handle,
shutdown_sender: Some(shutdown_sender),
};
Ok((chain_gateway, node_handle))
Ok((chain_gateway, node_handle, stream))
}
}

type RunNodeResult = Result<(ChainGateway, Option<Receiver<BlockUpdate>>), ChainGatewayError>;

fn run_node(
ready_sender: tokio::sync::oneshot::Sender<Result<ChainGateway, ChainGatewayError>>,
ready_sender: tokio::sync::oneshot::Sender<RunNodeResult>,
near_config: nearcore::NearConfig,
home_dir: &Path,
shutdown_receiver: tokio::sync::oneshot::Receiver<()>,
streamer_setup: Option<StreamerSetup>,
) {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
Expand All @@ -146,15 +174,43 @@ fn run_node(
}
};

let indexer_and_params = streamer_setup.map(|s| {
let indexer =
near_indexer::Indexer::from_near_node(s.indexer_config, s.near_config, &near_node);
(indexer, s.subscriber)
});

let view_client = NearViewClientActorHandle::new(near_node.view_client);
let client = NearClientActorHandle::new(near_node.client);
let rpc_handler = NearRpcActorHandle::new(near_node.rpc_handler);

let _ = ready_sender.send(Ok(ChainGateway {
view_client,
client,
rpc_handler,
}));
let stream = if let Some((indexer, streamer_config)) = indexer_and_params {
let raw_stream: Receiver<StreamerMessage> = indexer.streamer();
match event_subscriber::streamer::start(
streamer_config,
raw_stream,
view_client.clone(),
)
.await
{
Ok(rx) => Some(rx),
Err(err) => {
let _ = ready_sender.send(Err(err));
return;
}
}
} else {
None
};

let _ = ready_sender.send(Ok((
ChainGateway {
view_client,
client,
rpc_handler,
},
stream,
)));

match shutdown_receiver.await {
Ok(()) => {
Expand All @@ -168,3 +224,11 @@ fn run_node(
actor_system.stop();
});
}

/// Parameters for optionally starting the block-event streaming pipeline
/// alongside the nearcore node.
struct StreamerSetup {
subscriber: BlockEventSubscriber,
indexer_config: near_indexer::IndexerConfig,
near_config: NearConfig,
}
11 changes: 11 additions & 0 deletions crates/chain-gateway/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,15 @@ pub enum ChainGatewayError {

#[error("view client error while {op}: {message}")]
ViewError { op: ChainGatewayOp, message: String },

#[error(
"block event channel has been full for too long, the receiver must drain messages otherwise we risk out of memory"
)]
BlockEventBufferFull,

#[error("near indexer dropped")]
BlockEventIndexerDropped,

#[error("block event stream has been closed or dropped by the receiver")]
BlockEventReceiverDropped,
}
4 changes: 4 additions & 0 deletions crates/chain-gateway/src/event_subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod block_events;
mod stats;
pub(super) mod streamer;
pub mod subscriber;
65 changes: 65 additions & 0 deletions crates/chain-gateway/src/event_subscriber/block_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use derive_more::{Deref, From};
use near_account_id::AccountId;
use near_indexer_primitives::CryptoHash;

use crate::types::BlockHeight;

/// The BlockUpdate returned by the Chain indexer.
/// Similar to [`ChainBlockUpdate`](../../node/src/indexer/handler.rs) in the `mpc-node` crate.
#[derive(Debug)]
pub struct BlockUpdate {
pub context: BlockContext,
pub events: Vec<MatchedEvent>,
}

/// Context for a single block
#[derive(Debug)]
pub struct BlockContext {
pub hash: CryptoHash,
pub height: BlockHeight,
pub prev_hash: CryptoHash,
pub last_final_block: CryptoHash,
pub block_entropy: [u8; 32],
pub block_timestamp_nanosec: u64,
}

#[derive(Debug)]
pub struct MatchedEvent {
/// this is needed such that the caller can identify the block event
pub id: BlockEventId,
/// any data associated with that event
pub event_data: EventData,
}

/// An identifier for a block event
#[derive(Debug, Deref, From, Clone, Copy, PartialEq, Eq)]
pub struct BlockEventId(pub u64);

/// Event data, matching a filter [`super::subscriber::BlockEventFilter`]
#[derive(Debug, PartialEq)]
pub enum EventData {
ExecutorFunctionCallSuccessWithPromise(ExecutorFunctionCallSuccessWithPromiseData),
ReceiverFunctionCall(ReceiverFunctionCallData),
}

/// Event data for a receipt matching a [`super::subscriber::BlockEventFilter::ExecutorFunctionCallSuccessWithPromise`]
#[derive(Debug, PartialEq)]
pub struct ExecutorFunctionCallSuccessWithPromiseData {
/// the receipt_id of the receipt this event came from
pub receipt_id: CryptoHash,
/// predecessor_id who signed the transaction
pub predecessor_id: AccountId,
/// the receipt that will hold the outcome of this receipt
pub next_receipt_id: CryptoHash,
/// raw bytes used for function call
pub args_raw: Vec<u8>,
}

/// Event data for a receipt matching a [`super::subscriber::BlockEventFilter::ReceiverFunctionCall`]
#[derive(Debug, PartialEq)]
pub struct ReceiverFunctionCallData {
/// the receipt id for the matched transaction
pub receipt_id: CryptoHash,
/// whether the execution outcome was successful
pub is_success: bool,
}
Loading
Loading