diff --git a/filter/envoy/envoy-wasm-filters/src/lib.rs b/filter/envoy/envoy-wasm-filters/src/lib.rs index dfb2486..3b052c3 100644 --- a/filter/envoy/envoy-wasm-filters/src/lib.rs +++ b/filter/envoy/envoy-wasm-filters/src/lib.rs @@ -33,15 +33,14 @@ struct APIEvent { struct Metadata { context_id: u32, timestamp: u64, - istio_version: String, + receiver_name: String, + receiver_version: String, mesh_id: String, node_name: String, } #[derive(Serialize, Default, Clone)] struct Workload { - name: String, - namespace: String, ip: String, port: u16, } @@ -103,7 +102,7 @@ impl RootContext for Plugin { impl HttpContext for Plugin { fn on_http_request_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action { - let (src_ip, src_port) = get_url_and_port( + let (mut src_ip, src_port) = get_url_and_port( String::from_utf8( self.get_property(vec!["source", "address"]) .unwrap_or_default(), @@ -113,13 +112,22 @@ impl HttpContext for Plugin { let req_headers = self.get_http_request_headers(); let mut headers: HashMap = HashMap::with_capacity(req_headers.len()); - for header in req_headers { + for (key, value) in req_headers { // Don't include Envoy's pseudo headers // https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#id13 - if !header.0.starts_with("x-envoy") { - headers.insert(header.0, header.1); + if !key.starts_with("x-envoy") { + headers.insert(key.clone(), value.clone()); + } + if key.starts_with("x-forwarded-for") { + // Use the real-client IP if available + src_ip = value + .split(",") + .map(|s| s.to_string()) + .collect::>()[0] + .clone() } } + headers.insert( "query".to_string(), String::from_utf8( @@ -154,17 +162,6 @@ impl HttpContext for Plugin { self.api_event.source.ip = src_ip; self.api_event.source.port = src_port; - self.api_event.source.name = String::from_utf8( - self.get_property(vec!["node", "metadata", "NAME"]) - .unwrap_or_default(), - ) - .unwrap_or_default(); - self.api_event.source.namespace = String::from_utf8( - self.get_property(vec!["node", "metadata", "NAMESPACE"]) - .unwrap_or_default(), - ) - .unwrap_or_default(); - Action::Continue } @@ -203,7 +200,6 @@ impl HttpContext for Plugin { self.api_event.response.headers = headers; self.api_event.destination.ip = dest_ip; self.api_event.destination.port = dest_port; - find_and_update_dest_namespace(self); Action::Continue } @@ -221,27 +217,6 @@ impl HttpContext for Plugin { } } -fn find_and_update_dest_namespace(obj: &mut Plugin) { - let dest_ns = String::from_utf8( - obj.get_property(vec![ - "upstream_host_metadata", - "filter_metadata", - "istio", - "workload", - ]) - .unwrap_or_default(), - ) - .unwrap_or_default(); - - // e.g., filterserver;sentryflow;filterserver;;Kubernetes - if !dest_ns.is_empty() { - let parts: Vec<&str> = dest_ns.split(";").collect(); - if parts.len() == 5 || parts.len() == 4 { - obj.api_event.destination.namespace = parts[1].to_string(); - } - } -} - fn dispatch_http_call_to_upstream(obj: &mut Plugin) { update_metadata(obj); let telemetry_json = serde_json::to_string(&obj.api_event).unwrap_or_default(); @@ -281,11 +256,17 @@ fn update_metadata(obj: &mut Plugin) { .unwrap_or_default(), ) .unwrap_or_default(); - obj.api_event.metadata.istio_version = String::from_utf8( + + let istio_version: String = String::from_utf8( obj.get_property(vec!["node", "metadata", "ISTIO_VERSION"]) .unwrap_or_default(), ) .unwrap_or_default(); + + if !istio_version.is_empty() { + obj.api_event.metadata.receiver_version = istio_version; + obj.api_event.metadata.receiver_name = "Istio".to_string(); + } } fn get_url_and_port(address: String) -> (String, u16) {