From bf4108b73dc279f0bff55d98f877e1f2d240de64 Mon Sep 17 00:00:00 2001 From: mahesh bhatiya Date: Thu, 10 Jul 2025 22:15:28 +0530 Subject: [PATCH] feat(ebpf): add connect_count probe for tracing outbound IPv4 connections Implements a Rust-based eBPF probe using aya to trace IPv4 connect() syscalls via the `sys_enter_connect` tracepoint. The probe records: - PID of the process initiating the connection - Destination IP and port - Incremental connection count per PID-IP-port tuple Key features: - Safe user-space pointer dereferencing using `bpf_probe_read_user` - Compact map key: [pid(4) | ip(4) | port(2) | padding(2)] - Two eBPF maps: - `connect_attempts`: count of per-connection attempts - `total_triggers`: global increment counter - Handles invalid addrlen, pointer, and non-IPv4 traffic gracefully - Logs each event via `bpf_printk!` for debugging This version is stable and verified via `trace_pipe`. --- core/connect.go | 107 +++++++++++++- core/ssh.go | 11 +- ebpf-programs/connect_count/Cargo.toml | 17 +++ .../connect_count/rust-toolchain.toml | 4 + ebpf-programs/connect_count/src/lib.rs | 131 ++++++++++++++++++ ebpf-programs/udp_monitor/src/lib.rs | 2 +- 6 files changed, 260 insertions(+), 12 deletions(-) create mode 100644 ebpf-programs/connect_count/Cargo.toml create mode 100644 ebpf-programs/connect_count/rust-toolchain.toml create mode 100644 ebpf-programs/connect_count/src/lib.rs diff --git a/core/connect.go b/core/connect.go index 2938de6..e22373a 100644 --- a/core/connect.go +++ b/core/connect.go @@ -1,8 +1,113 @@ package core import ( + "bytes" + "fmt" + "log" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" ) +type connectKey struct { + PID uint32 + IP uint32 + Port uint16 + Pad uint16 // for 4-byte alignment +} + func RunConnectMonitor() { - runMonitor("bin/connect_count.o", "count_connect", "connect_attempts", "tracepoint", "sys_enter_connect") + const ( + objPath = "bin/connect_count.o" + progName = "count_connect" + mapName = "connect_attempts" + tpCat = "syscalls" + tpEvent = "sys_enter_connect" + ) + + fmt.Printf("Starting monitor: %s → tracepoint:%s:%s\n", progName, tpCat, tpEvent) + + spec, err := ebpf.LoadCollectionSpec(objPath) + if err != nil { + log.Fatalf("Failed to load eBPF spec: %v", err) + } + + coll, err := ebpf.NewCollection(spec) + if err != nil { + log.Fatalf("Failed to create eBPF collection: %v", err) + } + defer coll.Close() + + prog := coll.Programs[progName] + if prog == nil { + log.Fatalf("Program %s not found in object file", progName) + } + defer prog.Close() + + // Correct: Tracepoint, not Kprobe + tp, err := link.Tracepoint(tpCat, tpEvent, prog, nil) + if err != nil { + log.Fatalf("Failed to attach tracepoint: %v", err) + } + defer tp.Close() + + m := coll.Maps[mapName] + if m == nil { + log.Fatalf("Map %s not found", mapName) + } + defer m.Close() + + fmt.Println("eBPF tracepoint attached. Monitoring connections...") + + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + + // Handle Ctrl+C + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + + for { + select { + case <-ticker.C: + var key connectKey + var value uint32 + iter := m.Iterate() + total := uint32(0) + + fmt.Println("[connect_attempts] PID@IP:PORT → Count") + for iter.Next(&key, &value) { + fmt.Printf(" %s (PID %d) → %s:%d → %d\n", + resolveProcessName(int(key.PID)), + key.PID, + FormatIPv4(key.IP), + key.Port, + value, + ) + total += value + } + + if err := iter.Err(); err != nil { + log.Printf("Map iteration error: %v", err) + } + fmt.Printf("[connect_attempts] Total Attempts: %d\n\n", total) + + case <-sig: + fmt.Println("Connection monitor stopped.") + return + } + } +} + +func resolveProcessName(pid int) string { + path := "/proc/" + strconv.Itoa(pid) + "/comm" + data, err := os.ReadFile(path) + if err != nil { + return "unknown" + } + return string(bytes.TrimSpace(data)) } diff --git a/core/ssh.go b/core/ssh.go index 78e8f31..6ef3508 100644 --- a/core/ssh.go +++ b/core/ssh.go @@ -1,10 +1,8 @@ package core import ( - "encoding/binary" "fmt" "log" - "net" "os" "os/signal" "syscall" @@ -77,7 +75,7 @@ defer kprobe.Close() fmt.Println("[ssh_attempts] PID@IP -> Count") for iter.Next(&key, &value) { - ipStr := formatIPv4(key.IP) + ipStr := FormatIPv4(key.IP) fmt.Printf(" %d@%s -> %d\n", key.PID, ipStr, value) total += value } @@ -93,10 +91,3 @@ defer kprobe.Close() } } } - -// Converts uint32 IP to dotted IPv4 string -func formatIPv4(ip uint32) string { - ipBytes := make([]byte, 4) - binary.LittleEndian.PutUint32(ipBytes, ip) - return net.IP(ipBytes).String() -} diff --git a/ebpf-programs/connect_count/Cargo.toml b/ebpf-programs/connect_count/Cargo.toml new file mode 100644 index 0000000..8ba861f --- /dev/null +++ b/ebpf-programs/connect_count/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "connect_count" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["staticlib"] + +[dependencies] +aya-ebpf = { version = "0.1.1", default-features = false } + +[profile.release] +opt-level = "z" +lto = true +panic = "abort" +codegen-units = 1 +strip = "debuginfo" diff --git a/ebpf-programs/connect_count/rust-toolchain.toml b/ebpf-programs/connect_count/rust-toolchain.toml new file mode 100644 index 0000000..78474db --- /dev/null +++ b/ebpf-programs/connect_count/rust-toolchain.toml @@ -0,0 +1,4 @@ +[toolchain] +channel = "nightly" +components = ["rust-src"] +targets = ["bpfel-unknown-none"] diff --git a/ebpf-programs/connect_count/src/lib.rs b/ebpf-programs/connect_count/src/lib.rs new file mode 100644 index 0000000..2770f31 --- /dev/null +++ b/ebpf-programs/connect_count/src/lib.rs @@ -0,0 +1,131 @@ +#![no_std] +#![no_main] +#![allow(non_snake_case)] +#![allow(unused)] +#![allow(static_mut_refs)] + +use aya_ebpf::{ + macros::{map, tracepoint}, + maps::HashMap, + programs::TracePointContext, + helpers::{bpf_get_current_pid_tgid, bpf_printk, bpf_probe_read_user}, + EbpfContext, + cty::c_long, +}; + +#[map(name = "connect_attempts")] +static mut CONNECT_ATTEMPTS: HashMap<[u8; 12], u32> = HashMap::<[u8; 12], u32>::with_max_entries(1024, 0); + +#[map(name = "total_triggers")] +static mut TOTAL_TRIGGERS: HashMap = HashMap::::with_max_entries(1, 0); + +#[tracepoint(name = "sys_enter_connect", category = "syscalls")] +pub fn count_connect(ctx: TracePointContext) -> u32 { + match try_count_connect(ctx) { + Ok(_) => 0, + Err(e) => { + unsafe { bpf_printk!(b"Error: %d\0", e as u32); } + 1 + } + } +} + +fn try_count_connect(ctx: TracePointContext) -> Result<(), c_long> { + // Get PID from bpf helper + let pid_tgid = bpf_get_current_pid_tgid(); + let pid = (pid_tgid >> 32) as u32; + + // Increment total_triggers[0] + unsafe { + TOTAL_TRIGGERS + .insert(&0, &1, 0) + .map(|_| ()) + .or_else(|_| { + TOTAL_TRIGGERS + .get_ptr_mut(&0) + .map(|count| *count += 1) + .ok_or(0) + })?; + } + + // Define argument structure for connect syscall + #[repr(C)] + struct ConnectArgs { + fd: u64, + sockaddr_ptr: u64, + addrlen: u64, + } + + // Read syscall arguments + let args = unsafe { ctx.read_at::(16).map_err(|e| e as c_long)? }; + let sockaddr_ptr = args.sockaddr_ptr; + let addrlen = args.addrlen as i32; + + if sockaddr_ptr == 0 || sockaddr_ptr > 0x7fff_ffff_ffff { + unsafe { bpf_printk!(b"Invalid sockaddr_ptr: %lx\0", sockaddr_ptr); } + return Err(-1); + } + + if addrlen < 16 { + unsafe { bpf_printk!(b"addrlen too small: %d\0", addrlen); } + return Err(-2); + } + + // Read sa_family + let sa_family: u16 = unsafe { + bpf_probe_read_user::(sockaddr_ptr as *const u16).map_err(|e| e as c_long)? + }; + + // Only support IPv4 (AF_INET == 2) + if sa_family != 2 { + return Ok(()); + } + + // Read sockaddr_in structure + #[repr(C)] + struct SockAddrIn { + sin_family: u16, + sin_port: u16, + sin_addr: u32, + sin_zero: [u8; 8], + } + + let sockaddr: SockAddrIn = unsafe { + bpf_probe_read_user::(sockaddr_ptr as *const SockAddrIn).map_err(|e| e as c_long)? + }; + + let port = sockaddr.sin_port.to_be(); + let ip = sockaddr.sin_addr.to_be(); + + // Create key: [pid(4) | ip(4) | port(2) | padding(2)] + let mut key = [0u8; 12]; + key[0..4].copy_from_slice(&pid.to_ne_bytes()); + key[4..8].copy_from_slice(&ip.to_ne_bytes()); + key[8..10].copy_from_slice(&port.to_ne_bytes()); + + unsafe { + CONNECT_ATTEMPTS + .insert(&key, &1, 0) + .map(|_| ()) + .or_else(|_| { + CONNECT_ATTEMPTS + .get_ptr_mut(&key) + .map(|count| *count += 1) + .ok_or(0) + })?; + + bpf_printk!(b"pid=%d ip=%x port=%d\0", pid, ip, port as u32); + } + + Ok(()) +} + +#[panic_handler] +fn panic(_info: &core::panic::PanicInfo) -> ! { + unsafe { bpf_printk!(b"PANIC!\0"); } + loop {} +} + +#[no_mangle] +#[link_section = "license"] +pub static LICENSE: [u8; 4] = *b"GPL\0"; \ No newline at end of file diff --git a/ebpf-programs/udp_monitor/src/lib.rs b/ebpf-programs/udp_monitor/src/lib.rs index b73f81b..427183b 100644 --- a/ebpf-programs/udp_monitor/src/lib.rs +++ b/ebpf-programs/udp_monitor/src/lib.rs @@ -4,7 +4,7 @@ #![allow(unused_unsafe)] use aya_ebpf::{ - helpers::{bpf_get_current_pid_tgid, bpf_probe_read}, + helpers::{bpf_get_current_pid_tgid}, macros::{kprobe, map}, maps::HashMap, programs::ProbeContext,