Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -1,20 +1,34 @@
name: Rust
name: WCLI

on:
push:
branches: [ "main" ]
branches:
- main
pull_request:
branches: [ "main" ]

env:
CARGO_TERM_COLOR: always
branches:
- main

jobs:
build:

lint:
name: Format & Clippy Check
runs-on: ubuntu-latest
env:
CARGO_NET_GIT_FETCH_WITH_CLI: true

steps:
- uses: actions/checkout@v4
- name: Build
run: cargo build --verbose
- name: Checkout code
uses: actions/checkout@v4

- name: Install Rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
components: rustfmt, clippy
override: true

- name: Check formatting
run: cargo fmt --all -- --check

- name: Run clippy
run: cargo clippy --all-targets --all-features -- -D warnings
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ edition = "2024"

[dependencies]
clap = { version = "4.5.54", features = ["derive"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149"
tokio = { version = "1.49.0", features = ["full"] }
xconn = { git = "https://github.com/xconnio/xconn-rust", version = "0.1.0" }
wampproto = { git = "https://github.com/xconnio/wampproto-rust.git", rev = "520130fa02343409578879959748b36f151bbc8d" }
xconn = { git = "https://github.com/xconnio/xconn-rust.git", rev = "484b2deade287937d3c3a65b6ad9d8b2ae2dd2d1" }
15 changes: 13 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
build:
cargo build
lint:
cargo fmt -- --check
cargo clippy --all-targets --all-features -- -D warnings

format:
cargo fmt --

run:
cargo run

build:
cargo build

build-release:
cargo build --release

76 changes: 76 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use clap::{Parser, Subcommand};

#[derive(Parser)]
#[command(name = "wcli")]
#[command(about = "WAMP Command Line Interface", long_about = None)]
pub struct Cli {
/// The URL of the router to connect to
#[arg(long, default_value = "ws://localhost:8080/ws", global = true)]
pub url: String,

/// The realm to join
#[arg(long, default_value = "realm1", global = true)]
pub realm: String,

/// Authentication ID
#[arg(long, global = true)]
pub authid: Option<String>,

/// Authentication role
#[arg(long, global = true)]
pub authrole: Option<String>,

/// Secret for ticket/wampcra authentication
#[arg(long, global = true)]
pub secret: Option<String>,

/// Path to private key file for cryptosign
#[arg(long, global = true)]
pub private_key: Option<String>,

/// Ticket for ticket authentication
#[arg(long, global = true)]
pub ticket: Option<String>,

/// Serializer to use (json, msgpack, cbor)
#[arg(long, default_value = "json", global = true)]
pub serializer: String,

#[command(subcommand)]
pub command: Commands,
}

#[derive(Subcommand)]
pub enum Commands {
/// Call a procedure
Call {
/// Procedure to call
procedure: String,

/// Positional arguments for the call
/// To enforce value is always a string, send value in quotes e.g. "'1'" or '"true"'
#[arg()]
args: Vec<String>,

/// Number of times to repeat the call per session
#[arg(long, default_value_t = 1)]
repeat: u32,

/// Number of parallel sessions to create
#[arg(long, default_value_t = 1)]
parallel: u32,

/// Maximum number of concurrent sessions
#[arg(long, default_value_t = 1)]
concurrency: usize,
},
/// Register a procedure
Register {
/// Procedure to register
procedure: String,
},
/// Subscribe to a topic
Subscribe,
/// Publish to a topic
Publish,
}
109 changes: 109 additions & 0 deletions src/commands/call.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use crate::config::{CallConfig, ConnectionConfig};
use crate::utils::{CommandOutput, ParsedArg, parse_arg, wamp_value_to_serde};
use std::sync::Arc;
use tokio::sync::Semaphore;
use xconn::sync::CallRequest;

/// Builds a CallRequest from the procedure name and parsed arguments.
fn build_call_request(procedure: &str, args: &[String]) -> CallRequest {
let mut request = CallRequest::new(procedure);
for arg in args {
request = match parse_arg(arg) {
ParsedArg::Integer(v) => request.arg(v),
ParsedArg::Float(v) => request.arg(v),
ParsedArg::Boolean(v) => request.arg(v),
ParsedArg::String(v) => request.arg(v),
};
}
request
}

/// Executes calls for a single session: connects, runs repeated calls, and disconnects.
async fn run_session(
conn_config: Arc<ConnectionConfig>,
call_config: Arc<CallConfig>,
session_id: u32,
) {
let session = match conn_config.connect().await {
Ok(s) => s,
Err(e) => {
eprintln!("Session {} Connection Error: {}", session_id, e);
return;
}
};

for iteration in 0..call_config.repeat {
let request = build_call_request(&call_config.procedure, &call_config.args);

match session.call(request).await {
Ok(result) => {
let output = CommandOutput {
args: result
.args
.as_ref()
.map(|a: &Vec<xconn::sync::Value>| {
a.iter().map(wamp_value_to_serde).collect()
})
.unwrap_or_default(),
kwargs: result
.kwargs
.as_ref()
.map(
|kw: &std::collections::HashMap<String, xconn::sync::Value>| {
kw.iter()
.map(|(k, v)| (k.clone(), wamp_value_to_serde(v)))
.collect()
},
)
.unwrap_or_default(),
};
match serde_json::to_string_pretty(&output) {
Ok(json) => println!("{}", json),
Err(e) => eprintln!(
"Session {} Iteration {} Error serializing result: {}",
session_id, iteration, e
),
}
}
Err(e) => eprintln!(
"Session {} Iteration {} Call Error: {}",
session_id, iteration, e
),
}
}

if let Err(e) = session.leave().await {
eprintln!("Session {} Error leaving: {}", session_id, e);
}
}

pub async fn handle(
conn_config: ConnectionConfig,
call_config: CallConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let semaphore = Arc::new(Semaphore::new(call_config.concurrency));
let conn_config = Arc::new(conn_config);
let call_config = Arc::new(call_config);

let mut handles = Vec::with_capacity(call_config.parallel as usize);

for session_id in 0..call_config.parallel {
let permit = semaphore.clone().acquire_owned().await.unwrap();

let conn_config = conn_config.clone();
let call_config = call_config.clone();

let handle = tokio::spawn(async move {
let _permit = permit;
run_session(conn_config, call_config, session_id).await;
});

handles.push(handle);
}

for handle in handles {
let _ = handle.await;
}

Ok(())
}
4 changes: 4 additions & 0 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod call;
pub mod publish;
pub mod register;
pub mod subscribe;
6 changes: 6 additions & 0 deletions src/commands/publish.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use xconn::async_::session::Session;

pub async fn handle(_session: &Session) -> Result<(), Box<dyn std::error::Error>> {
println!("Subcommand 'publish' executed");
Ok(())
}
37 changes: 37 additions & 0 deletions src/commands/register.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use crate::utils::{CommandOutput, wamp_async_value_to_serde};
use tokio::signal;
use xconn::async_::session::Session;
use xconn::async_::{Invocation, RegisterRequest, Yield};

async fn registration_handler(inv: Invocation) -> Yield {
let output = CommandOutput {
args: inv.args.iter().map(wamp_async_value_to_serde).collect(),
kwargs: inv
.kwargs
.iter()
.map(|(k, v): (_, _)| (k.clone(), wamp_async_value_to_serde(v)))
.collect(),
};

match serde_json::to_string_pretty(&output) {
Ok(json) => println!("{}", json),
Err(e) => println!("Error serializing invocation: {}", e),
}

Yield::new(inv.args, inv.kwargs)
}

pub async fn handle(session: &Session, procedure: &str) -> Result<(), Box<dyn std::error::Error>> {
let register_request = RegisterRequest::new(procedure, registration_handler);

match session.register(register_request).await {
Ok(reg) => println!("Registered procedure {}: {:?}", procedure, reg),
Err(e) => println!("Error registering procedure: {}", e),
}

println!("Press Ctrl+C to exit");
signal::ctrl_c().await?;
println!("Exiting...");

Ok(())
}
6 changes: 6 additions & 0 deletions src/commands/subscribe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use xconn::async_::session::Session;

pub async fn handle(_session: &Session) -> Result<(), Box<dyn std::error::Error>> {
println!("Subcommand 'subscribe' executed");
Ok(())
}
Loading
Loading