From 73b789231050cb9987b9cee4424789b1e6588b5d Mon Sep 17 00:00:00 2001 From: poinT92 Date: Sun, 24 Aug 2025 19:17:34 +0200 Subject: [PATCH 1/5] Remove unused dependencies from CLI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Removed serde_yaml: not used in codebase - Removed tokio: not directly used in CLI code - Removed tokio-stream: not used in CLI This cleanup reduces build times and dependency bloat as part of issue #57. πŸ€– Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- cli/Cargo.lock | 28 ---------------------------- cli/Cargo.toml | 3 --- 2 files changed, 31 deletions(-) diff --git a/cli/Cargo.lock b/cli/Cargo.lock index 47db442..4da951a 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -379,9 +379,6 @@ dependencies = [ "prost", "prost-types", "serde", - "serde_yaml", - "tokio", - "tokio-stream", "tonic", "tonic-reflection", "tracing", @@ -1181,12 +1178,6 @@ version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" -[[package]] -name = "ryu" -version = "1.0.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" - [[package]] name = "scopeguard" version = "1.2.0" @@ -1213,19 +1204,6 @@ dependencies = [ "syn", ] -[[package]] -name = "serde_yaml" -version = "0.9.34+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" -dependencies = [ - "indexmap", - "itoa", - "ryu", - "serde", - "unsafe-libyaml", -] - [[package]] name = "sharded-slab" version = "0.1.7" @@ -1598,12 +1576,6 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" -[[package]] -name = "unsafe-libyaml" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" - [[package]] name = "utf8parse" version = "0.2.2" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 069b9e6..691e94f 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -14,14 +14,11 @@ clap = { version = "4.5.38", features = ["derive"] } colored = "3.0.0" directories = "6.0.0" serde = { version = "1.0.219", features = ["derive"] } -serde_yaml = "0.9.34" tracing = "0.1.41" -tokio = "1.47.0" anyhow = "1.0.98" api = { path = "../core/api" } tonic = "0.14.1" tonic-reflection = "0.14.1" -tokio-stream = "0.1.17" prost-types = "0.14.1" prost = "0.14.1" From a29fe5c0a71dabd3549ee5fbbb2e0683113ac613 Mon Sep 17 00:00:00 2001 From: poinT92 Date: Sun, 24 Aug 2025 19:20:53 +0200 Subject: [PATCH 2/5] Remove unused dependencies from CLI Removed serde_yaml: not used in codebase Removed tokio: not directly used in CLI code Removed tokio-stream: not used in CLI This cleanup reduces build times and dependency bloat as part of issue #57. --- .gitignore | 3 + core/Cargo.toml | 4 - core/src/components/loadbalancer/Cargo.toml | 26 - core/src/components/loadbalancer/Dockerfile | 47 -- core/src/components/loadbalancer/build-lb.sh | 28 - .../components/loadbalancer/src/discovery.rs | 591 ------------------ .../loadbalancer/src/loadbalancer.rs | 170 ----- core/src/components/loadbalancer/src/main.rs | 134 ---- .../components/loadbalancer/src/messaging.rs | 233 ------- .../components/loadbalancer/src/metrics.rs | 25 - core/src/components/loadbalancer/src/mod.rs | 5 - .../loadbalancer/src/shared_struct.rs | 41 -- core/src/components/maps/Cargo.toml | 36 -- core/src/components/maps/build.rs | 17 - core/src/components/maps/src/lib.rs | 6 - core/src/components/maps/src/map.rs | 100 --- core/src/components/proxy/Cargo.toml | 46 -- core/src/components/proxy/src/main.rs | 42 -- core/src/components/proxy/src/mod.rs | 5 - core/src/components/proxy/src/proxy.rs | 263 -------- core/src/components/xdp/Cargo.toml | 37 -- core/src/components/xdp/build.rs | 17 - core/src/components/xdp/build.sh | 2 - core/src/components/xdp/requirements.txt | 4 - core/src/components/xdp/src/filter.rs | 129 ---- core/src/components/xdp/src/lib.rs | 6 - core/src/components/xdp/src/mod.rs | 5 - 27 files changed, 3 insertions(+), 2019 deletions(-) delete mode 100644 core/src/components/loadbalancer/Cargo.toml delete mode 100644 core/src/components/loadbalancer/Dockerfile delete mode 100755 core/src/components/loadbalancer/build-lb.sh delete mode 100644 core/src/components/loadbalancer/src/discovery.rs delete mode 100644 core/src/components/loadbalancer/src/loadbalancer.rs delete mode 100644 core/src/components/loadbalancer/src/main.rs delete mode 100644 core/src/components/loadbalancer/src/messaging.rs delete mode 100644 core/src/components/loadbalancer/src/metrics.rs delete mode 100644 core/src/components/loadbalancer/src/mod.rs delete mode 100644 core/src/components/loadbalancer/src/shared_struct.rs delete mode 100644 core/src/components/maps/Cargo.toml delete mode 100644 core/src/components/maps/build.rs delete mode 100644 core/src/components/maps/src/lib.rs delete mode 100644 core/src/components/maps/src/map.rs delete mode 100644 core/src/components/proxy/Cargo.toml delete mode 100644 core/src/components/proxy/src/main.rs delete mode 100644 core/src/components/proxy/src/mod.rs delete mode 100644 core/src/components/proxy/src/proxy.rs delete mode 100644 core/src/components/xdp/Cargo.toml delete mode 100644 core/src/components/xdp/build.rs delete mode 100755 core/src/components/xdp/build.sh delete mode 100644 core/src/components/xdp/requirements.txt delete mode 100644 core/src/components/xdp/src/filter.rs delete mode 100644 core/src/components/xdp/src/lib.rs delete mode 100644 core/src/components/xdp/src/mod.rs diff --git a/.gitignore b/.gitignore index 26701db..ce3fb08 100644 --- a/.gitignore +++ b/.gitignore @@ -175,3 +175,6 @@ client-deployment.yaml dns-deployment.yaml proxy-injector.yaml core/src/components/conntracker/src/bindings.rs + +# Claude AI assistant working files +CLAUDE.md diff --git a/core/Cargo.toml b/core/Cargo.toml index 088c2e9..f1b50db 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,11 +1,7 @@ [workspace] resolver = "3" members = [ - #"src/components/loadbalancer", "api", - #"src/components/proxy", - #"src/components/xdp", - #"src/components/maps", "src/components/conntracker", "src/components/identity", "src/components/metrics_tracer", diff --git a/core/src/components/loadbalancer/Cargo.toml b/core/src/components/loadbalancer/Cargo.toml deleted file mode 100644 index b328953..0000000 --- a/core/src/components/loadbalancer/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "loadbalancer" -version = "0.1.0" -edition = "2021" - -[dependencies] -anyhow = "1.0.98" -aya = "0.13.1" -aya-log = "0.2.1" -log = "0.4.27" -tokio = { version = "1.44.2", features = ["full"] } -tracing = "0.1.41" -tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } -bytemuck = {version ="1.23.0",features = ["derive"]} -serde = "1.0.219" -serde_json = "1.0.140" -prometheus = "0.14.0" -lazy_static = "1.5.0" -kube = "1.1.0" -base64 = "0.22.1" -k8s-openapi = { version = "0.25.0", features = ["v1_32"] } -warp = "0.3.7" -kube-runtime = "1.1.0" - -[dependencies.shared] -path = "../../shared" diff --git a/core/src/components/loadbalancer/Dockerfile b/core/src/components/loadbalancer/Dockerfile deleted file mode 100644 index fdddcfb..0000000 --- a/core/src/components/loadbalancer/Dockerfile +++ /dev/null @@ -1,47 +0,0 @@ -# Phase 1: Build image -FROM rust:1.85 AS builder - -# Set working directory -WORKDIR /usr/src/app - -# Copy the shared library in the correct location -WORKDIR /usr/src/shared -COPY .shared/Cargo.toml . -COPY .shared/src ./src - -# Copying the XDP filter binaries -WORKDIR /usr/src/app/loadbalancer - -# Then create the loadbalancer project structure -WORKDIR /usr/src/app/loadbalancer -COPY Cargo.toml . -COPY src ./src - -# Ensure Cargo recognizes the shared dependency -RUN cargo fetch - -# Build the project -RUN cargo build --release - -# Phase 2: Create final image -FROM ubuntu:24.04 - -# Install runtime dependencies -RUN apt-get update && apt-get install -y \ - ca-certificates \ - && rm -rf /var/lib/apt/lists/* - -# Create directory for the loadbalancer -WORKDIR /usr/src/cortexbrain-loadbalancer - -# Copy the binary from builder -COPY --from=builder /usr/src/app/loadbalancer/target/release/loadbalancer /usr/local/bin/cortexflow-loadbalancer - -# Copy config file -COPY xdp-filter /usr/src/cortexbrain-loadbalancer/xdp-filter - -# Set config path environment variable -ENV BPF_PATH="/usr/src/cortexbrain-loadbalancer/xdp-filter" - -# Set the loadbalancer execution command -CMD ["cortexflow-loadbalancer"] diff --git a/core/src/components/loadbalancer/build-lb.sh b/core/src/components/loadbalancer/build-lb.sh deleted file mode 100755 index f31c4c5..0000000 --- a/core/src/components/loadbalancer/build-lb.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/bash - -# Create temporary shared directory -mkdir -p .shared - -# Copy shared files -echo "Copying shared files" -cp -r ../../shared/src .shared/ -cp -r ../../shared/Cargo.toml .shared/ -cp -r ../../client/config.yaml config.yaml - -# Building XDP filter files -echo "Building the xdp filter files" -pushd ../xdp -./build.sh -popd - -echo "Copying xdp-filter binaries" -cp -r ../../../target/bpfel-unknown-none/release/xdp-filter xdp-filter - -# Run docker build -docker build -t loadbalancer:0.0.1 . - -# Cleanup -echo "Cleaning building files" -rm -rf .shared -rm -rf config.yaml -rm -rf xdp-filter diff --git a/core/src/components/loadbalancer/src/discovery.rs b/core/src/components/loadbalancer/src/discovery.rs deleted file mode 100644 index e2d6c47..0000000 --- a/core/src/components/loadbalancer/src/discovery.rs +++ /dev/null @@ -1,591 +0,0 @@ -/* -πŸš€ Update to ebpf: Using BPF maps to store values πŸš€ - -To store the service discovery data we need to do things in the kernel space -we can use bpf maps in particular this map: - - BPF_MAP_TYPE_HASH - doc: https://docs.kernel.org/bpf/map_hash.html - - DEFAULT MAP STRUCT: - #include - #include - - struct key { - __u32 srcip; - }; - - struct value { - __u64 packets; - __u64 bytes; - }; - - struct { - __uint(type, BPF_MAP_TYPE_LRU_HASH); - __uint(max_entries, 32); - __type(key, struct key); - __type(value, struct value); - } packet_stats SEC(".maps"); - - -*/ - -/* -The new algorithm can be described as this: - - 1. pod A need to know the ip of pod B to perform operations - 2. discovery service check if the pod B ip is in the cache - 2a. ip is in the cache!---> return ip address - 2b. ip is not in the cache---> need to go to step 3 - - 3. Service discovery search in ETCD the address of the pod B - 4. Service discovery store the address in the BPF map to use it in the kernel - 5. Pod A now can obtain Pod B ip address - - - -*/ - - -use crate::messaging; -use crate::messaging::MexType; -use crate::metrics::{DNS_REQUEST, DNS_RESPONSE_TIME}; -use crate::shared_struct::{SVCKey, SVCValue}; -use crate::shared_struct; -use anyhow::Error ; -use std::result::Result::Ok; -use aya::maps::{HashMap as UserSpaceMap, MapData}; -use aya::Ebpf; -use k8s_openapi::api::core::v1::{Pod, Service}; -use kube::api::ListParams; -use kube::{Client, api::Api}; -use serde_json::{json, to_vec}; -use std::collections::BTreeMap; -use std::net::{SocketAddr, ToSocketAddrs}; -use std::time::Duration; -use std::time::Instant; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::{TcpStream, UdpSocket}; -use tokio::time::timeout; -use tracing::{debug, error, info, warn}; - - -/* service discovery structure: - uses a dns_server-->kube-dns - service_cache: speed up the discovery process - -*/ -pub struct ServiceDiscovery<'a> { - kube_client: Client, - service_cache: &'a mut UserSpaceMap<&'a mut MapData,SVCKey,SVCValue>, -} -/* - Doc: - Here i'm using a lifetime <'a> - Lifetimes in Rust are used to ensure that references (&) are - always valid and do not point to β€œexpired” or deallocated memory. - - Ensure that the code cannot use dangling pointers during the execution - -*/ - -/* User space implementation */ - -impl<'a> ServiceDiscovery<'a> { - pub async fn new(mut service_map: &'a mut UserSpaceMap<&'a mut MapData,SVCKey,SVCValue> ) -> Result { - let kube_client = Client::try_default().await?; - Ok(ServiceDiscovery { - kube_client, - service_cache: service_map, - }) - } - - /* - Destination resolver: - Args: service_name, namespace - Return: service endpoint - - - */ - pub async fn resolve_service_destination( - &mut self, - service_name: &str, - namespace: &str, - port: i32, - ) -> Option { - // 1. Check the cache and return the service ip if found - if let Some(cached_service) =self.get_service(service_name){ - info!("Service {:?} found in cache ",service_name); - return Some(cached_service) - } - else{ - let services: Api = Api::namespaced(self.kube_client.clone(), namespace); - let pods: Api = Api::namespaced(self.kube_client.clone(), namespace); - - debug!( - "Fetching service {} from namespace {}", - service_name, namespace - ); - - // Fetch the service endpoint from the kubernetes api - self.fetch_service_endpoint_from_kubeapi(service_name, services, pods, namespace, port) - .await - } - } - - /* - Resolver function: - Args: service_name, namespace - Return: service address or None - */ - - async fn resolve_service_address( - &mut self, - service_name: &str, - namespace: &str, - port: i32, - ) -> Option { - match self - .resolve_service_destination(service_name, namespace, port) - .await - { - Some(service_address) => { - debug!( - "Resolved service address for {}: {}", - service_name, service_address - ); - Some(service_address) - } - None => { - error!( - "Service {} not found in namespace {}", - service_name, namespace - ); - None - } - } - } - - /* - fetch service endpoint from the KUBERNETES-API - Args: service name, service_api, namespace - Return: service endpoint - */ - - async fn fetch_service_endpoint_from_kubeapi( - &mut self, - service_name: &str, - service_api: Api, - pod_api: Api, - namespace: &str, - communication_port: i32, //can be udp or tcp port - ) -> Option { - // retrieve the service - let service = match service_api.get(service_name).await { - Ok(service) => service, - Err(e) => { - error!( - "Service {} not found in namespace {}: {:?}", - service_name, namespace, e - ); - return None; - } - }; - - // return the service selector - let selector_map = match service - .spec - .as_ref() - .and_then(|spec| spec.selector.as_ref()) - { - Some(selector) => selector, - None => { - error!("No selector found for service {}", service_name); - return None; - } - }; - - // Convert the selector to a string format - let selector = self.labels_to_selector(selector_map); - - // find the pods that match the selector - let pods = match pod_api.list(&ListParams::default().labels(&selector)).await { - Ok(pods) => pods, - Err(e) => { - error!( - "Failed to fetch pods for service {} in namespace {}: {:?}", - service_name, namespace, e - ); - return None; - } - }; - - // Select the first pod available - // TODO: more advanced load balancing techniques needed here - if let Some(pod) = pods.items.first() { - if let Some(pod_ip) = pod - .status - .as_ref() - .and_then(|status| status.pod_ip.as_ref()) - { - info!("Pod IP for service {}: {}", service_name, pod_ip); - let endpoint = format!("{}:{}", pod_ip, communication_port); - info!( - "Resolved endpoint for service {}: {}", - service_name, endpoint - ); - // add to service cache - - let key = SVCKey { - service_name: shared_struct::str_to_u8_64(&service_name), - }; - let value = SVCValue{ - ip: shared_struct::str_to_u8_4(&pod_ip), - port: communication_port as u32 - }; - - self.service_cache - .insert(key, value,u64::min_value()); - return Some(endpoint); - } else { - error!( - "No Pod IP defined for pod {}", - pod.metadata.name.as_deref().unwrap_or("unknown") - ); - } - } else { - error!("No pods found for service {}", service_name); - } - - None - } - - //return the selector from the labels - fn labels_to_selector(&self, labels: &BTreeMap) -> String { - labels - .iter() - .map(|(key, value)| format!("{}={}", key, value)) - .collect::>() - .join(",") - } - - //directly register a service in the cache - pub fn register_service(&mut self, service_id: String, endpoint: String,port: u32) { - let key = SVCKey{ - service_name:shared_struct::str_to_u8_64(&service_id) - }; - let value = SVCValue{ - ip: shared_struct::str_to_u8_4(&endpoint), - port - }; - self.service_cache.insert(key, value,u64::min_value()); - } - - //directly get a service from the cache - pub fn get_service(&self, service_id: &str) -> Option { - - let key= SVCKey{ - service_name:shared_struct::str_to_u8_64(&service_id) - }; - - //match pattern - match self.service_cache.get(&key,0) { - Ok(service) => { - let svc_ip = String::from_utf8_lossy(&service.ip) - .trim_end_matches(char::from(0)) - .to_string(); - Some(svc_ip) - }, - //return an error in case the key is not found - Err(aya::maps::MapError::KeyNotFound) => { - error!("Service not found in cache!"); - return None - } - //return an error in case of any other error type expect "KeyNotFound" - Err(e)=>{ - error!("An error occured {}",e); - return None - } - } - } - - //TCP RESPONSE - //TODO: replace this logic in the kernel space - pub async fn send_tcp_request( - &mut self, - service_name: &str, - namespace: &str, - payload: &[u8], - port: i32, - ) -> Option> { - // Resolves the address of the service - let target_service = match self.resolve_service_destination(service_name, namespace, port).await { - Some(addr) => addr, - None => { - error!("Service {} not found in namespace {}", service_name, namespace); - return None; - } - }; - - // Convert the address in a socket address - let target_addr: SocketAddr = match target_service.parse() { - Ok(addr) => addr, - Err(e) => { - error!("Invalid target address: {}", e); - return None; - } - }; - - // TCP connection to the service - let start_time = Instant::now(); - let duration = start_time.elapsed().as_secs_f64(); - - - - match TcpStream::connect(target_addr).await { - Ok(mut stream) => { - info!("Connected to service at {}", target_addr); - - // Create the json message - info!("Message waiting to be forwarded:{:?}",&payload); - - let response = json!({ - "message": String::from_utf8_lossy(payload) - }); - - let msg_forwarded_serialized = match to_vec(&response) { - Ok(data) => data, - Err(e) => { - error!("Failed to serialize request: {}", e); - return None; - } - }; - let response_message = messaging::create_message( - &service_name, - MexType::Outcoming, - &msg_forwarded_serialized, - ); - - // send the message - if let Err(e) = stream.write_all(&response_message).await { - error!("Failed to send request to {}: {}", target_addr, e); - return None; - } - info!("Request sent to {}", target_addr); - - let client_addr = match stream.peer_addr() { - Ok(addr) => addr, - Err(e) => { - error!("Cannot return client address: {}", e); - return None; - } - }; - - let mut buffer = vec![0u8; 1024]; - - // wait for the response with a timeout timer - match timeout(Duration::from_secs(10), stream.read(&mut buffer)).await { - Ok(Ok(len)) => { - let response_data = buffer[..len].to_vec(); - info!("Received response from {} ({} bytes)", client_addr, len); - DNS_REQUEST.with_label_values(&[&(client_addr.to_string()+"_tcp")]).inc(); - // Register the metric when len =0 - DNS_RESPONSE_TIME.with_label_values(&["service_discovery_tcp"]).observe(duration); - - if len > 0 { - DNS_REQUEST.with_label_values(&[&(client_addr.to_string() + "_tcp")]).inc(); - Some(response_data) - } else { - warn!("Empty response received from {}", client_addr); - None - } - } - Ok(Err(e))=> { - error!("Error: {}",e); - None - } - Err(e) => { - error!("TCP response timed out: {}", e); - - // Register the metric when timeout - DNS_RESPONSE_TIME.with_label_values(&["service_discovery_tcp"]).observe(duration); - - None - } - } - } - Err(e) => { - error!("Failed to connect to {}: {}", target_addr, e); - None - } - } - } - //UDP RESPONSE - //TODO: replace this logic in the kernel space - pub async fn wait_for_udp_response( - &mut self, - service_name: &str, - namespace: &str, - payload: &[u8], - port: i32, - client_addr: SocketAddr, - ) -> Option> { - // Resolve the service name - let target_service = match self - .resolve_service_destination(service_name, namespace, port) - .await - { - Some(addr) => addr, - None => { - error!( - "Service {} not found in namespace {}", - service_name, namespace - ); - return None; // return None if service not found - } - }; - - // Parse target_service into SocketAddr if it's a string - let target_addr = match target_service.to_socket_addrs() { - Ok(mut addrs) => match addrs.next() { - Some(addr) => addr, - None => { - error!("Could not resolve address for {}", target_service); - return None; - } - }, - Err(e) => { - error!("Failed to parse socket address: {}", e); - return None; - } - }; - - // initialize the udp socket - // bind to a random port - let socket = match UdpSocket::bind("0.0.0.0:0").await { - Ok(socket) => socket, - Err(e) => { - error!("Failed to bind UDP socket: {}", e); - return None; - } - }; - - // Allow the socket to receive from any address, not just the target - // This is important for UDP where responses might come from different ports - if let Err(e) = socket.set_broadcast(true) { - error!("Failed to set socket to broadcast mode: {}", e); - return None; - } - - // Sends the payload to the destination service - let response = json!({ - "message": String::from_utf8_lossy(payload) - }); - - let serialized_response = match to_vec(&response) { - Ok(bytes) => bytes, - Err(e) => { - error!("cannot serialize udp response: {}", e); - return None; - } - }; - - let response_message = messaging::create_message( - &service_name, - MexType::Outcoming, - &serialized_response, - ); - - info!( - "([{}]->[{}])sending response (outcoming) message to : {}", - target_addr, client_addr, client_addr - ); - - if let Err(e) = socket.send_to(&response_message, &client_addr).await { - error!( - "Error sending UDP response to target service {}: {}", - target_addr, e - ); - return None; - }else { - info!( - "UDP response successfully sent to {} from {} with message {:?} ", - client_addr,target_addr ,response_message - ); - } - - - - let start_time = Instant::now(); - let client_ip = client_addr.ip(); - - // Set up timeout for receiving the response - match timeout( - Duration::from_secs(10), // 10 second timeout - async { - // Get the response from the destination service - // We're willing to accept a response from any address associated with the target service - let mut buffer = [0u8; 1024]; - loop { - match socket.recv_from(&mut buffer).await { - Ok((len, addr)) => { - // Check if this is a response from our target (any port) - //TODO: is this part safe? - if addr.ip() == client_ip { - DNS_REQUEST.with_label_values(&[&(addr.to_string()+"_udp")]).inc(); - if len == 0 { - warn!( - "Received null UDP response from {} at address {}", - client_addr, addr - ); - return None; - } else { - info!( - "Received UDP response from {}({} bytes) at address:{}", - client_addr, len, addr - ); - - let response_data = buffer[..len].to_vec(); - return Some(response_data); - } - } else { - // Message from another address, ignore and keep waiting - info!("Received message from unexpected address: {}, continuing to wait", addr); - continue; - } - } - Err(e) => { - error!("Error receiving UDP response: {}", e); - return None; - } - } - } - } - ).await { - Ok(Some(response_data)) => { - let duration = start_time.elapsed().as_secs_f64(); - DNS_RESPONSE_TIME - .with_label_values(&["service_discovery_udp"]) - .observe(duration); - - // Forward the response to the original client - info!("Forwarding the response to the client: {client_addr}"); - if let Err(e) = socket.send_to(&response_data, client_addr).await { - error!("Error sending UDP response to client: {}", e); - } else { - info!("Response forwarded to client {}", client_addr); - } - - Some(response_data) - } - Ok(None) => { - warn!("None UDP response from {}", target_addr); - None - } - Err(e) => { - error!("UDP response timed out with error: {}", e); - None - } - } - } -} diff --git a/core/src/components/loadbalancer/src/loadbalancer.rs b/core/src/components/loadbalancer/src/loadbalancer.rs deleted file mode 100644 index fef95b0..0000000 --- a/core/src/components/loadbalancer/src/loadbalancer.rs +++ /dev/null @@ -1,170 +0,0 @@ -/* Implementation */ -//TODO: implement loadbalancer function - -use crate::discovery::ServiceDiscovery; -use crate::messaging; -use crate::messaging::MexType; -use crate::messaging::{ - ignore_message_with_no_service, produce_incoming_message, produce_unknown_message, - produce_unknown_message_udp, send_fail_ack_message, send_outcoming_message, - send_outcoming_message_udp, send_success_ack_message, -}; -use anyhow::{Error, Result}; -use aya::Ebpf; -use anyhow::Context; -use tokio::signal; -use tokio::fs; -use aya_log::EbpfLogger; -use aya::programs::{Xdp, XdpFlags}; -use prometheus::{Encoder, TextEncoder}; -use std::sync::{Arc}; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWriteExt; -use tokio::net::UdpSocket; -use tokio::net::{TcpListener, TcpStream}; -use tracing::{debug, warn,error, info}; -use warp::Filter; -use shared::apiconfig::EdgeProxyConfig; -use std::path::Path; -use aya::maps::{HashMap as UserSpaceMap, MapData}; -use crate::shared_struct::BackendPorts; -use tokio::sync::RwLock; - - -const BPF_PATH : &str = "BPF_PATH"; - -pub struct Loadbalancer<'a> { - proxy_config: Arc, - service_discovery: Arc>>, - backends: Arc>>, -} - -impl<'a> Loadbalancer<'a>{ - - pub async fn new(proxycfg: EdgeProxyConfig,cache_map: ServiceDiscovery<'a>,backends_list: UserSpaceMap -) -> Result { - Ok(Loadbalancer { - proxy_config: Arc::new(proxycfg), - service_discovery: Arc::new(cache_map.into()), - backends: Arc::new(backends_list.into()), - }) - } - - pub async fn run(&self) -> Result<(), Error> { - let bpf_path= std::env::var(BPF_PATH).context("BPF_PATH environment variable required")?; - let data = fs::read(Path::new(&bpf_path)).await.context("failed to load file from path")?; - let mut bpf = aya::Ebpf::load(&data).context("failed to load data from file")?; - EbpfLogger::init(&mut bpf).context("Cannot initialize ebpf logger"); - - //extract the bpf program "xdp-hello" from builded binaries - info!("loading xdp program"); - let program: &mut Xdp = bpf.program_mut("xdp_hello").unwrap().try_into()?; - program.load().context("Failed to laod XDP program")?; //load the program - - info!("Starting program"); - program - .attach("eth0", XdpFlags::default()) - .context("Failed to attach XDP program with default flags to interface eth0")?; - info!("Cortexflow Intelligent Loadbalancer is running"); - - //waiting for signint (ctrl-c) to shutdown the program - info!("Waiting for Ctrl-C..."); - - // Start udpsocket - let socket = UdpSocket::bind("0.0.0.0:5053").await?; - info!("Udp socket bound to {}", socket.local_addr()?); - - let metrics_route = warp::path!("metrics").map(|| { - let mut buffer = Vec::new(); - let encoder = TextEncoder::new(); - let metrics_families = prometheus::gather(); - encoder.encode(&metrics_families, &mut buffer).unwrap(); - warp::reply::with_header( - String::from_utf8(buffer).unwrap(), - "Content-Type", - "text/plain; charset=utf-8", - ) - }); - - tokio::spawn(async move { - warp::serve(metrics_route).run(([0, 0, 0, 0], 9090)).await; - }); - - let socket = Arc::new(socket); - let socket_clone = socket.clone(); - - let mut buffer = [0u8; 512]; - loop { - match socket_clone.recv_from(&mut buffer).await { - Ok((len, addr)) => { - let query = &buffer[..len]; - info!("Received {} bytes from sender: {}", len, addr); - - let response = self - .handle_udp_connection(query, &socket_clone, addr, 5053) - .await; - - if let Err(e) = socket_clone.send_to(&response, addr).await { - error!("Error sending response: {:?}", e); - } - } - Err(e) => { - error!("Error receiving packet: {}", e); - } - } - } - } - - - pub async fn handle_udp_connection( - &self, - query: &[u8], - socket: &UdpSocket, - sender_addr: std::net::SocketAddr, - port: i32, - ) -> Vec { - // Extract service name, direction, and payload - let (direction, service_name, payload) = - match messaging::extract_service_name_and_payload(query) { - Some((direction, name, payload)) if !name.is_empty() => (direction, name, payload), - _ => { - error!("Invalid UDP request format"); - return Vec::new(); // Return an empty response - } - }; - - let namespace = "cortexflow"; - - match direction { - MexType::Incoming => { - info!( - "([{}]->[{}]): Processing incoming UDP message from service: {}", - sender_addr, service_name, sender_addr - ); - - // Use service discovery to resolve the request and forward response to client - if let Some(response) = self - .service_discovery - .write() - .await - .wait_for_udp_response(&service_name, namespace, &payload, port, sender_addr) - .await - { - if let Err(e) = socket.send_to(&response, sender_addr).await { - error!( - "([{}]->[{}]):Error sending UDP response : {}", - service_name, sender_addr, e - ); - } - response - } else { - Vec::new() // Return empty if no response received - } - } - MexType::Outcoming => { - send_outcoming_message_udp(socket, service_name, sender_addr).await - } - _ => produce_unknown_message_udp(socket, service_name, sender_addr).await, - } - } -} diff --git a/core/src/components/loadbalancer/src/main.rs b/core/src/components/loadbalancer/src/main.rs deleted file mode 100644 index 668f213..0000000 --- a/core/src/components/loadbalancer/src/main.rs +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Contains the Load balancer (CortexFlow Agent) user space code implementation. - * The implementation leverages the power of bpf programs to interact with the internet interface - * to distribute load accross multiple backends. - * The program leverages bpf maps to enable the communication between Kernel Space and User Space - - * //TODO: Update the code to use the discovered services from the cortexflow identity service - */ - -/* - * Annotations - - let kubeconfig_path = PathBuf::from("/home/cortexflow/.kube/config"); -*/ - /* annotations for permissions: - sudo chmod 644 /home//.kube/config - sudo chown : /home//.kube/config - - sudo mkdir -p /root/.kube - sudo cp /home//.kube/config /root/.kube/config -*/ - -mod shared_struct; -mod discovery; - -mod messaging; -mod metrics; -mod loadbalancer; - -use std::error; - -use anyhow::Context; -use aya::programs::{Xdp, XdpFlags}; -use tokio::fs; -use tokio::signal; -use aya_log::EbpfLogger; -use tracing::{error,info,warn}; -use std::path::Path; -use std::path::PathBuf; - -use tracing_subscriber::fmt::format::FmtSpan; -use tracing_subscriber::EnvFilter; - -use aya::maps::{HashMap as UserSpaceMap, MapData}; -use crate::shared_struct::{SVCKey,SVCValue,BackendPorts}; -use crate::loadbalancer::Loadbalancer; -use crate::discovery::ServiceDiscovery; -use shared::{apiconfig::EdgeProxyConfig, default_api_config::ConfigType}; -use k8s_openapi::api::core::v1::ConfigMap; -use kube::{Client,Config, api::Api}; -use kube::config::Kubeconfig; - - -unsafe impl aya::Pod for shared_struct::SVCKey {} -unsafe impl aya::Pod for shared_struct::SVCValue {} -unsafe impl aya::Pod for shared_struct::BackendPorts {} - -const BPF_PATH : &str = "BPF_PATH"; - - -//main program -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - - // * init tracing subscriber - tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) - .with_target(false) - .with_level(true) - .with_span_events(FmtSpan::NONE) - .without_time() - .with_file(false) - .pretty() - .with_env_filter(EnvFilter::new("info")) - .with_line_number(false) - .init(); - - - - - // * loading the pre-built binaries--> reason: linux kernel does not accept non compiled code. only accepts bytecode - - info!("loading data"); - let bpf_path= std::env::var(BPF_PATH).context("BPF_PATH environment variable required")?; - let data = fs::read(Path::new(&bpf_path)).await.context("failed to load file from path")?; - info!("loading bpf data"); - let mut bpf = aya::Ebpf::load(&data).context("failed to load data from file")?; - info!("loading maps data"); - let mut maps_owned = bpf.take_map("services").context("failed to take services map")?; - info!("loading bpf backends map"); - let backend_map = bpf.take_map("Backend_ports").context("failed to take backends map")?; - - - if Path::new("/sys/fs/bpf/services").exists(){ - warn!("map already pinned,skipping process"); - } - else{ - maps_owned.pin("/sys/fs/bpf/services").context("failed to pin map")?; - } - - info!("loading service map in user space"); - let mut service_map = UserSpaceMap::<&mut MapData, SVCKey, SVCValue>::try_from(&mut maps_owned)?; - info!("loading backends in user space"); - let mut backends = UserSpaceMap::::try_from(backend_map)?; - - let mut ports = [0;4]; - ports[0] = 9876; - ports[1] = 9877; - - let backend_ports = BackendPorts{ - ports:ports, - index:0, - }; - backends.insert(5053,backend_ports,0); - - - //declare service discovery - info!("Initializing service discovery"); - let service_discovery=ServiceDiscovery::new(&mut service_map).await?; - - info!("connecting to client"); - let client = Client::try_default().await?; - - info!("reading kubernetes configmap"); - let configmap: Api = Api::namespaced(client.clone(), "cortexflow"); - - info!("Loading Loadbalancer configuration from configmap"); - let lbcfg = EdgeProxyConfig::load_from_configmap(configmap, ConfigType::Default).await?; - info!("Initializing Loadbalancer"); - let loadbalancer = Loadbalancer::new(lbcfg,service_discovery,backends).await?; - loadbalancer.run().await?; - - Ok(()) -} diff --git a/core/src/components/loadbalancer/src/messaging.rs b/core/src/components/loadbalancer/src/messaging.rs deleted file mode 100644 index a615638..0000000 --- a/core/src/components/loadbalancer/src/messaging.rs +++ /dev/null @@ -1,233 +0,0 @@ -/* Contains all the functions used to communicate between services */ -use base64::{Engine as _, engine::general_purpose::STANDARD}; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use tokio::net::UdpSocket; -use tokio::{io::AsyncWriteExt, net::TcpStream}; -use tracing::{error, info, warn}; - -/* -Extract the service name and the payload from this format: - - .: - -Messages structure: - payload - direction - service - -Message Type: - Incoming - Outcoming - Unknown - -*/ -/* - Messagging logic: - A sends an "Incoming" message to B. - B receives the message and processes it: - - If the service is valid, B tries to get a response from service_discovery. - - If it finds a response, B sends the "Outcoming" message with the correct payload. - - If it does not find a response, it logs an error. - - A receives an "Outcoming" message with the service response payload. - - If B receives an "Outcoming" message, it responds with {"status": "received"}. - -*/ - -#[derive(Debug, Serialize, Deserialize, PartialEq)] -pub enum MexType { - Incoming, - Outcoming, - Unknown, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Message { - payload: String, //TODO: consider using type Option so the payload cannot be present and the functions does not return errors - service: String, - direction: MexType, -} - -pub fn extract_service_name_and_payload( - msg_encrypted: &[u8], -) -> Option<(MexType, String, Vec)> { - // Convert the bytes in a UTF-8 String - let message = match std::str::from_utf8(msg_encrypted) { - Ok(msg) => { - info!("{:?}", msg); - msg - } - Err(e) => { - error!("Invalid byte sequence: {}", e); - return None; - // return none for invalid byte sequence - //TODO: add checks if the message is not a JSON - } - }; - decode_json_message(message) -} - -// Parse the json message -fn decode_json_message(message: &str) -> Option<(MexType, String, Vec)> { - match serde_json::from_str::(message) { - Ok(service_message) => { - // Extract service name - let service_name = service_message - .service - .split('.') - .next() - .unwrap_or("") - .to_string(); - // Decode base64 payload - match STANDARD.decode(&service_message.payload) { - Ok(decoded_payload) => { - info!("decoded payload:{:?}", decoded_payload); - Some((service_message.direction, service_name, decoded_payload)) - } - Err(e) => { - error!("Invalid Payload: {}", e); - None - } - } - } - Err(e) => { - error!("Cannot decode JSON message: {:?}", e); - None - } - } -} - -// Create JSON message -pub fn create_message(service: &str, direction: MexType, payload: &[u8]) -> Vec { - let message = Message { - direction, - payload: STANDARD.encode(payload), - service: service.to_string(), - }; - match serde_json::to_string(&message) { - Ok(json) => json.into_bytes(), - Err(e) => { - error!("Cannot serialize the message: {}", e); - Vec::new() // Empty vector in case of error - } - } -} -//tcp connection method -//TODO: debug tcp connection -pub async fn send_outcoming_message(stream: &mut TcpStream, service_name: String) { - info!("Producing outcoming message"); - info!( - "([{}]->[{:?}]): Receiving outgoing message from: {}", - service_name, - stream.peer_addr(), - service_name - ); - - // Send a response back - let response_json = json!({ "status": "received" }).to_string(); - if let Err(e) = stream.write_all(response_json.as_bytes()).await { - error!("Error sending JSON response to {}: {}", service_name, e); - } -} -//udp connection method -pub async fn send_outcoming_message_udp( - socket: &UdpSocket, - service_name: String, - addr: std::net::SocketAddr, -) -> Vec { - info!( - "([{}]->[{}]):Receiving outgoing message from: {}", - service_name, addr, service_name - ); - - // Send a response back - let response_json = json!({ "status": "received" }).to_string(); - if let Err(e) = socket.send_to(&response_json.as_bytes(), addr).await { - error!( - "([{}]->[{}]):Error sending JSON response to {}: {}", - addr, service_name, service_name, e - ); - } - response_json.as_bytes().to_vec() -} -//tcp connection method -//TODO: debug this method -pub async fn produce_unknown_message(stream: &mut TcpStream, service_name: String) { - warn!("Producing message from unknown direction"); - warn!( - "Receiving message with unknown direction from {}", - service_name - ); - warn!("Ignoring the message with unknown direction"); - - // Send a response back - let response_json = json!({ "status": "received" }).to_string(); - if let Err(e) = stream.write_all(response_json.as_bytes()).await { - error!("Error sending JSON response to {}: {}", service_name, e); - } -} -//udp connection method -pub async fn produce_unknown_message_udp( - socket: &UdpSocket, - service_name: String, - addr: std::net::SocketAddr, -) -> Vec { - warn!( - "Receiving message with unknown direction from {}", - service_name - ); - warn!("Ignoring the message with unknown direction"); - - // Send a response back - let response_json = json!({ "status": "received" }).to_string(); - if let Err(e) = socket.send_to(&response_json.as_bytes(), addr).await { - error!("Error sending JSON response to {}: {}", service_name, e); - } - response_json.as_bytes().to_vec() -} - -//tcp connection method -//TODO: debug this method -pub async fn produce_incoming_message(stream: &mut TcpStream, service_name: String) { - info!("Producing Incoming response message"); - // return a status response - let response_json = json!({"status":"received"}).to_string(); - info!( - "Sending TCP response back to {} with content {}", - service_name, response_json - ); - let response_message = - create_message(&service_name, MexType::Outcoming, response_json.as_bytes()); - - if let Err(e) = stream.write_all(&response_message).await { - error!("Error sending {:?} to {}", response_message, service_name); - error!("Error: {}", e); - } -} -//tcp connection method -//TODO: debug this method -pub async fn send_success_ack_message(stream: &mut TcpStream) { - // ACK message - let ack_message = b"Message Received"; - if let Err(e) = stream.write_all(ack_message).await { - error!("Error sending TCP acknowledgment: {}", e); - } -} -//tcp connection method -//TODO: debug this method -pub async fn send_fail_ack_message(stream: &mut TcpStream) { - // ACK message - let ack_message = b"Delivery failed"; - if let Err(e) = stream.write_all(ack_message).await { - error!("Error sending TCP acknowledgment: {}", e); - } -} -//tcp connnection method -//TODO: debug this method -pub fn ignore_message_with_no_service(direction: MexType, payload: &[u8]) { - info!( - "Ignoring message with direction {:?}: {:?}", - direction, payload - ); -} diff --git a/core/src/components/loadbalancer/src/metrics.rs b/core/src/components/loadbalancer/src/metrics.rs deleted file mode 100644 index 09bd189..0000000 --- a/core/src/components/loadbalancer/src/metrics.rs +++ /dev/null @@ -1,25 +0,0 @@ -use lazy_static::lazy_static; -use prometheus::{register_histogram_vec, register_int_counter_vec, HistogramVec, IntCounterVec}; - - -lazy_static!{ - /* One of the main benefits of using lazy_static is the ability to store thread-safe global variables. - Because lazy static values are initialized in a thread-safe manner, they can be safely accessed - from multiple threads without the need for additional synchronization. - This can be especially useful in cases where you want to avoid the overhead of locking and unlocking shared resources. - - lazy static documentation: https://blog.logrocket.com/rust-lazy-static-pattern/#how-lazy-static-works - */ - - pub static ref DNS_REQUEST: IntCounterVec = register_int_counter_vec!( - "total_dns_requests", - "Total_DNS_Requests", - &["client_ip"] - ).unwrap(); - - pub static ref DNS_RESPONSE_TIME: HistogramVec = register_histogram_vec!( - "dns_response_time", - "DNS_response_time", - &["server"] - ).unwrap(); -} diff --git a/core/src/components/loadbalancer/src/mod.rs b/core/src/components/loadbalancer/src/mod.rs deleted file mode 100644 index 7e3fd1d..0000000 --- a/core/src/components/loadbalancer/src/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod loadbalancer; -pub mod shared_struct; -pub mod discovery; -pub mod metrics; -pub mod messaging; \ No newline at end of file diff --git a/core/src/components/loadbalancer/src/shared_struct.rs b/core/src/components/loadbalancer/src/shared_struct.rs deleted file mode 100644 index b34f2e8..0000000 --- a/core/src/components/loadbalancer/src/shared_struct.rs +++ /dev/null @@ -1,41 +0,0 @@ -#![no_std] - -use bytemuck::{Pod, Zeroable}; - -//ref: /maps/map.rs/SVCKey -#[repr(C)] -#[derive(Clone, Copy, Pod, Zeroable,Debug)] -pub struct SVCKey { - pub service_name: [u8; 64], -} - -//ref: /maps/map.rs/SVCValue -#[repr(C)] -#[derive(Clone, Copy, Pod, Zeroable,Debug)] -pub struct SVCValue { - pub ip: [u8; 4], - pub port: u32, -} -//ref: /map/maps.rs/BackendPorts -#[repr(C)] -#[derive(Clone,Debug,Pod,Zeroable,Copy)] -pub struct BackendPorts{ - pub ports: [u16;4], - pub index: usize -} - -//ref: /map/map.rs/str_to_u8_64 -pub fn str_to_u8_64(s: &str) -> [u8; 64] { - let mut buf = [0u8; 64]; - let bytes = s.as_bytes(); - let len = bytes.len().min(64); - buf[..len].copy_from_slice(&bytes[..len]); - buf -} -pub fn str_to_u8_4(s: &str) -> [u8; 4] { - let mut buf = [0u8; 4]; - let bytes = s.as_bytes(); - let len = bytes.len().min(4); - buf[..len].copy_from_slice(&bytes[..len]); - buf -} \ No newline at end of file diff --git a/core/src/components/maps/Cargo.toml b/core/src/components/maps/Cargo.toml deleted file mode 100644 index 1b8206e..0000000 --- a/core/src/components/maps/Cargo.toml +++ /dev/null @@ -1,36 +0,0 @@ -[package] -name = "maps" -version = "0.1.0" -edition = "2021" - -[lib] -name = "maps" -path = "src/lib.rs" - -[dependencies] -aya-ebpf = { git = "https://github.com/aya-rs/aya" } -aya-log-ebpf = { git = "https://github.com/aya-rs/aya" } -bytemuck = {version ="1.23.0",features = ["derive"]} -network-types = "0.0.8" - -[build-dependencies] -which = { version = "8.0.0", default-features = false } - -[[bin]] -name = "xdp-map" -path = "src/map.rs" - -[profile.dev] -panic = "abort" - -[profile.release] -panic = "abort" - -[target.bpfel-unknown-none] -linker = "bpf-linker" -rustflags = [ - "-C", "panic=abort", - "-C", "target-feature=+alu32", - "-C", "link-args=-znotext", -] - diff --git a/core/src/components/maps/build.rs b/core/src/components/maps/build.rs deleted file mode 100644 index f83c317..0000000 --- a/core/src/components/maps/build.rs +++ /dev/null @@ -1,17 +0,0 @@ -use which::which; - -/// Building this crate has an undeclared dependency on the `bpf-linker` binary. This would be -/// better expressed by [artifact-dependencies][bindeps] but issues such as -/// https://github.com/rust-lang/cargo/issues/12385 make their use impractical for the time being. -/// -/// This file implements an imperfect solution: it causes cargo to rebuild the crate whenever the -/// mtime of `which bpf-linker` changes. Note that possibility that a new bpf-linker is added to -/// $PATH ahead of the one used as the cache key still exists. Solving this in the general case -/// would require rebuild-if-changed-env=PATH *and* rebuild-if-changed={every-directory-in-PATH} -/// which would likely mean far too much cache invalidation. -/// -/// [bindeps]: https://doc.rust-lang.org/nightly/cargo/reference/unstable.html?highlight=feature#artifact-dependencies -fn main() { - let bpf_linker = which("bpf-linker").unwrap(); - println!("cargo:rerun-if-changed={}", bpf_linker.to_str().unwrap()); -} diff --git a/core/src/components/maps/src/lib.rs b/core/src/components/maps/src/lib.rs deleted file mode 100644 index 844d23a..0000000 --- a/core/src/components/maps/src/lib.rs +++ /dev/null @@ -1,6 +0,0 @@ -#![no_std] -#![no_main] - -pub mod map; - - diff --git a/core/src/components/maps/src/map.rs b/core/src/components/maps/src/map.rs deleted file mode 100644 index 21f1b39..0000000 --- a/core/src/components/maps/src/map.rs +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Init a bpf map to save the user space pod data - * to use in the kernel space and user space -*/ -#![no_std] -#![no_main] - -use aya_ebpf::maps::PerfEventArray; -use aya_ebpf::{macros::map, maps::HashMap as KernelSpaceMap}; //aya_ebpf is the kernel space libary -use bytemuck::{Pod, Zeroable}; - -/* unsafe impl Zeroable for SVCKey {} //implemente zeroable -unsafe impl Zeroable for SVCValue {} */ - -#[repr(C)] -/* - * match the C fields alignment. tells the compiler that the rappresentation - * must follow the C rules. disable the rust compiler realignment - * - *In this case Rust struct and C are unitary equivalent -*/ -#[derive(Clone, Copy, Pod, Zeroable, Debug)] -pub struct SVCKey { - pub service_name: [u8; 64], -} - -#[repr(C)] //match the C fields alignment -#[derive(Clone, Copy, Pod, Zeroable, Debug)] -pub struct SVCValue { - pub ip: [u8; 4], - pub port: u32, -} - -#[repr(C)] -#[derive(Clone, Debug, Pod, Zeroable, Copy)] -pub struct BackendPorts { - pub ports: [u16; 4], - pub index: usize, -} - -//enable Pod (Plain of data) data type -/* unsafe impl Pod for SVCKey {} -unsafe impl Pod for SVCValue {} */ -/* - * Doc: - * POD (Plain Old Data) types are marked with the trait, indicating that they can be - * duplicated simply by copying their memory representation. - * - * This trait allows the Rust compiler to efficiently handle data creating bit-for-bit copies - * without invoking user-defined methods - * POD types do not involve pointers or complex data structures, they are easier to manage in terms of - * memory allocation and deallocation - -*/ - -/* - * Maps -*/ - -#[map(name = "services")] -/* - * connect the map name "SERVICES" to the HasMap in the BPF bytecode - * init a BPF_MAP_HASH_TYPE to store the resolved service values -*/ -pub static mut SERVICES: KernelSpaceMap = - KernelSpaceMap::with_max_entries(1024, 0); - -#[map(name = "Backend_ports")] -/* - * connect the map name "BACKEND_PORTS" to the HasMap in the BPF bytecode - * init a BPF_MAP_HASH_TYPE to store the resolved service values -*/ -pub static mut BACKEND_PORTS: KernelSpaceMap = - KernelSpaceMap::with_max_entries(10, 0); - -/*Aux Functions */ - -//perform &str types to &[u8;64] -pub fn str_to_u8_64(s: &str) -> [u8; 64] { - let mut buf = [0u8; 64]; - let bytes = s.as_bytes(); - let len = bytes.len().min(64); - buf[..len].copy_from_slice(&bytes[..len]); - buf -} -pub fn u32_to_u8_4(s: u32) -> [u8; 4] { - //32 bit ---> 4 bytes - let mut buf = [0u8; 4]; - let bytes = s.to_le_bytes(); // this produce [u8,4] - buf[..4].copy_from_slice(&bytes); // Only copy the first 4 bytes - buf -} - -pub fn u32_to_u8_64(s: u32) -> [u8; 64] { - //32 bit ---> 4 bytes - let mut buf = [0u8; 64]; - let bytes = s.to_le_bytes(); //this produce [u8,4] - buf[..4].copy_from_slice(&bytes); // Only copy the first 4 bytes - buf -} diff --git a/core/src/components/proxy/Cargo.toml b/core/src/components/proxy/Cargo.toml deleted file mode 100644 index 296e6d9..0000000 --- a/core/src/components/proxy/Cargo.toml +++ /dev/null @@ -1,46 +0,0 @@ -[package] -name = "proxy" -version = "0.1.0" -edition = "2024" - -[dependencies] -anyhow = "1.0.95" -base64 = "0.22.1" -dashmap = "6.1.0" -hyper = "1.6.0" -k8s-openapi = "0.25.0" -kube = "1.1.0" -kube-runtime = "1.1.0" -lazy_static = "1.5.0" -prometheus = "0.14.0" -serde = "1.0.219" -serde_json = "1.0.140" -tokio = "1.43.0" -tracing = "0.1.41" -tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } -warp = "0.3.7" -time = "=0.3.41" -aya-ebpf = { git = "https://github.com/aya-rs/aya" } -aya-log-ebpf = { git = "https://github.com/aya-rs/aya" } -bytemuck = "1.22.0" -aya = "0.13.1" -once_cell = { version = "1.20.3" } - -[dependencies.shared] -path = "../../shared" - -[build-dependencies] -which = { version = "8.0.0", default-features = false } - -[[bin]] -name = "proxy" -path = "src/main.rs" - -[profile.dev] -panic = "abort" - -[profile.release] -panic = "abort" - -[patch.crates-io] -once_cell = "1.20.3" diff --git a/core/src/components/proxy/src/main.rs b/core/src/components/proxy/src/main.rs deleted file mode 100644 index 30ed7db..0000000 --- a/core/src/components/proxy/src/main.rs +++ /dev/null @@ -1,42 +0,0 @@ -//TODO: basic proxy functionalities -//TODO: add integration with prometheus logging system -//TODO: add load balancer between dns servers - -mod discovery; -mod map; -mod messaging; -mod metrics; -mod proxy; - -use k8s_openapi::api::core::v1::ConfigMap; -use kube::{Client, api::Api}; -use proxy::Proxy; -use shared::{apiconfig::EdgeProxyConfig, default_api_config::ConfigType}; -use tracing_subscriber::EnvFilter; -use tracing_subscriber::fmt::format::FmtSpan; - -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) - .with_target(false) - .with_level(true) - .with_span_events(FmtSpan::NONE) - .without_time() - .with_file(false) - .pretty() - .with_env_filter(EnvFilter::new("info")) - .with_line_number(false) - .init(); - - let client = Client::try_default().await?; - let configmap: Api = Api::namespaced(client.clone(), "cortexflow"); - - let proxycfg = EdgeProxyConfig::load_from_configmap(configmap, ConfigType::Default).await?; - let proxy = Proxy::new(proxycfg).await?; - - proxy.start().await?; - proxy.get_info().await; - - Ok(()) -} diff --git a/core/src/components/proxy/src/mod.rs b/core/src/components/proxy/src/mod.rs deleted file mode 100644 index 6f11795..0000000 --- a/core/src/components/proxy/src/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod discovery; -pub mod map; -pub mod messaging; -pub mod metrics; -pub mod proxy; diff --git a/core/src/components/proxy/src/proxy.rs b/core/src/components/proxy/src/proxy.rs deleted file mode 100644 index 0d7f317..0000000 --- a/core/src/components/proxy/src/proxy.rs +++ /dev/null @@ -1,263 +0,0 @@ -/* -Cortexflow proxy is the main proxy in cortexbrain. Features: -- Caching βœ…//TODO: refer to bug (line 67) -- UDP/TCP traffic βœ… -- Automatic prometheus metrics export βœ… -- Load balancing ❌ -- Service discovery βœ… -*/ -use crate::discovery::ServiceDiscovery; -use crate::messaging; -use crate::messaging::MexType; -use crate::messaging::{ - ignore_message_with_no_service, produce_incoming_message, produce_unknown_message, - produce_unknown_message_udp, send_fail_ack_message, send_outcoming_message, - send_outcoming_message_udp, send_success_ack_message, -}; -use anyhow::{Error, Result}; -use aya::Ebpf; -use prometheus::{Encoder, TextEncoder}; -use shared::apiconfig::EdgeProxyConfig; -use std::sync::Arc; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWriteExt; -use tokio::net::UdpSocket; -use tokio::net::{TcpListener, TcpStream}; -use tracing::{debug, error, info}; -use warp::Filter; -#[derive(Clone)] -pub struct Proxy { - proxy_config: Arc, - service_discovery: Arc, -} - -impl Proxy { - pub async fn new(proxycfg: EdgeProxyConfig) -> Result { - let service_discovery = ServiceDiscovery::new(&mut Ebpf).await?; - Ok(Proxy { - proxy_config: Arc::new(proxycfg), - service_discovery: Arc::new(service_discovery), - }) - } - - pub async fn get_info(&self) { - info!("Enable: {:?}", self.proxy_config.enable); - info!("Listen interface: {:?}", self.proxy_config.listen_interface); - } - - pub async fn start(&self) -> Result<(), Error> { - if !self.proxy_config.enable { - error!("Proxy is not running"); - return Ok(()); - } - self.run().await - } - - //TODO: a code refactoring needed here - pub async fn run(&self) -> Result<(), Error> { - debug!("Cortexflow Proxy is running"); - - // Start udpsocket - let socket = UdpSocket::bind("0.0.0.0:5053").await?; - debug!("Socket bound to {}", socket.local_addr()?); - - // Start tcp_listener - let tcp_listener = TcpListener::bind("0.0.0.0:5054").await?; - debug!("Tcp listener bound to {}", tcp_listener.local_addr()?); - - //TODO:fix caching bug - /* - Bug description: the caching system use the udp resolved endpoint - when a tcp communication is performed - - Solution to do: - implement a system that can recognize and block the use of udp endpoints - while performing a tcp communication - - Additional info: - TCP port: 5054 - UDP port : 5053 - - */ - //start the cache - //let cache = Arc::new(DashMap::new()); - //let cache_clone = cache.clone(); - - let metrics_route = warp::path!("metrics").map(|| { - let mut buffer = Vec::new(); - let encoder = TextEncoder::new(); - let metrics_families = prometheus::gather(); - encoder.encode(&metrics_families, &mut buffer).unwrap(); - warp::reply::with_header( - String::from_utf8(buffer).unwrap(), - "Content-Type", - "text/plain; charset=utf-8", - ) - }); - - tokio::spawn(async move { - warp::serve(metrics_route).run(([0, 0, 0, 0], 9090)).await; - }); - - // Clone all the necessary for the tcp task - let proxy_clone = self.clone(); - - tokio::spawn(async move { - while let Ok((stream, _)) = tcp_listener.accept().await { - //let cache = cache_clone.clone(); - let proxy = proxy_clone.clone(); - - tokio::spawn(async move { - Self::handle_tcp_connection(proxy, stream, 5054).await; - }); - } - }); - - let socket = Arc::new(socket); - let socket_clone = socket.clone(); - - let mut buffer = [0u8; 512]; - loop { - match socket_clone.recv_from(&mut buffer).await { - Ok((len, addr)) => { - let query = &buffer[..len]; - info!("Received {} bytes from sender: {}", len, addr); - - let response = self - .handle_udp_connection(query, &socket_clone, addr, 5053) - .await; - - if let Err(e) = socket_clone.send_to(&response, addr).await { - error!("Error sending response: {:?}", e); - } - } - Err(e) => { - error!("Error receiving packet: {}", e); - } - } - } - } - - pub async fn handle_udp_connection( - &self, - query: &[u8], - socket: &UdpSocket, - sender_addr: std::net::SocketAddr, - port: i32, - ) -> Vec { - // Extract service name, direction, and payload - let (direction, service_name, payload) = - match messaging::extract_service_name_and_payload(query) { - Some((direction, name, payload)) if !name.is_empty() => (direction, name, payload), - _ => { - error!("Invalid UDP request format"); - return Vec::new(); // Return an empty response - } - }; - - let namespace = "cortexflow"; - - match direction { - MexType::Incoming => { - info!( - "([{}]->[{}]): Processing incoming UDP message from service: {}", - sender_addr, service_name, sender_addr - ); - - // Use service discovery to resolve the request and forward response to client - if let Some(response) = self - .service_discovery - .wait_for_udp_response(&service_name, namespace, &payload, port, sender_addr) - .await - { - if let Err(e) = socket.send_to(&response, sender_addr).await { - error!( - "([{}]->[{}]):Error sending UDP response : {}", - service_name, sender_addr, e - ); - } - response - } else { - Vec::new() // Return empty if no response received - } - } - MexType::Outcoming => { - send_outcoming_message_udp(socket, service_name, sender_addr).await - } - _ => produce_unknown_message_udp(socket, service_name, sender_addr).await, - } - } - - // handles the tcp connection - pub async fn handle_tcp_connection(proxy: Self, mut stream: TcpStream, port: i32) { - let sender_addr = stream.peer_addr(); - info!("Debugging sender address: {:?}", sender_addr); - let mut buffer = [0u8; 1024]; - - match stream.read(&mut buffer).await { - Ok(size) if size > 0 => { - let query = &buffer[..size]; - info!("Received query: {:?}", query); - - match messaging::extract_service_name_and_payload(query) { - Some((direction, service_name, payload)) if !service_name.is_empty() => { - let namespace = "cortexflow"; - - match direction { - MexType::Incoming => { - info!( - "([{:?}]->[{}]):Processing request for service: {}", - sender_addr, service_name, service_name - ); - - // Forward the response to the client - if let Some(response) = proxy - .service_discovery - .send_tcp_request(&service_name, namespace, &payload, port) - .await - { - info!( - "([{}]->[{:?}]): Sending response back to client", - service_name, sender_addr - ); - if let Err(e) = stream.write_all(&response).await { - error!("Failed to send response: {}", e); - } - } else { - error!( - "Service {} not found in namespace {}", - service_name, namespace - ); - } - } - MexType::Outcoming => { - info!( - "([{}]->[{:?}]) Processing outgoing message for {}", - service_name, sender_addr, service_name - ); - send_outcoming_message(&mut stream, service_name).await; - } - _ => { - produce_unknown_message(&mut stream, service_name).await; - } - } - - send_success_ack_message(&mut stream).await; - } - _ => { - error!("Invalid or empty service name in request"); - send_fail_ack_message(&mut stream).await; - } - } - } - Ok(_) => { - info!("Received empty message"); - send_success_ack_message(&mut stream).await; - } - Err(e) => { - error!("Error: {}", e); - send_fail_ack_message(&mut stream).await; - } - } - } -} diff --git a/core/src/components/xdp/Cargo.toml b/core/src/components/xdp/Cargo.toml deleted file mode 100644 index 253d6d6..0000000 --- a/core/src/components/xdp/Cargo.toml +++ /dev/null @@ -1,37 +0,0 @@ -[package] -name = "xdp" -version = "0.1.0" -edition = "2021" - -[lib] -name = "xdp" -path = "src/lib.rs" - -[dependencies] -maps ={ path = "../maps"} -aya-ebpf = { git = "https://github.com/aya-rs/aya" } -aya-log-ebpf = { git = "https://github.com/aya-rs/aya" } -bytemuck = {version ="1.23.0",features = ["derive"]} -network-types = "0.0.8" - -[build-dependencies] -which = { version = "8.0.0", default-features = false } - -[[bin]] -name = "xdp-filter" -path = "src/filter.rs" - -[profile.dev] -panic = "abort" - -[profile.release] -panic = "abort" - -[target.bpfel-unknown-none] -linker = "bpf-linker" -rustflags = [ - "-C", "panic=abort", - "-C", "target-feature=+alu32", - "-C", "link-args=-znotext", -] - diff --git a/core/src/components/xdp/build.rs b/core/src/components/xdp/build.rs deleted file mode 100644 index f83c317..0000000 --- a/core/src/components/xdp/build.rs +++ /dev/null @@ -1,17 +0,0 @@ -use which::which; - -/// Building this crate has an undeclared dependency on the `bpf-linker` binary. This would be -/// better expressed by [artifact-dependencies][bindeps] but issues such as -/// https://github.com/rust-lang/cargo/issues/12385 make their use impractical for the time being. -/// -/// This file implements an imperfect solution: it causes cargo to rebuild the crate whenever the -/// mtime of `which bpf-linker` changes. Note that possibility that a new bpf-linker is added to -/// $PATH ahead of the one used as the cache key still exists. Solving this in the general case -/// would require rebuild-if-changed-env=PATH *and* rebuild-if-changed={every-directory-in-PATH} -/// which would likely mean far too much cache invalidation. -/// -/// [bindeps]: https://doc.rust-lang.org/nightly/cargo/reference/unstable.html?highlight=feature#artifact-dependencies -fn main() { - let bpf_linker = which("bpf-linker").unwrap(); - println!("cargo:rerun-if-changed={}", bpf_linker.to_str().unwrap()); -} diff --git a/core/src/components/xdp/build.sh b/core/src/components/xdp/build.sh deleted file mode 100755 index a4376a8..0000000 --- a/core/src/components/xdp/build.sh +++ /dev/null @@ -1,2 +0,0 @@ -echo "πŸš€ Building xdp" -cargo +nightly build -Z build-std=core --target bpfel-unknown-none --release --bin xdp-filter diff --git a/core/src/components/xdp/requirements.txt b/core/src/components/xdp/requirements.txt deleted file mode 100644 index 726852e..0000000 --- a/core/src/components/xdp/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -To correctly build the component you must have bpf-linker installed - -installation: -cargo install --git https://github.com/aya-rs/bpf-linker diff --git a/core/src/components/xdp/src/filter.rs b/core/src/components/xdp/src/filter.rs deleted file mode 100644 index 685ac1b..0000000 --- a/core/src/components/xdp/src/filter.rs +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Contains the code for the kernel xdp manipulation. this code lives in - * the kernel space only and needs to be attached to a "main" program that lives in the user space -*/ - -#![no_std] // * no standard library -#![no_main] // * no main entrypoint - -use aya_ebpf::{bindings::xdp_action, macros::xdp, programs::XdpContext}; -use aya_log_ebpf::{debug, error, info}; - -use core::mem; -use maps::map::{SVCKey, SVCValue, SERVICES}; -use network_types::{ - eth::{EthHdr, EtherType}, - ip::{IpProto, Ipv4Hdr}, - tcp::TcpHdr, - udp::UdpHdr, -}; - -/* -* init xdp program -*/ -#[xdp] -pub fn xdp_hello(ctx: XdpContext) -> u32 { - match unsafe { xdp_firewall(&ctx) } { - Ok(ret) => ret, - Err(_) => xdp_action::XDP_ABORTED, - } -} - -unsafe fn init_xdp(ctx: &XdpContext) -> Result { - info!(ctx, "Received a packet"); - Ok(xdp_action::XDP_PASS) -} - -#[panic_handler] -fn panic(_info: &core::panic::PanicInfo) -> ! { - loop {} -} - -// * getting packet data from raw packets -#[inline(always)] //inline -fn ptr_at(ctx: &XdpContext, offset: usize) -> Result<*const T, ()> { - let start = ctx.data(); - let end = ctx.data_end(); - let len = mem::size_of::(); - - if start + offset + len > end { - return Err(()); - } - Ok((start + offset) as *const T) -} - -//TODO:safe the result of the firewall into a bpf hash map and perform a redirect -/* -* XDP firewall -* Usage: -* 1. Drop packets from the 443 port (only fo development test ) -* 2. Log TCP and UDP traffic -* 3. //TODO: rebuild firewall policy to efficiently filter traffic -* 4. //TODO: use ConnArray declared in conntracker program to discover services and implement policies -*/ -fn xdp_firewall(ctx: &XdpContext) -> Result { - let ethhdr: *const EthHdr = ptr_at(ctx, 0)?; - match unsafe { (*ethhdr).ether_type } { - EtherType::Ipv4 => {} - _ => return Ok(xdp_action::XDP_PASS), - } - - let ipv4hdr: *const Ipv4Hdr = ptr_at(ctx, EthHdr::LEN)?; - let source_addr = u32::from_be_bytes(unsafe { (*ipv4hdr).src_addr }); - - // handle protocols - match unsafe { (*ipv4hdr).proto } { - IpProto::Tcp => { - let tcphdr: *const TcpHdr = ptr_at(ctx, EthHdr::LEN + Ipv4Hdr::LEN)?; - let port = u16::from_be(unsafe { (*tcphdr).source }); - if port == 443 { - return Ok(xdp_action::XDP_PASS); - } else { - info!( - ctx, - "Received TCP packet from IP: {:i} PORT: {}", source_addr, port - ); - } - } - IpProto::Udp => { - let udphdr: *const UdpHdr = ptr_at(ctx, EthHdr::LEN + Ipv4Hdr::LEN)?; - let port = u16::from_be_bytes(unsafe { (*udphdr).source }); - if port == 443 { - return Ok(xdp_action::XDP_PASS); - } else { - info!( - ctx, - "Received UDP packet from IP: {:i} PORT: {}", source_addr, port - ); - //TODO: saving the packet address here will only store the ip and port - // of the gateway (in this case the minikube node ip)--> i need to save the ip in the user space - debug!( - ctx, - "Inserting key: {:i} and value {:i} into the services bpf map", - source_addr, - port as u32 - ); - let key = SVCKey { - service_name: maps::map::u32_to_u8_64(source_addr), - }; - let value = SVCValue { - ip: maps::map::u32_to_u8_4(source_addr.into()), - port: port as u32, - }; - let res = unsafe { SERVICES.insert(&key, &value, 0) }; - match res { - Ok(_) => { - return Ok(xdp_action::XDP_PASS); - } - Err(_) => { - error!(ctx, "Error inserting element into bpf map"); - return Err(()); - } - } - } - } - _ => return Ok(xdp_action::XDP_DROP), - }; - - Ok(xdp_action::XDP_PASS) -} diff --git a/core/src/components/xdp/src/lib.rs b/core/src/components/xdp/src/lib.rs deleted file mode 100644 index 3950067..0000000 --- a/core/src/components/xdp/src/lib.rs +++ /dev/null @@ -1,6 +0,0 @@ -#![no_std] -#![no_main] - - -pub mod filter; - diff --git a/core/src/components/xdp/src/mod.rs b/core/src/components/xdp/src/mod.rs deleted file mode 100644 index 52f4d03..0000000 --- a/core/src/components/xdp/src/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -#![no_std] -#![no_main] - -pub mod map; -pub mod filter; \ No newline at end of file From ccb9265f129d6d906a1aa9b2115f5cb3585aa7e8 Mon Sep 17 00:00:00 2001 From: poinT92 Date: Sun, 24 Aug 2025 20:06:16 +0200 Subject: [PATCH 3/5] Fix dependency cleanup: restore needed dependencies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Restored serde_yaml: used in essential.rs for config handling - Restored tokio: required for async functionality - Removed only tokio-stream: actually unused in CLI code - Updated Cargo.lock with correct dependencies This corrects the previous overly aggressive dependency removal. πŸ€– Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- cli/Cargo.lock | 180 +++++++++++++++++++++++++++++-------------------- cli/Cargo.toml | 2 + 2 files changed, 110 insertions(+), 72 deletions(-) diff --git a/cli/Cargo.lock b/cli/Cargo.lock index 4da951a..740da77 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -34,9 +34,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "anstream" -version = "0.6.19" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933" +checksum = "3ae563653d1938f79b1ab1b5e668c87c76a9930414574a6583a7b7e11a8e6192" dependencies = [ "anstyle", "anstyle-parse", @@ -64,29 +64,29 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9" +checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] name = "anstyle-wincon" -version = "3.0.9" +version = "3.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882" +checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] name = "anyhow" -version = "1.0.98" +version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" [[package]] name = "api" @@ -261,9 +261,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bitflags" -version = "2.9.1" +version = "2.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +checksum = "34efbcccd345379ca2868b2b2c9d3782e9cc58ba87bc7d79d5b53d9c9ae6f25d" [[package]] name = "bytemuck" @@ -293,9 +293,9 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "cfg-if" -version = "1.0.1" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" +checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" [[package]] name = "cfg_aliases" @@ -305,9 +305,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "clap" -version = "4.5.41" +version = "4.5.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be92d32e80243a54711e5d7ce823c35c41c9d929dc4ab58e1276f625841aadf9" +checksum = "1fc0e74a703892159f5ae7d3aac52c8e6c392f5ae5f359c70b5881d60aaac318" dependencies = [ "clap_builder", "clap_derive", @@ -315,9 +315,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.41" +version = "4.5.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "707eab41e9622f9139419d573eca0900137718000c517d47da73045f54331c3d" +checksum = "b3e7f4214277f3c7aa526a59dd3fbe306a370daee1f8b7b8c987069cd8e888a8" dependencies = [ "anstream", "anstyle", @@ -327,9 +327,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.41" +version = "4.5.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef4f52386a59ca4c860f7393bcf8abd8dfd91ecccc0f774635ff68e92eeef491" +checksum = "14cb31bb0a7d536caef2639baa7fad459e15c3144efefa6dbd1c84562c4739f6" dependencies = [ "heck", "proc-macro2", @@ -379,6 +379,8 @@ dependencies = [ "prost", "prost-types", "serde", + "serde_yaml", + "tokio", "tonic", "tonic-reflection", "tracing", @@ -549,9 +551,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.4" +version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ "allocator-api2", "equivalent", @@ -612,13 +614,14 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" +checksum = "eb3aa54a13a0dfe7fbe3a59e0c76093041720fdc77b110cc0fc260fafb4dc51e" dependencies = [ + "atomic-waker", "bytes", "futures-channel", - "futures-util", + "futures-core", "h2", "http", "http-body", @@ -626,6 +629,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", + "pin-utils", "smallvec", "tokio", "want", @@ -684,9 +688,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" +checksum = "f2481980430f9f78649238835720ddccc57e52df14ffce1c6f37391d61b563e9" dependencies = [ "equivalent", "hashbrown", @@ -694,9 +698,9 @@ dependencies = [ [[package]] name = "io-uring" -version = "0.7.9" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" +checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" dependencies = [ "bitflags", "cfg-if", @@ -732,15 +736,15 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.174" +version = "0.2.175" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" +checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" [[package]] name = "libredox" -version = "0.1.4" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1580801010e535496706ba011c15f8532df6b42297d2e471fec38ceadd8c0638" +checksum = "391290121bad3d37fbddad76d8f5d1c1c314cfc646d143d7e07a3086ddff0ce3" dependencies = [ "bitflags", "libc", @@ -935,9 +939,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.1" +version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "petgraph" @@ -983,9 +987,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "prettyplease" -version = "0.2.36" +version = "0.2.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff24dfcda44452b9816fff4cd4227e1bb73ff5a2f1bc1105aa92fb8565ce44d2" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", "syn", @@ -993,9 +997,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.95" +version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" dependencies = [ "unicode-ident", ] @@ -1100,25 +1104,25 @@ dependencies = [ [[package]] name = "redox_users" -version = "0.5.0" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b" +checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" dependencies = [ "getrandom 0.2.16", "libredox", - "thiserror 2.0.12", + "thiserror 2.0.16", ] [[package]] name = "regex" -version = "1.11.1" +version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.9", - "regex-syntax 0.8.5", + "regex-automata 0.4.10", + "regex-syntax 0.8.6", ] [[package]] @@ -1132,13 +1136,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +checksum = "6b9458fa0bfeeac22b5ca447c63aaf45f28439a709ccd244698632f9aa6394d6" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.5", + "regex-syntax 0.8.6", ] [[package]] @@ -1149,15 +1153,15 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" [[package]] name = "rustc-demangle" -version = "0.1.25" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" +checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" [[package]] name = "rustix" @@ -1174,9 +1178,15 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.21" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "ryu" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" [[package]] name = "scopeguard" @@ -1204,6 +1214,19 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -1224,9 +1247,9 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" [[package]] name = "smallvec" @@ -1252,9 +1275,9 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "syn" -version = "2.0.104" +version = "2.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" +checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" dependencies = [ "proc-macro2", "quote", @@ -1269,15 +1292,15 @@ checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" [[package]] name = "tempfile" -version = "3.20.0" +version = "3.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" +checksum = "15b61f8f20e3a6f7e0649d825294eaf317edce30f82cf6026e7e4cb9222a7d1e" dependencies = [ "fastrand", "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -1291,11 +1314,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.12" +version = "2.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0" dependencies = [ - "thiserror-impl 2.0.12", + "thiserror-impl 2.0.16", ] [[package]] @@ -1311,9 +1334,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.12" +version = "2.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960" dependencies = [ "proc-macro2", "quote", @@ -1331,9 +1354,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.47.0" +version = "1.47.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43864ed400b6043a4757a25c7a64a8efde741aed79a056a2fb348a406701bb35" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" dependencies = [ "backtrace", "bytes", @@ -1576,6 +1599,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "utf8parse" version = "0.2.2" @@ -1640,6 +1669,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-link" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" + [[package]] name = "windows-sys" version = "0.59.0" @@ -1655,7 +1690,7 @@ version = "0.60.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" dependencies = [ - "windows-targets 0.53.2", + "windows-targets 0.53.3", ] [[package]] @@ -1676,10 +1711,11 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.53.2" +version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c66f69fcc9ce11da9966ddb31a40968cad001c5bedeb5c2b82ede4253ab48aef" +checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" dependencies = [ + "windows-link", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 691e94f..f00b01d 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -14,7 +14,9 @@ clap = { version = "4.5.38", features = ["derive"] } colored = "3.0.0" directories = "6.0.0" serde = { version = "1.0.219", features = ["derive"] } +serde_yaml = "0.9.34" tracing = "0.1.41" +tokio = "1.47.0" anyhow = "1.0.98" api = { path = "../core/api" } tonic = "0.14.1" From 83ec6a63a4dbc72d592f18e374afcdcf4e21e132 Mon Sep 17 00:00:00 2001 From: poinT92 Date: Sun, 24 Aug 2025 20:12:53 +0200 Subject: [PATCH 4/5] Remove unused network-types dependency from metrics_tracer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - network-types dependency was not used in any source files - Only found in auto-generated bindings.rs as field names, not imports - This completes the thorough dependency cleanup across all components πŸ€– Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- core/src/components/metrics_tracer/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/components/metrics_tracer/Cargo.toml b/core/src/components/metrics_tracer/Cargo.toml index 0660036..4816770 100644 --- a/core/src/components/metrics_tracer/Cargo.toml +++ b/core/src/components/metrics_tracer/Cargo.toml @@ -7,7 +7,6 @@ edition = "2024" aya-ebpf = { git = "https://github.com/aya-rs/aya" } aya-log-ebpf = { git = "https://github.com/aya-rs/aya" } bytemuck = {version ="1.23.0",features = ["derive"]} -network-types = "0.0.8" [build-dependencies] which = { version = "7.0.0", default-features = false } From 2d718a3ae16cf77de0a7805288e196a842cd246f Mon Sep 17 00:00:00 2001 From: poinT92 Date: Mon, 25 Aug 2025 11:37:31 +0200 Subject: [PATCH 5/5] Update .gitignore and core/Readme.md based on PR feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove unused entries from .gitignore as requested - Update core/Readme.md to reflect only existing components - Remove references to non-implemented Loadbalancer, Maps, and XDP components πŸ€– Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .gitignore | 3 --- core/Readme.md | 3 --- 2 files changed, 6 deletions(-) diff --git a/.gitignore b/.gitignore index ce3fb08..5c9518e 100644 --- a/.gitignore +++ b/.gitignore @@ -172,9 +172,6 @@ skbuff.rs admission-webhook.yaml certificate-manager.yaml client-deployment.yaml -dns-deployment.yaml -proxy-injector.yaml -core/src/components/conntracker/src/bindings.rs # Claude AI assistant working files CLAUDE.md diff --git a/core/Readme.md b/core/Readme.md index e45a8d2..747dd31 100644 --- a/core/Readme.md +++ b/core/Readme.md @@ -21,7 +21,4 @@ If you're interested or have any questions, feel free to contact: | **Identity** | User Space program that uses Conntracker component and displays active connections in the cluster | - [[92]](https://github.com/CortexFlow/CortexBrain/issues/92)
- [Core](https://github.com/CortexFlow/CortexBrain/milestone/1) | **Metrics_tracer** | Kernel Space program that collects the main CortexBrain metrics collectors | - [[91]](https://github.com/CortexFlow/CortexBrain/issues/78)
- [Core](https://github.com/CortexFlow/CortexBrain/milestone/1) | **Metrics** | User Space implementation of the metrics_tracer BPF scripts. The metrics crate also aggregates, stores, and hosts the main data processing functions | - [[91]](https://github.com/CortexFlow/CortexBrain/issues/78)
- [Core](https://github.com/CortexFlow/CortexBrain/milestone/1) - | **Loadbalancer** | User space component. One of the core functionalities. The load balancer uses XDP, Maps, and Identity to distribute traffic across multiple backends in the cluster using optimized load balancing techniques | - [[91]](https://github.com/CortexFlow/CortexBrain/issues/91)
- [[Core]](https://github.com/CortexFlow/CortexBrain/milestone/1) - | **Maps** | Contains all the BPF maps used in the XDP component | / - | **XDP** | Kernel Space program that operates in the XDP hook to filter traffic and apply access policies in the cluster | - [[91]](https://github.com/CortexFlow/CortexBrain/issues/91)
- [Core](https://github.com/CortexFlow/CortexBrain/milestone/1) \ No newline at end of file