Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,539 changes: 2,200 additions & 339 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ members = [
]

[features]
default = ["bin", "nats", "net", "quic", "wasmtime", "web-transport"]
default = ["bin", "nats", "net", "quic", "wasmtime", "web-transport", "zenoh-transport"]

bin = ["bin-bindgen", "bin-wasmtime"]
bin-bindgen = [
Expand All @@ -43,6 +43,7 @@ net = ["wrpc-transport/net"]
quic = ["dep:wrpc-transport-quic"]
wasmtime = ["dep:wrpc-runtime-wasmtime"]
web-transport = ["dep:wrpc-transport-web"]
zenoh-transport = ["dep:zenoh", "dep:wrpc-transport-zenoh", "wrpc-cli/zenoh-transport"]

[[bin]]
name = "wit-bindgen-wrpc"
Expand Down Expand Up @@ -84,6 +85,10 @@ wrpc-transport-nats = { workspace = true, optional = true }
wrpc-transport-quic = { workspace = true, optional = true }
wrpc-transport-web = { workspace = true, optional = true }
wrpc-wasmtime-cli = { workspace = true, optional = true }
wrpc-transport-zenoh = { workspace = true, features = [
"zenoh-1_5_0",
], optional = true }
zenoh = { workspace = true, optional = true }

[dev-dependencies]
anyhow = { workspace = true }
Expand Down Expand Up @@ -114,8 +119,10 @@ wasmtime-cli-flags = { workspace = true, features = [
"pooling-allocator",
"threads",
] }
wrpc-test = { workspace = true, features = ["nats", "quic", "web-transport"] }
wrpc-test = { workspace = true, features = ["nats", "quic", "web-transport", "zenoh-transport"] }
wrpc-transport = { workspace = true, features = ["net"] }
serial_test = "*"


[workspace.dependencies]
anyhow = { version = "1", default-features = false }
Expand Down Expand Up @@ -182,8 +189,11 @@ wrpc-transport = { version = "0.29", path = "./crates/transport", default-featur
wrpc-transport-nats = { version = "0.31", path = "./crates/transport-nats", default-features = false }
wrpc-transport-quic = { version = "0.6", path = "./crates/transport-quic", default-features = false }
wrpc-transport-web = { version = "0.3", path = "./crates/transport-web", default-features = false }
wrpc-transport-zenoh = { version = "0.29", path = "./crates/transport-zenoh", default-features = false }
wrpc-wasi-keyvalue = { version = "0.1.1", path = "./crates/wasi-keyvalue", default-features = false }
wrpc-wasi-keyvalue-mem = { version = "0.2", path = "./crates/wasi-keyvalue-mem", default-features = false }
wrpc-wasi-keyvalue-redis = { version = "0.2", path = "./crates/wasi-keyvalue-redis", default-features = false }
wrpc-wasmtime-cli = { version = "0.9", path = "./crates/wasmtime-cli", default-features = false }
wtransport = { version = "0.6.1", default-features = false }
zenoh = {version = "1.5.0", default-features = false}
flume = {version = "0.11.0", default-features = false}
157 changes: 157 additions & 0 deletions ZENOH.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
### Requirements *******************

#### Setup zenohd on NixOS

1. Add zenoh to your configuration.nix file in the environment.systemPackages
```
# List packages installed in system profile. To search, run:
# $ nix search wget
environment.systemPackages = with pkgs; [
...
zenoh
];
```

2. Rebuild NixOS
```
sudo nixos-rebuild switch
```

3. Test zenohd
```
zenohd
```

- Sample output:
2025-12-08T12:27:11.419376Z INFO main ThreadId(01) zenohd: zenohd v1.6.1 built with rustc 1.85.0 (4d91de4e4 2025-02-17)
2025-12-08T12:27:11.423016Z INFO main ThreadId(01) zenohd: Initial conf: {"access_control":{"def ........ ,"qos":{"enabled":true}}}}
2025-12-08T12:27:11.428864Z INFO main ThreadId(01) zenoh::net::runtime: Using ZID: ba0e4a24bdf641c79bfe9ee37eb9bc4
2025-12-08T12:27:11.438761Z INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::215:5dff:fe00:b079]:7447
2025-12-08T12:27:11.439328Z INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::3842:4ff:fe75:baf1]:7447
2025-12-08T12:27:11.439331Z INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::209d:11ff:fe06:8e06]:7447
2025-12-08T12:27:11.439333Z INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/10.255.255.254:7447
2025-12-08T12:27:11.439334Z INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/172.21.8.176:7447
2025-12-08T12:27:11.439336Z INFO main ThreadId(01) zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/172.18.0.1:7447
2025-12-08T12:27:11.439428Z INFO main ThreadId(01) zenoh::net::runtime::orchestrator: zenohd listening scout messages on 224.0.0.224:7446

#### Using [zenoh] transport

We will use the following two Rust wRPC applications using [zenoh] transport:
- [examples/rust/hello-zenoh-client](examples/rust/hello-zenoh-client)
- [examples/rust/hello-zenoh-server](examples/rust/hello-zenoh-server)

1. Run [zenoh]:

- Build the repo, so that zenoh transport is compiled for wrpc-wasmtime (runtime) using
```sh
cargo build --release
```

- Create a zenoh config file at /path/to/config/zenoh_conf.json5
```sh
{
mode: "client",
listen: {
endpoints: ["tcp/0.0.0.0:7447"],
},
}
```
This is a minimal example config.

- Set the config environment variable for zenoh:
> export ZENOH_CONFIG="/path/to/config/zenoh_conf.json5"

- Running as a daemon with the config above in order to run zenoh as a transport (in Client mode):
```sh
zenohd
```

2. Serve Wasm `hello` server via [zenoh]

```sh
./target/release/wrpc-wasmtime zenoh serve ./target/wasm32-wasip2/debug/hello_component_server.wasm
```

- Sample output:
> INFO zenoh::net::runtime: Using ZID: 638a012883d98f769cf4367f8457bc66
> INFO zenoh::net::runtime::orchestrator: Scouting...
> INFO zenoh::net::runtime::orchestrator: Found HelloProto { version: 9, whatami: Router, zid: f3ae70a723144e6675f4c72d30196797, locators: [tcp/[fe80::215:5dff:fe83:cd20]:7447, tcp/[fe80::8d2:a4ff:fed2:f33d]:7447, tcp/[fe80::c431:38ff:fe58:9310]:7447, tcp/10.255.255.254:7447, tcp/172.21.8.176:7447, tcp/172.18.0.1:7447] }
> INFO zenoh::net::runtime::orchestrator: Found HelloProto { version: 9, whatami: Router, zid: f3ae70a723144e6675f4c72d30196797, locators: [tcp/[fe80::215:5dff:fe83:cd20]:7447, tcp/[fe80::8d2:a4ff:fed2:f33d]:7447, tcp/[fe80::c431:38ff:fe58:9310]:7447, tcp/10.255.255.254:7447, tcp/172.21.8.176:7447, tcp/172.18.0.1:7447] }
> INFO wrpc_wasmtime_cli: serving instance function name="hello"

3. Call Wasm `hello` server using a Wasm `hello` client via [zenoh]:

```sh
./target/release/wrpc-wasmtime zenoh run ./target/wasm32-wasip2/debug/hello-component-client.wasm
```

- Sample output:
> INFO zenoh::net::runtime: Using ZID: e4b5dcb134469e2e5773dd88dfb7a8a
> INFO zenoh::net::runtime::orchestrator: Scouting...
> INFO zenoh::net::runtime::orchestrator: Found HelloProto { version: 9, whatami: Router, zid: f3ae70a723144e6675f4c72d30196797, locators: [tcp/[fe80::215:5dff:fe83:cd20]:7447, tcp/[fe80::8d2:a4ff:fed2:f33d]:7447, tcp/[fe80::c431:38ff:fe58:9310]:7447, tcp/10.255.255.254:7447, tcp/172.21.8.176:7447, tcp/172.18.0.1:7447] }
> INFO zenoh::net::runtime::orchestrator: Found HelloProto { version: 9, whatami: Router, zid: f3ae70a723144e6675f4c72d30196797, locators: [tcp/[fe80::215:5dff:fe83:cd20]:7447, tcp/[fe80::8d2:a4ff:fed2:f33d]:7447, tcp/[fe80::c431:38ff:fe58:9310]:7447, tcp/10.255.255.254:7447, tcp/172.21.8.176:7447, tcp/172.18.0.1:7447] }
hello from Rust
> INFO zenoh::api::session: close session zid=e4b5dcb134469e2e5773dd88dfb7a8a

4. Call the Wasm `hello` server using a native wRPC `hello` client via [zenoh]:

```sh
cargo run -p hello-zenoh-client
```

5. Serve native wRPC `hello` server via [zenoh]:

```sh
cargo run -p hello-zenoh-server
```


#### Testing

To test the transport-zenoh package run:

```sh
cargo test zenoh -- --nocapture
```

- Sample output:
> test result: ok. 3 passed; 0 failed; 0 ignored; 0 measured; 13 filtered out; finished in 6.17s





























6. Call both the native wRPC `hello` server and Wasm `hello` server using native wRPC `hello` client via [zenoh]:

```sh
cargo run -p hello-zenoh-client rust native
```

7. Call native wRPC `hello` server using Wasm `hello` client via [zenoh]:

```sh
wrpc-wasmtime zenoh run --import native ./target/wasm32-wasip2/release/hello-component-client.wasm
4 changes: 3 additions & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ license.workspace = true
repository.workspace = true

[features]
default = ["nats"]
default = ["nats", "zenoh-transport"]
nats = ["async-nats/ring", "dep:async-nats", "dep:tokio", "tokio/sync"]
zenoh-transport = ["dep:zenoh", "dep:tokio", "tokio/sync"]

[dependencies]
anyhow = { workspace = true, features = ["std"] }
Expand All @@ -25,3 +26,4 @@ tracing-subscriber = { workspace = true, features = [
"smallvec",
"tracing-log",
] }
zenoh = { workspace = true, optional = true, features = ["transport_tcp"] }
2 changes: 2 additions & 0 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "nats")]
pub mod nats;
pub mod tracing;
#[cfg(feature = "zenoh-transport")]
pub mod zenoh;
10 changes: 10 additions & 0 deletions crates/cli/src/zenoh.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use zenoh::{Config, Session};

/// Open a regular Zenoh session with configs supplied by an environment variable.
pub async fn connect() -> anyhow::Result<Session> {
let cfg = Config::from_env().expect("Missing environment variable 'ZENOH_CONFIG'");

let session = zenoh::open(cfg)
.await.unwrap();
Ok(session)
}
5 changes: 4 additions & 1 deletion crates/test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license.workspace = true
repository.workspace = true

[features]
default = ["nats", "quic", "web-transport"]
default = ["nats", "quic", "web-transport", "zenoh-transport"]
nats = ["dep:async-nats", "async-nats/ring", "wrpc-cli/nats"]
quic = [
"dep:quinn",
Expand All @@ -22,6 +22,7 @@ quic = [
"quinn/rustls",
]
web-transport = ["dep:wtransport", "wtransport/self-signed"]
zenoh-transport = ["dep:zenoh", "dep:wrpc-transport-zenoh", "wrpc-cli/zenoh-transport"]

[dependencies]
anyhow = { workspace = true }
Expand All @@ -33,3 +34,5 @@ tokio = { workspace = true, features = ["net", "process", "rt-multi-thread"] }
tracing = { workspace = true }
wrpc-cli = { workspace = true }
wtransport = { workspace = true, features = ["ring"], optional = true }
wrpc-transport-zenoh = { workspace = true, features = [ "zenoh-1_5_0",], optional = true }
zenoh = { workspace = true, optional = true }
51 changes: 51 additions & 0 deletions crates/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use core::net::Ipv6Addr;

use std::process::ExitStatus;

use std::sync::Arc;
use anyhow::Context;
use rcgen::{generate_simple_self_signed, CertifiedKey};
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
Expand All @@ -12,6 +13,7 @@ use tokio::process::Command;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::{select, spawn};
use zenoh::{Config};

pub async fn free_port() -> anyhow::Result<u16> {
TcpListener::bind((Ipv6Addr::LOCALHOST, 0))
Expand Down Expand Up @@ -120,6 +122,55 @@ pub async fn start_nats() -> anyhow::Result<(
Ok((port, client, server, stop_tx))
}

#[cfg(feature = "zenoh-transport")]
pub async fn start_zenoh() -> anyhow::Result<(
u16,
wrpc_transport_zenoh::Client,
JoinHandle<anyhow::Result<ExitStatus>>,
oneshot::Sender<()>,
)> {
let port = free_port().await?; // not used in setup -- just return
let (server, stop_tx) =
spawn_server(Command::new("zenohd").args(&[] as &[&str]))
.await
.context("failed to start zenohd server")?;

// connect to zenoh
let cfg = Config::from_env().expect("Missing environment variable 'ZENOH_CONFIG'");

let session = zenoh::open(cfg)
.await
.expect("Failed to open a Zenoh session");

let arc_session = Arc::new(session);

let prefix = Arc::<str>::from("");

let client = wrpc_transport_zenoh::Client::new(arc_session, prefix)
.await
.context("failed to construct transport client")?;

Ok((port, client, server, stop_tx))
}

#[cfg(feature = "zenoh-transport")]
pub async fn with_zenoh<T, Fut>(f: impl FnOnce(u16, wrpc_transport_zenoh::Client) -> Fut) -> anyhow::Result<T>
where
Fut: core::future::Future<Output = anyhow::Result<T>>,
{
let (port, zenoh_client, zenoh_server, stop_tx) = start_zenoh()
.await
.context("failed to start Zenoh server")?;
let res = f(port, zenoh_client).await.context("closure failed")?;

stop_tx.send(()).expect("failed to stop Zenoh server");
zenoh_server
.await
.context("failed to await Zenoh server stop")?
.context("Zenoh server failed to stop")?;
Ok(res)
}

#[cfg(feature = "nats")]
pub async fn with_nats<T, Fut>(f: impl FnOnce(u16, async_nats::Client) -> Fut) -> anyhow::Result<T>
where
Expand Down
33 changes: 33 additions & 0 deletions crates/transport-zenoh/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "wrpc-transport-zenoh"
version = "0.29.0"
description = "wRPC Zenoh transport"

authors.workspace = true
categories.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true

[features]
default = ["zenoh-1_5_0"]
hotpath = ["hotpath/hotpath"]
hotpath-alloc = ["hotpath/hotpath-alloc"]

[dependencies]
anyhow = { workspace = true, features = ["std"] }
bytes = { workspace = true, features = ["serde"] }
futures = { workspace = true, features = ["async-await"] }
nuid = { workspace = true }
tokio = { workspace = true, features = ["io-util", "rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true, features = ["codec", "io"] }
tracing = { workspace = true, features = ["attributes"] }
wasm-tokio = { workspace = true }
wrpc-transport = { workspace = true }
zenoh-1_5_0 = { package = "zenoh", version = "1.5.0", default-features = false, optional = true }
serde_json = { workspace = true }
serde = { workspace = true }
uuid = { workspace = true }
flume = { workspace = true }
hotpath = "0.9"
Loading
Loading