diff --git a/.gitignore b/.gitignore index 919687b..26701db 100644 --- a/.gitignore +++ b/.gitignore @@ -173,4 +173,5 @@ admission-webhook.yaml certificate-manager.yaml client-deployment.yaml dns-deployment.yaml -proxy-injector.yaml \ No newline at end of file +proxy-injector.yaml +core/src/components/conntracker/src/bindings.rs diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 3539789..9aea01f 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -2,13 +2,17 @@ name = "cortexflow-cli" version = "0.1.0" edition = "2024" +description = "CortexFlow command line interface made to interact with the CortexBrain core components" +authors = ["Lorenzo Tettamanti", "Pranav Verma", "Lorenzo Bradanini"] +documentation = "https://docs.cortexflow.org" +homepage = "https://docs.cortexflow.org" +repository = "https://github.com/CortexFlow/CortexBrain" +license = "Apache-2.0" [dependencies] clap = { version = "4.5.38", features = ["derive"] } tracing = "0.1.41" -#commented until first release - -#[[bin]] -#name="cfcli" -#path="src/main.rs" \ No newline at end of file +[[bin]] +name = "cfcli" +path = "src/main.rs" diff --git a/core/src/components/conntracker/Cargo.toml b/core/src/components/conntracker/Cargo.toml index f43c4a8..52387cf 100644 --- a/core/src/components/conntracker/Cargo.toml +++ b/core/src/components/conntracker/Cargo.toml @@ -3,6 +3,7 @@ name = "conntracker" version = "0.1.0" edition = "2021" + [dependencies] aya-ebpf = { git = "https://github.com/aya-rs/aya" } aya-log-ebpf = { git = "https://github.com/aya-rs/aya" } diff --git a/core/src/components/conntracker/build-conntracker.sh b/core/src/components/conntracker/build-conntracker.sh index 4d2f456..d72f30d 100755 --- a/core/src/components/conntracker/build-conntracker.sh +++ b/core/src/components/conntracker/build-conntracker.sh @@ -1,2 +1,20 @@ echo "🚀 Building connection tracker" + +bpftool btf dump file /sys/kernel/btf/vmlinux format c > vmlinux.h + +if [ $? -ne 0 ]; then + echo "Error: Failed to dump BTF from vmlinux. Ensure bpftool is installed and has access to the kernel BTF." + exit 1 +fi + +bindgen vmlinux.h -o src/bindings.rs --use-core --allowlist-type 'sk_buff' + + +if ! command -v bindgen &> /dev/null; then + echo "bindgen not found, installing..." + cargo install bindgen-cli +fi + cargo +nightly build -Z build-std=core --target bpfel-unknown-none --release --bin conntracker + +rm -f vmlinux.h \ No newline at end of file diff --git a/core/src/components/conntracker/build.rs b/core/src/components/conntracker/build.rs index f83c317..c996ce6 100644 --- a/core/src/components/conntracker/build.rs +++ b/core/src/components/conntracker/build.rs @@ -1,3 +1,4 @@ + use which::which; /// Building this crate has an undeclared dependency on the `bpf-linker` binary. This would be diff --git a/core/src/components/conntracker/src/main.rs b/core/src/components/conntracker/src/main.rs index f1b6883..7c31a44 100644 --- a/core/src/components/conntracker/src/main.rs +++ b/core/src/components/conntracker/src/main.rs @@ -1,4 +1,4 @@ -/* +/* TODO: this part needs an update * * This file contains the code for the identity service * @@ -16,15 +16,16 @@ #![allow(warnings)] //mod skbuff; +//mod veth_trace; +mod bindings; use bytemuck::{ Pod, Zeroable }; use aya_ebpf::{ bindings::{ TC_ACT_OK, TC_ACT_SHOT }, - macros::{ classifier, map, kprobe, tracepoint }, - maps::PerfEventArray, - maps::LruPerCpuHashMap, - programs::{ TcContext, TracePointContext }, - helpers::{ bpf_probe_read_kernel, bpf_ktime_get_ns }, + helpers::{ bpf_ktime_get_ns, bpf_probe_read_kernel, bpf_probe_read_kernel_str_bytes }, + macros::{ classifier, kprobe, map, tracepoint }, + maps::{ LruPerCpuHashMap, PerfEventArray }, + programs::{ ProbeContext, TcContext, TracePointContext }, }; use aya_ebpf::EbpfContext; //use crate::skbuff::{ sock, sock_common }; @@ -40,6 +41,8 @@ use network_types::{ udp::UdpHdr, }; use core::ptr::addr_of; + +use crate::bindings::net_device; /* * ETHERNET TYPE II FRAME: * Reference: https://it.wikipedia.org/wiki/Frame_Ethernet @@ -109,6 +112,7 @@ pub struct ConnArray { #[map(name = "EventsMap")] static mut EVENTS: PerfEventArray = PerfEventArray::new(0); +//TODO: ConnectionMap needs a rework after implementing issue #105 #[map(name = "ConnectionMap")] pub static mut ACTIVE_CONNECTIONS: LruPerCpuHashMap< u16, @@ -121,6 +125,18 @@ pub static mut CONNTRACKER: LruPerCpuHashMap = LruPerCpuHashMap:: 0 ); +#[repr(C)] +#[derive(Clone, Copy, Zeroable, Debug)] +struct VethLog { + name: [u8; 16], + state: u64, //state var type: long unsigned int + dev_addr: [u32; 8], + event_type: u8, //i choose 1 for veth creation or 2 for veth destruction +} + +#[map(name = "veth_identity_map")] +pub static mut VETH_CREATION_EVENTS: PerfEventArray = PerfEventArray::new(0); + const IPV4_ETHERTYPE: u16 = 0x0800; //IPV4 STACK @@ -148,19 +164,177 @@ const AF_INET6: u16 = 10; //ipv6 const IPPROTO_UDP: u8 = 17; const IPPROTO_TCP: u8 = 6; -//TODO: add kprobe tracing for process ID -//kprobe docs: https://docs.kernel.org/trace/kprobes.html - /* constants */ +//FIXME: this will be deprecated after solving issue #105 const HOST_NETNS_INUM: u32 = 4026531993; const KUBE_POD_CIDR: u32 = 0x0af40000; // 10.244.0.0/16 - /* Helper Functions */ #[inline] unsafe fn is_kube_internal(ip: u32) -> bool { (ip & 0xffff0000) == KUBE_POD_CIDR } +#[kprobe] +pub fn veth_creation_trace(ctx: ProbeContext) -> u32 { + match try_veth_creation_trace(ctx) { + Ok(ret_val) => ret_val, + Err(ret_val) => ret_val.try_into().unwrap_or(1), + } +} +#[kprobe] +pub fn veth_deletion_trace(ctx: ProbeContext) -> u32 { + match try_veth_deletion_trace(ctx) { + Ok(ret_val) => ret_val, + Err(ret_val) => ret_val.try_into().unwrap_or(1), + } +} + + +pub fn try_veth_creation_trace(ctx: ProbeContext) -> Result { + let net_device_pointer: *const net_device = ctx.arg(0).ok_or(1i64)?; + + // first control: i'm, verifying that the pointer is not null + if net_device_pointer.is_null() { + return Err(1); + } + + let name_field_offset = 304; // reading the name field offset + //pahole commands: + //syntax pahole -C + // pahole -C net_device | grep name + + let dev_addr_offset = 1080; + let state_offset = 168; + + let name_pointer = unsafe { (net_device_pointer as *const u8).add(name_field_offset) }; + let dev_addr_pointer = unsafe { (net_device_pointer as *const u8).add(dev_addr_offset) }; + let state_pointer = unsafe { (net_device_pointer as *const u8).add(state_offset) }; + + let mut name_buf = [0u8; 16]; + let mut dev_addr_buf = [0u32; 8]; + + let name_array_ptr = name_pointer as *const [u8; 16]; + let dev_addr_ptr_array = dev_addr_pointer as *const [u32; 8]; + + let name_array = unsafe { + match bpf_probe_read_kernel(name_array_ptr) { + Ok(arr) => arr, + Err(ret) => { + return Err(ret); + } + } + }; + + let state=unsafe { + match bpf_probe_read_kernel(state_pointer) { + Ok(s)=>s, + Err(ret)=>{ + return Err(ret); + } + } + }; + + let dev_addr_array = unsafe { + match bpf_probe_read_kernel(dev_addr_ptr_array) { + Ok(arr) => arr, + Err(ret) => { + return Err(ret); + } + } + }; + + name_buf.copy_from_slice(&name_array); + dev_addr_buf.copy_from_slice(&dev_addr_array); + + let veth_data = VethLog { + name: name_buf, + state: state.into(), + dev_addr: dev_addr_buf, + event_type: 1 + }; + + //send the data to the userspace + unsafe { + VETH_CREATION_EVENTS.output(&ctx, &veth_data, 0); + } + + Ok(0) +} + +pub fn try_veth_deletion_trace(ctx: ProbeContext) -> Result { + let net_device_pointer: *const net_device = ctx.arg(0).ok_or(1i64)?; + + // first control: i'm, verifying that the pointer is not null + if net_device_pointer.is_null() { + return Err(1); + } + + let name_field_offset = 304; // reading the name field offset + //pahole commands: + //syntax pahole -C + // pahole -C net_device | grep name + + let dev_addr_offset = 1080; + let state_offset = 168; + + let name_pointer = unsafe { (net_device_pointer as *const u8).add(name_field_offset) }; + let dev_addr_pointer = unsafe { (net_device_pointer as *const u8).add(dev_addr_offset) }; + let state_pointer = unsafe { (net_device_pointer as *const u8).add(state_offset) }; + + let mut name_buf = [0u8; 16]; + let mut dev_addr_buf = [0u32; 8]; + + let name_array_ptr = name_pointer as *const [u8; 16]; + let dev_addr_ptr_array = dev_addr_pointer as *const [u32; 8]; + + let name_array = unsafe { + match bpf_probe_read_kernel(name_array_ptr) { + Ok(arr) => arr, + Err(ret) => { + return Err(ret); + } + } + }; + + let state=unsafe { + match bpf_probe_read_kernel(state_pointer) { + Ok(s)=>s, + Err(ret)=>{ + return Err(ret); + } + } + }; + + let dev_addr_array = unsafe { + match bpf_probe_read_kernel(dev_addr_ptr_array) { + Ok(arr) => arr, + Err(ret) => { + return Err(ret); + } + } + }; + + name_buf.copy_from_slice(&name_array); + dev_addr_buf.copy_from_slice(&dev_addr_array); + + let veth_data = VethLog { + name: name_buf, + state: state.into(), + dev_addr: dev_addr_buf, + event_type: 2 + }; + + //send the data to the userspace + unsafe { + VETH_CREATION_EVENTS.output(&ctx, &veth_data, 0); + } + + Ok(0) +} + + + + #[classifier] pub fn identity_classifier(ctx: TcContext) -> i32 { match try_identity_classifier(ctx) { @@ -200,9 +374,9 @@ fn try_identity_classifier(ctx: TcContext) -> Result<(), i64> { //not logging internal communication packets //TODO: do not log internal communications such as minikube dashboard packets or kubectl api packets - //FIXME: this part is not working properly because the ip associated in the k8s environment constantly changes every restart + //FIXME: this part is not working properly because the ip associated in the k8s environment constantly changes every restart let ip_to_block = u32::from_be_bytes([90, 120, 244, 10]); // kubernetes-dashboard internal ip - let ip_to_block_2 = u32::from_be_bytes([87, 120, 244, 10]); // cert manager internal ip + let ip_to_block_2 = u32::from_be_bytes([87, 120, 244, 10]); // cert manager internal ip let ip_to_block_3 = u32::from_be_bytes([89, 120, 244, 10]); // kube-system internal ip let ip_to_block_4 = u32::from_be_bytes([88, 120, 244, 10]); // other kuber-system internal ip @@ -262,4 +436,4 @@ fn try_identity_classifier(ctx: TcContext) -> Result<(), i64> { fn panic(_info: &core::panic::PanicInfo) -> ! { loop { } -} +} \ No newline at end of file diff --git a/core/src/components/conntracker/src/mod.rs b/core/src/components/conntracker/src/mod.rs new file mode 100644 index 0000000..4494657 --- /dev/null +++ b/core/src/components/conntracker/src/mod.rs @@ -0,0 +1 @@ +pub mod bindings; \ No newline at end of file diff --git a/core/src/components/identity/src/helpers.rs b/core/src/components/identity/src/helpers.rs index 84b4f57..4caacee 100644 --- a/core/src/components/identity/src/helpers.rs +++ b/core/src/components/identity/src/helpers.rs @@ -1,34 +1,63 @@ +use crate::enums::IpProtocols; +use crate::structs::{PacketLog, VethLog}; +use aya::programs::tc::SchedClassifierLinkId; use aya::{ - maps::{ perf::{ PerfEventArray, PerfEventArrayBuffer }, MapData }, - programs::{ SchedClassifier, TcAttachType }, - util::online_cpus, Bpf, + maps::{ + MapData, + perf::{PerfEventArray, PerfEventArrayBuffer}, + }, + programs::{SchedClassifier, TcAttachType}, + util::online_cpus, }; -use crate::structs::PacketLog; use bytes::BytesMut; +use nix::net::if_::if_nameindex; +use std::collections::HashMap; +use std::sync::Mutex; use std::{ + ascii, borrow::BorrowMut, net::Ipv4Addr, string, - sync::{ atomic::{ AtomicBool, Ordering }, Arc }, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, }; -use crate::enums::IpProtocols; -use tracing::{ info, error, warn }; -use nix::net::if_::if_nameindex; +use tracing::{error, event, info, warn}; -use tokio::{ fs, signal }; -use std::path::Path; use anyhow::Context; +use std::path::Path; +use tokio::{fs, signal}; /* * decleare bpf path env variable */ const BPF_PATH: &str = "BPF_PATH"; const IFACE: &str = "IFACE"; +use std::result::Result::Ok as Okk; + +/* + * TryFrom Trait implementation for IpProtocols enum + * This is used to reconstruct the packet protocol based on the + * IPV4 Header Protocol code + */ + +impl TryFrom for IpProtocols { + type Error = (); + fn try_from(proto: u8) -> Result { + match proto { + 1 => Ok(IpProtocols::ICMP), + 6 => Ok(IpProtocols::TCP), + 17 => Ok(IpProtocols::UDP), + _ => Err(()), + } + } +} pub async fn display_events>( mut perf_buffers: Vec>, running: Arc, - mut buffers: Vec + mut buffers: Vec, ) { while running.load(Ordering::SeqCst) { for buf in perf_buffers.iter_mut() { @@ -37,9 +66,8 @@ pub async fn display_events>( for i in 0..events.read { let data = &buffers[i]; if data.len() >= std::mem::size_of::() { - let pl: PacketLog = unsafe { - std::ptr::read(data.as_ptr() as *const _) - }; + let pl: PacketLog = + unsafe { std::ptr::read(data.as_ptr() as *const _) }; let src = Ipv4Addr::from(u32::from_be(pl.src_ip)); let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip)); let src_port = u16::from_be(pl.src_port as u16); @@ -50,16 +78,12 @@ pub async fn display_events>( Ok(proto) => { info!( "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}", - event_id, - proto, - src, - src_port, - dst, - dst_port + event_id, proto, src, src_port, dst, dst_port ); } - Err(_) => - info!("Event Id: {} Protocol: Unknown ({})", event_id, pl.proto), + Err(_) => { + info!("Event Id: {} Protocol: Unknown ({})", event_id, pl.proto) + } }; } else { warn!("Received packet data too small: {} bytes", data.len()); @@ -75,6 +99,72 @@ pub async fn display_events>( } } +pub async fn display_veth_events>( + bpf: Arc>, + mut perf_buffers: Vec>, + running: Arc, + mut buffers: Vec, + mut link_ids: Arc>>, +) { + while running.load(Ordering::SeqCst) { + for buf in perf_buffers.iter_mut() { + match buf.read_events(&mut buffers) { + Ok(events) => { + for i in 0..events.read { + let data = &buffers[i]; + if data.len() >= std::mem::size_of::() { + let vethlog: VethLog = + unsafe { std::ptr::read(data.as_ptr() as *const _) }; + + let name_bytes = vethlog.name; + + let dev_addr_bytes = vethlog.dev_addr.to_vec(); + let name = std::str::from_utf8(&name_bytes); + let state = vethlog.state; + + let dev_addr = dev_addr_bytes; + let mut event_type = String::new(); + match vethlog.event_type { + 1 => { + event_type = "creation".to_string(); + } + 2 => { + event_type = "deletion".to_string(); + } + _ => warn!("unknown event_type"), + } + match name { + Ok(veth_name) => { + info!( + "Triggered action: register_netdevice event_type:{:?} Manipulated veth: {:?} state:{:?} dev_addr:{:?}", + event_type, + veth_name.trim_end_matches("\0").to_string(), + state, + dev_addr + ); + attach_detach_veth(bpf.clone(), vethlog.event_type, veth_name, link_ids.clone()).await; + } + Err(_) => info!("Unknown name or corrupted field"), + } + } else { + warn!("Corrupted data"); + } + } + } + Err(e) => { + error!("Error reading veth events: {:?}", e); + } + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } +} + +pub fn ignore_iface(iface: &str) -> bool { + let ignored_interfaces = ["eth0", "docker0", "tunl0", "lo"]; + ignored_interfaces.contains(&iface) +} + //filter the interfaces,exclude docker0,eth0,lo interfaces pub fn get_veth_channels() -> Vec { //filter interfaces and save the output in the @@ -83,18 +173,61 @@ pub fn get_veth_channels() -> Vec { if let Ok(ifaces) = if_nameindex() { for iface in &ifaces { let iface_name = iface.name().to_str().unwrap().to_owned(); - if - iface_name != "eth0" && - iface_name != "docker0" && - iface_name != "tunl0" && - iface_name != "lo" + if !ignore_iface(&iface_name) { interfaces.push(iface_name); } else { - info!("skipping interface"); + info!("skipping interface {:?}", iface_name); } } } interfaces } + +async fn attach_detach_veth(bpf: Arc>, event_type: u8, iface: &str, link_ids: Arc>>) -> Result<(), anyhow::Error> { + info!("attach_detach_veth called: event_type={}, iface={}", event_type, iface); + match event_type { + 1 => { + let mut bpf = bpf.lock().unwrap(); + let program: &mut SchedClassifier = bpf + .program_mut("identity_classifier") + .ok_or_else(|| anyhow::anyhow!("program 'identity_classifier' not found"))? + .try_into()?; + + let iface = iface.trim_end_matches('\0'); + + if ignore_iface(iface) { + info!("Skipping ignored interface: {}", iface); + return Ok(()); + } + + let mut link_ids = link_ids.lock().unwrap(); + match program.attach(iface, TcAttachType::Ingress) { + Ok(link_id) => { + info!("Program 'identity_classifier' attached to interface {}", iface); + link_ids.insert(iface.to_string(), link_id); + + }, + Err(e) => error!("Error attaching program to interface {}: {:?}", iface, e), + } + } + 2 => { + // INFO: Detaching occurs automatically when veth is deleted by kernel itsel + let mut link_ids = link_ids.lock().unwrap(); + match link_ids.remove(iface) { + Some(_) => { + info!("Successfully detached program from interface {}", iface); + } + None => { + error!("Interface {} not found in link_ids", iface); + return Err(anyhow::anyhow!("Interface {} not found in link_ids", iface)); + } + } + } + _ => { + error!("Unknown event type: {}", event_type); + } + } + Ok(()) +} \ No newline at end of file diff --git a/core/src/components/identity/src/main.rs b/core/src/components/identity/src/main.rs index d4d03e2..b870acd 100644 --- a/core/src/components/identity/src/main.rs +++ b/core/src/components/identity/src/main.rs @@ -1,62 +1,48 @@ /* * CortexBrain Identity Service + * Open Issues: #105 #107 * Features: * 1. TCP, UDP , ICMP events tracker * 2. Track Connections using a PerfEventArray named ConnArray + * 3. Track veth creation and deletion events * */ #![allow(warnings)] #![allow(unused_mut)] +mod enums; mod helpers; mod structs; -mod enums; use aya::{ - maps::{ perf::{ PerfEventArray, PerfEventArrayBuffer }, MapData }, - programs::{ SchedClassifier, TcAttachType }, - util::online_cpus, - Bpf, + maps::{ + perf::{PerfEventArray, PerfEventArrayBuffer}, Map, MapData + }, programs::{tc::SchedClassifierLinkId, KProbe, SchedClassifier, TcAttachType}, util::online_cpus, Bpf, Ebpf }; -use bytes::BytesMut; -use std::{ convert::TryInto, sync::{ atomic::{ AtomicBool, Ordering }, Arc }, path::Path }; -use crate::helpers::{ display_events, get_veth_channels }; use crate::enums::IpProtocols; +use crate::helpers::{display_events, display_veth_events, get_veth_channels}; +use bytes::BytesMut; +use std::{ + convert::TryInto, + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, Arc, Mutex + }, +}; -use tokio::{ signal, fs }; -use anyhow::Context; -use tracing_subscriber::{ fmt::format::FmtSpan, EnvFilter }; -use tracing::{ info, error, warn }; - -/* - * TryFrom Trait implementation for IpProtocols enum - * This is used to reconstruct the packet protocol based on the - * IPV4 Header Protocol code - */ - -impl TryFrom for IpProtocols { - type Error = (); - fn try_from(proto: u8) -> Result { - match proto { - 1 => Ok(IpProtocols::ICMP), - 6 => Ok(IpProtocols::TCP), - 17 => Ok(IpProtocols::UDP), - _ => Err(()), - } - } -} +use anyhow::{Context, Ok}; +use tokio::{fs, signal, sync::broadcast::error}; +use tracing::{error, info, warn}; +use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan}; -/* - * decleare bpf path env variable - */ -const BPF_PATH: &str = "BPF_PATH"; -//const IFACE: &str = "IFACE"; +const BPF_PATH: &str = "BPF_PATH"; //BPF env path +use std::collections::HashMap; +use std::result::Result::Ok as Okk; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { //init tracing subscriber - tracing_subscriber - ::fmt() + tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .with_level(true) @@ -70,74 +56,206 @@ async fn main() -> Result<(), anyhow::Error> { info!("Starting identity service..."); info!("fetching data"); + // To Store link_ids they can be used to detach tc + let link_ids = Arc::new(Mutex::new(HashMap::::new())); + //init conntracker data path let bpf_path = std::env::var(BPF_PATH).context("BPF_PATH environment variable required")?; - let data = fs::read(Path::new(&bpf_path)).await.context("failed to load file from path")?; + let data = fs::read(Path::new(&bpf_path)) + .await + .context("failed to load file from path")?; //init bpf data - let mut bpf = Bpf::load(&data)?; + let bpf = Arc::new(Mutex::new(Bpf::load(&data)?)); + + //load veth_trace program ref veth_trace.rs + init_veth_tracer(bpf.clone()); + let bpf_maps = init_bpf_maps(bpf.clone()).unwrap(); let interfaces = get_veth_channels(); + info!("Found interfaces: {:?}", interfaces); - attach_bpf_program(&data, interfaces).await?; - let events_map = bpf + init_tc_classifier(bpf.clone(), interfaces, link_ids.clone()) + .context("An error occured during the execution of attach_bpf_program function")?; + + event_listener(bpf_maps, link_ids.clone(), bpf.clone()) + .await + .context("Error initializing event_listener")?; + + Ok(()) +} + +//attach the tc classifier program to a vector of interfaces +pub fn init_tc_classifier( + bpf: Arc>, + ifaces: Vec, + link_ids: Arc>>, +) -> Result<(), anyhow::Error> { + //this funtion initialize the tc classifier program + info!("Loading programs"); + + let mut bpf_new = bpf.lock().unwrap(); + + let program: &mut SchedClassifier = bpf_new + .program_mut("identity_classifier") + .ok_or_else(|| anyhow::anyhow!("program 'identity_classifier' not found"))? + .try_into() + .context("Failed to init SchedClassifier program")?; + + program + .load() + .context("Failed to load identity_classifier program")?; + + for interface in ifaces { + match program.attach(&interface, TcAttachType::Ingress) { + Okk(link_id) => { + info!( + "Program 'identity_classifier' attached to interface {}", + interface + ); + let mut map = link_ids.lock().unwrap(); + map.insert(interface.clone(), link_id); + } + Err(e) => error!( + "Error attaching program to interface {}: {:?}", + interface, e + ), + } + } + + Ok(()) +} + +fn init_veth_tracer(bpf: Arc>) -> Result<(), anyhow::Error> { + //this functions init the veth_tracer used to make the InterfacesRegistry + + let mut bpf_new = bpf.lock().unwrap(); + + //creation tracer + let veth_creation_tracer: &mut KProbe = bpf_new + .program_mut("veth_creation_trace") + .ok_or_else(|| anyhow::anyhow!("program 'veth_creation_trace' not found"))? + .try_into()?; + veth_creation_tracer.load()?; + + veth_creation_tracer.attach("register_netdevice", 0)?; + + //deletion tracer + let veth_deletion_tracer: &mut KProbe = bpf_new + .program_mut("veth_deletion_trace") + .ok_or_else(|| anyhow::anyhow!("program 'veth_deletion_trace' not found"))? + .try_into()?; + veth_deletion_tracer + .load() + .context("Failed to load deletetion_tracer program")?; + + match veth_deletion_tracer.attach("unregister_netdevice_queue", 0) { + std::result::Result::Ok(_) => info!("veth_deletion_trace program attached successfully"), + Err(e) => error!("Error attaching veth_deletetion_trace program {:?}", e), + } + + Ok(()) +} + +fn init_bpf_maps(bpf: Arc>) -> Result<(Map, Map), anyhow::Error> { + // this function init the bpfs maps used in the main program + /* + index 0: events_map + index 1: veth_map + */ + let mut bpf_new = bpf.lock().unwrap(); + + let events_map = bpf_new .take_map("EventsMap") .ok_or_else(|| anyhow::anyhow!("EventsMap map not found"))?; - info!("loading bpf connections map"); + let veth_map = bpf_new + .take_map("veth_identity_map") + .ok_or_else(|| anyhow::anyhow!("veth_identity_map map not found"))?; + + /* EDIT: this part is paused right now + info!("loading bpf connections map"); - //init connection map - let connections_map_raw = bpf - .take_map("ConnectionMap") - .context("failed to take connections map")?; + //init connection map + let connections_map_raw = bpf + .take_map("ConnectionMap") + .context("failed to take connections map")?; - let connection_tracker_map = bpf - .take_map("ConnectionTrackerMap") - .context("failed to take ConnectionTrackerMap map")?; + let connection_tracker_map = bpf + .take_map("ConnectionTrackerMap") + .context("failed to take ConnectionTrackerMap map")?; + */ + Ok((events_map, veth_map)) +} + +async fn event_listener(bpf_maps: (Map, Map), link_ids: Arc>>, bpf: Arc>) -> Result<(), anyhow::Error> { + // this function init the event listener. Listens for veth events (creation/deletion) and network events (pod to pod communications) + /* Doc: + + perf_net_events_array: contains is associated with the network events stored in the events_map (EventsMap) + perf_veth_array: contains is associated with the network events stored in the veth_map (veth_identity_map) + + */ + info!("Preparing perf_buffers and perf_arrays"); // init PerfEventArrays - let mut perf_array: PerfEventArray = PerfEventArray::try_from(events_map)?; + let mut perf_veth_array: PerfEventArray = PerfEventArray::try_from(bpf_maps.1)?; + let mut perf_net_events_array: PerfEventArray = PerfEventArray::try_from(bpf_maps.0)?; /* let mut connections_perf_array = PerCpuHashMap::<&mut MapData,u8,ConnArray>::try_from(connections_map_raw)?; //change with lru hash map*/ //init PerfEventArrays buffers - let mut perf_buffers: Vec> = Vec::new(); + let mut perf_veth_buffer: Vec> = Vec::new(); + let mut perf_net_events_buffer: Vec> = Vec::new(); /* let mut connections_perf_buffers = Vec::new(); */ for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? { - let buf: PerfEventArrayBuffer = perf_array.open(cpu_id, None)?; - perf_buffers.push(buf); + let veth_buf: PerfEventArrayBuffer = perf_veth_array.open(cpu_id, None)?; + perf_veth_buffer.push(veth_buf); + } + for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? { + let events_buf: PerfEventArrayBuffer = perf_net_events_array.open(cpu_id, None)?; + perf_net_events_buffer.push(events_buf); } info!("Listening for events..."); - let running = Arc::new(AtomicBool::new(true)); - let r = running.clone(); - //waiting for signint (CTRL+C) to stop the main program - tokio::spawn(async move { - signal::ctrl_c().await.unwrap(); - r.store(false, Ordering::SeqCst); - }); + let veth_running = Arc::new(AtomicBool::new(true)); + let net_events_running = Arc::new(AtomicBool::new(true)); - let mut buffers = vec![BytesMut::with_capacity(1024); 10]; + let mut veth_buffers = vec![BytesMut::with_capacity(1024); 10]; + let mut events_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()]; // let mut connections_buffers = vec![BytesMut::with_capacity(1024); 10]; - display_events(perf_buffers, running, buffers).await; - info!("Exiting..."); + let veth_running_signal = veth_running.clone(); + let net_events_running_signal = net_events_running.clone(); + let veth_link_ids = link_ids.clone(); - Ok(()) -} + //display_events(perf_buffers, running, buffers).await; + let veth_events_displayer = tokio::spawn(async move { + display_veth_events(bpf.clone(), perf_veth_buffer, veth_running, veth_buffers, veth_link_ids, ).await; + }); + let net_events_displayer = tokio::spawn(async move { + display_events(perf_net_events_buffer, net_events_running, events_buffers).await; + }); -//attach a program to a vector of interfaces -pub async fn attach_bpf_program(data: &[u8], ifaces: Vec) -> Result<(), anyhow::Error> { - info!("Loading programs"); + tokio::select! { + result = veth_events_displayer=>{ + match result{ + Err(e)=>error!("veth_event_displayer panicked {:?}",e), + std::result::Result::Ok(_) => info!("Found new veth_event"), + } + } - for interface in ifaces.iter() { - let mut bpf = Bpf::load(&data)?; - let program: &mut SchedClassifier = bpf - .program_mut("identity_classifier") - .ok_or_else(|| anyhow::anyhow!("program 'identity_classifier' not found"))? - .try_into()?; - program.load()?; + result = net_events_displayer=>{ + match result{ + Err(e)=>error!("net_event_displayer panicked {:?}",e), + std::result::Result::Ok(_) => info!("Found new net_event"), + } + } + _= signal::ctrl_c()=>{ + info!("Triggered Exiting..."); + veth_running_signal.store(false, Ordering::SeqCst); + net_events_running_signal.store(false, Ordering::SeqCst); + } - program.attach(&interface, TcAttachType::Ingress)?; } Ok(()) diff --git a/core/src/components/identity/src/structs.rs b/core/src/components/identity/src/structs.rs index 782a440..1c212ff 100644 --- a/core/src/components/identity/src/structs.rs +++ b/core/src/components/identity/src/structs.rs @@ -26,3 +26,12 @@ pub struct ConnArray { } unsafe impl aya::Pod for ConnArray {} + +#[repr(C)] +#[derive(Clone, Copy)] +pub struct VethLog { + pub name: [u8; 16], + pub state: u64, + pub dev_addr: [u32;8], + pub event_type: u8, +} diff --git a/core/src/testing/identity.yaml b/core/src/testing/identity.yaml index 78b9b68..a8f1956 100644 --- a/core/src/testing/identity.yaml +++ b/core/src/testing/identity.yaml @@ -18,58 +18,79 @@ spec: hostPID: true hostNetwork: true containers: - - name: identity - image: lorenzotettamanti/cortexflow-identity:latest - command: ["/bin/bash", "-c"] - env: - - name: IFACE - value: "cali8240b51b115" - args: - - | - echo "Running on kernel $(uname -r)" - if [ ! -d "/sys/fs/bpf" ]; then - echo "ERROR: BPF filesystem not mounted" - exit 1 - else - echo "Checking ebpf path..." - ls -l /sys/fs/bpf - fi - echo "checking privileges" - ls -ld /sys/fs/bpf + - name: identity + image: lorenzotettamanti/cortexflow-identity:latest + command: ["/bin/bash", "-c"] + args: + - | + echo "Running on kernel $(uname -r)" + if [ ! -d "/sys/fs/bpf" ]; then + echo "ERROR: BPF filesystem not mounted" + exit 1 + else + echo "Checking ebpf path..." + ls -l /sys/fs/bpf + fi + echo "checking privileges" + ls -ld /sys/fs/bpf - echo "Running application..." - exec /usr/local/bin/cortexflow-identity-service || echo "Application exited with code $?" - volumeMounts: + echo "Running application..." + exec /usr/local/bin/cortexflow-identity-service || echo "Application exited with code $?" + volumeMounts: + - name: bpf + mountPath: /sys/fs/bpf + mountPropagation: Bidirectional + readOnly: false + - name: proc + mountPath: /host/proc + readOnly: false + - name: kernel-dev + mountPath: /lib/modules + readOnly: false + securityContext: + privileged: true + allowPrivilegeEscalation: true + capabilities: + add: + - SYS_ADMIN + - NET_ADMIN + - SYS_RESOURCE + - BPF + - SYS_PTRACE + - name: bpftool-control-manager + image: danielpacak/bpftool-runner:latest + command: ["/bin/bash", "-c","sleep infinity"] + volumeMounts: + - name: bpf + mountPath: /sys/fs/bpf + mountPropagation: Bidirectional + readOnly: false + - name: proc + mountPath: /host/proc + readOnly: false + - name: kernel-dev + mountPath: /lib/modules + readOnly: false + securityContext: + privileged: true + allowPrivilegeEscalation: true + capabilities: + add: + - SYS_ADMIN + - NET_ADMIN + - SYS_RESOURCE + - BPF + - SYS_PTRACE + volumes: - name: bpf - mountPath: /sys/fs/bpf - mountPropagation: Bidirectional - readOnly: false + hostPath: + path: /sys/fs/bpf + type: Directory - name: proc - mountPath: /host/proc - readOnly: false + hostPath: + path: /proc + type: Directory - name: kernel-dev - mountPath: /lib/modules - readOnly: false - securityContext: - privileged: true - allowPrivilegeEscalation: true - capabilities: - add: - - SYS_ADMIN - - NET_ADMIN - - SYS_RESOURCE - - BPF - - SYS_PTRACE - volumes: - - name: bpf - hostPath: - path: /sys/fs/bpf - type: Directory - - name: proc - hostPath: - path: /proc - type: Directory - - name: kernel-dev - hostPath: - path: /lib/modules - type: Directory + hostPath: + path: /lib/modules + type: Directory