Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backrun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ spl-memo = "3.0.1"
thiserror = "1.0.40"
tokio = "1"
tonic = { version = "0.10", features = ["tls", "tls-roots", "tls-webpki-roots"] }
uuid = { version = "1.8.0", features = ["v4"] }
7 changes: 6 additions & 1 deletion backrun/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
See the [cli README](../cli/README.md) for setup instructions.

## Usage

```bash
cargo run --bin jito-backrun-example -- \
--block-engine-url <BLOCK_ENGINE_URL> \
Expand All @@ -12,10 +13,13 @@ cargo run --bin jito-backrun-example -- \
--rpc-url http://{RPC_URL}:8899 \
--tip-program-id <TIP_PROGRAM_ID> \
--backrun-accounts <BACKRUN_ACCOUNTS>
# Note: Don't provide --auth-keypair argument if not planning to use authentication
```

## Example

Backrun transactions that write-lock the [Pyth SOL/USDC account](https://solscan.io/account/H6ARHf6YXhGYeQfUzQNGk6rDNnLBQKrenN712K4AQJEG):

```bash
RUST_LOG=INFO cargo run --bin jito-backrun-example -- \
--block-engine-url https://frankfurt.mainnet.block-engine.jito.wtf \
Expand All @@ -25,4 +29,5 @@ RUST_LOG=INFO cargo run --bin jito-backrun-example -- \
--rpc-url https://api.mainnet-beta.solana.com:8899 \
--tip-program-id T1pyyaTNZsKv2WcRAB8oVnk93mLJw2XzjtVYqCsaHqt \
--backrun-accounts H6ARHf6YXhGYeQfUzQNGk6rDNnLBQKrenN712K4AQJEG
```
# Note: Don't provide --auth-keypair argument if not planning to use authentication
```
158 changes: 72 additions & 86 deletions backrun/src/event_loops.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::{sync::Arc, time::Duration};
use std::time::Duration;

use futures_util::StreamExt;
use jito_protos::{
bundle::BundleResult,
searcher::{
mempool_subscription, MempoolSubscription, PendingTxNotification,
SubscribeBundleResultsRequest, WriteLockedAccountSubscriptionV0,
mempool_subscription, searcher_service_client::SearcherServiceClient, MempoolSubscription,
PendingTxNotification, SubscribeBundleResultsRequest, WriteLockedAccountSubscriptionV0,
},
};
use jito_searcher_client::get_searcher_client;
use log::info;
use solana_client::{
nonblocking::pubsub_client::PubsubClient,
Expand All @@ -21,11 +20,13 @@ use solana_sdk::{
clock::Slot,
commitment_config::{CommitmentConfig, CommitmentLevel},
pubkey::Pubkey,
signature::Keypair,
};
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
use tokio::{sync::mpsc::Sender, time::sleep};
use tonic::Streaming;
use tonic::{
codegen::{Body, Bytes, StdError},
Streaming,
};

// slot update subscription loop that attempts to maintain a connection to an RPC server
pub async fn slot_subscribe_loop(pubsub_addr: String, slot_sender: Sender<Slot>) {
Expand Down Expand Up @@ -144,13 +145,18 @@ pub async fn block_subscribe_loop(
}

// attempts to maintain connection to searcher service and stream pending transaction notifications over a channel
pub async fn pending_tx_loop(
block_engine_url: String,
auth_keypair: Arc<Keypair>,
pub async fn pending_tx_loop<T>(
mut searcher_client: SearcherServiceClient<T>,
pending_tx_sender: Sender<PendingTxNotification>,
backrun_pubkeys: Vec<Pubkey>,
) {
let mut num_searcher_connection_errors: usize = 0;
) where
T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + 'static + Clone,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: std::marker::Send,
{
let _num_searcher_connection_errors: usize = 0;
let mut num_pending_tx_sub_errors: usize = 0;
let mut num_pending_tx_stream_errors: usize = 0;
let mut num_pending_tx_stream_disconnects: usize = 0;
Expand All @@ -160,103 +166,83 @@ pub async fn pending_tx_loop(
loop {
sleep(Duration::from_secs(1)).await;

match get_searcher_client(&block_engine_url, &auth_keypair).await {
Ok(mut searcher_client) => {
match searcher_client
.subscribe_mempool(MempoolSubscription {
regions: vec![],
msg: Some(mempool_subscription::Msg::WlaV0Sub(
WriteLockedAccountSubscriptionV0 {
accounts: backrun_pubkeys.iter().map(|pk| pk.to_string()).collect(),
},
)),
})
.await
{
Ok(pending_tx_stream_response) => {
let mut pending_tx_stream = pending_tx_stream_response.into_inner();
while let Some(maybe_notification) = pending_tx_stream.next().await {
match maybe_notification {
Ok(notification) => {
if pending_tx_sender.send(notification).await.is_err() {
datapoint_error!(
"pending_tx_send_error",
("errors", 1, i64)
);
return;
}
}
Err(e) => {
num_pending_tx_stream_errors += 1;
datapoint_error!(
"searcher_pending_tx_stream_error",
("errors", num_pending_tx_stream_errors, i64),
("error_str", e.to_string(), String)
);
break;
}
match searcher_client
.subscribe_mempool(MempoolSubscription {
regions: vec![],
msg: Some(mempool_subscription::Msg::WlaV0Sub(
WriteLockedAccountSubscriptionV0 {
accounts: backrun_pubkeys.iter().map(|pk| pk.to_string()).collect(),
},
)),
})
.await
{
Ok(pending_tx_stream_response) => {
let mut pending_tx_stream = pending_tx_stream_response.into_inner();
while let Some(maybe_notification) = pending_tx_stream.next().await {
match maybe_notification {
Ok(notification) => {
if pending_tx_sender.send(notification).await.is_err() {
datapoint_error!("pending_tx_send_error", ("errors", 1, i64));
return;
}
}
num_pending_tx_stream_disconnects += 1;
datapoint_error!(
"searcher_pending_tx_stream_disconnect",
("errors", num_pending_tx_stream_disconnects, i64),
);
}
Err(e) => {
num_pending_tx_sub_errors += 1;
datapoint_error!(
"searcher_pending_tx_sub_error",
("errors", num_pending_tx_sub_errors, i64),
("error_str", e.to_string(), String)
);
Err(e) => {
num_pending_tx_stream_errors += 1;
datapoint_error!(
"searcher_pending_tx_stream_error",
("errors", num_pending_tx_stream_errors, i64),
("error_str", e.to_string(), String)
);
break;
}
}
}
num_pending_tx_stream_disconnects += 1;
datapoint_error!(
"searcher_pending_tx_stream_disconnect",
("errors", num_pending_tx_stream_disconnects, i64),
);
}
Err(e) => {
num_searcher_connection_errors += 1;
num_pending_tx_sub_errors += 1;
datapoint_error!(
"searcher_connection_error",
("errors", num_searcher_connection_errors, i64),
"searcher_pending_tx_sub_error",
("errors", num_pending_tx_sub_errors, i64),
("error_str", e.to_string(), String)
);
}
}
}
}

pub async fn bundle_results_loop(
block_engine_url: String,
auth_keypair: Arc<Keypair>,
pub async fn bundle_results_loop<T>(
mut searcher_client: SearcherServiceClient<T>,
bundle_results_sender: Sender<BundleResult>,
) {
let mut connection_errors: usize = 0;
) where
T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + 'static + Clone,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: std::marker::Send,
{
let _connection_errors: usize = 0;
let mut response_errors: usize = 0;

loop {
sleep(Duration::from_millis(1000)).await;
match get_searcher_client(&block_engine_url, &auth_keypair).await {
Ok(mut c) => match c
.subscribe_bundle_results(SubscribeBundleResultsRequest {})
.await
{
Ok(resp) => {
consume_bundle_results_stream(resp.into_inner(), &bundle_results_sender).await;
}
Err(e) => {
response_errors += 1;
datapoint_error!(
"searcher_bundle_results_error",
("errors", response_errors, i64),
("msg", e.to_string(), String)
);
}
},
match searcher_client
.subscribe_bundle_results(SubscribeBundleResultsRequest {})
.await
{
Ok(resp) => {
consume_bundle_results_stream(resp.into_inner(), &bundle_results_sender).await;
}
Err(e) => {
connection_errors += 1;
response_errors += 1;
datapoint_error!(
"searcher_bundle_results_error",
("errors", connection_errors, i64),
("errors", response_errors, i64),
("msg", e.to_string(), String)
);
}
Expand Down
Loading