Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 91 additions & 13 deletions backrun/src/event_loops.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
use std::{sync::Arc, time::Duration};

use futures_util::StreamExt;
use jito_protos::searcher::{PendingTxNotification, PendingTxSubscriptionRequest};
use jito_protos::{
bundle::BundleResult,
searcher::{
PendingTxNotification, PendingTxSubscriptionRequest, SubscribeBundleResultsRequest,
},
};
use log::info;
use searcher_service_client::get_searcher_client;
use solana_client::nonblocking::pubsub_client::PubsubClient;
use solana_client::rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter};
use solana_client::rpc_response;
use solana_client::rpc_response::{RpcBlockUpdate, SlotUpdate};
use solana_client::{
nonblocking::pubsub_client::PubsubClient,
rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter},
rpc_response,
rpc_response::{RpcBlockUpdate, SlotUpdate},
};
use solana_metrics::{datapoint_error, datapoint_info};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::{
clock::Slot,
commitment_config::{CommitmentConfig, CommitmentLevel},
pubkey::Pubkey,
signature::Keypair,
};
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
use tokio::{sync::mpsc::Sender, time::sleep};
use tonic::Streaming;

// slot update subscription loop that attempts to maintain a connection to an RPC server
pub async fn slot_subscribe_loop(pubsub_addr: String, slot_sender: Sender<Slot>) {
Expand Down Expand Up @@ -139,6 +148,75 @@ pub async fn block_subscribe_loop(
}
}

// attempts to maintain connection to searcher service and stream bundle results over a channel
pub async fn bundle_results_loop(
auth_addr: String,
searcher_addr: String,
auth_keypair: Arc<Keypair>,
bundle_results_sender: Sender<BundleResult>,
) {
let mut connection_errors: usize = 0;
let mut response_errors: usize = 0;

loop {
sleep(Duration::from_millis(1000)).await;
match get_searcher_client(&auth_addr, &searcher_addr, &auth_keypair).await {
Ok(mut c) => match c
.subscribe_bundle_results(SubscribeBundleResultsRequest {})
.await
{
Ok(resp) => {
consume_bundle_results_stream(resp.into_inner(), &bundle_results_sender).await;
}
Err(e) => {
response_errors += 1;
datapoint_error!(
"searcher_bundle_results_error",
("errors", response_errors, i64),
("msg", e.to_string(), String)
);
}
},
Err(e) => {
connection_errors += 1;
datapoint_error!(
"searcher_bundle_results_error",
("errors", connection_errors, i64),
("msg", e.to_string(), String)
);
}
}
}
}

pub async fn consume_bundle_results_stream(
mut stream: Streaming<BundleResult>,
bundle_results_sender: &Sender<BundleResult>,
) {
while let Some(maybe_msg) = stream.next().await {
match maybe_msg {
Ok(msg) => {
if let Err(e) = bundle_results_sender.send(msg).await {
datapoint_error!(
"searcher_bundle_results_error",
("errors", 1, i64),
("msg", e.to_string(), String)
);
return;
}
}
Err(e) => {
datapoint_error!(
"searcher_bundle_results_error",
("errors", 1, i64),
("msg", e.to_string(), String)
);
return;
}
}
}
}

// attempts to maintain connection to searcher service and stream pending transaction notifications over a channel
pub async fn pending_tx_loop(
auth_addr: String,
Expand Down
92 changes: 60 additions & 32 deletions backrun/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,58 @@
mod event_loops;

use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use std::{path::Path, result, str::FromStr, sync::Arc, time::Duration};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
path::Path,
result,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};

use crate::event_loops::{block_subscribe_loop, pending_tx_loop, slot_subscribe_loop};
use clap::Parser;
use env_logger::TimestampPrecision;
use histogram::Histogram;
use jito_protos::bundle::Bundle;
use jito_protos::convert::{proto_packet_from_versioned_tx, versioned_tx_from_packet};
use jito_protos::searcher::{
searcher_service_client::SearcherServiceClient, ConnectedLeadersRequest,
NextScheduledLeaderRequest, PendingTxNotification, SendBundleRequest, SendBundleResponse,
use jito_protos::{
bundle::{Bundle, BundleResult},
convert::{proto_packet_from_versioned_tx, versioned_tx_from_packet},
searcher::{
searcher_service_client::SearcherServiceClient, ConnectedLeadersRequest,
NextScheduledLeaderRequest, PendingTxNotification, SendBundleRequest, SendBundleResponse,
},
};
use log::*;
use rand::rngs::ThreadRng;
use rand::{thread_rng, Rng};
use searcher_service_client::token_authenticator::ClientInterceptor;
use searcher_service_client::{get_searcher_client, BlockEngineConnectionError};
use solana_client::client_error::ClientError;
use solana_client::nonblocking::pubsub_client::PubsubClientError;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_response;
use solana_client::rpc_response::RpcBlockUpdate;
use rand::{rngs::ThreadRng, thread_rng, Rng};
use searcher_service_client::{
get_searcher_client, token_authenticator::ClientInterceptor, BlockEngineConnectionError,
};
use solana_client::{
client_error::ClientError,
nonblocking::{pubsub_client::PubsubClientError, rpc_client::RpcClient},
rpc_response,
rpc_response::RpcBlockUpdate,
};
use solana_metrics::{datapoint_info, set_host_id};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Signature, Signer};
use solana_sdk::system_instruction::transfer;
use solana_sdk::transaction::Transaction;
use solana_sdk::transaction::VersionedTransaction;
use solana_sdk::{
clock::Slot,
commitment_config::{CommitmentConfig, CommitmentLevel},
hash::Hash,
pubkey::Pubkey,
signature::{read_keypair_file, Keypair},
signature::{read_keypair_file, Keypair, Signature, Signer},
system_instruction::transfer,
transaction::{Transaction, VersionedTransaction},
};
use spl_memo::build_memo;
use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::{runtime::Builder, time::interval};
use tonic::codegen::InterceptedService;
use tonic::transport::Channel;
use tonic::{Response, Status};
use tokio::{
runtime::Builder,
sync::mpsc::{channel, Receiver},
time::interval,
};
use tonic::{codegen::InterceptedService, transport::Channel, Response, Status};

