From f62f5ca5add2ae5f42b8ff044e6b09500f6ebcd5 Mon Sep 17 00:00:00 2001 From: ebin-mathews Date: Fri, 10 May 2024 00:16:47 -0400 Subject: [PATCH] Make grpc authentication optional --- Cargo.lock | 10 +++ backrun/Cargo.toml | 1 + backrun/README.md | 7 +- backrun/src/event_loops.rs | 158 +++++++++++++++++-------------------- backrun/src/main.rs | 112 ++++++++++++++++++++------ cli/README.md | 4 + cli/src/main.rs | 57 ++++++++++--- searcher_client/src/lib.rs | 38 +++++++-- 8 files changed, 255 insertions(+), 132 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce44318..6efbec9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1959,6 +1959,7 @@ dependencies = [ "thiserror", "tokio", "tonic", + "uuid", ] [[package]] @@ -4965,6 +4966,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +dependencies = [ + "getrandom 0.2.12", +] + [[package]] name = "vec_map" version = "0.8.2" diff --git a/backrun/Cargo.toml b/backrun/Cargo.toml index 3c7235a..03dae03 100644 --- a/backrun/Cargo.toml +++ b/backrun/Cargo.toml @@ -21,3 +21,4 @@ spl-memo = "3.0.1" thiserror = "1.0.40" tokio = "1" tonic = { version = "0.10", features = ["tls", "tls-roots", "tls-webpki-roots"] } +uuid = { version = "1.8.0", features = ["v4"] } diff --git a/backrun/README.md b/backrun/README.md index 1e5b80a..e09cfc3 100644 --- a/backrun/README.md +++ b/backrun/README.md @@ -3,6 +3,7 @@ See the [cli README](../cli/README.md) for setup instructions. ## Usage + ```bash cargo run --bin jito-backrun-example -- \ --block-engine-url \ @@ -12,10 +13,13 @@ cargo run --bin jito-backrun-example -- \ --rpc-url http://{RPC_URL}:8899 \ --tip-program-id \ --backrun-accounts + # Note: Don't provide --auth-keypair argument if not planning to use authentication ``` ## Example + Backrun transactions that write-lock the [Pyth SOL/USDC account](https://solscan.io/account/H6ARHf6YXhGYeQfUzQNGk6rDNnLBQKrenN712K4AQJEG): + ```bash RUST_LOG=INFO cargo run --bin jito-backrun-example -- \ --block-engine-url https://frankfurt.mainnet.block-engine.jito.wtf \ @@ -25,4 +29,5 @@ RUST_LOG=INFO cargo run --bin jito-backrun-example -- \ --rpc-url https://api.mainnet-beta.solana.com:8899 \ --tip-program-id T1pyyaTNZsKv2WcRAB8oVnk93mLJw2XzjtVYqCsaHqt \ --backrun-accounts H6ARHf6YXhGYeQfUzQNGk6rDNnLBQKrenN712K4AQJEG -``` \ No newline at end of file +# Note: Don't provide --auth-keypair argument if not planning to use authentication +``` diff --git a/backrun/src/event_loops.rs b/backrun/src/event_loops.rs index 9901385..e057613 100644 --- a/backrun/src/event_loops.rs +++ b/backrun/src/event_loops.rs @@ -1,14 +1,13 @@ -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use futures_util::StreamExt; use jito_protos::{ bundle::BundleResult, searcher::{ - mempool_subscription, MempoolSubscription, PendingTxNotification, - SubscribeBundleResultsRequest, WriteLockedAccountSubscriptionV0, + mempool_subscription, searcher_service_client::SearcherServiceClient, MempoolSubscription, + PendingTxNotification, SubscribeBundleResultsRequest, WriteLockedAccountSubscriptionV0, }, }; -use jito_searcher_client::get_searcher_client; use log::info; use solana_client::{ nonblocking::pubsub_client::PubsubClient, @@ -21,11 +20,13 @@ use solana_sdk::{ clock::Slot, commitment_config::{CommitmentConfig, CommitmentLevel}, pubkey::Pubkey, - signature::Keypair, }; use solana_transaction_status::{TransactionDetails, UiTransactionEncoding}; use tokio::{sync::mpsc::Sender, time::sleep}; -use tonic::Streaming; +use tonic::{ + codegen::{Body, Bytes, StdError}, + Streaming, +}; // slot update subscription loop that attempts to maintain a connection to an RPC server pub async fn slot_subscribe_loop(pubsub_addr: String, slot_sender: Sender) { @@ -144,13 +145,18 @@ pub async fn block_subscribe_loop( } // attempts to maintain connection to searcher service and stream pending transaction notifications over a channel -pub async fn pending_tx_loop( - block_engine_url: String, - auth_keypair: Arc, +pub async fn pending_tx_loop( + mut searcher_client: SearcherServiceClient, pending_tx_sender: Sender, backrun_pubkeys: Vec, -) { - let mut num_searcher_connection_errors: usize = 0; +) where + T: tonic::client::GrpcService + Send + 'static + Clone, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + >::Future: std::marker::Send, +{ + let _num_searcher_connection_errors: usize = 0; let mut num_pending_tx_sub_errors: usize = 0; let mut num_pending_tx_stream_errors: usize = 0; let mut num_pending_tx_stream_disconnects: usize = 0; @@ -160,64 +166,49 @@ pub async fn pending_tx_loop( loop { sleep(Duration::from_secs(1)).await; - match get_searcher_client(&block_engine_url, &auth_keypair).await { - Ok(mut searcher_client) => { - match searcher_client - .subscribe_mempool(MempoolSubscription { - regions: vec![], - msg: Some(mempool_subscription::Msg::WlaV0Sub( - WriteLockedAccountSubscriptionV0 { - accounts: backrun_pubkeys.iter().map(|pk| pk.to_string()).collect(), - }, - )), - }) - .await - { - Ok(pending_tx_stream_response) => { - let mut pending_tx_stream = pending_tx_stream_response.into_inner(); - while let Some(maybe_notification) = pending_tx_stream.next().await { - match maybe_notification { - Ok(notification) => { - if pending_tx_sender.send(notification).await.is_err() { - datapoint_error!( - "pending_tx_send_error", - ("errors", 1, i64) - ); - return; - } - } - Err(e) => { - num_pending_tx_stream_errors += 1; - datapoint_error!( - "searcher_pending_tx_stream_error", - ("errors", num_pending_tx_stream_errors, i64), - ("error_str", e.to_string(), String) - ); - break; - } + match searcher_client + .subscribe_mempool(MempoolSubscription { + regions: vec![], + msg: Some(mempool_subscription::Msg::WlaV0Sub( + WriteLockedAccountSubscriptionV0 { + accounts: backrun_pubkeys.iter().map(|pk| pk.to_string()).collect(), + }, + )), + }) + .await + { + Ok(pending_tx_stream_response) => { + let mut pending_tx_stream = pending_tx_stream_response.into_inner(); + while let Some(maybe_notification) = pending_tx_stream.next().await { + match maybe_notification { + Ok(notification) => { + if pending_tx_sender.send(notification).await.is_err() { + datapoint_error!("pending_tx_send_error", ("errors", 1, i64)); + return; } } - num_pending_tx_stream_disconnects += 1; - datapoint_error!( - "searcher_pending_tx_stream_disconnect", - ("errors", num_pending_tx_stream_disconnects, i64), - ); - } - Err(e) => { - num_pending_tx_sub_errors += 1; - datapoint_error!( - "searcher_pending_tx_sub_error", - ("errors", num_pending_tx_sub_errors, i64), - ("error_str", e.to_string(), String) - ); + Err(e) => { + num_pending_tx_stream_errors += 1; + datapoint_error!( + "searcher_pending_tx_stream_error", + ("errors", num_pending_tx_stream_errors, i64), + ("error_str", e.to_string(), String) + ); + break; + } } } + num_pending_tx_stream_disconnects += 1; + datapoint_error!( + "searcher_pending_tx_stream_disconnect", + ("errors", num_pending_tx_stream_disconnects, i64), + ); } Err(e) => { - num_searcher_connection_errors += 1; + num_pending_tx_sub_errors += 1; datapoint_error!( - "searcher_connection_error", - ("errors", num_searcher_connection_errors, i64), + "searcher_pending_tx_sub_error", + ("errors", num_pending_tx_sub_errors, i64), ("error_str", e.to_string(), String) ); } @@ -225,38 +216,33 @@ pub async fn pending_tx_loop( } } -pub async fn bundle_results_loop( - block_engine_url: String, - auth_keypair: Arc, +pub async fn bundle_results_loop( + mut searcher_client: SearcherServiceClient, bundle_results_sender: Sender, -) { - let mut connection_errors: usize = 0; +) where + T: tonic::client::GrpcService + Send + 'static + Clone, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + >::Future: std::marker::Send, +{ + let _connection_errors: usize = 0; let mut response_errors: usize = 0; loop { sleep(Duration::from_millis(1000)).await; - match get_searcher_client(&block_engine_url, &auth_keypair).await { - Ok(mut c) => match c - .subscribe_bundle_results(SubscribeBundleResultsRequest {}) - .await - { - Ok(resp) => { - consume_bundle_results_stream(resp.into_inner(), &bundle_results_sender).await; - } - Err(e) => { - response_errors += 1; - datapoint_error!( - "searcher_bundle_results_error", - ("errors", response_errors, i64), - ("msg", e.to_string(), String) - ); - } - }, + match searcher_client + .subscribe_bundle_results(SubscribeBundleResultsRequest {}) + .await + { + Ok(resp) => { + consume_bundle_results_stream(resp.into_inner(), &bundle_results_sender).await; + } Err(e) => { - connection_errors += 1; + response_errors += 1; datapoint_error!( "searcher_bundle_results_error", - ("errors", connection_errors, i64), + ("errors", response_errors, i64), ("msg", e.to_string(), String) ); } diff --git a/backrun/src/main.rs b/backrun/src/main.rs index 9729baf..c1ae258 100644 --- a/backrun/src/main.rs +++ b/backrun/src/main.rs @@ -21,7 +21,7 @@ use jito_protos::{ }, }; use jito_searcher_client::{ - get_searcher_client, send_bundle_no_wait, token_authenticator::ClientInterceptor, + get_searcher_client_auth, get_searcher_client_no_auth, send_bundle_no_wait, BlockEngineConnectionError, }; use log::*; @@ -49,7 +49,10 @@ use tokio::{ sync::mpsc::{channel, Receiver}, time::interval, }; -use tonic::{codegen::InterceptedService, transport::Channel, Response, Status}; +use tonic::{ + codegen::{Body, Bytes, StdError}, + Response, Status, +}; use crate::event_loops::{ block_subscribe_loop, bundle_results_loop, pending_tx_loop, slot_subscribe_loop, @@ -74,7 +77,7 @@ struct Args { /// Path to keypair file used to authenticate with the Jito Block Engine /// See: https://jito-labs.gitbook.io/mev/searcher-resources/getting-started#block-engine-api-key #[arg(long, env)] - auth_keypair: PathBuf, + auth_keypair: Option, /// RPC Websocket URL. /// See: https://solana.com/docs/rpc/websocket @@ -177,10 +180,17 @@ fn build_bundles( .collect() } -async fn send_bundles( - searcher_client: &mut SearcherServiceClient>, +async fn send_bundles( + searcher_client: &mut SearcherServiceClient, bundles: &[BundledTransactions], -) -> Result, Status>>> { +) -> Result, Status>>> +where + T: tonic::client::GrpcService + Send + 'static + Clone, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + >::Future: std::marker::Send, +{ let mut futs = Vec::with_capacity(bundles.len()); for b in bundles { let mut searcher_client = searcher_client.clone(); @@ -215,13 +225,20 @@ fn generate_tip_accounts(tip_program_pubkey: &Pubkey) -> Vec { ] } -async fn maintenance_tick( - searcher_client: &mut SearcherServiceClient>, +async fn maintenance_tick( + searcher_client: &mut SearcherServiceClient, rpc_client: &RpcClient, leader_schedule: &mut HashMap>, blockhash: &mut Hash, regions: Vec, -) -> Result<()> { +) -> Result<()> +where + T: tonic::client::GrpcService + Send + 'static + Clone, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + >::Future: std::marker::Send, +{ *blockhash = rpc_client .get_latest_blockhash_with_commitment(CommitmentConfig { commitment: CommitmentLevel::Confirmed, @@ -495,9 +512,8 @@ fn print_block_stats( } #[allow(clippy::too_many_arguments)] -async fn run_searcher_loop( - block_engine_url: String, - auth_keypair: Arc, +async fn run_searcher_loop( + mut searcher_client: SearcherServiceClient, keypair: &Keypair, rpc_url: String, regions: Vec, @@ -507,13 +523,18 @@ async fn run_searcher_loop( mut block_receiver: Receiver>, mut bundle_results_receiver: Receiver, mut pending_tx_receiver: Receiver, -) -> Result<()> { +) -> Result<()> +where + T: tonic::client::GrpcService + Send + 'static + Clone, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + >::Future: std::marker::Send, +{ let mut leader_schedule: HashMap> = HashMap::new(); let mut block_stats: HashMap = HashMap::new(); let mut block_signatures: HashMap> = HashMap::new(); - let mut searcher_client = get_searcher_client(&block_engine_url, &auth_keypair).await?; - let mut rng = thread_rng(); let tip_accounts = generate_tip_accounts(&tip_program_pubkey); @@ -591,11 +612,55 @@ fn main() -> Result<()> { let args: Args = Args::parse(); let payer_keypair = Arc::new(read_keypair_file(&args.payer_keypair).expect("parse kp file")); - let auth_keypair = Arc::new(read_keypair_file(&args.auth_keypair).expect("parse kp file")); - - set_host_id(auth_keypair.pubkey().to_string()); + let auth_keypair = args + .auth_keypair + .as_ref() + .map(|path| Arc::new(read_keypair_file(path).expect("parse kp file"))); + + set_host_id( + auth_keypair + .as_ref() + .map(|kp| kp.pubkey().to_string()) + .unwrap_or(uuid::Uuid::new_v4().to_string()), + ); let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); + + match auth_keypair { + Some(auth_keypair) => { + let searcher_client_auth = runtime.block_on( + get_searcher_client_auth( + args.block_engine_url.as_str(), + &auth_keypair, + )) + .expect("Failed to get searcher client with auth. Note: If you don't pass in the auth keypair, we can attempt to connect to the no auth endpoint"); + start_searcher_loop(runtime, searcher_client_auth, &payer_keypair, args) + } + None => { + let searcher_client_no_auth = runtime.block_on( + get_searcher_client_no_auth( + args.block_engine_url.as_str(), + )) + .expect("Failed to get searcher client with auth. Note: If you don't pass in the auth keypair, we can attempt to connect to the no auth endpoint"); + start_searcher_loop(runtime, searcher_client_no_auth, &payer_keypair, args) + } + } +} + +#[allow(clippy::too_many_arguments)] +fn start_searcher_loop( + runtime: tokio::runtime::Runtime, + searcher_client: SearcherServiceClient, + payer_keypair: &Keypair, + args: Args, +) -> Result<()> +where + T: tonic::client::GrpcService + Send + 'static + Clone, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + >::Future: std::marker::Send, +{ runtime.block_on(async move { let (slot_sender, slot_receiver) = channel(100); let (block_sender, block_receiver) = channel(100); @@ -605,24 +670,21 @@ fn main() -> Result<()> { tokio::spawn(slot_subscribe_loop(args.pubsub_url.clone(), slot_sender)); tokio::spawn(block_subscribe_loop(args.pubsub_url.clone(), block_sender)); tokio::spawn(pending_tx_loop( - args.block_engine_url.clone(), - auth_keypair.clone(), + searcher_client.clone(), pending_tx_sender, args.backrun_accounts, )); if args.subscribe_bundle_results { tokio::spawn(bundle_results_loop( - args.block_engine_url.clone(), - auth_keypair.clone(), + searcher_client.clone(), bundle_results_sender, )); } let result = run_searcher_loop( - args.block_engine_url, - auth_keypair, - &payer_keypair, + searcher_client, + payer_keypair, args.rpc_url, args.regions, args.message, diff --git a/cli/README.md b/cli/README.md index a95cdec..da82ff5 100644 --- a/cli/README.md +++ b/cli/README.md @@ -9,6 +9,7 @@ The following program exposes functionality in the Block Engine's searcher API. - Ensure the rust compiler is installed. - Sending a bundle requires an RPC server and a keypair with funds to pay for tip + transaction fees. - For cross region functionality, add the `--regions REGION1,REGION2,etc` arg. [More details](https://jito-labs.gitbook.io/mev/searcher-services/recommendations#cross-region) + ## Building ```bash @@ -45,6 +46,7 @@ cargo run --bin jito-searcher-cli -- \ --block-engine-url https://frankfurt.mainnet.block-engine.jito.wtf \ --keypair-path auth.json \ connected-leaders + # Note: Don't provide --keypair-path argument if not planning to use authentication ``` Example output: @@ -64,6 +66,7 @@ cargo run --bin jito-searcher-cli -- \ --block-engine-url https://frankfurt.mainnet.block-engine.jito.wtf \ --keypair-path auth.json \ tip-accounts + # Note: Don't provide --keypair-path argument if not planning to use authentication ``` Example output: @@ -88,6 +91,7 @@ cargo run --bin jito-searcher-cli -- \ --lamports 100000 \ --tip-account 96gYZGLnJYVFmbjzopPSU6QiEV5fGqZNyN9nmNhvrZU5 \ --rpc-url "https://mainnet.rpc.jito.wtf/?access-token=" + # Note: Don't provide --keypair-path argument if not planning to use authentication ``` Example output: diff --git a/cli/src/main.rs b/cli/src/main.rs index 262007c..a6d792d 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -6,14 +6,14 @@ use futures_util::StreamExt; use jito_protos::{ convert::versioned_tx_from_packet, searcher::{ - mempool_subscription, searcher_service_client::SearcherServiceClient, - ConnectedLeadersRegionedRequest, GetTipAccountsRequest, MempoolSubscription, - NextScheduledLeaderRequest, PendingTxNotification, ProgramSubscriptionV0, - SubscribeBundleResultsRequest, WriteLockedAccountSubscriptionV0, + searcher_service_client::SearcherServiceClient, ConnectedLeadersRegionedRequest, + GetTipAccountsRequest, NextScheduledLeaderRequest, PendingTxNotification, + SubscribeBundleResultsRequest, }, }; use jito_searcher_client::{ - get_searcher_client, send_bundle_with_confirmation, token_authenticator::ClientInterceptor, + get_searcher_client_auth, get_searcher_client_no_auth, send_bundle_with_confirmation, + token_authenticator::ClientInterceptor, }; use log::info; use solana_client::nonblocking::rpc_client::RpcClient; @@ -26,7 +26,11 @@ use solana_sdk::{ }; use spl_memo::build_memo; use tokio::time::{sleep, timeout}; -use tonic::{codegen::InterceptedService, transport::Channel, Streaming}; +use tonic::{ + codegen::{Body, Bytes, InterceptedService, StdError}, + transport::Channel, + Streaming, +}; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -39,7 +43,7 @@ struct Args { /// Path to keypair file used to authenticate with the Jito Block Engine /// See: https://jito-labs.gitbook.io/mev/searcher-resources/getting-started#block-engine-api-key #[arg(long, env)] - keypair_path: PathBuf, + keypair_path: Option, /// Comma-separated list of regions to request cross-region data from. /// If no region specified, then default to the currently connected block engine's region. @@ -119,11 +123,40 @@ async fn main() { .format_timestamp(Some(TimestampPrecision::Micros)) .init(); - let keypair = Arc::new(read_keypair_file(&args.keypair_path).expect("reads keypair at path")); - let mut client = get_searcher_client(&args.block_engine_url, &keypair) - .await - .expect("connects to searcher client"); + let keypair = args + .keypair_path + .as_ref() + .map(|path| Arc::new(read_keypair_file(path).expect("parse kp file"))); + + match keypair { + Some(auth_keypair) => { + let searcher_client_auth = + get_searcher_client_auth( + args.block_engine_url.as_str(), + &auth_keypair, + ).await + .expect("Failed to get searcher client with auth. Note: If you don't pass in the auth keypair, we can attempt to connect to the no auth endpoint"); + process_commands(args, searcher_client_auth).await + } + None => { + let searcher_client_no_auth = + get_searcher_client_no_auth( + args.block_engine_url.as_str(), + ).await + .expect("Failed to get searcher client with auth. Note: If you don't pass in the auth keypair, we can attempt to connect to the no auth endpoint"); + process_commands(args, searcher_client_no_auth).await + } + } +} +async fn process_commands(args: Args, mut client: SearcherServiceClient) +where + T: tonic::client::GrpcService + Send + 'static + Clone, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + >::Future: std::marker::Send, +{ match args.command { Commands::NextScheduledLeader => { let next_leader = client @@ -277,7 +310,7 @@ async fn main() { } } -async fn print_packet_stream( +pub async fn print_packet_stream( client: &mut SearcherServiceClient>, mut pending_transactions: Streaming, regions: Vec, diff --git a/searcher_client/src/lib.rs b/searcher_client/src/lib.rs index deee4f9..dc3abb9 100644 --- a/searcher_client/src/lib.rs +++ b/searcher_client/src/lib.rs @@ -26,7 +26,7 @@ use solana_sdk::{ use thiserror::Error; use tokio::time::timeout; use tonic::{ - codegen::InterceptedService, + codegen::{Body, Bytes, InterceptedService, StdError}, transport, transport::{Channel, Endpoint}, Response, Status, Streaming, @@ -58,7 +58,7 @@ pub enum BundleRejectionError { pub type BlockEngineConnectionResult = Result; -pub async fn get_searcher_client( +pub async fn get_searcher_client_auth( block_engine_url: &str, auth_keypair: &Arc, ) -> BlockEngineConnectionResult< @@ -78,6 +78,14 @@ pub async fn get_searcher_client( Ok(searcher_client) } +pub async fn get_searcher_client_no_auth( + block_engine_url: &str, +) -> BlockEngineConnectionResult> { + let searcher_channel = create_grpc_channel(block_engine_url).await?; + let searcher_client = SearcherServiceClient::new(searcher_channel); + Ok(searcher_client) +} + pub async fn create_grpc_channel(url: &str) -> BlockEngineConnectionResult { let mut endpoint = Endpoint::from_shared(url.to_string()).expect("invalid url"); if url.starts_with("https") { @@ -86,12 +94,19 @@ pub async fn create_grpc_channel(url: &str) -> BlockEngineConnectionResult( transactions: &[VersionedTransaction], rpc_client: &RpcClient, - searcher_client: &mut SearcherServiceClient>, + searcher_client: &mut SearcherServiceClient, bundle_results_subscription: &mut Streaming, -) -> Result<(), Box> { +) -> Result<(), Box> +where + T: tonic::client::GrpcService + Send + 'static + Clone, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + >::Future: std::marker::Send, +{ let bundle_signatures: Vec = transactions.iter().map(|tx| tx.signatures[0]).collect(); @@ -175,10 +190,17 @@ pub async fn send_bundle_with_confirmation( Ok(()) } -pub async fn send_bundle_no_wait( +pub async fn send_bundle_no_wait( transactions: &[VersionedTransaction], - searcher_client: &mut SearcherServiceClient>, -) -> Result, Status> { + searcher_client: &mut SearcherServiceClient, +) -> Result, Status> +where + T: tonic::client::GrpcService + Send + 'static + Clone, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + >::Future: std::marker::Send, +{ // convert them to packets + send over let packets: Vec<_> = transactions .iter()