diff --git a/Scripts/check-cortexflow-components.sh b/Scripts/check-cortexflow-components.sh deleted file mode 100755 index 01232cbe..00000000 --- a/Scripts/check-cortexflow-components.sh +++ /dev/null @@ -1,21 +0,0 @@ -echo "Welcome to CortexFlow tools" -echo "Checking CortexFlow components" - -echo "Checking if CortexFlow namespace exists..." -if kubectl get namespace cortexflow >/dev/null 2>&1; then - echo "✅ Namespace 'cortexflow' exists." - - sleep 1 - echo "Checking pods..." - kubectl get pods -n cortexflow - - echo - - sleep 1 - echo "Checking services..." - kubectl get svc -n cortexflow - echo -else - echo "❌ Namespace 'cortexflow' does not exist." - exit 1 -fi diff --git a/Scripts/check-dev-requisites.sh b/Scripts/check-dev-requisites.sh deleted file mode 100755 index c775754c..00000000 --- a/Scripts/check-dev-requisites.sh +++ /dev/null @@ -1,41 +0,0 @@ -#!/bin/bash - -echo "Welcome to the CortexFlow tools" -echo "Checking pre-requisites for developers" -echo - -echo "Checking Docker installation..." -if which docker >/dev/null 2>&1; then - echo "✅ Docker is installed." -else - echo "❌ Docker is NOT installed." -fi -sleep 1 - -echo -echo "Checking Minikube installation..." -if which minikube >/dev/null 2>&1; then - echo "✅ Minikube is installed." -else - echo "❌ Minikube is NOT installed." -fi -sleep 1 - -echo - -echo "Checking Node.js installation..." -if which node >/dev/null 2>&1; then - echo "✅ Node.js is installed." -else - echo "Node.js is NOT installed." -fi -sleep 1 - -echo - -echo "Checking npm installation..." -if which npm >/dev/null 2>&1; then - echo "✅ npm is installed." -else - echo "❌ npm is NOT installed." -fi diff --git a/Scripts/install-debugging-tools.sh b/Scripts/install-debugging-tools.sh deleted file mode 100755 index 9e3ed017..00000000 --- a/Scripts/install-debugging-tools.sh +++ /dev/null @@ -1,45 +0,0 @@ -if ! kubectl exec -n cortexflow $1 -c $2 -- which netstat >/dev/null 2>&1; then - echo "🔨 installing netstat" - kubectl exec -n cortexflow $1 -c $2 -- apt update - kubectl exec -n cortexflow $1 -c $2 -- apt install -y net-tools -else - echo "✅ Netstat is installed." -fi - -sleep 1.5 - -if ! kubectl exec -n cortexflow $1 -c $2 -- which nc >/dev/null 2>&1; then - echo "🔨 installing netcat" - kubectl exec -n cortexflow $1 -c $2 -- apt install -y netcat-traditional -else - echo "✅ Netcat is installed." -fi - -sleep 1.5 - -if ! kubectl exec -n cortexflow $1 -c $2 -- which curl >/dev/null 2>&1; then - echo "🔨 installing curl" - kubectl exec -n cortexflow $1 -c $2 -- apt install -y curl -else - echo "✅ Curl is installed." -fi - -sleep 1.5 - -if ! kubectl exec -n cortexflow $1 -c $2 -- which nslookup >/dev/null 2>&1; then - echo "🔨 installing dnsutils" - kubectl exec -n cortexflow $1 -c $2 -- apt install -y dnsutils -else - echo "✅ Nslookup is installed." -fi - -sleep 1.5 - -if ! kubectl exec -n cortexflow $1 -c $2 -- which tcpdump >/dev/null 2>&1; then - echo "🔨 installing tcpdump" - kubectl exec -n cortexflow $1 -c $2 -- apt install -y tcpdump -else - echo "✅ tcpdump is installed." -fi - -sleep 1.5 diff --git a/Scripts/test-connections.sh b/Scripts/test-connections.sh deleted file mode 100755 index 95dcc946..00000000 --- a/Scripts/test-connections.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/bash - -proxy_pod_name=$(kubectl get pods -n cortexflow --no-headers -o custom-columns=":metadata.name" | grep cortexflow-proxy) -proxy_ip=$(kubectl get -o template service/proxy-service -n cortexflow --template='{{.spec.clusterIP}}') -proxy_udp_port=5053 -proxy_tcp_port=5054 -proxy_metrics_port=9090 -proxy_container=$(kubectl get pod $proxy_pod_name -n cortexflow -o jsonpath='{.spec.containers[*].name}') - -echo "🧑🏻‍🔬 Checking cortexflow proxy inside the proxy pod: $proxy_pod_name" - -sleep 1.5 -echo "🔨 checking env variables" -kubectl exec -n cortexflow $proxy_pod_name -- env - -sleep 1.5 - -./install-debugging-tools.sh $proxy_pod_name $proxy_container -echo -./test-proxy-ports.sh $proxy_pod_name $proxy_metrics_port -echo -sleep 1.5 -echo "🔨 Sending a test package with netcat from proxy pod -> proxy pod" -kubectl exec -n cortexflow $proxy_pod_name -- sh -c echo b"Hi CortexFlow" | nc -u -w5 -v 127.0.0.1 $proxy_udp_port - -echo -sleep 1.5 -echo "🔨 Testing the DNS resolution manually with nslookup" -kubectl exec -n cortexflow $proxy_pod_name -- nslookup proxy-service.cortexflow.svc.cluster.local - -sleep 1.5 -echo -./test-proxy-endpoints.sh $proxy_pod_name -echo -echo -echo "🧑🏻‍🔬 Testing outside the proxy pod using a test pod" -echo "🔨 Testing using a temporary test pod and nslookup" -kubectl run -it --rm --image=busybox test-pod --restart=Never -n cortexflow -- nslookup proxy-service.cortexflow.svc.cluster.local - -echo -sleep 1.5 -echo "🔨 Sending a test message using netcat and a temporary test pod" -kubectl run -it --rm --image=busybox test-pod --restart=Never -n cortexflow -- sh -c "echo -n Hi CortexFlow | nc -u -w 3 -v $proxy_ip $proxy_udp_port" - -echo -sleep 1.5 -echo "🔨 Testing the tcp port" -echo "🔨 Sending a test message using netcat and a temporary test pod " -kubectl run -it --rm --image=busybox test-pod --restart=Never -n cortexflow -- sh -c "echo -n Hi TCP | nc -w 3 -v $proxy_ip $proxy_tcp_port" diff --git a/Scripts/test-proxy-endpoints.sh b/Scripts/test-proxy-endpoints.sh deleted file mode 100755 index c89e52ef..00000000 --- a/Scripts/test-proxy-endpoints.sh +++ /dev/null @@ -1,45 +0,0 @@ -echo "🔨 Testing curl command" -response=$(kubectl exec -n cortexflow $1 -- curl -s -o /dev/null -w "%{http_code}" http://localhost:9090/) -if [ "$response" -eq 200 ]; then - echo "✅ Server is working" - echo " Checking / endpoint" - kubectl exec -n cortexflow $1 -- curl -v http://localhost:9090/ -else - echo "❌ Error in http response ERROR: $response. Service does not exists or is not exposed" -fi - -echo -sleep 1.5 -echo "🔨 Testing /health endpoint" -response=$(kubectl exec -n cortexflow $1 -- curl -s -o /dev/null -w "%{http_code}" http://localhost:9090/health) -if [ "$response" -eq 200 ]; then - echo "✅ Server is working" - echo " Checking /health endpoint" - kubectl exec -n cortexflow $1 -- curl -v http://localhost:9090/health -else - echo "❌ Error in http response ERROR: $response. Service does not exists or is not exposed" -fi - -echo -sleep 1.5 -echo "🔨 Testing /metrics endpoint" -response=$(kubectl exec -n cortexflow $1 -- curl -s -o /dev/null -w "%{http_code}" http://localhost:9090/metrics) -if [ "$response" -eq 200 ]; then - echo "✅ Server is working" - echo " Checking /metrics endpoint" - kubectl exec -n cortexflow $1 -- curl -v http://localhost:9090/metrics -else - echo "❌ Error in http response ERROR: $response. Service does not exists or is not exposed" -fi - -echo -sleep 1.5 -echo "🔨 Testing /status endpoint" -response=$(kubectl exec -n cortexflow $1 -- curl -s -o /dev/null -w "%{http_code}" http://localhost:9090/status) -if [ "$response" -eq 200 ]; then - echo "✅ Server is working" - echo " Checking /status endpoint" - kubectl exec -n cortexflow $1 -- curl -v http://localhost:9090/status -else - echo "❌ Error in http response ERROR: $response. Service does not exists or is not exposed" -fi diff --git a/Scripts/test-proxy-ports.sh b/Scripts/test-proxy-ports.sh deleted file mode 100755 index 33d658dd..00000000 --- a/Scripts/test-proxy-ports.sh +++ /dev/null @@ -1,18 +0,0 @@ -echo "🔨 Testing network connections" -kubectl exec -n cortexflow $1 -- netstat -tulnp | grep $2 - -sleep 1.5 - -echo -echo "🔨 testing if the process is in execution" -kubectl exec -n cortexflow $1 -- ps aux | grep cortexflow-proxy - -sleep 1.5 -echo -echo "🔨 testing using netcat" -kubectl exec -n cortexflow $1 -- nc -zv proxy-service.cortexflow.svc.cluster.local $2 - -sleep 1.5 -echo -echo "🔨 Checking if the proxy is listening in the 5053 port" -kubectl exec -n cortexflow $1 -- netstat -ulnp diff --git a/Scripts/test-sidecar-advanced-tcp.sh b/Scripts/test-sidecar-advanced-tcp.sh deleted file mode 100755 index ec3fce4c..00000000 --- a/Scripts/test-sidecar-advanced-tcp.sh +++ /dev/null @@ -1,67 +0,0 @@ -#!/bin/sh - -./install-debugging-tools.sh test-proxy proxy-sidecar -./install-debugging-tools.sh test-proxy2 proxy-sidecar -./install-debugging-tools.sh test-proxy3 proxy-sidecar -./install-debugging-tools.sh test-proxy4 proxy-sidecar - -# start the tcp listener -kubectl exec test-proxy -c proxy-sidecar -n cortexflow -- sh -c ' - echo "Starting TCP listener on port 5054..." - nohup sh -c "nc -l -p 5054" >/dev/null 2>&1 & -' - -kubectl exec test-proxy2 -c proxy-sidecar -n cortexflow -- sh -c ' - echo "Starting TCP listener on port 5054..." - nohup sh -c "nc -l -p 5054" >/dev/null 2>&1 & -' - - -test_proxy_to_proxy2() { - for i in $(seq 1 300); do - sleep $((RANDOM % 5 + 1)) - kubectl exec test-proxy -c proxy-sidecar -n cortexflow -- sh -c ' - printf "{\"service\":\"test-proxy2.cortexflow\",\"direction\":\"Incoming\",\"payload\":\"eyJwYXlsb2FkIjogIkhlbGxvIGZyb20gcHJveHktc2lkZWNhciJ9\"}\n" | nc -w1 test-proxy2 5054 - ' - done -} - -test_proxy2_to_proxy() { - for i in $(seq 1 300); do - sleep $((RANDOM % 5 + 1)) - kubectl exec test-proxy2 -c proxy-sidecar -n cortexflow -- sh -c ' - printf "{\"service\":\"test-proxy.cortexflow\",\"direction\":\"Incoming\",\"payload\":\"eyJwYXlsb2FkIjogIkhlbGxvIGZyb20gcHJveHktc2lkZWNhciJ9\"}\n" | nc -w1 test-proxy 5054 - ' - done -} - -test_proxy3_to_proxy2() { - for i in $(seq 1 300); do - sleep $((RANDOM % 5 + 1)) - kubectl exec test-proxy3 -c proxy-sidecar -n cortexflow -- sh -c ' - printf "{\"service\":\"test-proxy2.cortexflow\",\"direction\":\"Incoming\",\"payload\":\"eyJwYXlsb2FkIjogIkhlbGxvIGZyb20gcHJveHktc2lkZWNhciJ9\"}\n" | nc -w1 test-proxy2 5054 - ' - done -} - -test_proxy4_to_proxy2() { - for i in $(seq 1 300); do - sleep $((RANDOM % 5 + 1)) - kubectl exec test-proxy4 -c proxy-sidecar -n cortexflow -- sh -c ' - printf "{\"service\":\"test-proxy2.cortexflow\",\"direction\":\"Incoming\",\"payload\":\"eyJwYXlsb2FkIjogIkhlbGxvIGZyb20gcHJveHktc2lkZWNhciJ9\"}\n" | nc -w1 test-proxy2 5054 - ' - done -} - -# execute the functions in background -test_proxy_to_proxy2 & -test_proxy2_to_proxy & -test_proxy3_to_proxy2 & -test_proxy4_to_proxy2 & - - -sleep 300 - -# stop the listeners -kubectl exec test-proxy -c proxy-sidecar -n cortexflow -- sh -c 'pkill nc' -kubectl exec test-proxy2 -c proxy-sidecar -n cortexflow -- sh -c 'pkill nc' diff --git a/Scripts/test-sidecar-advanced-udp.sh b/Scripts/test-sidecar-advanced-udp.sh deleted file mode 100755 index d9c52a85..00000000 --- a/Scripts/test-sidecar-advanced-udp.sh +++ /dev/null @@ -1,70 +0,0 @@ -#!/bin/sh -./install-debugging-tools.sh test-proxy proxy-sidecar -./install-debugging-tools.sh test-proxy2 proxy-sidecar -./install-debugging-tools.sh test-proxy3 proxy-sidecar -./install-debugging-tools.sh test-proxy4 proxy-sidecar - -# start the udp listener -kubectl exec test-proxy -c proxy-sidecar -n cortexflow -- sh -c ' - echo "Starting UDP listener on port 5053..." - nohup nc -lu 5053 >/dev/null 2>&1 & -' - -kubectl exec test-proxy2 -c proxy-sidecar -n cortexflow -- sh -c ' - echo "Starting UDP listener on port 5053..." - nohup nc -lu 5053 >/dev/null 2>&1 & -' - - -test_proxy_to_proxy2() { - for i in $(seq 1 300); do - sleep $((RANDOM % 5 + 1)) - echo "Sending UDP packet from test-proxy to test-proxy2..." - kubectl exec test-proxy -c proxy-sidecar -n cortexflow -- sh -c ' - printf "{\"service\":\"test-proxy2.cortexflow\",\"direction\":\"Incoming\",\"payload\":\"eyJwYXlsb2FkIjogIkhlbGxvIGZyb20gcHJveHktc2lkZWNhciJ9\"}\n" | nc -u -w1 test-proxy2 5053 - ' - done -} - -test_proxy2_to_proxy() { - for i in $(seq 1 300); do - sleep $((RANDOM % 5 + 1)) - echo "Sending UDP packet from test-proxy2 to test-proxy..." - kubectl exec test-proxy2 -c proxy-sidecar -n cortexflow -- sh -c ' - printf "{\"service\":\"test-proxy.cortexflow\",\"direction\":\"Incoming\",\"payload\":\"eyJwYXlsb2FkIjogIkhlbGxvIGZyb20gcHJveHktc2lkZWNhciJ9\"}\n" | nc -u -w1 test-proxy 5053 - ' - done -} - -test_proxy3_to_proxy2() { - for i in $(seq 1 300); do - sleep $((RANDOM % 5 + 1)) - echo "Sending UDP packet from test-proxy3 to test-proxy2..." - kubectl exec test-proxy3 -c proxy-sidecar -n cortexflow -- sh -c ' - printf "{\"service\":\"test-proxy2.cortexflow\",\"direction\":\"Incoming\",\"payload\":\"eyJwYXlsb2FkIjogIkhlbGxvIGZyb20gcHJveHktc2lkZWNhciJ9\"}\n" | nc -u -w1 test-proxy2 5053 - ' - done -} - -test_proxy4_to_proxy2() { - for i in $(seq 1 300); do - sleep $((RANDOM % 5 + 1)) - echo "Sending UDP packet from test-proxy4 to test-proxy2..." - kubectl exec test-proxy4 -c proxy-sidecar -n cortexflow -- sh -c ' - printf "{\"service\":\"test-proxy2.cortexflow\",\"direction\":\"Incoming\",\"payload\":\"eyJwYXlsb2FkIjogIkhlbGxvIGZyb20gcHJveHktc2lkZWNhciJ9\"}\n" | nc -u -w1 test-proxy2 5053 - ' - done -} - -# execute the functions in background -(test_proxy_to_proxy2 &) & -(test_proxy2_to_proxy &) & -(test_proxy3_to_proxy2 &) & -(test_proxy4_to_proxy2 &) & - - -sleep 300 - -# stop the listeners -kubectl exec test-proxy -c proxy-sidecar -n cortexflow -- sh -c 'pkill nc || kill $(pgrep nc)' -kubectl exec test-proxy2 -c proxy-sidecar -n cortexflow -- sh -c 'pkill nc || kill $(pgrep nc)' diff --git a/Scripts/test-sidecar-proxy.sh b/Scripts/test-sidecar-proxy.sh deleted file mode 100755 index fcce42de..00000000 --- a/Scripts/test-sidecar-proxy.sh +++ /dev/null @@ -1,71 +0,0 @@ -#!/bin/bash - -echo "Testing Sidecar proxy injection " - -sleep 1 -echo "Checking pods" -kubectl get pods -o wide -n cortexflow -echo -echo "Checking if the sidecar proxy is present" -kubectl get pods -n cortexflow -o json | jq '.items[].spec.containers[].name' - -echo -sleep 1 -echo "Checking open ports in test-proxy" -kubectl get pods test-proxy -o jsonpath='{.spec.containers[*].ports}' -n cortexflow -echo -kubectl get pods test-proxy2 -o jsonpath='{.spec.containers[*].ports}' -n cortexflow - -echo -echo -echo "Installing debugging tools in test-proxy: (PROXY-SIDECAR container)" -sleep 3 -./install-debugging-tools.sh test-proxy proxy-sidecar -echo -echo -echo "Installing debugging tools in test-proxy2: (PROXY-SIDECAR container)" -sleep 3 -./install-debugging-tools.sh test-proxy2 proxy-sidecar - -echo -echo -echo "Checking network connections in test-proxy pod " -kubectl exec -it test-proxy -c proxy-sidecar -n cortexflow -- netstat -tulnp -echo -echo "Checking network connections in test-proxy2 pod" -kubectl exec -it test-proxy2 -c proxy-sidecar -n cortexflow -- netstat -tulnp - - -echo -sleep 2 -echo "TEST 1: Checking if test-proxy can communicate with test-proxy2" -kubectl exec -it test-proxy -c proxy-sidecar -n cortexflow -- nc -zv test-proxy2.cortexflow.svc.cluster.local 5054 -echo - -echo - -echo "TEST 2: Checking if test-proxy can communicate with test-proxy2 (TCP)" - -# 2. Send the message from test-proxy to test-proxy2 -kubectl exec test-proxy -c proxy-sidecar -n cortexflow -- sh -c ' - echo "Test: Incoming Message ⏳" - printf "{\"service\":\"test-proxy2.cortexflow\",\"direction\":\"Incoming\",\"payload\":\"eyJwYXlsb2FkIjogIkhlbGxvIGZyb20gcHJveHktc2lkZWNhciJ9\"}\n" | nc -w3 test-proxy2 5054 && echo "✅ Test completed" -' - -echo -sleep 2 -echo -echo "TEST 2: Sending a message from test-proxy to test-proxy2 (UDP)" - -#Start the UDP listener on test-proxy2 (MUST be before sending the message) -kubectl exec test-proxy2 -c proxy-sidecar -n cortexflow -- sh -c ' - echo "Starting UDP listener on port 5053..." - nohup sh -c "nc -lu -p 5053 > /tmp/received_message.log" >/dev/null 2>&1 & - sleep 2 # Wait for the listener to start -' - -#2. Send the message from test-proxy to test-proxy2 -kubectl exec test-proxy -c proxy-sidecar -n cortexflow -- sh -c ' - echo "Test: Incoming Message ⏳" - echo "{\"service\":\"test-proxy2.cortexflow\",\"direction\":\"Incoming\",\"payload\":\"eyJtZXNzYWdlIjogIkhlbGxvIGZyb20gcHJveHktc2lkZWNhciJ9\"}" | nc -u -w3 test-proxy2 5053 && echo "✅ Test completed" -' diff --git a/cli/src/monitoring.rs b/cli/src/monitoring.rs index 72a94b87..eefae1c7 100644 --- a/cli/src/monitoring.rs +++ b/cli/src/monitoring.rs @@ -10,7 +10,7 @@ use tonic_reflection::pb::v1::server_reflection_response::MessageResponse; use agent_api::client::{connect_to_client, connect_to_server_reflection}; use agent_api::requests::{ get_all_features, send_active_connection_request, send_dropped_packets_request, - send_latency_metrics_request, send_tracked_veth_request, + send_latency_metrics_request, send_tracked_veth_request, send_veth_tracked_hashmap_req, }; use crate::errors::CliError; @@ -304,25 +304,24 @@ pub async fn monitor_tracked_veth() -> Result<(), CliError> { "Connecting to cortexflow Client".white() ); match connect_to_client().await { - Ok(client) => match send_tracked_veth_request(client).await { + Ok(client) => match send_veth_tracked_hashmap_req(client).await { Ok(response) => { let veth_response = response.into_inner(); - if veth_response.tot_monitored_veth == 0 { - println!("{} {} ", "=====>".blue().bold(), "No tracked veth found"); - Ok(()) - } else { - println!( - "{} {} {} {} ", - "=====>".blue().bold(), - "Found:", - &veth_response.tot_monitored_veth, - "tracked veth" - ); - for veth in veth_response.veth_names.iter() { - println!("{} {}", "=====>".blue().bold(), &veth); - } - Ok(()) + // if veth_response.tot_monitored_veth == 0 { + // println!("{} {} ", "=====>".blue().bold(), "No tracked veth found"); + // Ok(()) + // } else { + // println!( + // "{} {} {} {} ", + // "=====>".blue().bold(), + // "Found:", + // &veth_response.tot_monitored_veth, + // "tracked veth" + // ); + for veth in veth_response.veths.iter() { + println!("{} {:?}", "=====>".blue().bold(), &veth); } + Ok(()) } Err(e) => { return Err(CliError::AgentError( diff --git a/core/api/Cargo.toml b/core/api/Cargo.toml index a422fd74..0070430d 100644 --- a/core/api/Cargo.toml +++ b/core/api/Cargo.toml @@ -32,7 +32,8 @@ aya = "0.13.1" cortexbrain-common = { path = "../common", features = [ "map-handlers", "network-structs", - "buffer-reader" + "buffer-reader", + "monitoring-structs" ] } tonic-reflection = "0.14.0" tonic-build = "0.14.0" diff --git a/core/api/protos/agent.proto b/core/api/protos/agent.proto index 9bfc6e4a..e2b1500a 100644 --- a/core/api/protos/agent.proto +++ b/core/api/protos/agent.proto @@ -84,7 +84,13 @@ message VethEvent{ uint32 pid = 6; // Process ID } -//declare agent api +message VethHashMapResponse{ // returns tracked veth from the tracked_veth hashmap + string status = 1; + map veths = 2; +} + +// Agent Service + service Agent{ // active connections endpoint rpc ActiveConnections(RequestActiveConnections) returns (ActiveConnectionResponse); @@ -102,10 +108,15 @@ service Agent{ // dropped packets endpoint rpc GetDroppedPacketsMetrics(google.protobuf.Empty) returns (DroppedPacketsResponse); + // TODO: can i combine this 2 endpoints? // active veth info endpoint rpc GetTrackedVeth(google.protobuf.Empty) returns (VethResponse); + // get tracked veth from blocklist + rpc GetTrackedVethFromHashMap(google.protobuf.Empty) returns (VethHashMapResponse); } +// Blocklist + message AddIpToBlocklistRequest{ optional string ip = 1 ; } diff --git a/core/api/src/agent.rs b/core/api/src/agent.rs index cb93ddd2..8d004b90 100644 --- a/core/api/src/agent.rs +++ b/core/api/src/agent.rs @@ -151,6 +151,17 @@ pub struct VethEvent { #[prost(uint32, tag = "6")] pub pid: u32, } +/// returns tracked veth from the tracked_veth hashmap +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VethHashMapResponse { + #[prost(string, tag = "1")] + pub status: ::prost::alloc::string::String, + #[prost(map = "string, string", tag = "2")] + pub veths: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, +} #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct AddIpToBlocklistRequest { #[prost(string, optional, tag = "1")] @@ -192,7 +203,6 @@ pub mod agent_client { )] use tonic::codegen::*; use tonic::codegen::http::Uri; - /// declare agent api #[derive(Debug, Clone)] pub struct AgentClient { inner: tonic::client::Grpc, @@ -422,6 +432,7 @@ pub mod agent_client { .insert(GrpcMethod::new("agent.Agent", "GetDroppedPacketsMetrics")); self.inner.unary(req, path, codec).await } + /// TODO: can i combine this 2 endpoints? /// active veth info endpoint pub async fn get_tracked_veth( &mut self, @@ -444,6 +455,31 @@ pub mod agent_client { .insert(GrpcMethod::new("agent.Agent", "GetTrackedVeth")); self.inner.unary(req, path, codec).await } + /// get tracked veth from blocklist + pub async fn get_tracked_veth_from_hash_map( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/agent.Agent/GetTrackedVethFromHashMap", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("agent.Agent", "GetTrackedVethFromHashMap")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -506,13 +542,21 @@ pub mod agent_server { tonic::Response, tonic::Status, >; + /// TODO: can i combine this 2 endpoints? /// active veth info endpoint async fn get_tracked_veth( &self, request: tonic::Request<()>, ) -> std::result::Result, tonic::Status>; + /// get tracked veth from blocklist + async fn get_tracked_veth_from_hash_map( + &self, + request: tonic::Request<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } - /// declare agent api #[derive(Debug)] pub struct AgentServer { inner: Arc, @@ -885,6 +929,50 @@ pub mod agent_server { }; Box::pin(fut) } + "/agent.Agent/GetTrackedVethFromHashMap" => { + #[allow(non_camel_case_types)] + struct GetTrackedVethFromHashMapSvc(pub Arc); + impl tonic::server::UnaryService<()> + for GetTrackedVethFromHashMapSvc { + type Response = super::VethHashMapResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_tracked_veth_from_hash_map( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetTrackedVethFromHashMapSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { let mut response = http::Response::new( diff --git a/core/api/src/api.rs b/core/api/src/api.rs index 79b9df34..ba25101b 100644 --- a/core/api/src/api.rs +++ b/core/api/src/api.rs @@ -1,14 +1,19 @@ use anyhow::Context; +use anyhow::anyhow; +use aya::maps::perf::PerfEventArrayBuffer; use chrono::Local; +use cortexbrain_common::buffer_type::IpProtocols; +use cortexbrain_common::buffer_type::NetworkMetrics; +use cortexbrain_common::buffer_type::PacketLog; +use cortexbrain_common::buffer_type::TimeStampMetrics; use cortexbrain_common::formatters::{format_ipv4, format_ipv6}; use cortexbrain_common::map_handlers::load_perf_event_array_from_mapdata; -use prost::bytes::BytesMut; use std::str::FromStr; use std::sync::Mutex; use tonic::{Request, Response, Status}; use tracing::info; -use aya::{maps::MapData, util::online_cpus}; +use aya::maps::MapData; use std::result::Result::Ok; use tonic::async_trait; @@ -22,106 +27,35 @@ use crate::agent::{ LatencyMetricsResponse, VethEvent, }; -use crate::structs::{NetworkMetrics, PacketLog, TimeStampMetrics}; use cortexbrain_common::buffer_type::VethLog; // * contains agent api configuration use crate::agent::{ ActiveConnectionResponse, AddIpToBlocklistRequest, BlocklistResponse, RequestActiveConnections, - RmIpFromBlocklistRequest, RmIpFromBlocklistResponse, VethResponse, agent_server::Agent, + RmIpFromBlocklistRequest, RmIpFromBlocklistResponse, VethHashMapResponse, VethResponse, + agent_server::Agent, }; use crate::constants::PIN_BLOCKLIST_MAP_PATH; use crate::helpers::comm_to_string; use aya::maps::Map; -use cortexbrain_common::buffer_type::IpProtocols; use std::net::Ipv4Addr; use tracing::warn; +use cortexbrain_common::buffer_type::BufferSize; +use cortexbrain_common::buffer_type::fill_buffers; + pub struct AgentApi { //* event_rx is an istance of a mpsc receiver. //* is used to receive the data from the transmitter (tx) active_connection_event_rx: Mutex, Status>>>, - active_connection_event_tx: mpsc::Sender, Status>>, + pub(crate) active_connection_event_tx: mpsc::Sender, Status>>, latency_metrics_rx: Mutex, Status>>>, - latency_metrics_tx: mpsc::Sender, Status>>, + pub(crate) latency_metrics_tx: mpsc::Sender, Status>>, dropped_packet_metrics_rx: Mutex, Status>>>, - dropped_packet_metrics_tx: mpsc::Sender, Status>>, + pub(crate) dropped_packet_metrics_tx: mpsc::Sender, Status>>, tracked_veth_rx: Mutex, Status>>>, - tracked_veth_tx: mpsc::Sender, Status>>, -} - -//* Event sender trait. Takes an event from a map and send that to the mpsc channel -//* using the send_map function -#[async_trait] -pub trait EventSender: Send + Sync + 'static { - async fn send_active_connection_event(&self, event: Vec); - async fn send_active_connection_event_map( - &self, - map: Vec, - tx: mpsc::Sender, Status>>, - ) { - let status = Status::new(tonic::Code::Ok, "success"); - let event = Ok(map); - - let _ = tx.send(event).await; - } - - async fn send_latency_metrics_event(&self, event: Vec); - async fn send_latency_metrics_event_map( - &self, - map: Vec, - tx: mpsc::Sender, Status>>, - ) { - let status = Status::new(tonic::Code::Ok, "success"); - let event = Ok(map); - let _ = tx.send(event).await; - } - - async fn send_dropped_packet_metrics_event(&self, event: Vec); - async fn send_dropped_packet_metrics_event_map( - &self, - map: Vec, - tx: mpsc::Sender, Status>>, - ) { - let status = Status::new(tonic::Code::Ok, "success"); - let event = Ok(map); - let _ = tx.send(event).await; - } - - async fn send_tracked_veth_event(&self, event: Vec); - async fn send_tracked_veth_event_map( - &self, - map: Vec, - tx: mpsc::Sender, Status>>, - ) { - let status = Status::new(tonic::Code::Ok, "success"); - let event = Ok(map); - let _ = tx.send(event).await; - } -} - -// send event function. takes an HashMap and send that using mpsc event_tx -#[async_trait] -impl EventSender for AgentApi { - async fn send_active_connection_event(&self, event: Vec) { - self.send_active_connection_event_map(event, self.active_connection_event_tx.clone()) - .await; - } - - async fn send_latency_metrics_event(&self, event: Vec) { - self.send_latency_metrics_event_map(event, self.latency_metrics_tx.clone()) - .await; - } - - async fn send_dropped_packet_metrics_event(&self, event: Vec) { - self.send_dropped_packet_metrics_event_map(event, self.dropped_packet_metrics_tx.clone()) - .await; - } - async fn send_tracked_veth_event(&self, event: Vec) { - self.send_tracked_veth_event_map(event, self.tracked_veth_tx.clone()) - .await; - } + pub(crate) tracked_veth_tx: mpsc::Sender, Status>>, } //initialize a default trait for AgentApi. Loads a name and a bpf istance. @@ -133,13 +67,13 @@ impl Default for AgentApi { // // TODO: in the future will be better to not use .unwrap() - let mut active_connection_events_array = + let active_connection_events_array = load_perf_event_array_from_mapdata("/sys/fs/bpf/maps/events_map").unwrap(); - let mut network_metrics_events_array = + let network_metrics_events_array = load_perf_event_array_from_mapdata("/sys/fs/bpf/trace_maps/net_metrics").unwrap(); - let mut time_stamp_events_array = + let time_stamp_events_array = load_perf_event_array_from_mapdata("/sys/fs/bpf/trace_maps/time_stamp_events").unwrap(); - let mut tracked_veth_events_array = + let tracked_veth_events_array = load_perf_event_array_from_mapdata("/sys/fs/bpf/maps/veth_identity_map").unwrap(); // @@ -151,6 +85,7 @@ impl Default for AgentApi { let (drop_tx, drop_rx) = mpsc::channel(2048); let (veth_tx, tracked_veth_rx) = mpsc::channel(1024); + // init the API to send the events from the agent to the CLI let api = AgentApi { active_connection_event_rx: conn_rx.into(), active_connection_event_tx: conn_tx.clone(), @@ -162,35 +97,45 @@ impl Default for AgentApi { tracked_veth_tx: veth_tx.clone(), }; + // init map manager + //let map_manager = map_manager(maps)? + + // init the buffers + let mut net_events_buffers = BufferSize::TcpEvents.set_buffer(); + let mut net_metrics_buffers = BufferSize::NetworkMetricsEvents.set_buffer(); + let mut ts_metrics_buffers = BufferSize::TimeMetricsEvents.set_buffer(); + let mut veth_metrics_buffers = BufferSize::VethEvents.set_buffer(); + + // init the Vec of Buffers + + let mut net_events_vec_buffer = Vec::>::new(); + let mut net_metrics_vec_buffer = Vec::>::new(); + let mut ts_events_vec_buffer = Vec::>::new(); + let mut veth_events_vec_buffer = Vec::>::new(); + + // fill the Vec of Buffers + + net_events_vec_buffer = fill_buffers(net_events_vec_buffer, active_connection_events_array); + net_metrics_vec_buffer = fill_buffers(net_metrics_vec_buffer, network_metrics_events_array); + + ts_events_vec_buffer = fill_buffers(ts_events_vec_buffer, time_stamp_events_array); + + veth_events_vec_buffer = fill_buffers(veth_events_vec_buffer, tracked_veth_events_array); + // For network metrics //spawn an event readers task::spawn(async move { - let mut net_events_buffer = Vec::new(); - //scan the cpus to read the data - - for cpu_id in online_cpus() - .map_err(|e| anyhow::anyhow!("Error {:?}", e)) - .unwrap() - { - let buf = active_connection_events_array - .open(cpu_id, None) - .expect("Error during the creation of net_events_buf structure"); - - let buffers = vec![BytesMut::with_capacity(4096); 8]; - net_events_buffer.push((buf, buffers)); - } - info!("Starting event listener"); //send the data through a mpsc channel loop { - for (buf, buffers) in net_events_buffer.iter_mut() { - match buf.read_events(buffers) { + for buf in net_events_vec_buffer.iter_mut() { + match buf.read_events(&mut net_events_buffers) { Ok(events) => { //read the events, this function is similar to the one used in identity/helpers.rs/display_events if events.read > 0 { for i in 0..events.read { - let data = &buffers[i]; + let data = &net_events_buffers[i]; if data.len() >= std::mem::size_of::() { let pl: PacketLog = unsafe { std::ptr::read(data.as_ptr() as *const _) }; @@ -251,32 +196,17 @@ impl Default for AgentApi { }); task::spawn(async move { - let mut net_metrics_buffer = Vec::new(); - - //scan the cpus to read the data - for cpu_id in online_cpus() - .map_err(|e| anyhow::anyhow!("Error {:?}", e)) - .unwrap() - { - let buf = network_metrics_events_array - .open(cpu_id, None) - .expect("Error during the creation of net_metrics_buf structure"); - - let buffers = vec![BytesMut::with_capacity(4096); 8]; - net_metrics_buffer.push((buf, buffers)); - } - info!("Starting network metrics listener"); //send the data through a mpsc channel loop { - for (buf, buffers) in net_metrics_buffer.iter_mut() { - match buf.read_events(buffers) { + for buf in net_metrics_vec_buffer.iter_mut() { + match buf.read_events(&mut net_metrics_buffers) { Ok(events) => { //read the events, this function is similar to the one used in identity/helpers.rs/display_events if events.read > 0 { for i in 0..events.read { - let data = &buffers[i]; + let data = &net_metrics_buffers[i]; if data.len() >= std::mem::size_of::() { let nm: NetworkMetrics = unsafe { std::ptr::read(data.as_ptr() as *const _) }; @@ -333,34 +263,22 @@ impl Default for AgentApi { }); task::spawn(async move { - let mut ts_events_buffer = Vec::new(); - //scan the cpus to read the data - for cpu_id in online_cpus() - .map_err(|e| anyhow::anyhow!("Error {:?}", e)) - .unwrap() - { - let buf = time_stamp_events_array - .open(cpu_id, None) - .expect("Error during the creation of time stamp events buf structure"); - - let buffers = vec![BytesMut::with_capacity(4096); 8]; - ts_events_buffer.push((buf, buffers)); - } - info!("Starting time stamp events listener"); //send the data through a mpsc channel loop { - for (buf, buffers) in ts_events_buffer.iter_mut() { - match buf.read_events(buffers) { + for buf in ts_events_vec_buffer.iter_mut() { + match buf.read_events(&mut ts_metrics_buffers) { Ok(events) => { //read the events, this function is similar to the one used in identity/helpers.rs/display_events if events.read > 0 { for i in 0..events.read { - let data = &buffers[i]; + let data = &ts_metrics_buffers[i]; if data.len() >= std::mem::size_of::() { let tsm: TimeStampMetrics = unsafe { std::ptr::read(data.as_ptr() as *const _) }; + let saddr_v6 = tsm.saddr_v6; + let daddr_v6 = tsm.daddr_v6; let latency_metric = LatencyMetric { delta_us: tsm.delta_us, timestamp_us: tsm.ts_us, @@ -371,8 +289,8 @@ impl Default for AgentApi { address_family: tsm.af as u32, src_address_v4: format_ipv4(tsm.saddr_v4), dst_address_v4: format_ipv4(tsm.daddr_v4), - src_address_v6: format_ipv6(&tsm.saddr_v6), - dst_address_v6: format_ipv6(&tsm.daddr_v6), + src_address_v6: format_ipv6(&saddr_v6), + dst_address_v6: format_ipv6(&daddr_v6), }; info!( "Latency Metric - tgid: {}, process_name: {}, delta_us: {}, timestamp_us: {}, local_port: {}, remote_port: {}, address_family: {}, src_address_v4: {}, dst_address_v4: {}, src_address_v6: {}, dst_address_v6: {}", @@ -409,34 +327,19 @@ impl Default for AgentApi { } }); - // TODO: this part needs a better implementation task::spawn(async move { - let mut veth_events_buffer = Vec::new(); - //scan the cpus to read the data - for cpu_id in online_cpus() - .map_err(|e| anyhow::anyhow!("Error {:?}", e)) - .unwrap() - { - let buf = tracked_veth_events_array - .open(cpu_id, None) - .expect("Error during the creation of time stamp events buf structure"); - - let buffers = vec![BytesMut::with_capacity(4096); 8]; - veth_events_buffer.push((buf, buffers)); - } - info!("Starting time stamp events listener"); //send the data through a mpsc channel loop { - for (buf, buffers) in veth_events_buffer.iter_mut() { - match buf.read_events(buffers) { + for buf in veth_events_vec_buffer.iter_mut() { + match buf.read_events(&mut veth_metrics_buffers) { Ok(events) => { //read the events, this function is similar to the one used in identity/helpers.rs/display_events if events.read > 0 { for i in 0..events.read { info!("Found veth events {}", events.read); - let data = &buffers[i]; + let data = &veth_metrics_buffers[i]; if data.len() >= std::mem::size_of::() { let veth: VethLog = unsafe { std::ptr::read(data.as_ptr() as *const _) }; @@ -508,7 +411,7 @@ impl Agent for AgentApi { request: Request, ) -> Result, Status> { //read request - let req = request.into_inner(); + let _req = request.into_inner(); //create the hashmap to process events from the mpsc channel queue let mut aggregated_events: Vec = Vec::new(); @@ -555,22 +458,25 @@ impl Agent for AgentApi { } else { // add ip to the blocklist // log blocklist event - let datetime = Local::now().to_string(); + let _datetime = Local::now().to_string(); let ip = req.ip.unwrap(); //convert ip from string to [u8;4] type and insert into the bpf map let u8_4_ip = Ipv4Addr::from_str(&ip).unwrap().octets(); //TODO: convert datetime in a kernel compatible format - blocklist_map.insert(u8_4_ip, u8_4_ip, 0); + blocklist_map + .insert(u8_4_ip, u8_4_ip, 0) + .map_err(|e| anyhow!("Cannot insert address in the blocklist. Reason: {}", e)) + .unwrap(); info!("CURRENT BLOCKLIST: {:?}", blocklist_map); } - let path = std::env::var(PIN_BLOCKLIST_MAP_PATH) + let _path = std::env::var(PIN_BLOCKLIST_MAP_PATH) .context("Blocklist map path not found!") .unwrap(); //convert the maps with a buffer to match the protobuffer types let mut converted_blocklist_map: HashMap = HashMap::new(); for item in blocklist_map.iter() { - let (k, v) = item.unwrap(); + let (k, _v) = item.unwrap(); // convert keys and values from [u8;4] to String let key = Ipv4Addr::from(k).to_string(); let value = Ipv4Addr::from(k).to_string(); @@ -586,7 +492,7 @@ impl Agent for AgentApi { async fn check_blocklist( &self, - request: Request<()>, + _request: Request<()>, ) -> Result, Status> { info!("Returning blocklist hashmap"); //open blocklist map @@ -601,7 +507,7 @@ impl Agent for AgentApi { let mut converted_blocklist_map: HashMap = HashMap::new(); for item in blocklist_map.iter() { - let (k, v) = item.unwrap(); + let (k, _v) = item.unwrap(); // convert keys and values from [u8;4] to String let key = Ipv4Addr::from(k).to_string(); let value = Ipv4Addr::from(k).to_string(); @@ -628,7 +534,7 @@ impl Agent for AgentApi { //remove the address let ip_to_remove = req.ip; let u8_4_ip_to_remove = Ipv4Addr::from_str(&ip_to_remove).unwrap().octets(); - blocklist_map.remove(&u8_4_ip_to_remove); + let _ = blocklist_map.remove(&u8_4_ip_to_remove); //convert the maps with a buffer to match the protobuffer types let mut converted_blocklist_map: HashMap = HashMap::new(); @@ -651,7 +557,7 @@ impl Agent for AgentApi { request: Request<()>, ) -> Result, Status> { // Extract the request parameters - let req = request.into_inner(); + let _req = request.into_inner(); info!("Getting latency metrics"); // Here you would typically query your data source for the latency metrics @@ -714,7 +620,7 @@ impl Agent for AgentApi { request: Request<()>, ) -> Result, Status> { // Extract the request parameters - let req = request.into_inner(); + let _req = request.into_inner(); info!("Getting dropped packets metrics"); let mut aggregated_dropped_packet_metrics: Vec = Vec::new(); @@ -749,7 +655,7 @@ impl Agent for AgentApi { &self, request: Request<()>, ) -> Result, Status> { - let req = request.into_inner(); + let _req = request.into_inner(); info!("Getting tracked veth metrics"); let mut tracked_veth = Vec::::new(); let mut tot_veth = 0 as i32; @@ -774,4 +680,33 @@ impl Agent for AgentApi { Ok(Response::new(response)) } + + async fn get_tracked_veth_from_hash_map( + &self, + _request: Request<()>, + ) -> Result, Status> { + info!("Returning veth hashmap"); + //open blocklist map + let mapdata = MapData::from_pin("/sys/fs/bpf/maps/tracked_veth") + .expect("cannot open tracked_veth Mapdata"); + let tracked_veth_mapdata = Map::HashMap(mapdata); //load mapdata + + let tracked_veth_map: ayaHashMap = + ayaHashMap::try_from(tracked_veth_mapdata).unwrap(); + + //convert the maps with a buffer to match the protobuffer types + + let mut converted_tracked_veth_map: HashMap = HashMap::new(); + for item in tracked_veth_map.iter() { + let (k, v) = item.unwrap(); + // convert keys and values from [u8;4] to String + let key = String::from_utf8(k.to_vec()).unwrap(); + let value = String::from_utf8(v.to_vec()).unwrap(); + converted_tracked_veth_map.insert(key, value); + } + Ok(Response::new(VethHashMapResponse { + status: "success".to_string(), + veths: converted_tracked_veth_map, + })) + } } diff --git a/core/api/src/batcher.rs b/core/api/src/batcher.rs index 6e984d53..12d92784 100644 --- a/core/api/src/batcher.rs +++ b/core/api/src/batcher.rs @@ -1,22 +1,87 @@ // This module is experimental and may be subject to major changes. -use crate::agent::{ConnectionEvent, DroppedPacketMetric, LatencyMetric}; +// Do not use any of these functions +// FIXME: this module will be deprecated in the next version probably -pub enum MetricsBatcher { - LatencyMetrics, - DroppedPacketsMetrics, -} -pub enum EventBatcher {} -impl MetricsBatcher { - pub async fn send_batched_metrics() { - todo!(); +use tokio::sync::mpsc; +use tonic::{Status, async_trait}; + +use crate::{ + agent::{ConnectionEvent, DroppedPacketMetric, LatencyMetric, VethEvent}, + api::AgentApi, +}; + +// Event sender trait. Takes an event from a map and send that to the mpsc channel +// using the send_map function +#[async_trait] +pub trait EventSender: Send + Sync + 'static { + async fn send_active_connection_event(&self, event: Vec); + async fn send_active_connection_event_map( + &self, + map: Vec, + tx: mpsc::Sender, Status>>, + ) { + let status = Status::new(tonic::Code::Ok, "success"); + let event = Ok(map); + + let _ = tx.send(event).await; + } + + async fn send_latency_metrics_event(&self, event: Vec); + async fn send_latency_metrics_event_map( + &self, + map: Vec, + tx: mpsc::Sender, Status>>, + ) { + let status = Status::new(tonic::Code::Ok, "success"); + let event = Ok(map); + let _ = tx.send(event).await; + } + + async fn send_dropped_packet_metrics_event(&self, event: Vec); + async fn send_dropped_packet_metrics_event_map( + &self, + map: Vec, + tx: mpsc::Sender, Status>>, + ) { + let status = Status::new(tonic::Code::Ok, "success"); + let event = Ok(map); + let _ = tx.send(event).await; + } + + async fn send_tracked_veth_event(&self, event: Vec); + async fn send_tracked_veth_event_map( + &self, + map: Vec, + tx: mpsc::Sender, Status>>, + ) { + let status = Status::new(tonic::Code::Ok, "success"); + let event = Ok(map); + let _ = tx.send(event).await; } } -impl EventBatcher { - pub async fn send_batched_logs() { - todo!(); +// send event function. takes an HashMap and send that using mpsc event_tx +#[async_trait] +impl EventSender for AgentApi { + async fn send_active_connection_event(&self, event: Vec) { + self.send_active_connection_event_map(event, self.active_connection_event_tx.clone()) + .await; + } + + async fn send_latency_metrics_event(&self, event: Vec) { + self.send_latency_metrics_event_map(event, self.latency_metrics_tx.clone()) + .await; + } + + async fn send_dropped_packet_metrics_event(&self, event: Vec) { + self.send_dropped_packet_metrics_event_map(event, self.dropped_packet_metrics_tx.clone()) + .await; + } + async fn send_tracked_veth_event(&self, event: Vec) { + self.send_tracked_veth_event_map(event, self.tracked_veth_tx.clone()) + .await; } } diff --git a/core/api/src/lib.rs b/core/api/src/lib.rs index cf2c0c98..e0939202 100644 --- a/core/api/src/lib.rs +++ b/core/api/src/lib.rs @@ -2,7 +2,6 @@ pub mod api; pub mod agent; pub mod client; pub mod requests; -pub mod structs; pub mod constants; pub mod helpers; pub mod batcher; diff --git a/core/api/src/main.rs b/core/api/src/main.rs index 30fe5506..87478f54 100644 --- a/core/api/src/main.rs +++ b/core/api/src/main.rs @@ -6,7 +6,6 @@ mod agent; mod api; mod constants; mod helpers; -mod structs; mod agent_proto { use tonic::include_file_descriptor_set; diff --git a/core/api/src/requests.rs b/core/api/src/requests.rs index 06a40302..7c9f447d 100644 --- a/core/api/src/requests.rs +++ b/core/api/src/requests.rs @@ -14,6 +14,7 @@ use crate::agent::LatencyMetricsResponse; use crate::agent::RequestActiveConnections; use crate::agent::RmIpFromBlocklistRequest; use crate::agent::RmIpFromBlocklistResponse; +use crate::agent::VethHashMapResponse; use crate::agent::VethResponse; use crate::agent::agent_client::AgentClient; @@ -100,3 +101,12 @@ pub async fn send_tracked_veth_request( let response = client.get_tracked_veth(request).await?; Ok(response) } + +#[cfg(feature = "client")] +pub async fn send_veth_tracked_hashmap_req( + mut client: AgentClient, +) -> Result, Error> { + let request = Request::new(()); + let response = client.get_tracked_veth_from_hash_map(request).await?; + Ok(response) +} diff --git a/core/api/src/structs.rs b/core/api/src/structs.rs deleted file mode 100644 index 97a40175..00000000 --- a/core/api/src/structs.rs +++ /dev/null @@ -1,48 +0,0 @@ -use bytemuck_derive::Zeroable; -use crate::constants::TASK_COMM_LEN; - - -#[repr(C)] -#[derive(Clone, Copy, Zeroable)] -pub struct PacketLog { - pub proto: u8, - pub src_ip: u32, - pub src_port: u16, - pub dst_ip: u32, - pub dst_port: u16, - pub pid: u32, -} -unsafe impl aya::Pod for PacketLog {} - -#[repr(C, packed)] -#[derive(Clone, Copy, Zeroable)] -pub struct NetworkMetrics { - pub tgid: u32, - pub comm: [u8; TASK_COMM_LEN], - pub ts_us: u64, - pub sk_err: i32, - pub sk_err_soft: i32, - pub sk_backlog_len: i32, - pub sk_write_memory_queued: i32, - pub sk_receive_buffer_size: i32, - pub sk_ack_backlog: u32, - pub sk_drops: i32, -} -unsafe impl aya::Pod for NetworkMetrics {} - -#[repr(C)] -#[derive(Clone, Copy, Zeroable)] -pub struct TimeStampMetrics { - pub delta_us: u64, - pub ts_us: u64, - pub tgid: u32, - pub comm: [u8; TASK_COMM_LEN], - pub lport: u16, - pub dport_be: u16, - pub af: u16, - pub saddr_v4: u32, - pub daddr_v4: u32, - pub saddr_v6: [u32; 4], - pub daddr_v6: [u32; 4], -} -unsafe impl aya::Pod for TimeStampMetrics {} diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index b8e840d0..ee50e2b3 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -24,10 +24,12 @@ opentelemetry-otlp = { version = "0.31.0", features = ["logs", "grpc-tonic"] } bytemuck = "1.25.0" bytes = "1.11.0" bytemuck_derive = "1.10.2" +tokio = "1.49.0" [features] map-handlers = [] program-handlers = [] network-structs = [] +monitoring-structs = [] buffer-reader = [] experimental = [] diff --git a/core/common/src/buffer_type.rs b/core/common/src/buffer_type.rs index 9fc78285..f9626983 100644 --- a/core/common/src/buffer_type.rs +++ b/core/common/src/buffer_type.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "buffer-reader")] +use aya::maps::{MapData, PerfEventArray}; +use aya::{maps::perf::PerfEventArrayBuffer, util::online_cpus}; use bytemuck_derive::Zeroable; use bytes::BytesMut; use std::net::Ipv4Addr; @@ -54,19 +57,21 @@ unsafe impl aya::Pod for PacketLog {} #[cfg(feature = "network-structs")] #[repr(C, packed)] -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Zeroable)] pub struct VethLog { pub name: [u8; 16], // 16 bytes: veth interface name pub state: u64, // 8 bytes: state variable (unsigned long in kernel) - pub dev_addr: [u8; 6], // 32 bytes: device address + pub dev_addr: [u8; 6], // 6 bytes: device address pub event_type: u8, // 1 byte: 1 for veth creation, 2 for veth destruction pub netns: u32, // 4 bytes: network namespace inode number pub pid: u32, // 4 bytes: PID that triggered the event } +#[cfg(feature = "network-structs")] +unsafe impl aya::Pod for VethLog {} #[cfg(feature = "network-structs")] #[repr(C)] -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Zeroable)] pub struct TcpPacketRegistry { pub proto: u8, pub src_ip: u32, @@ -77,6 +82,47 @@ pub struct TcpPacketRegistry { pub command: [u8; 16], pub cgroup_id: u64, } +#[cfg(feature = "network-structs")] +unsafe impl aya::Pod for TcpPacketRegistry {} + +#[cfg(feature = "monitoring-structs")] +pub const TASK_COMM_LEN: usize = 16; // linux/sched.h +#[cfg(feature = "monitoring-structs")] +#[repr(C, packed)] +#[derive(Clone, Copy, Zeroable)] +pub struct NetworkMetrics { + pub tgid: u32, + pub comm: [u8; TASK_COMM_LEN], + pub ts_us: u64, + pub sk_err: i32, // Offset 284 + pub sk_err_soft: i32, // Offset 600 + pub sk_backlog_len: i32, // Offset 196 + pub sk_write_memory_queued: i32, // Offset 376 + pub sk_receive_buffer_size: i32, // Offset 244 + pub sk_ack_backlog: u32, // Offset 604 + pub sk_drops: i32, // Offset 136 +} +#[cfg(feature = "monitoring-structs")] +unsafe impl aya::Pod for NetworkMetrics {} + +#[cfg(feature = "monitoring-structs")] +#[repr(C, packed)] +#[derive(Clone, Copy, Zeroable)] +pub struct TimeStampMetrics { + pub delta_us: u64, + pub ts_us: u64, + pub tgid: u32, + pub comm: [u8; TASK_COMM_LEN], + pub lport: u16, + pub dport_be: u16, + pub af: u16, + pub saddr_v4: u32, + pub daddr_v4: u32, + pub saddr_v6: [u32; 4], + pub daddr_v6: [u32; 4], +} +#[cfg(feature = "monitoring-structs")] +unsafe impl aya::Pod for TimeStampMetrics {} // docs: // This function perform a byte swap from little-endian to big-endian @@ -95,15 +141,23 @@ pub fn reverse_be_addr(addr: u32) -> Ipv4Addr { // enum BuffersType #[cfg(feature = "buffer-reader")] pub enum BufferType { + #[cfg(feature = "network-structs")] PacketLog, + #[cfg(feature = "network-structs")] TcpPacketRegistry, + #[cfg(feature = "network-structs")] VethLog, + #[cfg(feature = "monitoring-structs")] + NetworkMetrics, + #[cfg(feature = "monitoring-structs")] + TimeStampMetrics, } // IDEA: this is an experimental implementation to centralize buffer reading logic // TODO: add variant for cortexflow API exporter #[cfg(feature = "buffer-reader")] impl BufferType { + #[cfg(feature = "network-structs")] pub async fn read_packet_log(buffers: &mut [BytesMut], tot_events: i32, offset: i32) { for i in offset..tot_events { let vec_bytes = &buffers[i as usize]; @@ -147,6 +201,7 @@ impl BufferType { } } } + #[cfg(feature = "network-structs")] pub async fn read_tcp_registry_log(buffers: &mut [BytesMut], tot_events: i32, offset: i32) { for i in offset..tot_events { let vec_bytes = &buffers[i as usize]; @@ -204,11 +259,8 @@ impl BufferType { } } } - pub async fn read_and_handle_veth_log( - buffers: &mut [BytesMut], - tot_events: i32, - offset: i32, - ) { + #[cfg(feature = "network-structs")] + pub async fn read_and_handle_veth_log(buffers: &mut [BytesMut], tot_events: i32, offset: i32) { for i in offset..tot_events { let vec_bytes = &buffers[i as usize]; if vec_bytes.len() < std::mem::size_of::() { @@ -289,4 +341,244 @@ impl BufferType { } } } + #[cfg(feature = "monitoring-structs")] + pub async fn read_network_metrics(buffers: &mut [BytesMut], tot_events: i32, offset: i32) { + for i in offset..tot_events { + let vec_bytes = &buffers[i as usize]; + if vec_bytes.len() < std::mem::size_of::() { + error!( + "Corrupted Network Metrics data. Raw data: {}. Readed {} bytes expected {} bytes", + vec_bytes + .iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .join(" "), + vec_bytes.len(), + std::mem::size_of::() + ); + continue; + } + if vec_bytes.len() >= std::mem::size_of::() { + let net_metrics: NetworkMetrics = + unsafe { std::ptr::read_unaligned(vec_bytes.as_ptr() as *const _) }; + let tgid = net_metrics.tgid; + let comm = String::from_utf8_lossy(&net_metrics.comm); + let ts_us = net_metrics.ts_us; + let sk_drop_count = net_metrics.sk_drops; + let sk_err = net_metrics.sk_err; + let sk_err_soft = net_metrics.sk_err_soft; + let sk_backlog_len = net_metrics.sk_backlog_len; + let sk_write_memory_queued = net_metrics.sk_write_memory_queued; + let sk_ack_backlog = net_metrics.sk_ack_backlog; + let sk_receive_buffer_size = net_metrics.sk_receive_buffer_size; + + info!( + "tgid: {}, comm: {}, ts_us: {}, sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_write_memory_queued: {}, sk_ack_backlog: {}, sk_receive_buffer_size: {}", + tgid, + comm, + ts_us, + sk_drop_count, + sk_err, + sk_err_soft, + sk_backlog_len, + sk_write_memory_queued, + sk_ack_backlog, + sk_receive_buffer_size + ); + } + } + } + #[cfg(feature = "monitoring-structs")] + pub async fn read_timestamp_metrics(buffers: &mut [BytesMut], tot_events: i32, offset: i32) { + for i in offset..tot_events { + let vec_bytes = &buffers[i as usize]; + if vec_bytes.len() < std::mem::size_of::() { + error!( + "Corrupted Network Metrics data. Raw data: {}. Readed {} bytes expected {} bytes", + vec_bytes + .iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .join(" "), + vec_bytes.len(), + std::mem::size_of::() + ); + continue; + } + if vec_bytes.len() >= std::mem::size_of::() { + let time_stamp_event: TimeStampMetrics = + unsafe { std::ptr::read_unaligned(vec_bytes.as_ptr() as *const _) }; + let delta_us = time_stamp_event.delta_us; + let ts_us = time_stamp_event.ts_us; + let tgid = time_stamp_event.tgid; + let comm = String::from_utf8_lossy(&time_stamp_event.comm); + let lport = time_stamp_event.lport; + let dport_be = time_stamp_event.dport_be; + let af = time_stamp_event.af; + info!( + "TimeStampEvent - delta_us: {}, ts_us: {}, tgid: {}, comm: {}, lport: {}, dport_be: {}, af: {}", + delta_us, ts_us, tgid, comm, lport, dport_be, af + ); + } + } + } +} + +// docs: read buffer function: +// template function that take a mut perf_event_array_buffer of type T and a mutable buffer of Vec +#[cfg(feature = "buffer-reader")] +pub async fn read_perf_buffer>( + mut array_buffers: Vec>, + mut buffers: Vec, + buffer_type: BufferType, +) { + // loop over the buffers + loop { + for buf in array_buffers.iter_mut() { + match buf.read_events(&mut buffers) { + Ok(events) => { + // triggered if some events are lost + if events.lost > 0 { + tracing::debug!("Lost events: {} ", events.lost); + } + // triggered if some events are readed + if events.read > 0 { + tracing::debug!("Readed events: {}", events.read); + let offset = 0; + let tot_events = events.read as i32; + + //read the events in the buffer + match buffer_type { + #[cfg(feature = "network-structs")] + BufferType::PacketLog => { + BufferType::read_packet_log(&mut buffers, tot_events, offset).await + } + #[cfg(feature = "network-structs")] + BufferType::TcpPacketRegistry => { + BufferType::read_tcp_registry_log(&mut buffers, tot_events, offset) + .await + } + #[cfg(feature = "network-structs")] + BufferType::VethLog => { + BufferType::read_and_handle_veth_log( + &mut buffers, + tot_events, + offset, + ) + .await + } + #[cfg(feature = "monitoring-structs")] + BufferType::NetworkMetrics => { + BufferType::read_network_metrics(&mut buffers, tot_events, offset) + .await + } + #[cfg(feature = "monitoring-structs")] + BufferType::TimeStampMetrics => { + BufferType::read_timestamp_metrics(&mut buffers, tot_events, offset) + .await + } + } + } + } + Err(e) => { + error!("Cannot read events from buffer. Reason: {} ", e); + } + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; // small sleep + } +} + +#[cfg(feature = "buffer-reader")] +pub enum BufferSize { + #[cfg(feature = "network-structs")] + ClassifierNetEvents, + #[cfg(feature = "network-structs")] + VethEvents, + #[cfg(feature = "network-structs")] + TcpEvents, + #[cfg(feature = "monitoring-structs")] + NetworkMetricsEvents, + #[cfg(feature = "monitoring-structs")] + TimeMetricsEvents, +} +#[cfg(feature = "buffer-reader")] +impl BufferSize { + pub fn get_size(&self) -> usize { + match self { + #[cfg(feature = "network-structs")] + BufferSize::ClassifierNetEvents => std::mem::size_of::(), + #[cfg(feature = "network-structs")] + BufferSize::VethEvents => std::mem::size_of::(), + #[cfg(feature = "network-structs")] + BufferSize::TcpEvents => std::mem::size_of::(), + #[cfg(feature = "monitoring-structs")] + BufferSize::NetworkMetricsEvents => std::mem::size_of::(), + #[cfg(feature = "monitoring-structs")] + BufferSize::TimeMetricsEvents => std::mem::size_of::(), + } + } + pub fn set_buffer(&self) -> Vec { + // iter returns and iterator of cpu ids, + // we need only the total number of cpus to set the buffer size so we use .len() to get + // the count of total cpus and then we allocate a buffer for each cpu with a capacity + // based on the structure size * a factor to have a bigger buffer to avoid overflows and lost events + + // Old buffers where 1024 bytes long. Now we set different buffer size based on + // the frequence of the events. + // ClassifierNetEvents are triggered by the TC classifier program, events has high frequency + // VethEvents are triggered by the creation and deletion of veth interfaces, events has small frequency compared to classifier events + // TcpEvents are triggered by TCP events and connections. Events has similar frequency to ClassifierNetEvents. + + let tot_cpu = online_cpus().iter().len(); // total number of cpus + + // TODO: finish to do all the calculations for the buffer sizes + match self { + #[cfg(feature = "network-structs")] + BufferSize::ClassifierNetEvents => { + let capacity = self.get_size() * 200; + return vec![BytesMut::with_capacity(capacity); tot_cpu]; + } + #[cfg(feature = "network-structs")] + BufferSize::VethEvents => { + let capacity = self.get_size() * 100; // Allocates 4Kb of memory for the buffers + return vec![BytesMut::with_capacity(capacity); tot_cpu]; + } + #[cfg(feature = "network-structs")] + BufferSize::TcpEvents => { + let capacity = self.get_size() * 200; + return vec![BytesMut::with_capacity(capacity); tot_cpu]; + } + #[cfg(feature = "monitoring-structs")] + BufferSize::NetworkMetricsEvents => { + let capacity = self.get_size() * 1024; + return vec![BytesMut::with_capacity(capacity); tot_cpu]; + } + #[cfg(feature = "monitoring-structs")] + BufferSize::TimeMetricsEvents => { + let capacity = self.get_size() * 1024; + return vec![BytesMut::with_capacity(capacity); tot_cpu]; + } + } + } +} + +#[cfg(feature = "buffer-reader")] +pub fn fill_buffers( + //buf: PerfEventArrayBuffer, + mut vec_of_buffers: Vec>, + //buffers: Vec, + mut events_array: PerfEventArray, +) -> Vec> { + for cpu_id in online_cpus() + .map_err(|e| anyhow::anyhow!("Error {:?}", e)) + .unwrap() + { + let buf = events_array + .open(cpu_id, None) + .expect("Error during the creation of net_events_buf structure"); + + vec_of_buffers.push(buf); + } + vec_of_buffers } diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index d88c1db5..d7e48b0d 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -1,5 +1,8 @@ -#[cfg(feature = "buffer-reader")] -#[cfg(feature = "network-structs")] +#[cfg(any( + feature = "buffer-reader", + feature = "network-structs", + feature = "monitoring-structs" +))] pub mod buffer_type; pub mod constants; pub mod formatters; diff --git a/core/common/src/map_handlers.rs b/core/common/src/map_handlers.rs index 19d4e204..b246b701 100644 --- a/core/common/src/map_handlers.rs +++ b/core/common/src/map_handlers.rs @@ -121,8 +121,11 @@ pub async fn populate_blocklist() -> Result<(), Error> { .filter(|s| !s.is_empty()) .collect(); //String parsing from "x y" to ["x","y"] - info!("Inserting addresses: {:?}", addresses); - for item in addresses { + if addresses.is_empty() { + warn!("No addresses found in the blocklist. Skipping load"); + } + for item in &addresses { + info!("Inserting addresses: {:?}", &item); let addr = Ipv4Addr::from_str(&item)?.octets(); let _ = blocklist_map.insert(addr, addr, 0); } @@ -138,6 +141,7 @@ pub async fn populate_blocklist() -> Result<(), Error> { } #[cfg(feature = "map-handlers")] +// TODO: modify this to accept also HashMap types pub fn load_perf_event_array_from_mapdata( path: &'static str, ) -> Result, Error> { @@ -154,3 +158,57 @@ pub fn load_perf_event_array_from_mapdata( })?; Ok(perf_event_array) } + +#[cfg(feature = "map-handlers")] +pub fn map_manager( + maps: BpfMapsData, +) -> Result< + std::collections::HashMap< + String, + ( + aya::maps::PerfEventArray, + Vec>, + ), + >, + Error, +> { + use aya::maps::PerfEventArray; + use aya::maps::{MapData, perf::PerfEventArrayBuffer}; + use tracing::debug; + + let mut map_manager = std::collections::HashMap::< + String, // this will store the bpf map name + (PerfEventArray, Vec>), // this will manage the BPF_MAP_TYPE_PERF_EVENT_ARRAY and its buffer + >::new(); + + // map_manager creates an hashmap that contains: + // MAP NAME as String (KEY) + // + // VALUES (tuple) + // a PERF_EVENT_ARRAY + // a vector of PERF_EVENT_ARRAY_BUFFER + // + // the map manager helps the event listener to specifically call a map by its pinned name + // e.g. veth_identity_map and returns the associated PERF_EVENT_ARRAY and PERF_EVENT_ARRAY_BUFFERS (1 per CPU) + // also the map manager helps to write a more complete debug context by linking map names with arrays and buffers. + // actually i cannot return the extact information using only the Aya library + + // create the PerfEventArrays and the buffers from the BpfMapsData Objects + for (map, name) in maps + .bpf_obj_map + .into_iter() + .zip(maps.bpf_obj_names.into_iter()) + // zip two iterators at the same time for map object and map names + { + debug!("Debugging map type:{:?} for map name {:?}", map, &name); + info!("Creating PerfEventArray for map name {:?}", &name); + + // save the map in a registry if is a PerfEventArray to access them by name + if let std::result::Result::Ok(perf_event_array) = PerfEventArray::try_from(map) { + map_manager.insert(name.clone(), (perf_event_array, Vec::new())); + } else { + warn!("Map {:?} is not a PerfEventArray, skipping load", &name); + } + } + Ok(map_manager) +} diff --git a/core/common/src/program_handlers.rs b/core/common/src/program_handlers.rs index 42cd3baf..347be51f 100644 --- a/core/common/src/program_handlers.rs +++ b/core/common/src/program_handlers.rs @@ -13,32 +13,38 @@ pub fn load_program( .lock() .map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?; - // Load and attach the eBPF programs + // Load and attach the eBPF program let program: &mut KProbe = bpf_new .program_mut(program_name) .ok_or_else(|| anyhow::anyhow!("Program {} not found", program_name))? .try_into() .map_err(|e| anyhow::anyhow!("Failed to convert program: {:?}", e))?; + // STEP 1: load program + program .load() .map_err(|e| anyhow::anyhow!("Cannot load program: {}. Error: {}", &program_name, e))?; + // STEP 2: Attach the loaded program to kernel symbol match program.attach(kernel_symbol, 0) { - Ok(_) => info!("{} program attached successfully", kernel_symbol), + Ok(_) => info!( + "{} program attached successfully to kernel symbol {}", + &program_name, &kernel_symbol + ), Err(e) => { - error!("Error attaching {} program {:?}", kernel_symbol, e); + error!( + "Error attaching {} program to kernel symbol {}. Reason: {:?}", + &program_name, &kernel_symbol, e + ); return Err(anyhow::anyhow!( - "Failed to attach {}: {:?}", - kernel_symbol, + "Failed to attach program {} to kernel symbol {}. Reason {:?}", + &program_name, + &kernel_symbol, e )); } }; - info!( - "eBPF program {} loaded and attached successfully", - program_name - ); Ok(()) } diff --git a/core/src/components/conntracker/src/data_structures.rs b/core/src/components/conntracker/src/data_structures.rs index f4c50479..c55cd3f4 100644 --- a/core/src/components/conntracker/src/data_structures.rs +++ b/core/src/components/conntracker/src/data_structures.rs @@ -47,7 +47,7 @@ pub struct ConnArray { // pid: kernel process ID // -#[repr(C,packed)] +#[repr(C, packed)] #[derive(Clone, Copy)] pub struct VethLog { pub name: [u8; 16], // 16 bytes: veth interface name @@ -94,9 +94,13 @@ pub static mut CONNTRACKER: LruPerCpuHashMap = pub static mut VETH_EVENTS: PerfEventArray = PerfEventArray::new(0); #[map(name = "Blocklist", pinning = "by_name")] -pub static mut BLOCKLIST: HashMap<[u8; 4], [u8; 4]> = - HashMap::<[u8; 4], [u8; 4]>::with_max_entries(1024, 0); +pub static mut BLOCKLIST: HashMap<[u8; 4], [u8; 4]> = HashMap::with_max_entries(1024, 0); //here i need to pass an address like this: [135,171,168,192] #[map(name = "TcpPacketRegistry", pinning = "by_name")] pub static mut PACKET_REGISTRY: PerfEventArray = PerfEventArray::new(0); + +#[map(name = "tracked_veth", pinning = "by_name")] +// This map takes a registry of tracked veth interfaces +// The maximum number of characters is 16 of type u8 +pub static mut TRACKED_VETH: HashMap<[u8; 16], [u8; 8]> = HashMap::with_max_entries(1024, 0); diff --git a/core/src/components/conntracker/src/main.rs b/core/src/components/conntracker/src/main.rs index e723e4b4..8438838f 100644 --- a/core/src/components/conntracker/src/main.rs +++ b/core/src/components/conntracker/src/main.rs @@ -32,6 +32,10 @@ use crate::tc::try_identity_classifier; use crate::tcp_analyzer::try_tcp_analyzer; use crate::veth_tracer::try_veth_tracer; +// TODO: add function to track +// 1. kprobe:tcp_enter_memory_pressure +// 2. kprobe:tcp_create_openreq_child (https://elixir.bootlin.com/linux/v6.18.6/source/net/ipv4/tcp_ipv4.c#L1776) [function: *tcp_v4_syn_recv_sock] + // docs: // // virtual ethernet (veth) interface tracer: diff --git a/core/src/components/identity/src/helpers.rs b/core/src/components/identity/src/helpers.rs index bd76a29d..50414bfd 100644 --- a/core/src/components/identity/src/helpers.rs +++ b/core/src/components/identity/src/helpers.rs @@ -1,14 +1,13 @@ -use aya::maps::perf::PerfEventArrayBuffer; -use cortexbrain_common::buffer_type::BufferType; use nix::net::if_::if_nameindex; use std::result::Result::Ok; -use tracing::{error, info}; +use tracing::info; // docs: // This function checks if the given interface name is in the list of ignored interfaces // Takes a interface name (iface) as &str and returns true if the interface should be ignored // Typically we want to ignore eth0,docker0,tunl0,lo interfaces because they are not relevant for the internal monitoring // +#[inline(always)] pub fn ignore_iface(iface: &str) -> bool { let ignored_interfaces = ["eth0", "docker0", "tunl0", "lo"]; ignored_interfaces.contains(&iface) @@ -18,6 +17,7 @@ pub fn ignore_iface(iface: &str) -> bool { // This function retrieves the list of veth interfaces on the system, filtering out ignored interfaces with // the ignore_iface function. // +#[inline(always)] pub fn get_veth_channels() -> Vec { //filter interfaces and save the output in the let mut interfaces: Vec = Vec::new(); @@ -36,58 +36,6 @@ pub fn get_veth_channels() -> Vec { interfaces } -// docs: read buffer function: -// template function that take a mut perf_event_array_buffer of type T and a mutable buffer of Vec - -pub async fn read_perf_buffer>( - mut array_buffers: Vec>, - mut buffers: Vec, - buffer_type: BufferType, -) { - // loop over the buffers - loop { - for buf in array_buffers.iter_mut() { - match buf.read_events(&mut buffers) { - Ok(events) => { - // triggered if some events are lost - if events.lost > 0 { - tracing::debug!("Lost events: {} ", events.lost); - } - // triggered if some events are readed - if events.read > 0 { - tracing::debug!("Readed events: {}", events.read); - let offset = 0; - let tot_events = events.read as i32; - - //read the events in the buffer - match buffer_type { - BufferType::PacketLog => { - BufferType::read_packet_log(&mut buffers, tot_events, offset).await - } - BufferType::TcpPacketRegistry => { - BufferType::read_tcp_registry_log(&mut buffers, tot_events, offset) - .await - } - BufferType::VethLog => { - BufferType::read_and_handle_veth_log( - &mut buffers, - tot_events, - offset, - ) - .await - } - } - } - } - Err(e) => { - error!("Cannot read events from buffer. Reason: {} ", e); - } - } - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; // small sleep - } -} - #[cfg(test)] mod tests { use cortexbrain_common::buffer_type::VethLog; diff --git a/core/src/components/identity/src/main.rs b/core/src/components/identity/src/main.rs index 598b964e..8d13e223 100644 --- a/core/src/components/identity/src/main.rs +++ b/core/src/components/identity/src/main.rs @@ -11,36 +11,35 @@ mod helpers; mod service_discovery; -use crate::helpers::{get_veth_channels, read_perf_buffer}; +use crate::helpers::get_veth_channels; use aya::{ Ebpf, - maps::{ - MapData, - perf::{PerfEventArray, PerfEventArrayBuffer}, - }, - programs::{SchedClassifier, TcAttachType, tc::SchedClassifierLinkId}, + maps::{Map, MapData}, + programs::{SchedClassifier, TcAttachType}, util::online_cpus, }; #[cfg(feature = "experimental")] use crate::helpers::scan_cgroup_cronjob; -use bytes::BytesMut; -use cortexbrain_common::map_handlers::{init_bpf_maps, map_pinner, populate_blocklist}; -use cortexbrain_common::program_handlers::load_program; -use cortexbrain_common::{buffer_type::BufferType, map_handlers::BpfMapsData}; +use cortexbrain_common::{ + buffer_type::{BufferSize, BufferType, read_perf_buffer}, + constants, logger, + map_handlers::BpfMapsData, + map_handlers::{init_bpf_maps, map_manager, map_pinner, populate_blocklist}, + program_handlers::load_program, +}; use std::{ convert::TryInto, path::Path, sync::{Arc, Mutex}, }; -use anyhow::{Context, Ok}; -use cortexbrain_common::{constants, logger}; -use tokio::{fs, signal}; -use tracing::{debug, error, info, warn}; +use anyhow::{Context, Ok, anyhow}; -use std::collections::HashMap; +//use std::collections::HashMap; +use tokio::{fs, signal}; +use tracing::{error, info}; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { @@ -51,7 +50,7 @@ async fn main() -> Result<(), anyhow::Error> { info!("fetching data"); // To Store link_ids they can be used to detach tc - let link_ids = Arc::new(Mutex::new(HashMap::::new())); + //let mut link_ids = HashMap::::new(); //init conntracker data path let bpf_path = @@ -69,6 +68,7 @@ async fn main() -> Result<(), anyhow::Error> { "veth_identity_map".to_string(), "TcpPacketRegistry".to_string(), "Blocklist".to_string(), + "tracked_veth".to_string(), ]; match init_bpf_maps(bpf.clone(), map_data) { std::result::Result::Ok(bpf_maps) => { @@ -92,8 +92,8 @@ async fn main() -> Result<(), anyhow::Error> { } { - init_tc_classifier(bpf.clone(), interfaces, link_ids.clone()).await.context( - "An error occured during the execution of attach_bpf_program function" + init_tc_classifier(bpf.clone(), interfaces).await.context( + "An error occured during the execution of attach_bpf_program function", )?; } { @@ -122,10 +122,10 @@ async fn main() -> Result<(), anyhow::Error> { } //attach the tc classifier program to a vector of interfaces +// TODO: consider to create a load schedule classifier in the common functions async fn init_tc_classifier( bpf: Arc>, ifaces: Vec, - link_ids: Arc>>, ) -> Result<(), anyhow::Error> { //this funtion initialize the tc classifier program info!("Loading programs"); @@ -140,10 +140,33 @@ async fn init_tc_classifier( .try_into() .context("Failed to init SchedClassifier program")?; + // load classifier program + program .load() .context("Failed to load identity_classifier program")?; + // attach program only to desired interfaces. We can skip the dock0,tunl0,lo and eth0 interface + // we also save the interfaces to a BPF_HASH_MAP to easily monitor the interfaces using the agent + + // decleare link_ids HashMap which is a shared hashmap between kernel and userspace + // Link_ids hashmap has type of HashMap<[u8; 16], [u8; 8]>. The key is the program name and the value is the state + + // at this point the pinning is already successfull so we can invoque the maps from the pin + + let link_ids_mapdata = MapData::from_pin("/sys/fs/bpf/maps/tracked_veth") + .map_err(|e| anyhow!("Cannot return link_ids_mapdata. Reason: {}", e))?; + + let link_ids_map = Map::HashMap(link_ids_mapdata); + + let mut link_ids: aya::maps::HashMap = + aya::maps::HashMap::try_from(link_ids_map).map_err(|e| { + anyhow!( + "Cannot create link_ids HashMap from link_ids_map. Reason:{}", + e + ) + })?; + for interface in ifaces { match program.attach(&interface, TcAttachType::Ingress) { std::result::Result::Ok(link_id) => { @@ -151,10 +174,34 @@ async fn init_tc_classifier( "Program 'identity_classifier' attached to interface {}", interface ); - let mut map = link_ids - .lock() - .map_err(|e| anyhow::anyhow!("Cannot get value from lock. Reason: {}", e))?; - map.insert(interface.clone(), link_id); + let interface_bytes = interface.as_bytes(); + + let mut if_bytes = [0u8; 16]; + + // to set the len compare the interface_bytes.len() with the if_bytes.len() [16] and take the minimum + // if we have interface_bytes.len() < than 16 we set the len + let len = interface_bytes.len().min(if_bytes.len()); + + // now we can copy the bytes from the slice into the if_bytes variable + if_bytes[..len].copy_from_slice(&interface_bytes[..len]); + + // we compute the same process for the state_bytes + let mut state_bytes = [0u8; 8]; + let state = b"attached"; // prints "attached" as [u8;8] sequence of bytes + let state_len = state.len().min(state_bytes.len()); + state_bytes[..state_len].copy_from_slice(&state[..state_len]); + + match link_ids.insert(if_bytes, state_bytes, 0) { + std::result::Result::Ok(_) => { + info!("Veth interface {} added into map", &interface); + } + Err(e) => { + error!( + "Cannot add Veth interface {} into map. Reason: {}", + &interface, e + ); + } + } } Err(e) => error!( "Error attaching program to interface {}: {:?}", @@ -203,35 +250,17 @@ async fn event_listener(bpf_maps: BpfMapsData) -> Result<(), anyhow::Error> { //TODO: try to change from PerfEventArray to a RingBuffer data structure - let mut map_manager = - HashMap::, Vec>)>::new(); - - // create the PerfEventArrays and the buffers from the BpfMapsData Objects - for (map, name) in bpf_maps - .bpf_obj_map - .into_iter() - .zip(bpf_maps.bpf_obj_names.into_iter()) - // zip two iterators at the same time for map and mapnames - { - debug!("Debugging map type:{:?} for map name {:?}", map, &name); - info!("Creating PerfEventArray for map name {:?}", &name); - - // save the map in a registry if is a PerfEventArray to access them by name - if let std::result::Result::Ok(perf_event_array) = PerfEventArray::try_from(map) { - map_manager.insert(name.clone(), (perf_event_array, Vec::new())); - - // perf_event_arrays.push(perf_event_array); // this is step 1 - // let perf_event_array_buffer = Vec::new(); - // event_buffers.push(perf_event_array_buffer); //this is step 2 - } else { - warn!("Map {:?} is not a PerfEventArray, skipping load", &name); - } - } + let mut maps = map_manager(bpf_maps)?; // fill the input buffers with data from the PerfEventArrays for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? { - for (name, (perf_evt_array, perf_evt_array_buffer)) in map_manager.iter_mut() { - let buf = perf_evt_array.open(cpu_id, None)?; + for (name, (perf_evt_array, perf_evt_array_buffer)) in maps.iter_mut() { + let buf = perf_evt_array.open(cpu_id, None).map_err(|e| { + anyhow!( + "Cannot create perf_event_array buffer from perf_event_array. Reason: {}", + e + ) + })?; info!( "Buffer created for map {:?} on cpu_id {:?}. Buffer size: {}", name, @@ -245,23 +274,20 @@ async fn event_listener(bpf_maps: BpfMapsData) -> Result<(), anyhow::Error> { info!("Listening for events..."); // i need to use remove to move the values from the Map Manager to the the async tasks - let (perf_veth_array, perf_veth_buffers) = map_manager + let (perf_veth_array, perf_veth_buffers) = maps .remove("veth_identity_map") .expect("Cannot create perf_veth buffer"); - let (perf_net_events_array, perf_net_events_buffers) = map_manager + let (perf_net_events_array, perf_net_events_buffers) = maps .remove("events_map") .expect("Cannot create perf_net_events buffer"); - let (tcp_registry_array, tcp_registry_buffers) = map_manager + let (tcp_registry_array, tcp_registry_buffers) = maps .remove("TcpPacketRegistry") .expect("Cannot create tcp_registry buffer"); // init output buffers - let veth_buffers = vec![BytesMut::with_capacity(10 * 1024); online_cpus().iter().len()]; - let events_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()]; - let tcp_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()]; - - // init veth link ids - //let veth_link_ids = link_ids; + let veth_buffers = BufferSize::VethEvents.set_buffer(); + let events_buffers = BufferSize::ClassifierNetEvents.set_buffer(); + let tcp_buffers = BufferSize::TcpEvents.set_buffer(); // spawn async tasks let veth_events_displayer = tokio::spawn(async move { diff --git a/core/src/components/metrics/Cargo.toml b/core/src/components/metrics/Cargo.toml index 0e88d8c0..c8dcb5b7 100644 --- a/core/src/components/metrics/Cargo.toml +++ b/core/src/components/metrics/Cargo.toml @@ -20,8 +20,11 @@ tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } libc = "0.2.172" bytemuck = "1.23.0" -cortexbrain-common = { path = "../../../common", features = [ +cortexbrain-common = { path = "../../../common/", features = [ "map-handlers", "program-handlers", + "buffer-reader", + "monitoring-structs", + "network-structs" ] } nix = { version = "0.30.1", features = ["net"] } diff --git a/core/src/components/metrics/src/helpers.rs b/core/src/components/metrics/src/helpers.rs index a67b6074..843f45d9 100644 --- a/core/src/components/metrics/src/helpers.rs +++ b/core/src/components/metrics/src/helpers.rs @@ -1,185 +1,68 @@ -use aya::{ - maps::{Map, MapData, PerfEventArray, perf::PerfEventArrayBuffer}, - util::online_cpus, -}; - -use bytes::BytesMut; -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, +use anyhow::anyhow; +use aya::util::online_cpus; +use cortexbrain_common::map_handlers::map_manager; +use cortexbrain_common::{ + buffer_type::{BufferSize, BufferType, read_perf_buffer}, + map_handlers::BpfMapsData, }; use tokio::signal; +use tracing::{error, info}; -use tracing::{debug, error, info, warn}; - -use crate::structs::NetworkMetrics; -use crate::structs::TimeStampMetrics; - -pub async fn display_metrics_map( - mut perf_buffers: Vec>, - running: Arc, // Changed to Arc - mut buffers: Vec, -) { - info!("Starting metrics event listener..."); - while running.load(Ordering::SeqCst) { - for buf in perf_buffers.iter_mut() { - match buf.read_events(&mut buffers) { - std::result::Result::Ok(events) => { - if events.read > 0 { - info!("Read {} metric events", events.read); - } - for i in 0..events.read { - let data = &buffers[i]; - if data.len() >= std::mem::size_of::() { - let net_metrics: NetworkMetrics = - unsafe { std::ptr::read_unaligned(data.as_ptr() as *const _) }; - let tgid = net_metrics.tgid; - let comm = String::from_utf8_lossy(&net_metrics.comm); - let ts_us = net_metrics.ts_us; - let sk_drop_count = net_metrics.sk_drops; - let sk_err = net_metrics.sk_err; - let sk_err_soft = net_metrics.sk_err_soft; - let sk_backlog_len = net_metrics.sk_backlog_len; - let sk_write_memory_queued = net_metrics.sk_write_memory_queued; - let sk_ack_backlog = net_metrics.sk_ack_backlog; - let sk_receive_buffer_size = net_metrics.sk_receive_buffer_size; - info!( - "tgid: {}, comm: {}, ts_us: {}, sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_write_memory_queued: {}, sk_ack_backlog: {}, sk_receive_buffer_size: {}", - tgid, - comm, - ts_us, - sk_drop_count, - sk_err, - sk_err_soft, - sk_backlog_len, - sk_write_memory_queued, - sk_ack_backlog, - sk_receive_buffer_size - ); - } else { - info!( - "Received data too small: {} bytes, expected: {}", - data.len(), - std::mem::size_of::() - ); - } - } - } - Err(e) => { - error!("Error reading events: {:?}", e); - } - } - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - info!("Metrics event listener stopped"); -} - -pub async fn display_time_stamp_events_map( - mut perf_buffers: Vec>, - running: Arc, // Changed to Arc - mut buffers: Vec, -) { - info!("Starting timestamp event listener..."); - while running.load(Ordering::SeqCst) { - for buf in perf_buffers.iter_mut() { - match buf.read_events(&mut buffers) { - std::result::Result::Ok(events) => { - if events.read > 0 { - info!("Read {} timestamp events", events.read); - } - for i in 0..events.read { - let data = &buffers[i]; - if data.len() >= std::mem::size_of::() { - let time_stamp_event: TimeStampMetrics = - unsafe { std::ptr::read_unaligned(data.as_ptr() as *const _) }; - let delta_us = time_stamp_event.delta_us; - let ts_us = time_stamp_event.ts_us; - let tgid = time_stamp_event.tgid; - let comm = String::from_utf8_lossy(&time_stamp_event.comm); - let lport = time_stamp_event.lport; - let dport_be = time_stamp_event.dport_be; - let af = time_stamp_event.af; - info!( - "TimeStampEvent - delta_us: {}, ts_us: {}, tgid: {}, comm: {}, lport: {}, dport_be: {}, af: {}", - delta_us, ts_us, tgid, comm, lport, dport_be, af - ); - } else { - info!("Received timestamp data too small: {} bytes", data.len()); - } - } - } - Err(e) => { - error!("Error reading timestamp events: {:?}", e); - } - } - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - info!("Timestamp event listener stopped"); -} - -pub async fn event_listener(bpf_maps: Vec) -> Result<(), anyhow::Error> { +pub async fn event_listener(bpf_maps: BpfMapsData) -> Result<(), anyhow::Error> { info!("Getting CPU count..."); - let mut perf_event_arrays = Vec::new(); // contains a vector of PerfEventArrays - let mut event_buffers = Vec::new(); // contains a vector of buffers - - info!("Creating perf buffers..."); - for map in bpf_maps { - debug!("Debugging map type:{:?}", map); - if let std::result::Result::Ok(perf_event_array) = PerfEventArray::try_from(map) { - perf_event_arrays.push(perf_event_array); // this is step 1 - let perf_event_array_buffer = Vec::new(); - event_buffers.push(perf_event_array_buffer); //this is step 2 - } else { - warn!("Map is not a PerfEventArray, skipping load"); - } - } + let mut maps = map_manager(bpf_maps)?; let cpu_count = online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))?; - //info!("CPU count: {}", cpu_count); - for (perf_evt_array, perf_evt_array_buffer) in - perf_event_arrays.iter_mut().zip(event_buffers.iter_mut()) - { - for cpu_id in &cpu_count { - let single_buffer = perf_evt_array.open(*cpu_id, None)?; - perf_evt_array_buffer.push(single_buffer); + for cpu_id in cpu_count { + for (name, (perf_event_array, perf_event_buffer)) in maps.iter_mut() { + let buf = perf_event_array.open(cpu_id, None).map_err(|e| { + anyhow!( + "Cannot create perf_event_array buffer from perf_event_array. Reason: {}", + e + ) + })?; + info!( + "Buffer created for map {:?} on cpu_id {:?}. Buffer size: {}", + name, + cpu_id, + std::mem::size_of_val(&buf) + ); + perf_event_buffer.push(buf); } } - //info!("Opening perf buffers for {} CPUs...", cpu_count); info!("Perf buffers created successfully"); - let mut event_buffers = event_buffers.into_iter(); - - let time_stamp_events_perf_buffer = event_buffers.next().expect(""); - let net_perf_buffer = event_buffers.next().expect(""); - // Create shared running flags - let net_metrics_running = Arc::new(AtomicBool::new(true)); - let time_stamp_events_running = Arc::new(AtomicBool::new(true)); + let (time_stamp_events_array, time_stamp_events_perf_buffer) = maps + .remove("time_stamp_events") + .expect("Cannot create time_stamp_events_buffer"); + let (net_perf_array, net_perf_buffer) = maps + .remove("net_metrics") + .expect("Cannot create net_perf_buffer"); // Create proper sized buffers - let net_metrics_buffers = vec![BytesMut::with_capacity(1024); cpu_count.len()]; - let time_stamp_events_buffers = vec![BytesMut::with_capacity(1024); cpu_count.len()]; - - // Clone for the signal handler - let net_metrics_running_signal = net_metrics_running.clone(); - let time_stamp_events_running_signal = time_stamp_events_running.clone(); + let net_metrics_buffers = BufferSize::NetworkMetricsEvents.set_buffer(); + let time_stamp_events_buffers = BufferSize::TimeMetricsEvents.set_buffer(); info!("Starting event listener tasks..."); let metrics_map_displayer = tokio::spawn(async move { - display_metrics_map(net_perf_buffer, net_metrics_running, net_metrics_buffers).await; + read_perf_buffer( + net_perf_buffer, + net_metrics_buffers, + BufferType::NetworkMetrics, + ) + .await; }); let time_stamp_events_displayer = tokio::spawn(async move { - display_time_stamp_events_map( + read_perf_buffer( time_stamp_events_perf_buffer, - time_stamp_events_running, time_stamp_events_buffers, + BufferType::TimeStampMetrics, ) - .await + .await; }); info!("Event listeners started, entering main loop..."); @@ -199,9 +82,6 @@ pub async fn event_listener(bpf_maps: Vec) -> Result<(), anyhow::Error> { _ = signal::ctrl_c() => { info!("Ctrl-C received, shutting down..."); - // Stop the event loops - net_metrics_running_signal.store(false, std::sync::atomic::Ordering::SeqCst); - time_stamp_events_running_signal.store(false, std::sync::atomic::Ordering::SeqCst); } } diff --git a/core/src/components/metrics/src/main.rs b/core/src/components/metrics/src/main.rs index e8677fb9..e5558eb4 100644 --- a/core/src/components/metrics/src/main.rs +++ b/core/src/components/metrics/src/main.rs @@ -1,6 +1,5 @@ use anyhow::{Context, Ok}; use aya::Ebpf; -use cortexbrain_common::{constants, logger}; use std::{ env, fs, path::Path, @@ -11,15 +10,17 @@ use tracing::{error, info}; mod helpers; use crate::helpers::event_listener; -use cortexbrain_common::map_handlers::{init_bpf_maps, map_pinner}; -use cortexbrain_common::program_handlers::load_program; - -mod structs; +use cortexbrain_common::{ + constants, + logger::otlp_logger_init, + map_handlers::{init_bpf_maps, map_pinner}, + program_handlers::load_program, +}; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { //init tracing subscriber - logger::init_default_logger(); + let otlp_provider = otlp_logger_init("metrics-service".to_string()); info!("Starting metrics service..."); info!("fetching data"); @@ -78,6 +79,7 @@ async fn main() -> Result<(), anyhow::Error> { } Err(e) => { error!("Error initializing BPF maps: {:?}", e); + let _ = otlp_provider.shutdown(); return Err(e); } } diff --git a/core/src/components/metrics/src/structs.rs b/core/src/components/metrics/src/structs.rs deleted file mode 100644 index dc63ace3..00000000 --- a/core/src/components/metrics/src/structs.rs +++ /dev/null @@ -1,33 +0,0 @@ - -pub const TASK_COMM_LEN: usize = 16; // linux/sched.h - -#[repr(C, packed)] -#[derive(Clone, Copy)] -pub struct NetworkMetrics { - pub tgid: u32, - pub comm: [u8; TASK_COMM_LEN], - pub ts_us: u64, - pub sk_err: i32, // Offset 284 - pub sk_err_soft: i32, // Offset 600 - pub sk_backlog_len: i32, // Offset 196 - pub sk_write_memory_queued: i32, // Offset 376 - pub sk_receive_buffer_size: i32, // Offset 244 - pub sk_ack_backlog: u32, // Offset 604 - pub sk_drops: i32, // Offset 136 -} - -#[repr(C)] -#[derive(Clone, Copy)] -pub struct TimeStampMetrics { - pub delta_us: u64, - pub ts_us: u64, - pub tgid: u32, - pub comm: [u8; TASK_COMM_LEN], - pub lport: u16, - pub dport_be: u16, - pub af: u16, - pub saddr_v4: u32, - pub daddr_v4: u32, - pub saddr_v6: [u32; 4], - pub daddr_v6: [u32; 4], -} \ No newline at end of file diff --git a/core/src/components/metrics_tracer/src/data_structures.rs b/core/src/components/metrics_tracer/src/data_structures.rs index f6d7afed..e9866a83 100644 --- a/core/src/components/metrics_tracer/src/data_structures.rs +++ b/core/src/components/metrics_tracer/src/data_structures.rs @@ -2,7 +2,7 @@ use aya_ebpf::{macros::map, maps::{LruPerCpuHashMap, HashMap, PerfEventArray}}; pub const TASK_COMM_LEN: usize = 16; - +#[repr(C,packed)] pub struct NetworkMetrics { pub tgid: u32, pub comm: [u8; TASK_COMM_LEN], @@ -16,7 +16,7 @@ pub struct NetworkMetrics { pub sk_drops: i32, // Offset 136 } -#[repr(C)] +#[repr(C,packed)] #[derive(Copy, Clone)] pub struct TimeStampStartInfo { pub comm: [u8; TASK_COMM_LEN], @@ -25,7 +25,7 @@ pub struct TimeStampStartInfo { } // Event we send to userspace when latency is computed -#[repr(C)] +#[repr(C,packed)] #[derive(Copy, Clone)] pub struct TimeStampEvent { pub delta_us: u64,