Skip to content

Latest commit

 

History

History
189 lines (174 loc) · 6.64 KB

File metadata and controls

189 lines (174 loc) · 6.64 KB

功能说明

  • 读取grpc
  • 订阅多个合约
  • 解析数据

功能代码

use {
    clap::{Parser},
    futures::{sink::SinkExt, stream::StreamExt},
    log::info,
    std::env,
    std::{
        collections::HashMap
    },
    tokio::time::{interval, Duration},
    tonic::transport::channel::ClientTlsConfig,
    yellowstone_grpc_client::GeyserGrpcClient,
    yellowstone_grpc_proto::prelude::{
        subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
        SubscribeRequestPing, SubscribeUpdatePong,SubscribeRequestFilterAccounts,
        SubscribeUpdateSlot, SubscribeUpdateAccountInfo
    },
    serde_json::{json, Value},
    solana_sdk::pubkey::Pubkey,
    bs58,
    hex,
};

#[derive(Debug, Clone, Parser)]
#[clap(author, version, about)]
struct Args {
    /// Service endpoint
    #[clap(short, long, default_value_t = String::from("https://solana-yellowstone-grpc.publicnode.com"))]
    endpoint: String,

    #[clap(long)]
    x_token: Option<String>,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    env::set_var(
        env_logger::DEFAULT_FILTER_ENV,
        env::var_os(env_logger::DEFAULT_FILTER_ENV).unwrap_or_else(|| "info".into()),
    );
    env_logger::init();

    let args = Args::parse();

    let mut client = GeyserGrpcClient::build_from_shared(args.endpoint)?
        .x_token(args.x_token)?
        .tls_config(ClientTlsConfig::new().with_native_roots())?
        .connect()
        .await?;
    let (mut subscribe_tx, mut stream) = client.subscribe().await?;

    type AccountFilterMap = HashMap<String, SubscribeRequestFilterAccounts>;
    let mut accounts: AccountFilterMap = HashMap::new();

    let pump = "pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA".to_string();
    let raydium = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8".to_string();
    accounts.insert(
        "client".to_string(),
        SubscribeRequestFilterAccounts {
            account: vec![],
            owner: vec![pump.clone(),raydium],
            filters: vec![],
            nonempty_txn_signature: None,
        },
    );

    futures::try_join!(
        async move {
            subscribe_tx
                .send(SubscribeRequest {
                    accounts,
                    commitment: Some(CommitmentLevel::Processed as i32),
                    ..Default::default()
                })
                .await?;

            let mut timer = interval(Duration::from_secs(3));
            let mut id = 0;
            loop {
                timer.tick().await;
                id += 1;
                subscribe_tx
                    .send(SubscribeRequest {
                        ping: Some(SubscribeRequestPing { id }),
                        ..Default::default()
                    })
                    .await?;
            }
            #[allow(unreachable_code)]
            Ok::<(), anyhow::Error>(())
        },
        async move {
            while let Some(message) = stream.next().await {
                match message?.update_oneof.expect("valid message") {
                    UpdateOneof::Slot(SubscribeUpdateSlot { slot, .. }) => {
                        info!("slot received: {slot}");
                    }
                    UpdateOneof::Ping(_msg) => {
                        info!("ping received");
                    }
                    UpdateOneof::Pong(SubscribeUpdatePong { id }) => {
                        info!("pong received: id#{id}");
                    }
                    UpdateOneof::Account(_msg) => {
                        let account = _msg.account.ok_or(anyhow::anyhow!("no account in the message"))?;
                        info!("account received");
                        let ammkey = Pubkey::try_from(account.pubkey).map_err(|_| anyhow::anyhow!("invalid account pubkey"))?.to_string();
                        info!("ammkey {}", ammkey);
                        let owner = Pubkey::try_from(account.owner).map_err(|_| anyhow::anyhow!("invalid account owner"))?.to_string();
                        info!("owner {}", owner);

                        if owner == pump {
                            // 反序列化PumpLayout结构体
                            #[derive(Debug)]
                            struct PumpLayout {
                                discriminator: u64,
                                pool_bump: u8,
                                index: u16,
                                creator: [u8; 32],
                                base_mint: [u8; 32],
                                quote_mint: [u8; 32],
                                lp_mint: [u8; 32],
                                base_vault: [u8; 32],
                                quote_vault: [u8; 32],
                            }
                            
                            let pump_data: PumpLayout = unsafe {
                                std::ptr::read(account.data.as_ptr() as *const _)
                            };
                            
                            info!("PumpLayout data:");
                            info!("  discriminator: {}", pump_data.discriminator);
                            info!("  pool_bump: {}", pump_data.pool_bump);
                            info!("  index: {}", pump_data.index);
                            info!("  creator: {}", bs58::encode(pump_data.creator).into_string());
                            info!("  base_mint: {}", bs58::encode(pump_data.base_mint).into_string());
                            info!("  quote_mint: {}", bs58::encode(pump_data.quote_mint).into_string());
                            info!("  lp_mint: {}", bs58::encode(pump_data.lp_mint).into_string());
                            info!("  base_vault: {}", bs58::encode(pump_data.base_vault).into_string());
                            info!("  quote_vault: {}", bs58::encode(pump_data.quote_vault).into_string());
                        }
                    }
                    msg => anyhow::bail!("received unexpected message: {msg:?}"),
                }
            }
            Ok::<(), anyhow::Error>(())
        }
    )?;

    Ok(())
}
[package]
name = "mevbot-ws-rust"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1.32.0", features = ["full"] }
tokio-tungstenite = "0.20.1"
futures-util = "0.3.28"
futures = "0.3.28"
serde = { version = "1.0", features = ["derive"] }
env_logger = "0.10"
anyhow = "1.0.62"
backoff = "0.4.0"
bincode = "1.3.3"
borsh = "0.9.1"
bs58 = "0.5.1"
chrono = "0.4.26"
clap = { version = "4.0", features = ["derive"] }
hex = "0.4.3"
indicatif = "0.17.9"
log = "0.4.17"
maplit = "1.0.2"
serde_json = "1.0.86"
solana-sdk = "~2.2.1"
solana-transaction-status = "~2.2.1"
tonic = "0.12.1"
yellowstone-grpc-client = "6.0.0"
yellowstone-grpc-proto = "6.0.0"