Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 22 additions & 41 deletions filter/envoy/envoy-wasm-filters/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ struct APIEvent {
struct Metadata {
context_id: u32,
timestamp: u64,
istio_version: String,
receiver_name: String,
receiver_version: String,
mesh_id: String,
node_name: String,
}

#[derive(Serialize, Default, Clone)]
struct Workload {
name: String,
namespace: String,
ip: String,
port: u16,
}
Expand Down Expand Up @@ -103,7 +102,7 @@ impl RootContext for Plugin {

impl HttpContext for Plugin {
fn on_http_request_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action {
let (src_ip, src_port) = get_url_and_port(
let (mut src_ip, src_port) = get_url_and_port(
String::from_utf8(
self.get_property(vec!["source", "address"])
.unwrap_or_default(),
Expand All @@ -113,13 +112,22 @@ impl HttpContext for Plugin {

let req_headers = self.get_http_request_headers();
let mut headers: HashMap<String, String> = HashMap::with_capacity(req_headers.len());
for header in req_headers {
for (key, value) in req_headers {
// Don't include Envoy's pseudo headers
// https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#id13
if !header.0.starts_with("x-envoy") {
headers.insert(header.0, header.1);
if !key.starts_with("x-envoy") {
headers.insert(key.clone(), value.clone());
}
if key.starts_with("x-forwarded-for") {
// Use the real-client IP if available
src_ip = value
.split(",")
.map(|s| s.to_string())
.collect::<Vec<String>>()[0]
.clone()
}
}

headers.insert(
"query".to_string(),
String::from_utf8(
Expand Down Expand Up @@ -154,17 +162,6 @@ impl HttpContext for Plugin {

self.api_event.source.ip = src_ip;
self.api_event.source.port = src_port;
self.api_event.source.name = String::from_utf8(
self.get_property(vec!["node", "metadata", "NAME"])
.unwrap_or_default(),
)
.unwrap_or_default();
self.api_event.source.namespace = String::from_utf8(
self.get_property(vec!["node", "metadata", "NAMESPACE"])
.unwrap_or_default(),
)
.unwrap_or_default();

Action::Continue
}

Expand Down Expand Up @@ -203,7 +200,6 @@ impl HttpContext for Plugin {
self.api_event.response.headers = headers;
self.api_event.destination.ip = dest_ip;
self.api_event.destination.port = dest_port;
find_and_update_dest_namespace(self);

Action::Continue
}
Expand All @@ -221,27 +217,6 @@ impl HttpContext for Plugin {
}
}

fn find_and_update_dest_namespace(obj: &mut Plugin) {
let dest_ns = String::from_utf8(
obj.get_property(vec![
"upstream_host_metadata",
"filter_metadata",
"istio",
"workload",
])
.unwrap_or_default(),
)
.unwrap_or_default();

// e.g., filterserver;sentryflow;filterserver;;Kubernetes
if !dest_ns.is_empty() {
let parts: Vec<&str> = dest_ns.split(";").collect();
if parts.len() == 5 || parts.len() == 4 {
obj.api_event.destination.namespace = parts[1].to_string();
}
}
}

fn dispatch_http_call_to_upstream(obj: &mut Plugin) {
update_metadata(obj);
let telemetry_json = serde_json::to_string(&obj.api_event).unwrap_or_default();
Expand Down Expand Up @@ -281,11 +256,17 @@ fn update_metadata(obj: &mut Plugin) {
.unwrap_or_default(),
)
.unwrap_or_default();
obj.api_event.metadata.istio_version = String::from_utf8(

let istio_version: String = String::from_utf8(
obj.get_property(vec!["node", "metadata", "ISTIO_VERSION"])
.unwrap_or_default(),
)
.unwrap_or_default();

if !istio_version.is_empty() {
obj.api_event.metadata.receiver_version = istio_version;
obj.api_event.metadata.receiver_name = "Istio".to_string();
}
}

fn get_url_and_port(address: String) -> (String, u16) {
Expand Down
Loading