From 4199851f0368b5408a877e8b14752df4a1f83281 Mon Sep 17 00:00:00 2001 From: Ismail Akram Date: Sat, 10 Jan 2026 17:38:19 +0500 Subject: [PATCH] Implement call --- .github/workflows/rust.yml | 36 ++++++++---- Cargo.lock | 7 ++- Cargo.toml | 5 +- Makefile | 15 ++++- src/cli.rs | 76 ++++++++++++++++++++++++ src/commands/call.rs | 109 ++++++++++++++++++++++++++++++++++ src/commands/mod.rs | 4 ++ src/commands/publish.rs | 6 ++ src/commands/register.rs | 37 ++++++++++++ src/commands/subscribe.rs | 6 ++ src/config.rs | 116 +++++++++++++++++++++++++++++++++++++ src/main.rs | 72 ++++++++++++----------- src/utils.rs | 77 ++++++++++++++++++++++++ 13 files changed, 515 insertions(+), 51 deletions(-) create mode 100644 src/cli.rs create mode 100644 src/commands/call.rs create mode 100644 src/commands/mod.rs create mode 100644 src/commands/publish.rs create mode 100644 src/commands/register.rs create mode 100644 src/commands/subscribe.rs create mode 100644 src/config.rs create mode 100644 src/utils.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 7f05c5f..1c68f46 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -1,20 +1,34 @@ -name: Rust +name: WCLI on: push: - branches: [ "main" ] + branches: + - main pull_request: - branches: [ "main" ] - -env: - CARGO_TERM_COLOR: always + branches: + - main jobs: - build: - + lint: + name: Format & Clippy Check runs-on: ubuntu-latest + env: + CARGO_NET_GIT_FETCH_WITH_CLI: true steps: - - uses: actions/checkout@v4 - - name: Build - run: cargo build --verbose + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + profile: minimal + components: rustfmt, clippy + override: true + + - name: Check formatting + run: cargo fmt --all -- --check + + - name: Run clippy + run: cargo clippy --all-targets --all-features -- -D warnings diff --git a/Cargo.lock b/Cargo.lock index ca4d052..68f07a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1306,7 +1306,7 @@ checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "wampproto" version = "0.1.0" -source = "git+https://github.com/xconnio/wampproto-rust.git?rev=535b34522ecf030276ef4ce6be226cf2aefb9a9a#535b34522ecf030276ef4ce6be226cf2aefb9a9a" +source = "git+https://github.com/xconnio/wampproto-rust.git?rev=520130fa02343409578879959748b36f151bbc8d#520130fa02343409578879959748b36f151bbc8d" dependencies = [ "base64", "ed25519-dalek", @@ -1341,7 +1341,10 @@ name = "wcli" version = "0.1.0" dependencies = [ "clap", + "serde", + "serde_json", "tokio", + "wampproto", "xconn", ] @@ -1449,7 +1452,7 @@ checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" [[package]] name = "xconn" version = "0.1.0" -source = "git+https://github.com/xconnio/xconn-rust#45b756f4a76025db865fed63d7d84a642b4550d4" +source = "git+https://github.com/xconnio/xconn-rust.git?rev=484b2deade287937d3c3a65b6ad9d8b2ae2dd2d1#484b2deade287937d3c3a65b6ad9d8b2ae2dd2d1" dependencies = [ "async-trait", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index 5db701f..56887fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,5 +5,8 @@ edition = "2024" [dependencies] clap = { version = "4.5.54", features = ["derive"] } +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.149" tokio = { version = "1.49.0", features = ["full"] } -xconn = { git = "https://github.com/xconnio/xconn-rust", version = "0.1.0" } +wampproto = { git = "https://github.com/xconnio/wampproto-rust.git", rev = "520130fa02343409578879959748b36f151bbc8d" } +xconn = { git = "https://github.com/xconnio/xconn-rust.git", rev = "484b2deade287937d3c3a65b6ad9d8b2ae2dd2d1" } diff --git a/Makefile b/Makefile index b8bc7ab..ffb641d 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,16 @@ -build: - cargo build +lint: + cargo fmt -- --check + cargo clippy --all-targets --all-features -- -D warnings + +format: + cargo fmt -- run: cargo run + +build: + cargo build + +build-release: + cargo build --release + diff --git a/src/cli.rs b/src/cli.rs new file mode 100644 index 0000000..5c75ce3 --- /dev/null +++ b/src/cli.rs @@ -0,0 +1,76 @@ +use clap::{Parser, Subcommand}; + +#[derive(Parser)] +#[command(name = "wcli")] +#[command(about = "WAMP Command Line Interface", long_about = None)] +pub struct Cli { + /// The URL of the router to connect to + #[arg(long, default_value = "ws://localhost:8080/ws", global = true)] + pub url: String, + + /// The realm to join + #[arg(long, default_value = "realm1", global = true)] + pub realm: String, + + /// Authentication ID + #[arg(long, global = true)] + pub authid: Option, + + /// Authentication role + #[arg(long, global = true)] + pub authrole: Option, + + /// Secret for ticket/wampcra authentication + #[arg(long, global = true)] + pub secret: Option, + + /// Path to private key file for cryptosign + #[arg(long, global = true)] + pub private_key: Option, + + /// Ticket for ticket authentication + #[arg(long, global = true)] + pub ticket: Option, + + /// Serializer to use (json, msgpack, cbor) + #[arg(long, default_value = "json", global = true)] + pub serializer: String, + + #[command(subcommand)] + pub command: Commands, +} + +#[derive(Subcommand)] +pub enum Commands { + /// Call a procedure + Call { + /// Procedure to call + procedure: String, + + /// Positional arguments for the call + /// To enforce value is always a string, send value in quotes e.g. "'1'" or '"true"' + #[arg()] + args: Vec, + + /// Number of times to repeat the call per session + #[arg(long, default_value_t = 1)] + repeat: u32, + + /// Number of parallel sessions to create + #[arg(long, default_value_t = 1)] + parallel: u32, + + /// Maximum number of concurrent sessions + #[arg(long, default_value_t = 1)] + concurrency: usize, + }, + /// Register a procedure + Register { + /// Procedure to register + procedure: String, + }, + /// Subscribe to a topic + Subscribe, + /// Publish to a topic + Publish, +} diff --git a/src/commands/call.rs b/src/commands/call.rs new file mode 100644 index 0000000..d93e50d --- /dev/null +++ b/src/commands/call.rs @@ -0,0 +1,109 @@ +use crate::config::{CallConfig, ConnectionConfig}; +use crate::utils::{CommandOutput, ParsedArg, parse_arg, wamp_value_to_serde}; +use std::sync::Arc; +use tokio::sync::Semaphore; +use xconn::sync::CallRequest; + +/// Builds a CallRequest from the procedure name and parsed arguments. +fn build_call_request(procedure: &str, args: &[String]) -> CallRequest { + let mut request = CallRequest::new(procedure); + for arg in args { + request = match parse_arg(arg) { + ParsedArg::Integer(v) => request.arg(v), + ParsedArg::Float(v) => request.arg(v), + ParsedArg::Boolean(v) => request.arg(v), + ParsedArg::String(v) => request.arg(v), + }; + } + request +} + +/// Executes calls for a single session: connects, runs repeated calls, and disconnects. +async fn run_session( + conn_config: Arc, + call_config: Arc, + session_id: u32, +) { + let session = match conn_config.connect().await { + Ok(s) => s, + Err(e) => { + eprintln!("Session {} Connection Error: {}", session_id, e); + return; + } + }; + + for iteration in 0..call_config.repeat { + let request = build_call_request(&call_config.procedure, &call_config.args); + + match session.call(request).await { + Ok(result) => { + let output = CommandOutput { + args: result + .args + .as_ref() + .map(|a: &Vec| { + a.iter().map(wamp_value_to_serde).collect() + }) + .unwrap_or_default(), + kwargs: result + .kwargs + .as_ref() + .map( + |kw: &std::collections::HashMap| { + kw.iter() + .map(|(k, v)| (k.clone(), wamp_value_to_serde(v))) + .collect() + }, + ) + .unwrap_or_default(), + }; + match serde_json::to_string_pretty(&output) { + Ok(json) => println!("{}", json), + Err(e) => eprintln!( + "Session {} Iteration {} Error serializing result: {}", + session_id, iteration, e + ), + } + } + Err(e) => eprintln!( + "Session {} Iteration {} Call Error: {}", + session_id, iteration, e + ), + } + } + + if let Err(e) = session.leave().await { + eprintln!("Session {} Error leaving: {}", session_id, e); + } +} + +pub async fn handle( + conn_config: ConnectionConfig, + call_config: CallConfig, +) -> Result<(), Box> { + let semaphore = Arc::new(Semaphore::new(call_config.concurrency)); + let conn_config = Arc::new(conn_config); + let call_config = Arc::new(call_config); + + let mut handles = Vec::with_capacity(call_config.parallel as usize); + + for session_id in 0..call_config.parallel { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + + let conn_config = conn_config.clone(); + let call_config = call_config.clone(); + + let handle = tokio::spawn(async move { + let _permit = permit; + run_session(conn_config, call_config, session_id).await; + }); + + handles.push(handle); + } + + for handle in handles { + let _ = handle.await; + } + + Ok(()) +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs new file mode 100644 index 0000000..36a2c05 --- /dev/null +++ b/src/commands/mod.rs @@ -0,0 +1,4 @@ +pub mod call; +pub mod publish; +pub mod register; +pub mod subscribe; diff --git a/src/commands/publish.rs b/src/commands/publish.rs new file mode 100644 index 0000000..d8ed5cf --- /dev/null +++ b/src/commands/publish.rs @@ -0,0 +1,6 @@ +use xconn::async_::session::Session; + +pub async fn handle(_session: &Session) -> Result<(), Box> { + println!("Subcommand 'publish' executed"); + Ok(()) +} diff --git a/src/commands/register.rs b/src/commands/register.rs new file mode 100644 index 0000000..07e5ad6 --- /dev/null +++ b/src/commands/register.rs @@ -0,0 +1,37 @@ +use crate::utils::{CommandOutput, wamp_async_value_to_serde}; +use tokio::signal; +use xconn::async_::session::Session; +use xconn::async_::{Invocation, RegisterRequest, Yield}; + +async fn registration_handler(inv: Invocation) -> Yield { + let output = CommandOutput { + args: inv.args.iter().map(wamp_async_value_to_serde).collect(), + kwargs: inv + .kwargs + .iter() + .map(|(k, v): (_, _)| (k.clone(), wamp_async_value_to_serde(v))) + .collect(), + }; + + match serde_json::to_string_pretty(&output) { + Ok(json) => println!("{}", json), + Err(e) => println!("Error serializing invocation: {}", e), + } + + Yield::new(inv.args, inv.kwargs) +} + +pub async fn handle(session: &Session, procedure: &str) -> Result<(), Box> { + let register_request = RegisterRequest::new(procedure, registration_handler); + + match session.register(register_request).await { + Ok(reg) => println!("Registered procedure {}: {:?}", procedure, reg), + Err(e) => println!("Error registering procedure: {}", e), + } + + println!("Press Ctrl+C to exit"); + signal::ctrl_c().await?; + println!("Exiting..."); + + Ok(()) +} diff --git a/src/commands/subscribe.rs b/src/commands/subscribe.rs new file mode 100644 index 0000000..817e12d --- /dev/null +++ b/src/commands/subscribe.rs @@ -0,0 +1,6 @@ +use xconn::async_::session::Session; + +pub async fn handle(_session: &Session) -> Result<(), Box> { + println!("Subcommand 'subscribe' executed"); + Ok(()) +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..69e3f64 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,116 @@ +use crate::cli::Cli; +use std::collections::HashMap; +use wampproto::authenticators::anonymous::AnonymousAuthenticator; +use wampproto::authenticators::authenticator::ClientAuthenticator; +use wampproto::authenticators::cryptosign::CryptoSignAuthenticator; +use wampproto::authenticators::ticket::TicketAuthenticator; +use wampproto::authenticators::wampcra::WAMPCRAAuthenticator; +use wampproto::messages::types::Value; +use xconn::async_::client::Client; +use xconn::async_::session::Session; +use xconn::sync::{CBORSerializerSpec, JSONSerializerSpec, MsgPackSerializerSpec, SerializerSpec}; + +/// Global connection and authentication configuration. +#[derive(Debug, Clone)] +pub struct ConnectionConfig { + pub url: String, + pub realm: String, + pub authid: Option, + pub authrole: Option, + pub secret: Option, + pub private_key: Option, + pub ticket: Option, + pub serializer: String, +} + +impl ConnectionConfig { + /// Connects to the router using the configured serializer and authentication method. + pub async fn connect(&self) -> Result> { + let serializer = self.create_serializer()?; + let authenticator = self.create_authenticator()?; + + let client = Client::new(serializer, authenticator); + client + .connect(&self.url, &self.realm) + .await + .map_err(|e| Box::new(e) as Box) + } + + /// Creates the appropriate serializer based on the --serializer option. + fn create_serializer(&self) -> Result, String> { + match self.serializer.to_lowercase().as_str() { + "json" => Ok(Box::new(JSONSerializerSpec)), + "msgpack" => Ok(Box::new(MsgPackSerializerSpec)), + "cbor" => Ok(Box::new(CBORSerializerSpec)), + other => Err(format!( + "Unknown serializer '{}'. Valid options: json, msgpack, cbor", + other + )), + } + } + + /// Creates the appropriate authenticator based on authentication options. + /// Priority: --private-key > --secret > --ticket > anonymous + fn create_authenticator( + &self, + ) -> Result, Box> { + let authid = self.authid.as_deref().unwrap_or(""); + let extra = self.build_auth_extra(); + + // Check for cryptosign (private key) + if let Some(ref private_key) = self.private_key { + let auth = CryptoSignAuthenticator::try_new(authid, private_key, extra) + .map_err(|e| format!("Failed to create CryptoSign authenticator: {}", e))?; + return Ok(Box::new(auth)); + } + + // Check for WAMP-CRA (secret) + if let Some(ref secret) = self.secret { + let auth = WAMPCRAAuthenticator::new(authid, secret, extra); + return Ok(Box::new(auth)); + } + + // Check for ticket authentication + if let Some(ref ticket) = self.ticket { + let auth = TicketAuthenticator::new(authid, ticket, extra); + return Ok(Box::new(auth)); + } + + // Default to anonymous + Ok(Box::new(AnonymousAuthenticator::new(authid, extra))) + } + + /// Builds the authentication extra HashMap, including authrole if specified. + fn build_auth_extra(&self) -> HashMap { + let mut extra = HashMap::new(); + if let Some(ref role) = self.authrole { + extra.insert("authrole".to_string(), Value::Str(role.clone())); + } + extra + } +} + +impl From<&Cli> for ConnectionConfig { + fn from(cli: &Cli) -> Self { + Self { + url: cli.url.clone(), + realm: cli.realm.clone(), + authid: cli.authid.clone(), + authrole: cli.authrole.clone(), + secret: cli.secret.clone(), + private_key: cli.private_key.clone(), + ticket: cli.ticket.clone(), + serializer: cli.serializer.clone(), + } + } +} + +/// Configuration specific to the Call command. +#[derive(Debug, Clone)] +pub struct CallConfig { + pub procedure: String, + pub args: Vec, + pub repeat: u32, + pub parallel: u32, + pub concurrency: usize, +} diff --git a/src/main.rs b/src/main.rs index 980f046..7c93fab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,51 +1,53 @@ -use clap::{Parser, Subcommand}; +mod cli; +mod commands; +mod config; +mod utils; -#[derive(Parser)] -#[command(name = "wcli")] -#[command(about = "WAMP Command Line Interface", long_about = None)] -struct Cli { - /// The URL of the router to connect to - #[arg(long, default_value = "ws://localhost:8080/ws", global = true)] - url: String, - - /// The realm to join - #[arg(long, default_value = "realm1", global = true)] - realm: String, - - #[command(subcommand)] - command: Commands, -} - -#[derive(Subcommand)] -enum Commands { - /// Call a procedure - Call, - /// Register a procedure - Register, - /// Subscribe to a topic - Subscribe, - /// Publish to a topic - Publish, -} +use clap::Parser; +use cli::{Cli, Commands}; +use config::{CallConfig, ConnectionConfig}; #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { let cli = Cli::parse(); println!("Connecting to {} in realm {}", cli.url, cli.realm); + let conn_config = ConnectionConfig::from(&cli); + match cli.command { - Commands::Call => { - println!("Subcommand 'call' executed"); + Commands::Call { + procedure, + args, + repeat, + parallel, + concurrency, + } => { + let call_config = CallConfig { + procedure, + args, + repeat, + parallel, + concurrency, + }; + commands::call::handle(conn_config, call_config).await?; } - Commands::Register => { - println!("Subcommand 'register' executed"); + Commands::Register { procedure } => { + let session = conn_config.connect().await?; + commands::register::handle(&session, &procedure).await?; + session.leave().await?; } Commands::Subscribe => { - println!("Subcommand 'subscribe' executed"); + let session = conn_config.connect().await?; + commands::subscribe::handle(&session).await?; + session.leave().await?; } Commands::Publish => { - println!("Subcommand 'publish' executed"); + let session = conn_config.connect().await?; + commands::publish::handle(&session).await?; + session.leave().await?; } } + + Ok(()) } diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..db3e320 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,77 @@ +use serde::Serialize; +use serde_json::Value as SerdeValue; +use xconn::sync::Value as WampValue; + +#[derive(Debug)] +pub enum ParsedArg { + Integer(i64), + Float(f64), + Boolean(bool), + String(String), +} + +pub fn parse_arg(input: &str) -> ParsedArg { + // Check for quoted strings to enforce string type + if ((input.starts_with('\'') && input.ends_with('\'')) + || (input.starts_with('"') && input.ends_with('"'))) + && input.len() >= 2 + { + return ParsedArg::String(input[1..input.len() - 1].to_string()); + } + + if let Ok(i) = input.parse::() { + return ParsedArg::Integer(i); + } + + if let Ok(f) = input.parse::() { + return ParsedArg::Float(f); + } + + if let Ok(b) = input.parse::() { + return ParsedArg::Boolean(b); + } + + ParsedArg::String(input.to_string()) +} + +#[derive(Serialize)] +pub struct CommandOutput { + pub args: Vec, + pub kwargs: std::collections::HashMap, +} + +pub fn wamp_value_to_serde(v: &WampValue) -> SerdeValue { + match v { + WampValue::Int(i) => SerdeValue::Number((*i).into()), + WampValue::Str(s) => SerdeValue::String(s.clone()), + WampValue::Bool(b) => SerdeValue::Bool(*b), + WampValue::Float(f) => serde_json::json!(f), + WampValue::List(l) => SerdeValue::Array(l.iter().map(wamp_value_to_serde).collect()), + WampValue::Dict(d) => SerdeValue::Object( + d.iter() + .map(|(k, v)| (k.clone(), wamp_value_to_serde(v))) + .collect(), + ), + WampValue::Bytes(_) => SerdeValue::String("".to_string()), + _ => SerdeValue::Null, + } +} + +pub fn wamp_async_value_to_serde(v: &xconn::async_::Value) -> SerdeValue { + match v { + xconn::async_::Value::Int(i) => SerdeValue::Number((*i).into()), + xconn::async_::Value::Str(s) => SerdeValue::String(s.clone()), + xconn::async_::Value::Bool(b) => SerdeValue::Bool(*b), + xconn::async_::Value::Float(f) => serde_json::json!(f), + xconn::async_::Value::List(l) => { + SerdeValue::Array(l.iter().map(wamp_async_value_to_serde).collect()) + } + xconn::async_::Value::Dict(d) => SerdeValue::Object( + d.iter() + .map(|(k, v)| (k.clone(), wamp_async_value_to_serde(v))) + .collect(), + ), + xconn::async_::Value::Bytes(_) => SerdeValue::String("".to_string()), + _ => SerdeValue::Null, + } +}