Skip to content
8 changes: 8 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ path = "src/routeguide/server.rs"
name = "routeguide-client"
path = "src/routeguide/client.rs"

[[bin]]
name = "routeguide-rdma-server"
path = "src/routeguide_rdma/server.rs"

[[bin]]
name = "routeguide-rdma-client"
path = "src/routeguide_rdma/client.rs"

[[bin]]
name = "authentication-client"
path = "src/authentication/client.rs"
Expand Down
18 changes: 18 additions & 0 deletions examples/src/json-codec/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ impl<T: serde::Serialize> Encoder for JsonEncoder<T> {
fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
serde_json::to_writer(buf.writer(), &item).map_err(|e| Status::internal(e.to_string()))
}

fn encode_into_slice(
&mut self,
_item: Self::Item,
_buf: &mut [u8],
) -> Result<usize, Self::Error> {
unimplemented!()
}
}

#[derive(Debug)]
Expand All @@ -48,6 +56,16 @@ impl<U: serde::de::DeserializeOwned> Decoder for JsonDecoder<U> {
serde_json::from_reader(buf.reader()).map_err(|e| Status::internal(e.to_string()))?;
Ok(Some(item))
}

fn decode_from_slice(&mut self, buf: &[u8]) -> Result<Option<Self::Item>, Self::Error> {
if !buf.has_remaining() {
return Ok(None);
}

let item: Self::Item =
serde_json::from_reader(buf.reader()).map_err(|e| Status::internal(e.to_string()))?;
Ok(Some(item))
}
}

/// A [`Codec`] that implements `application/grpc+json` via the serde library.
Expand Down
126 changes: 126 additions & 0 deletions examples/src/routeguide_rdma/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::error::Error;
use std::time::Duration;

use futures::stream;
use rand::rngs::ThreadRng;
use rand::Rng;
use tokio::time;
use tonic::transport::RdmaChannel;
use tonic::Request;

use routeguide::route_guide_client::RouteGuideClient;
use routeguide::{Point, Rectangle, RouteNote};

pub mod routeguide {
tonic::include_proto!("routeguide");
}

async fn print_features(client: &mut RouteGuideClient<RdmaChannel>) -> Result<(), Box<dyn Error>> {
let rectangle = Rectangle {
lo: Some(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
hi: Some(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
};

let mut stream = client
.list_features_rdma(Request::new(rectangle))
.await?
.into_inner();

while let Some(feature) = stream.message().await? {
println!("NOTE = {:?}", feature);
}

Ok(())
}

async fn run_record_route(client: &mut RouteGuideClient<RdmaChannel>) -> Result<(), Box<dyn Error>> {
let mut rng = rand::thread_rng();
let point_count: i32 = rng.gen_range(2..100);

let mut points = vec![];
for _ in 0..=point_count {
points.push(random_point(&mut rng))
}

println!("Traversing {} points", points.len());
let request = Request::new(stream::iter(points));

match client.record_route_rdma(request).await {
Ok(response) => println!("SUMMARY: {:?}", response.into_inner()),
Err(e) => println!("something went wrong: {:?}", e),
}

Ok(())
}

async fn run_route_chat(client: &mut RouteGuideClient<RdmaChannel>) -> Result<(), Box<dyn Error>> {
let start = time::Instant::now();

let outbound = async_stream::stream! {
let mut interval = time::interval(Duration::from_secs(1));

loop {
let time = interval.tick().await;
let elapsed = time.duration_since(start);
let note = RouteNote {
location: Some(Point {
latitude: 409146138 + elapsed.as_secs() as i32,
longitude: -746188906,
}),
message: format!("at {:?}", elapsed),
};

yield note;
}
};

let response = client.route_chat_rdma(Request::new(outbound)).await?;
let mut inbound = response.into_inner();

while let Some(note) = inbound.message().await? {
println!("NOTE = {:?}", note);
}

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = RouteGuideClient::connect_rdma("[::1]:10001").await?;
println!("rdma conn established!");

println!("*** SIMPLE RPC ***");
let response = client
.get_feature_rdma(Request::new(Point {
latitude: 409_146_138,
longitude: -746_188_906,
}))
.await?;
println!("RESPONSE = {:?}", response);

println!("\n*** SERVER STREAMING ***");
print_features(&mut client).await?;

println!("\n*** CLIENT STREAMING ***");
run_record_route(&mut client).await?;

println!("\n*** BIDIRECTIONAL STREAMING ***");
run_route_chat(&mut client).await?;

Ok(())
}

fn random_point(rng: &mut ThreadRng) -> Point {
let latitude = (rng.gen_range(0..180) - 90) * 10_000_000;
let longitude = (rng.gen_range(0..360) - 180) * 10_000_000;
Point {
latitude,
longitude,
}
}
33 changes: 33 additions & 0 deletions examples/src/routeguide_rdma/data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use serde::Deserialize;
use std::fs::File;

#[derive(Debug, Deserialize)]
struct Feature {
location: Location,
name: String,
}

#[derive(Debug, Deserialize)]
struct Location {
latitude: i32,
longitude: i32,
}

#[allow(dead_code)]
pub fn load() -> Vec<crate::routeguide::Feature> {
let file = File::open("examples/data/route_guide_db.json").expect("failed to open data file");

let decoded: Vec<Feature> =
serde_json::from_reader(&file).expect("failed to deserialize features");

decoded
.into_iter()
.map(|feature| crate::routeguide::Feature {
name: feature.name,
location: Some(crate::routeguide::Point {
longitude: feature.location.longitude,
latitude: feature.location.latitude,
}),
})
.collect()
}
Loading