Skip to content
Merged
240 changes: 146 additions & 94 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ alloy = { version = "1", features = ["full"] }
alloy-primitives = { version = "0.8.0", features = ["serde"] }
anyhow = "1.0"
async-trait = "0.1"
futures = "0.3.31"
futures-util = { version = "0.3.31", features = ["sink"] }
once_cell = "1.19"
reqwest = { version = "=0.12.23", features = ["json", "rustls-tls"] }
rmp-serde = "1.3.0"
rust_decimal = { version = "1.35", features = ["serde", "serde-float"] }
rust_decimal = { version = "1.35", features = ["serde", "serde-float", "serde-str", "serde-with-str"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["preserve_order"] }
thiserror = "1.0"
tokio = { version = "1.38", features = ["full"] }
tokio-tungstenite = { version = "0.28.0", features = ["native-tls"] }
tracing = "0.1"
tracing-subscriber = "0.3.22"
url = "2.5"
Expand All @@ -46,7 +49,7 @@ missing_docs = "allow"

[lints.clippy]
unwrap_used = "deny"
expect_used = "deny"

panic = "deny"
unreachable = "deny"
arithmetic_side_effects = "deny"
Expand All @@ -66,6 +69,7 @@ cast_possible_truncation = "deny"
cast_precision_loss = "deny"
as_conversions = "warn"

expect_used = "allow"
map_unwrap_or = "allow"
min_ident_chars = "allow"
question_mark_used = "allow"
Expand All @@ -86,6 +90,8 @@ must_use_candidate = "allow"
uninlined_format_args = "allow"
similar_names = "allow"
redundant_closure_for_method_calls = "allow"
inconsistent_struct_constructor = "allow"
match_same_arms = "allow"

pedantic = { level = "warn", priority = -1 }
restriction = { level = "allow", priority = -1 }
Expand Down Expand Up @@ -151,6 +157,10 @@ path = "examples/advanced_order.rs"
name = "twap_order"
path = "examples/twap_order.rs"

[[example]]
name = "subscriptions"
path = "examples/subscriptions.rs"

[profile.release]
codegen-units = 1
lto = true
Expand Down
2 changes: 1 addition & 1 deletion examples/advanced_order.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(clippy::all)]
#![allow(clippy::too_many_lines)]
use rhyperliquid::{
example_helpers::load_signer,
init_tracing::init_tracing,
Expand Down
119 changes: 119 additions & 0 deletions examples/subscriptions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#![allow(clippy::too_many_lines)]
use rhyperliquid::{
example_helpers::{testnet_client, user},
init_tracing::init_tracing,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
init_tracing();

let client = testnet_client()?;
let mut subs = client.subscriptions().await?;
let user = user();

subs.subscribe_all_mids(None).await?;
subs.subscribe_candle("BTC", "5m".to_string()).await?;
subs.subscribe_l2_book("BTC", None, None).await?;
subs.subscribe_trades("BTC").await?;
subs.subscribe_notifications(user.clone()).await?;
subs.subscribe_webdata3(user.clone()).await?;
subs.subscribe_twap_states(user.clone()).await?;
subs.subscribe_clearinghouse_state(user.clone()).await?;
subs.subscribe_open_orders(user.clone()).await?;
subs.subscribe_user_events(user.clone()).await?;
subs.subscribe_user_fills(user.clone()).await?;
subs.subscribe_user_funding(user.clone()).await?;
subs.subscribe_user_non_funding_ledger_updates(user.clone())
.await?;
subs.subscribe_active_asset_ctx("BTC").await?;
subs.subscribe_active_asset_data(user.clone(), "BTC")
.await?;
subs.subscribe_user_twap_slice_fills(user.clone()).await?;
subs.subscribe_user_twap_history(user.clone()).await?;
subs.subscribe_bbo("BTC").await?;

// Match and receive subscription messages
while let Ok(msg) = subs.events.recv().await {
match msg {
rhyperliquid::types::ws::SubscriptionResponse::Error(e) => {
tracing::info!("Error: {:?}", e);
}
rhyperliquid::types::ws::SubscriptionResponse::SubscriptionResponse(
subscription_confirmation,
) => {
tracing::info!("SubscriptionResponse: {:?}", subscription_confirmation);
}
rhyperliquid::types::ws::SubscriptionResponse::AllMids(ws_all_mids) => {
tracing::info!("AllMids: {:?}", ws_all_mids);
}
rhyperliquid::types::ws::SubscriptionResponse::Candle(ws_candle) => {
tracing::info!("Candle: {:?}", ws_candle);
}
rhyperliquid::types::ws::SubscriptionResponse::Trades(ws_trade) => {
tracing::info!("Trades: {:?}", ws_trade);
}
rhyperliquid::types::ws::SubscriptionResponse::L2Book(ws_book) => {
tracing::info!("L2Book: {:?}", ws_book);
}
rhyperliquid::types::ws::SubscriptionResponse::Notification(ws_notification) => {
tracing::info!("Notification: {:?}", ws_notification);
}
rhyperliquid::types::ws::SubscriptionResponse::WebData3(ws_web_data3) => {
tracing::info!("WebData3: {:?}", ws_web_data3);
}
rhyperliquid::types::ws::SubscriptionResponse::TwapStates(ws_twap_states) => {
tracing::info!("TwapStates: {:?}", ws_twap_states);
}
rhyperliquid::types::ws::SubscriptionResponse::OpenOrders(ws_open_orders) => {
tracing::info!("OpenOrders: {:?}", ws_open_orders);
}
rhyperliquid::types::ws::SubscriptionResponse::UserEvents(ws_user_event) => {
tracing::info!("UserEvents: {:?}", ws_user_event);
}
rhyperliquid::types::ws::SubscriptionResponse::UserNonFundingLedgerUpdates(
ws_user_non_funding_ledger_update,
) => {
tracing::info!(
"UserNonFundingLedgerUpdate: {:?}",
ws_user_non_funding_ledger_update
);
}
rhyperliquid::types::ws::SubscriptionResponse::ActiveAssetCtx(ws_asset_ctx) => {
tracing::info!("ActiveAssetCtx: {:?}", ws_asset_ctx);
}
rhyperliquid::types::ws::SubscriptionResponse::ActiveAssetData(
ws_active_asset_data,
) => {
tracing::info!("ActiveAssetData: {:?}", ws_active_asset_data);
}
rhyperliquid::types::ws::SubscriptionResponse::UserTwapSliceFills(
ws_user_twap_slice_fills,
) => {
tracing::info!("UserTwapSliceFills: {:?}", ws_user_twap_slice_fills);
}
rhyperliquid::types::ws::SubscriptionResponse::UserTwapHistory(
ws_user_twap_history,
) => {
tracing::info!("UserTwapHistory: {:?}", ws_user_twap_history);
}
rhyperliquid::types::ws::SubscriptionResponse::Bbo(ws_bbo) => {
tracing::info!("Bbo: {:?}", ws_bbo);
}
rhyperliquid::types::ws::SubscriptionResponse::Pong => tracing::info!("Pong"),
rhyperliquid::types::ws::SubscriptionResponse::ClearinghouseState(
ws_clearinghouse_state,
) => {
tracing::info!("ClearinghouseState: {:?}", ws_clearinghouse_state);
}
rhyperliquid::types::ws::SubscriptionResponse::UserFills(ws_user_fills) => {
tracing::info!("User Fills: {:?}", ws_user_fills);
}
rhyperliquid::types::ws::SubscriptionResponse::UserFundings(ws_user_fundings) => {
tracing::info!("User Fundings: {:?}", ws_user_fundings);
}
}
}

Ok(())
}
3 changes: 3 additions & 0 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ pub mod exchange;
pub mod info;
pub mod request_util;
pub mod response;
mod subscription;

pub use subscription::{StreamMessage, SubscriptionClient, SubscriptionConfig};
4 changes: 4 additions & 0 deletions src/api/subscription/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod sender;
mod ws;

pub use ws::{StreamMessage, SubscriptionClient, SubscriptionConfig};
70 changes: 70 additions & 0 deletions src/api/subscription/sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::types::ws::{
WsActiveAssetCtx, WsActiveAssetData, WsAllMids, WsBbo, WsBook, WsCandle, WsClearinghouseState,
WsNotification, WsOpenOrders, WsTrade, WsTwapStates, WsUserEvent, WsUserFills, WsUserFundings,
WsUserNonFundingLedgerUpdate, WsUserTwapHistory, WsUserTwapSliceFills, WsWebData3,
};
use tokio::sync::broadcast::Sender;

/// A mapping of subscription identifiers to sender channels and subscription
/// parameters.
#[derive(Clone, Default)]
pub struct StreamSenders {
pub(crate) all_mids: (Option<Sender<WsAllMids>>, Option<String>),
pub(crate) candle: (Option<Sender<WsCandle>>, Option<String>, Option<String>),
pub(crate) trades: (
Option<Sender<WsTrade>>,
Option<String>,
Option<u32>,
Option<u32>,
),
pub(crate) l2book: (
Option<Sender<WsBook>>,
Option<String>,
Option<u32>,
Option<u32>,
),
pub(crate) notifications: (Option<Sender<WsNotification>>, Option<String>),
pub(crate) webdata3: (Option<Sender<WsWebData3>>, Option<String>),
pub(crate) twap_states: (Option<Sender<WsTwapStates>>, Option<String>),
pub(crate) clearinghouse_state: (Option<Sender<WsClearinghouseState>>, Option<String>),
pub(crate) open_orders: (Option<Sender<WsOpenOrders>>, Option<String>),
pub(crate) user_events: (Option<Sender<WsUserEvent>>, Option<String>),
pub(crate) user_fills: (Option<Sender<WsUserFills>>, Option<String>),
pub(crate) user_funding: (Option<Sender<WsUserFundings>>, Option<String>),
pub(crate) user_non_funding_ledger_updates:
(Option<Sender<WsUserNonFundingLedgerUpdate>>, Option<String>),
pub(crate) active_asset_ctx: (Option<Sender<WsActiveAssetCtx>>, Option<String>),
pub(crate) active_asset_data: (
Option<Sender<WsActiveAssetData>>,
Option<String>,
Option<String>,
),
pub(crate) user_twap_slice_fills: (Option<Sender<WsUserTwapSliceFills>>, Option<String>),
pub(crate) user_twap_history: (Option<Sender<WsUserTwapHistory>>, Option<String>),
pub(crate) bbo: (Option<Sender<WsBbo>>, Option<String>),
}

impl StreamSenders {
pub fn new() -> Self {
Self {
all_mids: (None, None),
candle: (None, None, None),
trades: (None, None, None, None),
l2book: (None, None, None, None),
notifications: (None, None),
webdata3: (None, None),
twap_states: (None, None),
clearinghouse_state: (None, None),
open_orders: (None, None),
user_events: (None, None),
user_fills: (None, None),
user_funding: (None, None),
user_non_funding_ledger_updates: (None, None),
active_asset_ctx: (None, None),
active_asset_data: (None, None, None),
user_twap_slice_fills: (None, None),
user_twap_history: (None, None),
bbo: (None, None),
}
}
}
Loading