Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 59 additions & 19 deletions src/eth.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{Message, Request as KiRequest};
use alloy::rpc::json_rpc::ErrorPayload;
pub use alloy::rpc::types::pubsub::{Params, SubscriptionKind, SubscriptionResult};
pub use alloy::rpc::types::pubsub::Params;
pub use alloy::rpc::types::{
request::{TransactionInput, TransactionRequest},
Block, BlockId, BlockNumberOrTag, FeeHistory, Filter, FilterBlockOption, Log, Transaction,
Expand All @@ -10,19 +9,57 @@ pub use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes, TxHash, U128,
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};

/// Subscription kind. Pulled directly from alloy (https://github.com/alloy-rs/alloy).
/// Why? Because alloy is not yet 1.0 and the types in this interface must be stable.
/// If alloy SubscriptionKind changes, we can implement a transition function in runtime
/// for this type.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
pub enum SubscriptionKind {
/// New block headers subscription.
///
/// Fires a notification each time a new header is appended to the chain, including chain
/// reorganizations. In case of a chain reorganization the subscription will emit all new
/// headers for the new chain. Therefore the subscription can emit multiple headers on the same
/// height.
NewHeads,
/// Logs subscription.
///
/// Returns logs that are included in new imported blocks and match the given filter criteria.
/// In case of a chain reorganization previous sent logs that are on the old chain will be
/// resent with the removed property set to true. Logs from transactions that ended up in the
/// new chain are emitted. Therefore, a subscription can emit logs for the same transaction
/// multiple times.
Logs,
/// New Pending Transactions subscription.
///
/// Returns the hash or full tx for all transactions that are added to the pending state and
/// are signed with a key that is available in the node. When a transaction that was
/// previously part of the canonical chain isn't part of the new canonical chain after a
/// reorganization its again emitted.
NewPendingTransactions,
/// Node syncing status subscription.
///
/// Indicates when the node starts or stops synchronizing. The result can either be a boolean
/// indicating that the synchronization has started (true), finished (false) or an object with
/// various progress indicators.
Syncing,
}

/// The Action and Request type that can be made to eth:distro:sys. Any process with messaging
/// capabilities can send this action to the eth provider.
///
/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthAction {
/// Subscribe to logs with a custom filter. ID is to be used to unsubscribe.
/// Logs come in as alloy_rpc_types::pubsub::SubscriptionResults
/// Logs come in as JSON value which can be parsed to [`alloy::rpc::types::eth::pubsub::SubscriptionResult`]
SubscribeLogs {
sub_id: u64,
chain_id: u64,
kind: SubscriptionKind,
params: Params,
params: serde_json::Value,
},
/// Kill a SubscribeLogs subscription of a given ID, to stop getting updates.
UnsubscribeLogs(u64),
Expand All @@ -41,36 +78,38 @@ pub enum EthAction {
pub type EthSubResult = Result<EthSub, EthSubError>;

/// Incoming type for successful subscription updates.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EthSub {
pub id: u64,
pub result: SubscriptionResult,
/// can be parsed to [`alloy::rpc::types::eth::pubsub::SubscriptionResult`]
pub result: serde_json::Value,
}

/// If your subscription is closed unexpectedly, you will receive this.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EthSubError {
pub id: u64,
pub error: String,
}

/// The [`crate::Response`] type which a process will get from requesting with an [`EthAction`] will be
/// of this type, serialized and deserialized using `serde_json::to_vec`
/// and `serde_json::from_slice`.
/// The [`crate::Response`] body type which a process will get from requesting
/// with an [`EthAction`] will be of this type, serialized and deserialized
/// using [`serde_json::to_vec`] and [`serde_json::from_slice`].
///
/// In the case of an [`EthAction::SubscribeLogs`] request, the response will indicate if
/// the subscription was successfully created or not.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthResponse {
Ok,
Response { value: serde_json::Value },
Response(serde_json::Value),
Err(EthError),
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthError {
/// RPC provider returned an error
RpcError(ErrorPayload),
/// RPC provider returned an error.
/// Can be parsed to [`alloy::rpc::json_rpc::ErrorPayload`]
RpcError(serde_json::Value),
/// provider module cannot parse message
MalformedRequest,
/// No RPC provider for the chain
Expand All @@ -91,7 +130,7 @@ pub enum EthError {

/// The action type used for configuring eth:distro:sys. Only processes which have the "root"
/// [`crate::Capability`] from eth:distro:sys can successfully send this action.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthConfigAction {
/// Add a new provider to the list of providers.
AddProvider(ProviderConfig),
Expand Down Expand Up @@ -122,7 +161,7 @@ pub enum EthConfigAction {
}

/// Response type from an [`EthConfigAction`] request.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthConfigResponse {
Ok,
/// Response from a GetProviders request.
Expand Down Expand Up @@ -213,7 +252,7 @@ impl Provider {

match resp {
Message::Response { body, .. } => match serde_json::from_slice::<EthResponse>(&body) {
Ok(EthResponse::Response { value }) => {
Ok(EthResponse::Response(value)) => {
serde_json::from_value::<T>(value).map_err(|_| EthError::RpcMalformedResponse)
}
Ok(EthResponse::Err(e)) => Err(e),
Expand Down Expand Up @@ -596,7 +635,8 @@ impl Provider {
sub_id,
chain_id: self.chain_id,
kind: SubscriptionKind::Logs,
params: Params::Logs(Box::new(filter)),
params: serde_json::to_value(Params::Logs(Box::new(filter)))
.map_err(|_| EthError::InvalidParams)?,
};

let Ok(body) = serde_json::to_vec(&action) else {
Expand Down
Loading