Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ repository = "https://github.com/hyperware-ai/process_lib"
license = "Apache-2.0"

[features]
hyperapp = ["dep:futures-util", "dep:uuid", "logging"]
logging = ["dep:color-eyre", "dep:tracing", "dep:tracing-error", "dep:tracing-subscriber"]
hyperwallet = []
simulation-mode = []
Expand All @@ -29,19 +30,21 @@ alloy = { version = "0.8.1", features = [
anyhow = "1.0"
base64 = "0.22.1"
bincode = "1.3.3"
color-eyre = { version = "0.6", features = ["capture-spantrace"], optional = true }
hex = "0.4.3"
http = "1.0.0"
mime_guess = "2.0"
rand = "0.8"
regex = "1.11.1"
rmp-serde = "1.1.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.120"
sha3 = "0.10.8"
thiserror = "1.0"
url = "2.4.1"
wit-bindgen = "0.42.1"

futures-util = { version = "0.3", optional = true }
uuid = { version = "1.0", features = ["v4"], optional = true }

color-eyre = { version = "0.6", features = ["capture-spantrace"], optional = true }
tracing = { version = "0.1", optional = true }
tracing-error = { version = "0.2", optional = true }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "std"], optional = true }
url = "2.4.1"
wit-bindgen = "0.42.1"
120 changes: 119 additions & 1 deletion src/http/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
pub use super::server::{HttpResponse, WsMessageType};
use crate::{get_blob, LazyLoadBlob as KiBlob, Message, Request as KiRequest};
#[cfg(not(feature = "hyperapp"))]
use crate::Message;
use crate::{get_blob, LazyLoadBlob as KiBlob, Request as KiRequest};
use http::Method;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use thiserror::Error;

#[cfg(feature = "hyperapp")]
use crate::hyperapp;

/// [`crate::Request`] type sent to the `http-client:distro:sys` service in order to open a
/// WebSocket connection, send a WebSocket message on an existing connection, or
/// send an HTTP request.
Expand Down Expand Up @@ -131,6 +136,7 @@ pub fn send_request(
/// Make an HTTP request using http-client and await its response.
///
/// Returns HTTP response from the `http` crate if successful, with the body type as bytes.
#[cfg(not(feature = "hyperapp"))]
pub fn send_request_await_response(
method: Method,
url: url::Url,
Expand Down Expand Up @@ -190,6 +196,64 @@ pub fn send_request_await_response(
.unwrap())
}

/// Make an HTTP request using http-client and await its response.
///
/// Returns HTTP response from the `http` crate if successful, with the body type as bytes.
#[cfg(feature = "hyperapp")]
pub async fn send_request_await_response(
method: Method,
url: url::Url,
headers: Option<HashMap<String, String>>,
timeout: u64,
body: Vec<u8>,
) -> std::result::Result<http::Response<Vec<u8>>, HttpClientError> {
let request = KiRequest::to(("our", "http-client", "distro", "sys"))
.body(
serde_json::to_vec(&HttpClientAction::Http(OutgoingHttpRequest {
method: method.to_string(),
version: None,
url: url.to_string(),
headers: headers.unwrap_or_default(),
}))
.map_err(|_| HttpClientError::MalformedRequest)?,
)
.blob_bytes(body)
.expects_response(timeout);

let resp_result =
hyperapp::send::<std::result::Result<HttpClientResponse, HttpClientError>>(request)
.await
.map_err(|_| {
HttpClientError::ExecuteRequestFailed("http-client timed out".to_string())
})?;

let resp = match resp_result {
Ok(HttpClientResponse::Http(resp)) => resp,
Ok(HttpClientResponse::WebSocketAck) => {
return Err(HttpClientError::ExecuteRequestFailed(
"http-client gave unexpected response".to_string(),
))
}
Err(e) => return Err(e),
};
let mut http_response = http::Response::builder()
.status(http::StatusCode::from_u16(resp.status).unwrap_or_default());
let headers = http_response.headers_mut().unwrap();
for (key, value) in &resp.headers {
let Ok(key) = http::header::HeaderName::from_str(key) else {
continue;
};
let Ok(value) = http::header::HeaderValue::from_str(value) else {
continue;
};
headers.insert(key, value);
}
Ok(http_response
.body(get_blob().unwrap_or_default().bytes)
.unwrap())
}

#[cfg(not(feature = "hyperapp"))]
pub fn open_ws_connection(
url: String,
headers: Option<HashMap<String, String>>,
Expand Down Expand Up @@ -231,7 +295,37 @@ pub fn send_ws_client_push(channel_id: u32, message_type: WsMessageType, blob: K
.unwrap()
}

#[cfg(feature = "hyperapp")]
pub async fn open_ws_connection(
url: String,
headers: Option<HashMap<String, String>>,
channel_id: u32,
) -> std::result::Result<(), HttpClientError> {
let request = KiRequest::to(("our", "http-client", "distro", "sys"))
.body(
serde_json::to_vec(&HttpClientAction::WebSocketOpen {
url: url.clone(),
headers: headers.unwrap_or(HashMap::new()),
channel_id,
})
.unwrap(),
)
.expects_response(5);

let resp_result =
hyperapp::send::<std::result::Result<HttpClientResponse, HttpClientError>>(request)
.await
.map_err(|_| HttpClientError::WsOpenFailed { url: url.clone() })?;

match resp_result {
Ok(HttpClientResponse::WebSocketAck) => Ok(()),
Err(e) => Err(e),
_ => Err(HttpClientError::WsOpenFailed { url }),
}
}

/// Close a WebSocket connection.
#[cfg(not(feature = "hyperapp"))]
pub fn close_ws_connection(channel_id: u32) -> std::result::Result<(), HttpClientError> {
let Ok(Ok(Message::Response { body, .. })) =
KiRequest::to(("our", "http-client", "distro", "sys"))
Expand All @@ -251,3 +345,27 @@ pub fn close_ws_connection(channel_id: u32) -> std::result::Result<(), HttpClien
_ => Err(HttpClientError::WsCloseFailed { channel_id }),
}
}

/// Close a WebSocket connection.
#[cfg(feature = "hyperapp")]
pub async fn close_ws_connection(channel_id: u32) -> std::result::Result<(), HttpClientError> {
let request = KiRequest::to(("our", "http-client", "distro", "sys"))
.body(
serde_json::json!(HttpClientAction::WebSocketClose { channel_id })
.to_string()
.as_bytes()
.to_vec(),
)
.expects_response(5);

let resp_result =
hyperapp::send::<std::result::Result<HttpClientResponse, HttpClientError>>(request)
.await
.map_err(|_| HttpClientError::WsCloseFailed { channel_id })?;

match resp_result {
Ok(HttpClientResponse::WebSocketAck) => Ok(()),
Err(e) => Err(e),
_ => Err(HttpClientError::WsCloseFailed { channel_id }),
}
}
Loading