diff --git a/.omp/rules/asterisk.md b/.omp/rules/asterisk.md index 40eaf78..ab3facb 100644 --- a/.omp/rules/asterisk.md +++ b/.omp/rules/asterisk.md @@ -36,3 +36,9 @@ globs: - REST endpoints use Basic Auth on every request; credentials are not session-based. - Resources: channels, bridges, endpoints, device states, mailboxes, sounds, recordings, playbacks. - Stasis application model: channels enter stasis via dialplan `Stasis(appname)`, controlled via REST. + + +## Testing + +- No inline tests (`#[cfg(test)]`) in any protocol crate. All tests live in `tests/` (`asterisk-rs-tests` crate). +- Mock servers for each protocol are in `tests/src/mock/` (MockAmiServer, MockAriServer, MockAgiClient). \ No newline at end of file diff --git a/.omp/rules/rust.md b/.omp/rules/rust.md index 52acc8f..c443eb8 100644 --- a/.omp/rules/rust.md +++ b/.omp/rules/rust.md @@ -37,6 +37,12 @@ globs: - Each protocol crate (ami, agi, ari) is independently usable. - `asterisk-rs` is the umbrella re-export. It adds no logic, only pub use. +## Testing + +- No `#[cfg(test)]` or inline test modules in production crates. All tests live in the external `tests/` crate (`asterisk-rs-tests`). +- Unit, mock integration, and live integration tests are separate binaries in `tests/`. +- Run tests with `cargo test -p asterisk-rs-tests`, never with per-crate `cargo test -p asterisk-rs-ami`. + ## Build ```bash diff --git a/AGENTS.md b/AGENTS.md index 661dbda..c94d094 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -339,9 +339,15 @@ Located in each crate's `examples/` directory (not workspace root): |---------|-------|-------------| | `ami_originate.rs` | asterisk-rs-ami | Builder, OriginateAction, response handling | | `ami_events.rs` | asterisk-rs-ami | Event subscription loop | +| `ami_call_tracker.rs` | asterisk-rs-ami | CallTracker, CompletedCall records, background task | | `agi_server.rs` | asterisk-rs-agi | AgiHandler impl, channel operations | +| `agi_ivr.rs` | asterisk-rs-agi | IVR menu, DTMF, AstDB, variables, branching | | `ari_stasis_app.rs` | asterisk-rs-ari | Stasis event loop, ChannelHandle | | `ari_bridge.rs` | asterisk-rs-ari | Bridge creation, channel origination | +| `ari_recording.rs` | asterisk-rs-ari | Channel recording and playback | +| `ari_pending.rs` | asterisk-rs-ari | Race-free PendingChannel origination | +| `ari_websocket_transport.rs` | asterisk-rs-ari | Unified WebSocket transport mode | +| `pbx_dial.rs` | asterisk-rs | Pbx high-level dial, wait_for_answer, CompletedCall | All examples require a running Asterisk instance and use `tracing_subscriber` (dev-dependency). diff --git a/Cargo.lock b/Cargo.lock index 9e505f0..d6c3cb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,8 @@ dependencies = [ "asterisk-rs-core", "thiserror 2.0.18", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] diff --git a/README.md b/README.md index 454f610..59944c8 100644 --- a/README.md +++ b/README.md @@ -13,13 +13,15 @@ channels, bridges, queues, and recordings across all three Asterisk interfaces. - **AGI** -- run dialplan logic from your Rust service. FastAGI server with typed async commands. - **ARI** -- full call control via REST + WebSocket. Resource handles, typed events with metadata. -## Example +## Quick Example ```rust,ignore use asterisk_rs::ami::{AmiClient, AmiEvent}; #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + let client = AmiClient::builder() .host("10.0.0.1") .credentials("admin", "secret") @@ -33,7 +35,7 @@ async fn main() -> Result<(), Box> { while let Some(event) = hangups.recv().await { if let AmiEvent::Hangup { channel, cause, cause_txt, .. } = event { - println!("{channel} hung up: {cause} ({cause_txt})"); + tracing::info!(%channel, %cause, %cause_txt, "channel hung up"); } } @@ -43,26 +45,56 @@ async fn main() -> Result<(), Box> { ## Install +Use the umbrella crate to pull in whichever protocols you need: + +```toml +[dependencies] +asterisk-rs = "0.1" +``` + +Or add individual protocol crates directly: + ```toml [dependencies] -asterisk-rs = "0.2" +asterisk-rs-ami = "0.4" # AMI only +asterisk-rs-agi = "0.2" # AGI only +asterisk-rs-ari = "0.4" # ARI only ``` -Or pick individual protocols: +## Feature Selection + +The umbrella crate enables all protocols by default. To select only what you need: ```toml [dependencies] -asterisk-rs-ami = "0.2" # AMI only -asterisk-rs-agi = "0.1" # AGI only -asterisk-rs-ari = "0.2" # ARI only +asterisk-rs = { version = "0.1", default-features = false, features = ["ami"] } +# or: features = ["agi"] +# or: features = ["ari"] +# or: features = ["ami", "ari"] ``` +Available features: `ami`, `agi`, `ari`. The `pbx` abstraction requires `ami`. + +## Protocols + +| Protocol | Default Port | Transport | Use Case | +|----------|-------------|-----------|----------| +| [AMI](https://docs.rs/asterisk-rs-ami) | 5038 | TCP | Monitoring, call control, system management | +| [AGI](https://docs.rs/asterisk-rs-agi) | 4573 | TCP | Dialplan logic, IVR, call routing | +| [ARI](https://docs.rs/asterisk-rs-ari) | 8088 | HTTP + WS | Stasis applications, full media control | + ## Capabilities -- Typed actions, events, and commands for the full Asterisk 23 protocol surface +- Typed actions, events, and commands for the full Asterisk protocol surface - Filtered event subscriptions -- receive only what you need - Event-collecting actions -- `send_collecting()` gathers multi-event responses (Status, QueueStatus, etc.) - Automatic reconnection with exponential backoff, jitter, and re-authentication +- **Call tracker** -- correlates AMI events into `CompletedCall` records (channel, duration, cause, full event log) +- **PBX abstraction** -- `Pbx::dial()` wraps originate + OriginateResponse correlation into one async call +- **Pending resources** -- ARI `PendingChannel`/`PendingBridge` pre-subscribe before REST to eliminate event races +- **Transport modes** -- ARI supports HTTP (request/response) or WebSocket (bidirectional streaming) +- **Outbound WebSocket server** -- `AriServer` accepts Asterisk 22+ outbound WS connections +- **Media channel** -- low-level audio I/O over WebSocket for external media applications - Resource handles for ARI (ChannelHandle, BridgeHandle, PlaybackHandle, RecordingHandle) - Domain types for hangup causes, channel states, device states, dial statuses, and more - ARI event metadata (application, timestamp, asterisk_id) on every event @@ -71,17 +103,171 @@ asterisk-rs-ari = "0.2" # ARI only - `#[non_exhaustive]` enums -- new variants won't break your code - Structured logging via `tracing` -## Protocols +## More Examples -| Protocol | Default Port | Transport | Use Case | -|----------|-------------|-----------|----------| -| [AMI](https://docs.rs/asterisk-rs-ami) | 5038 | TCP | Monitoring, call control, system management | -| [AGI](https://docs.rs/asterisk-rs-agi) | 4573 | TCP | Dialplan logic, IVR, call routing | -| [ARI](https://docs.rs/asterisk-rs-ari) | 8088 | HTTP + WS | Stasis applications, full media control | +### AMI: call tracker + +```rust,ignore +use asterisk_rs::ami::AmiClient; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let client = AmiClient::builder() + .host("127.0.0.1") + .credentials("admin", "secret") + .build() + .await?; + + let (tracker, mut rx) = client.call_tracker(); + + while let Some(call) = rx.recv().await { + tracing::info!( + channel = %call.channel, + duration = ?call.duration, + cause = %call.cause_txt, + "call completed" + ); + } + + tracker.shutdown(); + Ok(()) +} +``` + +### AGI: IVR handler + +```rust,ignore +use asterisk_rs::agi::{AgiChannel, AgiHandler, AgiRequest, AgiServer}; + +struct IvrHandler; + +impl AgiHandler for IvrHandler { + async fn handle(&self, _request: AgiRequest, mut channel: AgiChannel) + -> asterisk_rs::agi::error::Result<()> + { + channel.answer().await?; + channel.stream_file("welcome", "#").await?; + let response = channel.get_data("press-ext", 5000, 4).await?; + tracing::info!(digits = response.result, "caller input"); + channel.hangup(None).await?; + Ok(()) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let (server, _shutdown) = AgiServer::builder() + .bind("0.0.0.0:4573") + .handler(IvrHandler) + .max_connections(100) + .build() + .await?; + + server.run().await?; + Ok(()) +} +``` + +### ARI: pending channel + +```rust,ignore +use asterisk_rs::ari::config::AriConfigBuilder; +use asterisk_rs::ari::{AriClient, AriEvent, PendingChannel, TransportMode}; +use asterisk_rs::ari::resources::channel::OriginateParams; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let config = AriConfigBuilder::new("my-app") + .host("127.0.0.1") + .port(8088) + .username("asterisk") + .password("asterisk") + .build()?; + + let client = AriClient::connect(config).await?; + + // pre-subscribe before originate so no events are missed + let pending = client.channel(); + let params = OriginateParams { + endpoint: "PJSIP/100".into(), + app: Some("my-app".into()), + ..Default::default() + }; + let (handle, mut events) = pending.originate(params).await?; + + while let Some(msg) = events.recv().await { + match msg.event { + AriEvent::StasisStart { .. } => { + handle.answer().await?; + handle.play("sound:hello-world").await?; + handle.hangup(None).await?; + } + AriEvent::ChannelDestroyed { cause_txt, .. } => { + tracing::info!(%cause_txt, "channel destroyed"); + break; + } + _ => {} + } + } + + Ok(()) +} +``` + +### PBX: dial and wait + +```rust,ignore +use asterisk_rs::ami::AmiClient; +use asterisk_rs::pbx::{DialOptions, Pbx}; +use std::time::Duration; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let client = AmiClient::builder() + .host("127.0.0.1") + .credentials("admin", "secret") + .build() + .await?; + + let mut pbx = Pbx::new(client); + + let call = pbx.dial( + "PJSIP/100", + "200", + Some( + DialOptions::new() + .caller_id("Rust PBX <100>") + .timeout_ms(30000), + ), + ).await?; + + call.wait_for_answer(Duration::from_secs(30)).await?; + tracing::info!("call answered"); + + call.hangup().await?; + + if let Some(completed) = pbx.next_completed_call().await { + tracing::info!(duration = ?completed.duration, cause = %completed.cause_txt, "call record"); + } + + Ok(()) +} +``` ## Documentation - [API Reference (docs.rs)](https://docs.rs/asterisk-rs) +- [AMI crate docs](https://docs.rs/asterisk-rs-ami) +- [AGI crate docs](https://docs.rs/asterisk-rs-agi) +- [ARI crate docs](https://docs.rs/asterisk-rs-ari) - [User Guide](https://deadcode-walker.github.io/asterisk-rs/) ## MSRV diff --git a/crates/asterisk-rs-agi/README.md b/crates/asterisk-rs-agi/README.md index a7e2210..07faceb 100644 --- a/crates/asterisk-rs-agi/README.md +++ b/crates/asterisk-rs-agi/README.md @@ -6,7 +6,7 @@ Async Rust FastAGI server for the Asterisk Gateway Interface. Answer calls, collect DTMF, play prompts, query databases, and control call flow. -## Example +## Quick Start ```rust,ignore use asterisk_rs_agi::{AgiServer, AgiHandler, AgiRequest, AgiChannel}; @@ -40,11 +40,83 @@ async fn main() -> Result<(), Box> { } ``` +## Available Commands + +### Call control + +| Method | Description | +|---|---| +| `answer()` | Answer the channel | +| `hangup(channel)` | Hang up a channel (pass `None` for current) | +| `channel_status(channel)` | Query channel state | +| `set_autohangup(secs)` | Schedule automatic hangup after N seconds | + +### Audio + +| Method | Description | +|---|---| +| `stream_file(file, escape_digits)` | Play a file; returns digit pressed or empty | +| `control_stream_file(file, escape, fwd_ms, rew_key, pause_key)` | Play with skip/pause controls | +| `get_option(file, escape_digits, timeout_ms)` | Play and wait for a single DTMF digit | +| `record_file(file, format, escape, timeout_ms, beep, silence, max_duration)` | Record audio | +| `set_music(on, class)` | Enable or disable music-on-hold | + +### DTMF and speech + +| Method | Description | +|---|---| +| `get_data(prompt, timeout_ms, max_digits)` | Collect a DTMF string | +| `wait_for_digit(timeout_ms)` | Wait for a single keypress | +| `say_digits(digits, escape_digits)` | Speak digits individually | +| `say_number(number, escape_digits)` | Speak a number as a cardinal | +| `say_alpha(text, escape_digits)` | Spell out characters | +| `say_phonetic(text, escape_digits)` | Spell using the NATO phonetic alphabet | +| `say_date(epoch_secs, escape_digits)` | Speak a date | +| `say_time(epoch_secs, escape_digits)` | Speak a time | +| `say_datetime(epoch_secs, escape_digits, format, tz)` | Speak a formatted date/time | + +### Variables + +| Method | Description | +|---|---| +| `set_variable(name, value)` | Set a channel variable | +| `get_variable(name)` | Get a channel variable | +| `get_full_variable(expression)` | Evaluate a dialplan expression (e.g. `${CHANNEL}`) | + +### Database + +| Method | Description | +|---|---| +| `database_get(family, key)` | Fetch a value from AstDB | +| `database_put(family, key, value)` | Store a value in AstDB | +| `database_del(family, key)` | Delete a key from AstDB | +| `database_deltree(family, keytree)` | Delete a key subtree from AstDB | + +### Dialplan + +| Method | Description | +|---|---| +| `exec(application, options)` | Run a dialplan application | +| `gosub(context, extension, priority)` | Jump to a dialplan subroutine | +| `set_context(context)` | Change the active dialplan context | +| `set_extension(extension)` | Change the active extension | +| `set_priority(priority)` | Change the active priority | + +### Other + +| Method | Description | +|---|---| +| `verbose(message, level)` | Write a message to the Asterisk verbose log | +| `set_callerid(callerid)` | Override the caller ID string | +| `noop()` | No-op; useful for keepalive or testing | + ## Capabilities -- Every AGI command with typed async methods +- 60+ async commands covering the full AGI specification - Handler trait using native async fn (RPITIT, no macro needed) -- Request environment parsing from Asterisk +- Request environment parsing (caller ID, channel, context, extension, and more) +- Automatic channel hangup detection +- Every AGI command with typed async methods - Configurable concurrency limits via semaphore - Graceful shutdown via `ShutdownHandle` - Argument quoting and escaping for special characters diff --git a/crates/asterisk-rs-agi/examples/agi_ivr.rs b/crates/asterisk-rs-agi/examples/agi_ivr.rs new file mode 100644 index 0000000..d1dd3d5 --- /dev/null +++ b/crates/asterisk-rs-agi/examples/agi_ivr.rs @@ -0,0 +1,164 @@ +//! FastAGI IVR (interactive voice response) menu example. +//! +//! Demonstrates a realistic phone menu backed by AstDB, with DTMF collection, +//! variable management, and branching logic. +//! +//! # Dialplan entry +//! +//! ```text +//! exten => s,1,AGI(agi://127.0.0.1:4573) +//! ``` +//! +//! # Run +//! +//! ```text +//! cargo run --example agi_ivr -p asterisk-rs-agi +//! ``` + +use asterisk_rs_agi::{AgiChannel, AgiHandler, AgiRequest, AgiServer}; + +// maximum times to offer the main menu before hanging up +const MAX_RETRIES: u32 = 3; + +// AstDB family used for account lookups +const DB_FAMILY: &str = "accounts"; + +struct IvrHandler; + +impl AgiHandler for IvrHandler { + async fn handle( + &self, + request: AgiRequest, + mut channel: AgiChannel, + ) -> asterisk_rs_agi::error::Result<()> { + let caller = request.caller_id().unwrap_or("unknown"); + tracing::info!(caller, "IVR session started"); + + channel.answer().await?; + + // stash the caller ID so dialplan extensions can read it back + channel.set_variable("IVR_CALLER", caller).await?; + + // verify the variable round-trips (useful during development) + let var_resp = channel.get_variable("IVR_CALLER").await?; + tracing::info!(result = var_resp.result, "IVR_CALLER variable confirmed"); + + // seed AstDB with a demo account balance if not already present + let balance_resp = channel.database_get(DB_FAMILY, "balance").await?; + if balance_resp.result == 0 { + // result == 0 means key not found; write an initial value + channel.database_put(DB_FAMILY, "balance", "1500").await?; + } + + let mut retries = 0u32; + loop { + if retries >= MAX_RETRIES { + tracing::info!("max retries reached, hanging up"); + channel.stream_file("goodbye", "#").await?; + break; + } + + // play the main menu prompt and collect one digit (5 s timeout) + channel.stream_file("main-menu", "#").await?; + let dtmf = channel.get_data("press-1-or-2", 5000, 1).await?; + + // result == -1 means timeout with no digit; result == 0 can mean + // empty input depending on Asterisk version — treat both as retry + if dtmf.result <= 0 { + tracing::info!("no digit received, retrying"); + channel.stream_file("please-try-again", "#").await?; + retries += 1; + continue; + } + + match dtmf.result { + 1 => { + // option 1 — account balance + if let Err(e) = handle_account_info(&mut channel).await { + tracing::error!(error = %e, "account info branch failed"); + } + break; + } + 2 => { + // option 2 — transfer to agent queue + if let Err(e) = handle_transfer(&mut channel).await { + tracing::error!(error = %e, "transfer branch failed"); + } + break; + } + digit => { + tracing::info!(digit, "unrecognised menu choice"); + channel.stream_file("option-invalid", "#").await?; + retries += 1; + } + } + } + + channel.hangup(None).await?; + Ok(()) + } +} + +// reads the account balance from AstDB and reads it back to the caller +async fn handle_account_info(channel: &mut AgiChannel) -> asterisk_rs_agi::error::Result<()> { + channel.stream_file("your-account-balance-is", "#").await?; + + let resp = channel.database_get(DB_FAMILY, "balance").await?; + if resp.result == 1 { + // result == 1 means key found; data holds the value as a string + let balance_str = resp.data.as_deref().unwrap_or("0"); + // parse defensively; fall back to 0 on corrupt data + let balance: i64 = balance_str.parse().unwrap_or(0); + tracing::info!(balance, "reading balance to caller"); + channel.say_number(balance, "#").await?; + } else { + tracing::info!("no balance record found in AstDB"); + channel + .stream_file("information-not-available", "#") + .await?; + } + + channel.stream_file("thank-you", "#").await?; + Ok(()) +} + +// sets a destination variable and transfers the caller via Dial +async fn handle_transfer(channel: &mut AgiChannel) -> asterisk_rs_agi::error::Result<()> { + tracing::info!("transferring caller to agent queue"); + channel.stream_file("please-hold", "#").await?; + + // store transfer destination as a channel variable for dialplan visibility + channel + .set_variable("IVR_TRANSFER_DEST", "Local/agents@queues") + .await?; + + // exec Dial directly from AGI — hands control back to Asterisk after return + channel.exec("Dial", "Local/agents@queues,30,tT").await?; + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let (server, shutdown_handle) = AgiServer::builder() + .bind("0.0.0.0:4573") + .handler(IvrHandler) + .max_connections(50) + .build() + .await?; + + tracing::info!("FastAGI IVR listening on 0.0.0.0:4573"); + + // drive the shutdown handle from a ctrl-c signal so the server exits cleanly + tokio::spawn(async move { + if let Ok(()) = tokio::signal::ctrl_c().await { + tracing::info!("shutdown signal received"); + shutdown_handle.shutdown(); + } + }); + + server.run().await?; + Ok(()) +} diff --git a/crates/asterisk-rs-ami/README.md b/crates/asterisk-rs-ami/README.md index 2a06bcc..70e4606 100644 --- a/crates/asterisk-rs-ami/README.md +++ b/crates/asterisk-rs-ami/README.md @@ -6,7 +6,7 @@ Async Rust client for the Asterisk Manager Interface (AMI). Monitor calls, originate channels, manage queues, and react to real-time events over TCP. -## Example +## Quick Start ```rust,ignore use asterisk_rs_ami::{AmiClient, AmiEvent}; @@ -38,16 +38,97 @@ async fn main() -> Result<(), Box> { } ``` +## Call Tracker + +`call_tracker()` correlates `DialBegin`, `DialEnd`, `Hangup`, and bridge events +into a single `CompletedCall` record per channel pair. The record includes +start/end timestamps, duration, hangup cause, and the ordered event list. + +```rust,ignore +use asterisk_rs_ami::AmiClient; +use std::time::Duration; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let client = AmiClient::builder() + .host("10.0.0.1") + .credentials("admin", "secret") + .build() + .await?; + + let (tracker, mut rx) = client.call_tracker(); + + tokio::spawn(async move { + while let Some(call) = rx.recv().await { + tracing::info!( + channel = %call.channel, + duration = ?call.duration, + cause = %call.cause_txt, + "call completed" + ); + } + }); + + // keep the tracker alive for as long as you need it; + // dropping it stops event correlation + tokio::signal::ctrl_c().await?; + tracker.shutdown(); + + Ok(()) +} +``` + +## Builder Options + +| Option | Default | Description | +|---|---|---| +| `host(h)` | `"127.0.0.1"` | AMI host | +| `port(p)` | `5038` | AMI TCP port | +| `credentials(u, p)` | required | AMI login | +| `timeout(d)` | 30 s | per-action response timeout | +| `ping_interval(d)` | disabled | keep-alive `Ping` cadence; set to detect dead connections | +| `reconnect(policy)` | exponential backoff | `ReconnectPolicy::exponential(min, max)` or `::none()` | +| `event_capacity(n)` | 1024 | broadcast channel depth; drop events when full if subscribers are slow | + +```rust,ignore +use asterisk_rs_ami::AmiClient; +use asterisk_rs_core::config::ReconnectPolicy; +use std::time::Duration; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let client = AmiClient::builder() + .host("10.0.0.1") + .port(5038) + .credentials("admin", "secret") + .timeout(Duration::from_secs(10)) + .ping_interval(Duration::from_secs(20)) + .reconnect(ReconnectPolicy::exponential( + Duration::from_secs(1), + Duration::from_secs(60), + )) + .event_capacity(2048) + .build() + .await?; + + Ok(()) +} +``` + ## Capabilities - Typed events and actions covering the full Asterisk 23 AMI surface - Filtered subscriptions -- receive only events you care about - Event-collecting actions -- `send_collecting()` gathers multi-event responses +- Call tracking with `CallTracker` -- correlates events into `CompletedCall` records - MD5 challenge-response and plaintext authentication - Automatic reconnection with re-authentication on every reconnect - Command output capture for `Response: Follows` responses - Channel variable extraction -- `ChanVariable(name)` headers parsed into a dedicated map -- Keep-alive ping loop -- configurable periodic heartbeat to detect dead connections +- Keep-alive pings with configurable interval -- detects dead connections without sending traffic +- Connection state monitoring via `client.connection_state()` - Domain types for hangup causes, channel states, device states, and more - `#[non_exhaustive]` enums -- new variants won't break your code - Configurable timeouts, backoff, ping interval, and event buffer size diff --git a/crates/asterisk-rs-ami/examples/ami_call_tracker.rs b/crates/asterisk-rs-ami/examples/ami_call_tracker.rs new file mode 100644 index 0000000..8c9103c --- /dev/null +++ b/crates/asterisk-rs-ami/examples/ami_call_tracker.rs @@ -0,0 +1,62 @@ +//! Example: track completed calls via AMI. +//! +//! Connects to Asterisk AMI, starts a CallTracker, and logs each +//! CompletedCall record (channel, unique_id, duration, cause) as calls +//! finish. A separate task handles incoming records so the main thread +//! remains free for other work. +//! +//! Usage: cargo run --example ami_call_tracker + +use asterisk_rs_ami::AmiClient; +use std::time::Duration; +use tracing::info; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let client = AmiClient::builder() + .host("127.0.0.1") + .port(5038) + .credentials("admin", "secret") + .timeout(Duration::from_secs(10)) + .ping_interval(Duration::from_secs(20)) + .build() + .await?; + + info!("connected, starting call tracker"); + + // call_tracker() subscribes internally — no events are missed between + // subscription and tracker start + let (tracker, mut rx) = client.call_tracker(); + + // process completed calls in a background task so the main thread can + // do other work (originate calls, run dialplan checks, etc.) + let collector = tokio::spawn(async move { + while let Some(call) = rx.recv().await { + info!( + channel = %call.channel, + unique_id = %call.unique_id, + linked_id = %call.linked_id, + duration_ms = call.duration.as_millis(), + cause = call.cause, + cause_txt = %call.cause_txt, + events = call.events.len(), + "call completed", + ); + } + info!("call tracker receiver closed"); + }); + + // wait for ctrl-c, then shut down cleanly + tokio::signal::ctrl_c().await?; + info!("shutting down"); + + // shutdown signals the background task inside CallTracker to stop, + // which closes the sender side of the channel, unblocking the collector + tracker.shutdown(); + collector.await?; + + client.disconnect().await?; + Ok(()) +} diff --git a/crates/asterisk-rs-ari/README.md b/crates/asterisk-rs-ari/README.md index bcefe8d..126faaa 100644 --- a/crates/asterisk-rs-ari/README.md +++ b/crates/asterisk-rs-ari/README.md @@ -6,16 +6,19 @@ Async Rust client for the Asterisk REST Interface (ARI). Build Stasis applications with full call control over REST + WebSocket. -## Example +## Quick Start ```rust,ignore -use asterisk_rs_ari::{AriClient, AriConfig}; +use asterisk_rs_ari::{AriClient}; +use asterisk_rs_ari::config::AriConfigBuilder; use asterisk_rs_ari::event::AriEvent; use asterisk_rs_ari::resources::channel::ChannelHandle; #[tokio::main] async fn main() -> Result<(), Box> { - let config = AriConfig::builder("my-app") + tracing_subscriber::fmt::init(); + + let config = AriConfigBuilder::new("my-app") .host("10.0.0.1") .username("asterisk") .password("secret") @@ -36,12 +39,127 @@ async fn main() -> Result<(), Box> { } ``` +## Transport Modes + +ARI supports two transport modes, selected at config time. + +**HTTP** (default) — standard REST + WebSocket for events. Works with all +supported Asterisk versions. + +**WebSocket** — REST calls are tunnelled over the same WebSocket connection as +events. Requires Asterisk 22+ with the unified WebSocket ARI transport enabled. + +```rust,ignore +use asterisk_rs_ari::config::{AriConfigBuilder, TransportMode}; + +let config = AriConfigBuilder::new("my-app") + .host("127.0.0.1") + .username("asterisk") + .password("asterisk") + .transport(TransportMode::WebSocket) + .build()?; +``` + +## Race-Free Resource Creation + +Subscribing to a channel's events *after* originating it creates a window where +early events (including `StasisStart`) can be missed. `PendingChannel` eliminates +that race by registering the event subscription before issuing the REST call. + +```rust,ignore +use asterisk_rs_ari::resources::channel::OriginateParams; + +let pending = client.channel(); +let params = OriginateParams { + endpoint: "PJSIP/100".into(), + app: Some("my-app".into()), + ..Default::default() +}; +let (handle, mut events) = pending.originate(params).await?; +// events are buffered from creation — StasisStart is guaranteed captured + +while let Some(msg) = events.recv().await { + // handle per-channel events +} +``` + +The same pattern applies to bridges (`client.bridge()`) and playbacks +(`client.playback()`). + +## Resource Handles + +Handles wrap a resource ID and a cloned client reference, exposing typed +methods without requiring you to construct REST paths manually. + +**ChannelHandle** +- `answer()`, `hangup(reason)` +- `play(media_uri)`, `record(name, format, ...)` +- `mute(direction)`, `unmute(direction)` +- `hold()`, `unhold()` +- `send_dtmf(digit, ...)` +- `get_variable(name)`, `set_variable(name, value)` + +**BridgeHandle** +- `add_channel(channel_id)`, `remove_channel(channel_id)` +- `play(media_uri)` +- `start_moh(class)`, `stop_moh()` +- `destroy()` + +**PlaybackHandle** +- `pause()`, `unpause()`, `restart()`, `stop()` + +**RecordingHandle** +- `stop()`, `pause()`, `unpause()`, `mute()`, `unmute()`, `get()` + +## Outbound WebSocket Server + +For Asterisk 22+ deployments, ARI can connect outbound to your application +instead of the reverse. `AriServer` listens for those incoming Asterisk +connections. + +```rust,ignore +use asterisk_rs_ari::server::AriServer; + +let (server, shutdown) = AriServer::builder() + .bind("0.0.0.0:8765") + .build().await?; + +server.run(|session| async move { + let mut events = session.subscribe(); + while let Some(msg) = events.recv().await { + tracing::info!(event = ?msg.event, "received"); + } +}).await?; +``` + +## Media Channel + +`MediaChannel` provides raw audio exchange via `chan_websocket`. Use it to +stream audio directly to/from an Asterisk channel without a separate media +server. + +```rust,ignore +use asterisk_rs_ari::media::MediaChannel; + +let media = MediaChannel::connect("ws://asterisk:8088/ws").await?; +media.answer().await?; + +while let Some(audio) = media.recv_audio().await? { + // process or echo audio bytes + media.send_audio(audio).await?; +} +``` + ## Capabilities - REST client and WebSocket listener covering the full Asterisk 23 ARI surface - Typed events with metadata (application, timestamp, asterisk_id) on every event - Filtered subscriptions -- receive only events you care about +- Race-free resource creation with PendingChannel, PendingBridge, PendingPlayback - Resource handles for channels, bridges, playbacks, recordings +- Dual transport modes: HTTP and unified WebSocket (Asterisk 22+) +- Outbound WebSocket server for Asterisk 22+ outbound connections +- Media channel for raw audio exchange via chan_websocket - System management -- modules, logging, config, global variables - URL-safe query encoding for user-provided values - HTTP connect and request timeouts diff --git a/crates/asterisk-rs-ari/examples/ari_pending.rs b/crates/asterisk-rs-ari/examples/ari_pending.rs new file mode 100644 index 0000000..465d517 --- /dev/null +++ b/crates/asterisk-rs-ari/examples/ari_pending.rs @@ -0,0 +1,117 @@ +//! Example: race-free channel origination via PendingChannel. +//! +//! The naive originate-then-subscribe pattern has a race: Asterisk fires +//! StasisStart immediately after the channel is created, potentially before +//! the caller has a chance to subscribe. The PendingChannel factory fixes this +//! by subscribing to channel events *before* sending the originate request. +//! Any events that arrive between the REST call returning and the caller calling +//! `events.recv()` are buffered in the subscription channel — none are lost. +//! +//! Flow: +//! 1. `client.channel()` — allocates a pre-generated channel ID, installs a +//! filtered subscription keyed on that ID (synchronous, no I/O). +//! 2. `pending.originate(params).await?` — sets `channel_id` on the params, +//! sends the REST request, and returns `(ChannelHandle, FilteredSubscription)`. +//! 3. The caller drives the channel via the handle and reads events from the +//! subscription without any risk of missing the first events. +//! +//! Usage: cargo run --example ari_pending + +use asterisk_rs_ari::config::AriConfigBuilder; +use asterisk_rs_ari::event::AriEvent; +use asterisk_rs_ari::resources::channel::OriginateParams; +use asterisk_rs_ari::AriClient; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let config = AriConfigBuilder::new("pending-demo") + .host("127.0.0.1") + .port(8088) + .username("asterisk") + .password("asterisk") + .build()?; + + let client = AriClient::connect(config).await?; + tracing::info!("connected to ARI"); + + // step 1: allocate the pending channel — this installs the event filter + // synchronously before any network traffic goes out + let pending = client.channel(); + let channel_id = pending.id().to_owned(); + tracing::info!(%channel_id, "pending channel created, event filter active"); + + // step 2: originate — channel_id is set automatically from the pending + // reservation; the event subscription was already live before this call + let params = OriginateParams { + endpoint: "PJSIP/100".into(), + app: Some("pending-demo".into()), + caller_id: Some("\"Pending Demo\" <0000000000>".into()), + timeout: Some(30), + ..Default::default() + }; + + let (handle, mut events) = pending.originate(params).await?; + tracing::info!(%channel_id, "originate sent, waiting for events"); + + // step 3: drive the call by processing the pre-subscribed event stream. + // events that arrived between originate completing and this loop starting + // are already buffered — recv() delivers them in order. + while let Some(msg) = events.recv().await { + match msg.event { + AriEvent::StasisStart { channel, args, .. } => { + tracing::info!( + channel_id = %channel.id, + ?args, + "stasis start — answering" + ); + + // answer and play a greeting; errors are logged and the loop + // exits so the example terminates cleanly + if let Err(e) = handle.answer().await { + tracing::error!(error = %e, "answer failed"); + break; + } + + if let Err(e) = handle.play("sound:hello-world").await { + tracing::error!(error = %e, "play failed"); + } + + if let Err(e) = handle.hangup(None).await { + tracing::error!(error = %e, "hangup failed"); + } + } + + AriEvent::ChannelStateChange { channel } => { + tracing::info!( + channel_id = %channel.id, + state = %channel.state, + "channel state changed" + ); + } + + AriEvent::StasisEnd { channel } => { + tracing::info!(channel_id = %channel.id, "stasis end, exiting"); + // channel left the app — we're done + break; + } + + AriEvent::ChannelDestroyed { + channel, cause_txt, .. + } => { + tracing::info!( + channel_id = %channel.id, + cause = %cause_txt, + "channel destroyed" + ); + break; + } + + _ => {} + } + } + + tracing::info!("example complete"); + Ok(()) +} diff --git a/crates/asterisk-rs-ari/examples/ari_recording.rs b/crates/asterisk-rs-ari/examples/ari_recording.rs new file mode 100644 index 0000000..7ec9bb2 --- /dev/null +++ b/crates/asterisk-rs-ari/examples/ari_recording.rs @@ -0,0 +1,115 @@ +//! Example: ARI channel recording and playback. +//! +//! Connects to ARI, waits for a channel to enter the Stasis app, records it, +//! then plays the recording back before hanging up. +//! +//! Usage: cargo run --example ari_recording + +use asterisk_rs_ari::config::AriConfigBuilder; +use asterisk_rs_ari::event::{AriEvent, AriMessage}; +use asterisk_rs_ari::resources::channel::ChannelHandle; +use asterisk_rs_ari::AriClient; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let config = AriConfigBuilder::new("recording-demo") + .host("127.0.0.1") + .port(8088) + .username("asterisk") + .password("asterisk") + .build()?; + + let client = AriClient::connect(config).await?; + tracing::info!("connected to ARI"); + + let mut events = client.subscribe(); + + while let Some(msg) = events.recv().await { + match msg.event { + AriEvent::StasisStart { channel, .. } => { + tracing::info!(channel_id = %channel.id, "stasis start"); + + let handle = ChannelHandle::new(channel.id.clone(), client.clone()); + // subscribe before spawning so recording events aren't missed + // between spawn and the first recv inside the task + let chan_events = client.subscribe_filtered({ + let id = channel.id.clone(); + move |m: &AriMessage| match &m.event { + AriEvent::RecordingStarted { recording } => { + recording.target_uri.contains(&*id) + } + AriEvent::RecordingFinished { recording } => { + recording.target_uri.contains(&*id) + } + _ => false, + } + }); + + tokio::spawn(async move { + if let Err(e) = handle_call(handle, chan_events).await { + tracing::error!(error = %e, "call handling failed"); + } + }); + } + AriEvent::StasisEnd { channel, .. } => { + tracing::info!(channel_id = %channel.id, "stasis end"); + } + _ => {} + } + } + + Ok(()) +} + +async fn handle_call( + channel: ChannelHandle, + mut events: asterisk_rs_core::event::FilteredSubscription, +) -> asterisk_rs_ari::error::Result<()> { + channel.answer().await?; + tracing::info!(channel_id = %channel.id(), "answered"); + + // name must be unique per call — reusing a name requires if_exists="overwrite" + // in the request body, which the current record() helper does not expose; + // embedding the channel id makes it naturally unique + let recording_name = format!("recording-{}", channel.id()); + let _live = channel.record(&recording_name, "wav").await?; + tracing::info!(%recording_name, "recording initiated"); + + // wait for Asterisk to confirm the recording has started + while let Some(msg) = events.recv().await { + if let AriEvent::RecordingStarted { recording } = msg.event { + tracing::info!( + name = %recording.name, + state = %recording.state, + "recording started" + ); + break; + } + } + + // RecordingFinished arrives when max_duration elapses, silence is detected, + // or the caller hangs up; a real app would pass terminate_on / max_silence + // as additional JSON fields to the record request + while let Some(msg) = events.recv().await { + if let AriEvent::RecordingFinished { recording } = msg.event { + tracing::info!( + name = %recording.name, + format = %recording.format, + state = %recording.state, + "recording finished" + ); + break; + } + } + + // play back the recorded audio using the "recording:" URI scheme + channel.play(&format!("recording:{recording_name}")).await?; + tracing::info!(%recording_name, "playback started"); + + channel.hangup(None).await?; + tracing::info!(channel_id = %channel.id(), "channel hung up"); + + Ok(()) +} diff --git a/crates/asterisk-rs-ari/examples/ari_websocket_transport.rs b/crates/asterisk-rs-ari/examples/ari_websocket_transport.rs new file mode 100644 index 0000000..912bccb --- /dev/null +++ b/crates/asterisk-rs-ari/examples/ari_websocket_transport.rs @@ -0,0 +1,105 @@ +//! Example: ARI with WebSocket transport mode. +//! +//! By default the ARI client uses two separate connections: HTTP for REST calls +//! and a WebSocket for events. WebSocket transport mode collapses both onto a +//! single persistent WebSocket connection — REST requests are sent as frames +//! over that same socket and responses are correlated back by request ID. +//! +//! When to use it: +//! - Asterisk 20.14.0+ / 21.9.0+ / 22.4.0+ (earlier releases lack the +//! required protocol support) +//! - Environments where opening two outbound connections to Asterisk is +//! inconvenient (strict firewall rules, NAT traversal, load balancers that +//! do not preserve HTTP session affinity) +//! +//! The application code is identical to HTTP mode — the transport difference +//! is entirely in the config builder. This example lists active channels, pings +//! Asterisk, then enters an event loop handling StasisStart/StasisEnd. +//! +//! Usage: cargo run --example ari_websocket_transport + +use asterisk_rs_ari::config::{AriConfigBuilder, TransportMode}; +use asterisk_rs_ari::event::{AriEvent, Channel}; +use asterisk_rs_ari::resources::asterisk::{self, AsteriskPing}; +use asterisk_rs_ari::AriClient; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + // single WebSocket carries both REST and event traffic + let config = AriConfigBuilder::new("ws-demo") + .host("127.0.0.1") + .port(8088) + .username("asterisk") + .password("asterisk") + .transport(TransportMode::WebSocket) + .build()?; + + let client = AriClient::connect(config).await?; + tracing::info!("connected via WebSocket transport"); + + // REST calls go over the same WebSocket — no second HTTP connection + let ping: AsteriskPing = asterisk::ping(&client).await?; + tracing::info!( + asterisk_id = %ping.asterisk_id, + timestamp = %ping.timestamp, + "asterisk ping ok" + ); + + let channels: Vec = client.get("/channels").await?; + tracing::info!(count = channels.len(), "active channels"); + for ch in &channels { + tracing::info!( + id = %ch.id, + name = %ch.name, + state = %ch.state, + "channel" + ); + } + + // event subscription works identically to HTTP mode + let mut events = client.subscribe(); + tracing::info!("entering event loop — ctrl-c to stop"); + + loop { + tokio::select! { + Some(msg) = events.recv() => { + match msg.event { + AriEvent::StasisStart { channel, args, .. } => { + tracing::info!( + channel_id = %channel.id, + channel_name = %channel.name, + ?args, + "stasis start" + ); + } + AriEvent::StasisEnd { channel, .. } => { + tracing::info!(channel_id = %channel.id, "stasis end"); + } + AriEvent::ChannelStateChange { channel, .. } => { + tracing::info!( + channel_id = %channel.id, + state = %channel.state, + "channel state change" + ); + } + AriEvent::ChannelDestroyed { channel, cause_txt, .. } => { + tracing::info!( + channel_id = %channel.id, + cause = %cause_txt, + "channel destroyed" + ); + } + _ => {} + } + } + _ = tokio::signal::ctrl_c() => { + tracing::info!("shutting down"); + break; + } + } + } + + Ok(()) +} diff --git a/crates/asterisk-rs-core/README.md b/crates/asterisk-rs-core/README.md index 330bb9e..96c1e87 100644 --- a/crates/asterisk-rs-core/README.md +++ b/crates/asterisk-rs-core/README.md @@ -3,13 +3,60 @@ [![crates.io](https://img.shields.io/crates/v/asterisk-rs-core)](https://crates.io/crates/asterisk-rs-core) [![docs.rs](https://img.shields.io/docsrs/asterisk-rs-core)](https://docs.rs/asterisk-rs-core) -Shared foundation for the asterisk-rs ecosystem. +Shared foundation for the asterisk-rs ecosystem. The AMI, AGI, and ARI crates +all depend on this crate for common types; it contains no protocol-specific logic. -Provides error types, event bus, reconnection policy, credentials, and -typed domain constants (hangup causes, channel states, device states, etc.) -used across the AMI, AGI, and ARI protocol crates. +## What it provides -This crate is a dependency of the protocol crates. You don't need to depend -on it directly unless you're building custom protocol integrations. +### Errors -Part of [asterisk-rs](https://github.com/deadcode-walker/asterisk-rs). MSRV 1.83. MIT/Apache-2.0. +- `Error` — top-level error enum covering all failure modes +- `ConnectionError` — TCP connect failures, TLS errors, socket closed +- `AuthError` — login rejected, missing credentials, challenge failures +- `TimeoutError` — read/write/login timeouts +- `ProtocolError` — malformed frames, unexpected packet structure + +### Event bus + +- `Event` — marker trait required by the bus: `Clone + Send + Sync + Debug + 'static` +- `EventBus` — broadcast hub; protocol crates publish into it internally +- `EventSubscription` — unbounded receiver; all events since subscribe +- `FilteredSubscription` — like `EventSubscription` but with a predicate applied before delivery + +### Reconnection + +- `ReconnectPolicy` — three modes: + - `ReconnectPolicy::exponential(initial, max)` — doubles delay each attempt, jitter applied to prevent thundering herd (default) + - `ReconnectPolicy::fixed(interval)` — constant retry delay + - `ReconnectPolicy::none()` — fail on first disconnect +- `ConnectionState` — observable state machine: `Disconnected → Connecting → Connected → Reconnecting` + +### Credentials + +- `Credentials` — username/secret pair; `Debug` impl redacts the secret so it never appears in logs or panic output + +### Domain constants + +Strongly-typed enums parsed from Asterisk protocol strings: + +| Type | Examples | +|---|---| +| `HangupCause` | `Normal`, `Busy`, `NoAnswer`, `Congestion`, `NoRouteDestination`, … | +| `ChannelState` | `Down`, `Rsrvd`, `OffHook`, `Dialing`, `Ring`, `Up`, … | +| `DeviceState` | `Unknown`, `NotInUse`, `InUse`, `Busy`, `Unavailable`, … | +| `DialStatus` | `Answer`, `Busy`, `NoAnswer`, `Cancel`, `Congestion`, … | +| `CdrDisposition` | `Answered`, `NoAnswer`, `Busy`, `Failed` | +| `PeerStatus` | `Registered`, `Unregistered`, `Reachable`, `Unreachable`, … | +| `QueueStrategy` | `RingAll`, `LeastRecent`, `FewestCalls`, `RoundRobin`, … | +| `ExtensionState` | `NotInUse`, `InUse`, `Busy`, `Unavailable`, … | +| `AgiStatus` | `Success`, `Failure`, `NotPermitted` | + +## Usage + +You do not need to add this crate as a direct dependency unless you are building +a custom protocol integration. Add `asterisk-rs-ami`, `asterisk-rs-agi`, or +`asterisk-rs-ari` instead; they re-export the types callers need. + +--- + +Part of [asterisk-rs](https://github.com/deadcode-walker/asterisk-rs). MSRV 1.83. MIT OR Apache-2.0. diff --git a/crates/asterisk-rs/Cargo.toml b/crates/asterisk-rs/Cargo.toml index 4a0ff47..211af02 100644 --- a/crates/asterisk-rs/Cargo.toml +++ b/crates/asterisk-rs/Cargo.toml @@ -27,3 +27,8 @@ default = ["ami", "agi", "ari"] ami = ["dep:asterisk-rs-ami"] agi = ["dep:asterisk-rs-agi"] ari = ["dep:asterisk-rs-ari"] + +[dev-dependencies] +tracing.workspace = true +tracing-subscriber.workspace = true +tokio = { workspace = true, features = ["full"] } diff --git a/crates/asterisk-rs/examples/pbx_dial.rs b/crates/asterisk-rs/examples/pbx_dial.rs new file mode 100644 index 0000000..bc75a92 --- /dev/null +++ b/crates/asterisk-rs/examples/pbx_dial.rs @@ -0,0 +1,102 @@ +//! Example: dial a call via the high-level Pbx abstraction. +//! +//! Connects to Asterisk AMI, originates a call from PJSIP/100 to extension +//! 200, waits for the far end to answer, then hangs up and prints the +//! completed-call record (CDR-equivalent). +//! +//! Prerequisites: +//! - Asterisk running with AMI enabled on 127.0.0.1:5038 +//! - AMI user "admin" with secret "secret" and read/write all +//! - PJSIP endpoint 100 registered and dialplan routing extension 200 +//! +//! Usage: cargo run -p asterisk-rs --example pbx_dial + +use std::time::Duration; + +use asterisk_rs::ami::AmiClient; +use asterisk_rs::pbx::{DialOptions, Pbx, PbxError}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let client = AmiClient::builder() + .host("127.0.0.1") + .port(5038) + .credentials("admin", "secret") + .timeout(Duration::from_secs(10)) + .event_capacity(2048) + .build() + .await?; + + tracing::info!("connected to AMI"); + + let mut pbx = Pbx::new(client); + + let options = DialOptions::new() + .caller_id("Rust PBX <100>") + .timeout_ms(30_000); + + tracing::info!(from = "PJSIP/100", to = "200", "dialing"); + + let call = match pbx.dial("PJSIP/100", "200", Some(options)).await { + Ok(c) => c, + Err(PbxError::CallFailed { cause, cause_txt }) => { + tracing::error!(cause, cause_txt, "call failed before answer"); + pbx.shutdown(); + return Ok(()); + } + Err(PbxError::Timeout) => { + tracing::error!("originate timed out"); + pbx.shutdown(); + return Ok(()); + } + Err(e) => return Err(e.into()), + }; + + tracing::info!( + channel = %call.channel, + unique_id = %call.unique_id, + "call originated, waiting for answer" + ); + + match call.wait_for_answer(Duration::from_secs(30)).await { + Ok(()) => { + tracing::info!(channel = %call.channel, "call answered"); + } + Err(PbxError::CallFailed { cause, cause_txt }) => { + tracing::error!(cause, cause_txt, "remote end rejected the call"); + pbx.shutdown(); + return Ok(()); + } + Err(PbxError::Timeout) => { + tracing::error!("no answer within 30 seconds, hanging up"); + call.hangup().await?; + pbx.shutdown(); + return Ok(()); + } + Err(e) => return Err(e.into()), + } + + // call is up — do work here, then hang up + call.hangup().await?; + tracing::info!(channel = %call.channel, "hung up"); + + // drain one completed-call record; the tracker correlates all AMI events + // that occurred during the channel's lifetime into a single struct + if let Some(completed) = pbx.next_completed_call().await { + tracing::info!( + channel = %completed.channel, + unique_id = %completed.unique_id, + linked_id = %completed.linked_id, + duration_secs = completed.duration.as_secs(), + cause = completed.cause, + cause_txt = %completed.cause_txt, + event_count = completed.events.len(), + "call record" + ); + } + + pbx.shutdown(); + Ok(()) +}