use crate::event_loops::{
block_subscribe_loop, bundle_results_loop, pending_tx_loop, slot_subscribe_loop,
};

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
Expand Down Expand Up @@ -83,6 +92,10 @@ struct Args {
/// Tip program public key
#[clap(long, env)]
tip_program_id: String,

/// Subscribe and print bundle results.
#[clap(long, env, default_value_t = true)]
subscribe_bundle_results: bool,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -488,6 +501,7 @@ async fn run_searcher_loop(
tip_program_pubkey: Pubkey,
mut slot_receiver: Receiver<Slot>,
mut block_receiver: Receiver<rpc_response::Response<RpcBlockUpdate>>,
mut bundle_results_receiver: Receiver<BundleResult>,
mut pending_tx_receiver: Receiver<PendingTxNotification>,
) -> Result<()> {
let mut leader_schedule: HashMap<Pubkey, HashSet<Slot>> = HashMap::new();
Expand Down Expand Up @@ -519,6 +533,10 @@ async fn run_searcher_loop(
_ = tick.tick() => {
maintenance_tick(&mut searcher_client, &rpc_client, &mut leader_schedule, &mut blockhash).await?;
}
maybe_bundle_result = bundle_results_receiver.recv() => {
let bundle_result: BundleResult = maybe_bundle_result.ok_or(BackrunError::Shutdown)?;
info!("received bundle_result: [bundle_id={:?}, result={:?}]", bundle_result.bundle_id, bundle_result.result);
},
maybe_pending_tx_notification = pending_tx_receiver.recv() => {
// block engine starts forwarding a few slots early, for super high activity accounts
// it might be ideal to wait until the leader slot is up
Expand Down Expand Up @@ -587,10 +605,19 @@ fn main() -> Result<()> {
runtime.block_on(async move {
let (slot_sender, slot_receiver) = channel(100);
let (block_sender, block_receiver) = channel(100);
let (bundle_results_sender, bundle_results_receiver) = channel(100);
let (pending_tx_sender, pending_tx_receiver) = channel(100);

tokio::spawn(slot_subscribe_loop(args.pubsub_url.clone(), slot_sender));
tokio::spawn(block_subscribe_loop(args.pubsub_url.clone(), block_sender));
if args.subscribe_bundle_results {
tokio::spawn(bundle_results_loop(
args.auth_addr.clone(),
args.searcher_addr.clone(),
auth_keypair.clone(),
bundle_results_sender,
));
}
tokio::spawn(pending_tx_loop(
args.auth_addr.clone(),
args.searcher_addr.clone(),
Expand All @@ -609,6 +636,7 @@ fn main() -> Result<()> {
tip_program_pubkey,
slot_receiver,
block_receiver,
bundle_results_receiver,
pending_tx_receiver,
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion jito_protos/protos
22 changes: 14 additions & 8 deletions searcher_service_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use crate::token_authenticator::ClientInterceptor;
use jito_protos::auth::auth_service_client::AuthServiceClient;
use jito_protos::auth::Role;
use jito_protos::searcher::searcher_service_client::SearcherServiceClient;
use solana_sdk::signature::Keypair;
use std::sync::Arc;

use jito_protos::{
auth::{auth_service_client::AuthServiceClient, Role},
searcher::searcher_service_client::SearcherServiceClient,
};
use solana_sdk::signature::Keypair;
use thiserror::Error;
use tonic::codegen::InterceptedService;
use tonic::transport::{Channel, Endpoint};
use tonic::{transport, Status};
use tonic::{
codegen::InterceptedService,
transport,
transport::{Channel, Endpoint},
Status,
};

use crate::token_authenticator::ClientInterceptor;

pub mod token_authenticator;

Expand Down
10 changes: 6 additions & 4 deletions searcher_service_client/src/token_authenticator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::RwLock;
use std::time::SystemTime;
use std::{sync::Arc, time::Duration};
use std::{
sync::{Arc, RwLock},
time::{Duration, SystemTime},
};

use crate::BlockEngineConnectionResult;
use jito_protos::auth::{
auth_service_client::AuthServiceClient, GenerateAuthChallengeRequest,
GenerateAuthTokensRequest, RefreshAccessTokenRequest, Role, Token,
Expand All @@ -13,6 +13,8 @@ use solana_sdk::signature::{Keypair, Signer};
use tokio::{task::JoinHandle, time::sleep};
use tonic::{service::Interceptor, transport::Channel, Request, Status};

use crate::BlockEngineConnectionResult;

const AUTHORIZATION_HEADER: &str = "authorization";
const BEARER: &str = "Bearer ";

Expand Down