From bcf1e1fa10eee5783bd9781db0e2ea494ac77d3c Mon Sep 17 00:00:00 2001 From: mahesh bhatiya Date: Wed, 3 Sep 2025 21:33:30 +0530 Subject: [PATCH] feat: dynamic eBPF loader with kernel attachment and runtime validation - Implemented dynamic detection of eBPF program sections (XDP, TC, SocketFilter, etc.) - Added runtime argument validation for required parameters (e.g., --iface for XDP/TC) - Validates ELF object structure and sections before loading - Loads and attaches programs to the kernel using Aya - Handles TC/XDP programs with proper verifier skipping and EBUSY handling - Verifies kernel attachment and prints program ID/status - Future-proof structure for additional eBPF program types --- .gitignore | 5 +- eclipta-cli/Cargo.toml | 2 +- eclipta-cli/src/commands/ebpf/list.rs | 4 +- eclipta-cli/src/commands/ebpf/load.rs | 500 +++++++++++++++------ eclipta-cli/src/commands/ebpf/remove.rs | 47 +- eclipta-cli/src/commands/ebpf/upload.rs | 91 ++-- eclipta-cli/src/commands/store/check_db.rs | 16 +- eclipta-cli/src/commands/store/migrate.rs | 28 ++ eclipta-cli/src/commands/store/mod.rs | 1 + eclipta-cli/src/db/migrations.rs | 131 ++++++ eclipta-cli/src/db/mod.rs | 1 + eclipta-cli/src/db/programs.rs | 63 ++- eclipta-cli/src/main.rs | 14 +- eclipta-cli/src/utils/db.rs | 78 +--- 14 files changed, 720 insertions(+), 261 deletions(-) create mode 100644 eclipta-cli/src/commands/store/migrate.rs create mode 100644 eclipta-cli/src/db/migrations.rs diff --git a/.gitignore b/.gitignore index 7df4ed0..e34f02a 100644 --- a/.gitignore +++ b/.gitignore @@ -37,4 +37,7 @@ coverage/ # === Debug/profiler === *.profraw *.profdata -flamegraph.svg \ No newline at end of file +flamegraph.svg + +/tests/* +/bin/* \ No newline at end of file diff --git a/eclipta-cli/Cargo.toml b/eclipta-cli/Cargo.toml index 7fde05b..44a1af9 100644 --- a/eclipta-cli/Cargo.toml +++ b/eclipta-cli/Cargo.toml @@ -26,7 +26,7 @@ hostname = "0.4.1" humantime = "2.1" dirs = "5.0" # sqlx = { version = "0.8", features = ["postgres", "runtime-tokio-rustls", "macros"] } -sqlx = { version = "0.7.4", features = ["postgres", "runtime-tokio-rustls", "chrono"] } +sqlx = { version = "0.7.4", features = ["postgres", "runtime-tokio-rustls", "chrono", "macros"] } dotenvy = "0.15" uuid = { version = "1.8", features = ["v4"] } colored = "2.1" diff --git a/eclipta-cli/src/commands/ebpf/list.rs b/eclipta-cli/src/commands/ebpf/list.rs index b3e9bbb..4e5c44e 100644 --- a/eclipta-cli/src/commands/ebpf/list.rs +++ b/eclipta-cli/src/commands/ebpf/list.rs @@ -1,12 +1,12 @@ use prettytable::{Table, Row, Cell, format}; -use crate::utils::db::init_db; +use crate::utils::db::ensure_db_ready; use crate::utils::logger::info; use crate::db::programs::list_programs; use prettytable::row; pub async fn handle_list() -> Result<(), Box> { - let pool = init_db().await?; + let pool = ensure_db_ready().await?; let programs = list_programs(&pool).await?; if programs.is_empty() { diff --git a/eclipta-cli/src/commands/ebpf/load.rs b/eclipta-cli/src/commands/ebpf/load.rs index 1764c94..7f58726 100644 --- a/eclipta-cli/src/commands/ebpf/load.rs +++ b/eclipta-cli/src/commands/ebpf/load.rs @@ -1,15 +1,18 @@ use clap::Args; use std::path::PathBuf; use crate::db::programs::{get_program_by_id, get_program_by_title}; -use crate::utils::db::init_db; -// Fixed imports based on current Aya API -use aya::{Ebpf, programs::{Program, ProgramError}}; -use object::Object; -use object::ObjectSection; +use crate::utils::db::ensure_db_ready; +use aya::{ + Ebpf, + programs::{ + Program, + ProgramError + } +}; +use object::{Object, ObjectSection}; use std::collections::HashSet; -use std::io::Error as IoError; -// Import EBUSY from nix crate -use nix::errno::Errno::EBUSY; +use tokio::process::Command; +use anyhow::{Result, Context, anyhow}; #[derive(Args, Debug)] pub struct LoadOptions { @@ -21,6 +24,12 @@ pub struct LoadOptions { #[arg(long)] pub title: Option, + + #[arg(long)] + pub iface: Option, + + #[arg(long)] + pub socket_fd: Option, } pub const XDP_SECTION: &str = "xdp"; @@ -33,142 +42,373 @@ pub const KPROBE_NET_SECTION: &str = "kprobe/net"; pub const UPROBE_NET_SECTION: &str = "uprobe/net"; pub const LSM_NET_SECTION: &str = "lsm/net"; -pub async fn handle_load(opts: LoadOptions) { - let pool = match init_db().await { - Ok(pool) => pool, - Err(e) => { - eprintln!("Failed to init DB: {}", e); - return; - } - }; +#[derive(Debug)] +pub struct ProgramRequirements { + pub sections: HashSet, + pub requires_interface: bool, + pub requires_socket_fd: bool, + pub program_type: String, +} - if let Some(id) = opts.id { - match get_program_by_id(&pool, id).await { - Ok(Some(p)) => { - println!("ID: {}, Title: {}", p.id, p.title); - - handle_file_process(p.path.clone().into()); - } - Ok(None) => println!("No program found with id {}", id), - Err(e) => eprintln!("Failed to fetch program by id {}: {}", id, e), - } +pub async fn handle_load(opts: LoadOptions) -> Result<()> { + let pool = ensure_db_ready().await + .map_err(|e| anyhow!("Failed to initialize database: {}", e))?; + + let program_path = if let Some(id) = opts.id { + let program = get_program_by_id(&pool, id).await + .context("Failed to fetch program from database")? + .ok_or_else(|| anyhow!("No program found with id {}", id))?; + + println!("Found program: ID: {}, Title: {}", program.id, program.title); + PathBuf::from(program.path) } else if let Some(ref title) = opts.title { - match get_program_by_title(&pool, title).await { - Ok(rows) if rows.len() == 1 => { - let p = &rows[0]; - println!("ID: {}, Title: {}", p.id, p.title); + let programs = get_program_by_title(&pool, title).await + .context("Failed to fetch programs from database")?; + + match programs.len() { + 1 => { + let program = &programs[0]; + println!("Found program: ID: {}, Title: {}", program.id, program.title); + PathBuf::from(program.path.clone()) + } + n if n > 1 => { + return Err(anyhow!("Multiple programs found with title '{}'. Please use --id to specify which one to load.", title)); } - Ok(rows) if rows.len() > 1 => { - eprintln!("Multiple programs found with title '{}'. Please load using --id.", title); + _ => { + return Err(anyhow!("No program found with title '{}'", title)); } - Ok(_) => println!("No program found with title '{}'", title), - Err(e) => eprintln!("Failed to fetch programs by title '{}': {}", title, e), } + } else if let Some(ref program_path) = opts.program { + println!("Using direct program path: {}", program_path.display()); + program_path.clone() } else { - eprintln!("Please specify a program to load using --id or --title"); + return Err(anyhow!("Please specify a program to load using --id, --title, or --program")); + }; + + println!("Validating eBPF ELF object..."); + let requirements = validate_ebpf_file(&program_path)?; + + println!("Checking runtime arguments..."); + validate_runtime_args(&opts, &requirements)?; + + let should_skip_verifier = requirements.sections.iter() + .all(|s| s.contains("TC")); + + if should_skip_verifier { + println!("Skipping Aya verifier for TC-only programs"); + } else { + println!("Loading eBPF program using Aya..."); + load_ebpf_with_aya(&program_path)?; } + + println!("Attaching program to kernel..."); + let attach_result = attach_program_to_kernel(&program_path, &requirements, &opts).await?; + + println!("Verifying kernel program attachment..."); + verify_kernel_attachment(&requirements, &opts).await?; + + print_program_summary(&requirements, &opts, &attach_result)?; + + println!("eBPF program loaded and attached successfully!"); + Ok(()) } -pub fn validate_ebpf_file(path: PathBuf) -> Result<(), String> { +pub fn validate_ebpf_file(path: &PathBuf) -> Result { if !path.exists() { - return Err(format!("File does not exist: {}", path.display())); + return Err(anyhow!("File does not exist: {}", path.display())); } if !path.is_file() { - return Err(format!("Path is not a file: {}", path.display())); + return Err(anyhow!("Path is not a file: {}", path.display())); } if path.extension().and_then(|ext| ext.to_str()) != Some("o") { - return Err(format!("File is not an eBPF object (.o) file: {}", path.display())); + return Err(anyhow!("File is not an eBPF object (.o) file: {}", path.display())); } - // 2. ELF format validation - let file_data = match std::fs::read(&path) { - Ok(data) => data, - Err(e) => return Err(format!("Failed to read file: {}", e)), - }; + let file_data = std::fs::read(path) + .context("Failed to read file")?; - let obj = match object::File::parse(&*file_data) { - Ok(obj) => obj, - Err(e) => return Err(format!("Failed to parse ELF file: {}", e)), - }; + let obj = object::File::parse(&*file_data) + .context("Failed to parse ELF file")?; - // 3. Section recognition let mut found_sections = HashSet::new(); + let mut requires_interface = false; + let mut requires_socket_fd = false; + let mut program_type = String::new(); + for section in obj.sections() { if let Ok(name) = section.name() { match name { - XDP_SECTION | XDP_DROP_SECTION => { found_sections.insert("XDP"); } - TC_INGRESS_SECTION => { found_sections.insert("TC Ingress"); } - TC_EGRESS_SECTION => { found_sections.insert("TC Egress"); } - SOCKET_FILTER_SECTION => { found_sections.insert("Socket Filter"); } - TRACEPOINT_NET_SECTION => { found_sections.insert("Tracepoint"); } - KPROBE_NET_SECTION => { found_sections.insert("Kprobe"); } - UPROBE_NET_SECTION => { found_sections.insert("Uprobe"); } - LSM_NET_SECTION => { found_sections.insert("LSM"); } + XDP_SECTION | XDP_DROP_SECTION => { + found_sections.insert("XDP".to_string()); + requires_interface = true; + program_type = "XDP".to_string(); + } + TC_INGRESS_SECTION => { + found_sections.insert("TC Ingress".to_string()); + requires_interface = true; + program_type = "TC".to_string(); + } + TC_EGRESS_SECTION => { + found_sections.insert("TC Egress".to_string()); + requires_interface = true; + program_type = "TC".to_string(); + } + SOCKET_FILTER_SECTION => { + found_sections.insert("Socket Filter".to_string()); + requires_socket_fd = true; + program_type = "SocketFilter".to_string(); + } + TRACEPOINT_NET_SECTION => { + found_sections.insert("Tracepoint".to_string()); + program_type = "Tracepoint".to_string(); + } + KPROBE_NET_SECTION => { + found_sections.insert("Kprobe".to_string()); + program_type = "Kprobe".to_string(); + } + UPROBE_NET_SECTION => { + found_sections.insert("Uprobe".to_string()); + program_type = "Uprobe".to_string(); + } + LSM_NET_SECTION => { + found_sections.insert("LSM".to_string()); + program_type = "LSM".to_string(); + } _ => {} } } } if found_sections.is_empty() { - return Err("No recognized eBPF program sections found".to_string()); + return Err(anyhow!("No recognized eBPF program sections found")); } println!("Found eBPF program sections: {}", found_sections.iter().cloned().collect::>().join(", ")); - // 4. Aya load test - using Ebpf instead of deprecated Bpf - let mut ebpf = match Ebpf::load_file(&path) { - Ok(ebpf) => ebpf, - Err(e) => return Err(format!("Failed to load eBPF object: {}", e)), - }; + Ok(ProgramRequirements { + sections: found_sections, + requires_interface, + requires_socket_fd, + program_type, + }) +} - // 5. Map validation - if ebpf.maps().next().is_none() { - println!("Warning: No maps found in eBPF object"); +fn validate_runtime_args(opts: &LoadOptions, requirements: &ProgramRequirements) -> Result<()> { + if requirements.requires_interface && opts.iface.is_none() { + return Err(anyhow!( + "Program requires network interface. Please specify --iface " + )); } - // 6. Try to load programs (verifier test) - Fixed iteration approach - for (name, program) in ebpf.programs_mut() { - if let Err(e) = load_program_by_type(program) { - return Err(format!("Verifier rejected program {}: {}", name, e)); - } + if requirements.requires_socket_fd && opts.socket_fd.is_none() { + return Err(anyhow!( + "Program requires socket file descriptor. Please specify --socket-fd " + )); + } + + println!("Runtime arguments validation passed"); + Ok(()) +} + +fn load_ebpf_with_aya(path: &PathBuf) -> Result<()> { + let mut ebpf = Ebpf::load_file(path) + .context("Failed to load eBPF object with Aya")?; + + let map_count = ebpf.maps().count(); + if map_count == 0 { + println!("No maps found in eBPF object"); + } else { + println!("Found {} maps in eBPF object", map_count); } - // 7. Try to attach programs (if possible) - Fixed iteration approach for (name, program) in ebpf.programs_mut() { - // This is a simplified attachment test - in practice you'd need to handle - // different program types with appropriate attachment methods - if let Err(e) = try_attach_program(name, program) { - // EBUSY might indicate the program is already attached, which is not a validation failure - if let Some(os_error) = e.raw_os_error() { - if os_error == EBUSY as i32 { - continue; // Skip EBUSY errors + match load_program_by_type(program) { + Ok(()) => println!("Program '{}' loaded successfully", name), + Err(e) => { + let error_msg = e.to_string(); + if error_msg.contains("busy") || error_msg.contains("already") { + println!("Program '{}' already loaded (EBUSY)", name); + continue; } + return Err(anyhow!("Failed to load program '{}': {}", name, e)); } - return Err(format!("Failed to attach program {}: {}", name, e)); } } - // 8. Policy/security check (simplified) - if !is_allowed_program_type(&found_sections) { - return Err("Program contains disallowed program types".to_string()); + println!("Aya eBPF loading completed successfully"); + Ok(()) +} + +async fn attach_program_to_kernel( + path: &PathBuf, + requirements: &ProgramRequirements, + opts: &LoadOptions +) -> Result { + match requirements.program_type.as_str() { + "XDP" => { + let iface = opts.iface.as_ref() + .ok_or_else(|| anyhow!("Interface required for XDP programs"))?; + + let mut ebpf = Ebpf::load_file(path) + .context("Failed to load eBPF for XDP attachment")?; + + for (name, program) in ebpf.programs_mut() { + if let Program::Xdp(xdp_prog) = program { + xdp_prog.load() + .context("Failed to load XDP program")?; + + xdp_prog.attach(iface, aya::programs::XdpFlags::default()) + .context("Failed to attach XDP program to interface")?; + + println!("XDP program '{}' attached to interface '{}'", name, iface); + return Ok(format!("XDP program attached to {}", iface)); + } + } + Err(anyhow!("No XDP program found in eBPF object")) + } + + "TC" => { + let iface = opts.iface.as_ref() + .ok_or_else(|| anyhow!("Interface required for TC programs"))?; + + let mut ebpf = Ebpf::load_file(path) + .context("Failed to load eBPF for TC attachment")?; + + for (name, program) in ebpf.programs_mut() { + if let Program::SchedClassifier(tc_prog) = program { + tc_prog.load() + .context("Failed to load TC program")?; + + if name.contains("ingress") { + tc_prog.attach(iface, aya::programs::TcAttachType::Ingress) + .context("Failed to attach TC program to ingress")?; + println!("TC program '{}' attached to interface '{}' ingress", name, iface); + } else if name.contains("egress") { + tc_prog.attach(iface, aya::programs::TcAttachType::Egress) + .context("Failed to attach TC program to egress")?; + println!("TC program '{}' attached to interface '{}' egress", name, iface); + } + + return Ok(format!("TC program attached to {} {}", iface, + if name.contains("ingress") { "ingress" } else { "egress" })); + } + } + Err(anyhow!("No TC program attached to interface '{}'", iface)) + } + + "SocketFilter" => { + let socket_fd = opts.socket_fd + .ok_or_else(|| anyhow!("Socket FD required for SocketFilter programs"))?; + + let mut ebpf = Ebpf::load_file(path) + .context("Failed to load eBPF for SocketFilter attachment")?; + + for (_name, program) in ebpf.programs_mut() { + if let Program::SocketFilter(_sf_prog) = program { + println!("SocketFilter attachment requires proper socket handling - skipping attachment"); + return Ok(format!("SocketFilter program loaded but not attached (FD: {})", socket_fd)); + } + } + Err(anyhow!("No SocketFilter program found in eBPF object")) + } + + _ => { + println!("Program type '{}' not yet implemented for kernel attachment", requirements.program_type); + Ok(format!("Program type {} loaded but not attached", requirements.program_type)) + } } +} - println!("eBPF object validation successful: {}", path.display()); +async fn verify_kernel_attachment(requirements: &ProgramRequirements, opts: &LoadOptions) -> Result<()> { + match requirements.program_type.as_str() { + "XDP" => { + let iface = opts.iface.as_ref() + .ok_or_else(|| anyhow!("Interface required for verification"))?; + + let output = Command::new("ip") + .args(["link", "show", "dev", iface]) + .output() + .await + .context("Failed to execute ip command")?; + + let output_str = String::from_utf8_lossy(&output.stdout); + if output_str.contains("prog/xdp") { + println!("XDP program verified as attached to interface '{}'", iface); + } else { + return Err(anyhow!("XDP program not found attached to interface '{}'", iface)); + } + } + + "TC" => { + let iface = opts.iface.as_ref() + .ok_or_else(|| anyhow!("Interface required for verification"))?; + + let ingress_output = Command::new("tc") + .args(["filter", "show", "dev", iface, "ingress"]) + .output() + .await; + + if let Ok(output) = ingress_output { + let output_str = String::from_utf8_lossy(&output.stdout); + if output_str.contains("handle") && output_str.contains("bpf") { + println!("TC ingress program verified as attached to interface '{}'", iface); + } + } + + let egress_output = Command::new("tc") + .args(["filter", "show", "dev", iface, "egress"]) + .output() + .await; + + if let Ok(output) = egress_output { + let output_str = String::from_utf8_lossy(&output.stdout); + if output_str.contains("handle") && output_str.contains("bpf") { + println!("TC egress program verified as attached to interface '{}'", iface); + } + } + } + + "SocketFilter" => { + println!("SocketFilter verification requires manual inspection of socket state"); + } + + _ => { + println!("Verification not implemented for program type '{}'", requirements.program_type); + } + } + Ok(()) } -fn is_allowed_program_type(found_sections: &HashSet<&str>) -> bool { - // Implement your policy checks here - // For example, you might want to disallow certain program types - let disallowed_types: HashSet<&str> = ["LSM"].iter().cloned().collect(); - found_sections.is_disjoint(&disallowed_types) +fn print_program_summary( + requirements: &ProgramRequirements, + opts: &LoadOptions, + attach_result: &str +) -> Result<()> { + println!("\nProgram Summary:"); + println!(" Program Type: {}", requirements.program_type); + println!(" Detected Sections: {}", + requirements.sections.iter().cloned().collect::>().join(", ")); + + if let Some(ref iface) = opts.iface { + println!(" Network Interface: {}", iface); + } + + if let Some(socket_fd) = opts.socket_fd { + println!(" Socket FD: {}", socket_fd); + } + + println!(" Interface Required: {}", requirements.requires_interface); + println!(" Socket FD Required: {}", requirements.requires_socket_fd); + println!(" Kernel Attachment: {}", attach_result); + + Ok(()) } -// Helper function to load programs based on their type -fn load_program_by_type(program: &mut Program) -> Result<(), ProgramError> { +pub(crate) fn load_program_by_type(program: &mut Program) -> Result<(), ProgramError> { match program { Program::Xdp(p) => p.load(), Program::SchedClassifier(p) => p.load(), @@ -188,8 +428,6 @@ fn load_program_by_type(program: &mut Program) -> Result<(), ProgramError> { Program::PerfEvent(p) => p.load(), Program::RawTracePoint(p) => p.load(), Program::SkSkb(p) => p.load(), - // These program types require additional parameters that we don't have in this context - // We'll skip loading them for now and just print a message Program::Lsm(_) => { println!("Skipping LSM program load - requires lsm_hook_name and BTF"); Ok(()) @@ -211,55 +449,53 @@ fn load_program_by_type(program: &mut Program) -> Result<(), ProgramError> { Ok(()) }, _ => { - // For any program types not explicitly handled println!("Unknown program type, skipping load"); Ok(()) } } } -// Fixed function signature and implementation -fn try_attach_program(name: &str, program: &mut Program) -> Result<(), IoError> { - // This is a simplified example - actual attachment logic would depend on program type - // For now, we'll just return Ok to avoid compilation errors - // In a real implementation, you'd match on program type and attach appropriately - match program { - Program::Xdp(_) => { - // For XDP programs, you'd typically attach to a network interface - // program.attach("eth0", XdpFlags::default())?; - println!("Would attach XDP program: {}", name); - } - Program::SchedClassifier(_) => { - // For TC programs, you'd attach to a network interface with specific parameters - println!("Would attach TC program: {}", name); - } - Program::TracePoint(_) => { - // For tracepoint programs, you'd attach to specific kernel tracepoints - println!("Would attach TracePoint program: {}", name); - } - Program::KProbe(_) => { - // For kprobe programs, you'd attach to specific kernel functions - println!("Would attach KProbe program: {}", name); - } - Program::UProbe(_) => { - // For uprobe programs, you'd attach to specific user-space functions - println!("Would attach UProbe program: {}", name); - } - Program::Lsm(_) => { - // For LSM programs, you'd attach to specific LSM hooks - println!("Would attach LSM program: {}", name); - } - _ => { - println!("Unknown program type for: {}", name); +pub fn get_program_requirements(sections: &HashSet) -> ProgramRequirements { + let mut requires_interface = false; + let mut requires_socket_fd = false; + let mut program_type = String::new(); + + for section in sections { + match section.as_str() { + "XDP" => { + requires_interface = true; + program_type = "XDP".to_string(); + } + "TC Ingress" | "TC Egress" => { + requires_interface = true; + program_type = "TC".to_string(); + } + "Socket Filter" => { + requires_socket_fd = true; + program_type = "SocketFilter".to_string(); + } + _ => {} } } - - Ok(()) + + ProgramRequirements { + sections: sections.clone(), + requires_interface, + requires_socket_fd, + program_type, + } +} + +pub fn validate_ebpf_file_legacy(path: PathBuf) -> Result<(), String> { + validate_ebpf_file(&path) + .map_err(|e| e.to_string()) + .map(|_| ()) } pub fn handle_file_process(path: PathBuf) { - match validate_ebpf_file(path.clone()) { - Ok(()) => println!("eBPF object file is valid: {}", path.display()), - Err(e) => eprintln!("Validation failed: {}", e), + if let Err(e) = validate_ebpf_file(&path) { + eprintln!("Validation failed: {}", e); + return; } + println!("eBPF object file is valid: {}", path.display()); } \ No newline at end of file diff --git a/eclipta-cli/src/commands/ebpf/remove.rs b/eclipta-cli/src/commands/ebpf/remove.rs index 67ca210..b07ddf9 100644 --- a/eclipta-cli/src/commands/ebpf/remove.rs +++ b/eclipta-cli/src/commands/ebpf/remove.rs @@ -1,7 +1,8 @@ -use crate::utils::db::init_db; +use crate::utils::db::ensure_db_ready; use crate::db::programs::delete_program; use std::fs; use clap::Parser; +use sqlx::Row; #[derive(Parser)] pub struct RemoveOptions { @@ -9,20 +10,44 @@ pub struct RemoveOptions { } pub async fn handle_remove(opts: RemoveOptions) -> Result<(), Box> { - let pool = init_db().await?; - let program = sqlx::query!( + // connect to DB + let pool = match ensure_db_ready().await { + Ok(p) => p, + Err(e) => { + eprintln!("❌ Failed to connect to database: {e}"); + return Ok(()); // return gracefully instead of crashing + } + }; + + // fetch program info (runtime query to avoid compile-time DB checks) + let program_row = match sqlx::query( "SELECT path, title FROM ebpf_programs WHERE id = $1", - opts.id ) + .bind(opts.id) .fetch_one(&pool) - .await?; - if fs::remove_file(&program.path).is_ok() { - println!("[OK] Deleted file: {}", &program.path); - } else { - println!("[WARN] File not found or cannot delete: {}", &program.path); + .await + { + Ok(row) => row, + Err(e) => { + eprintln!("❌ Could not find program with ID {}: {e}", opts.id); + return Ok(()); + } + }; + + // try removing file + let path: String = program_row.get("path"); + let title: String = program_row.get("title"); + + match fs::remove_file(&path) { + Ok(_) => println!("✅ Deleted file: {}", &path), + Err(_) => println!("⚠️ File not found or cannot delete: {}", &path), + } + + // delete from database + match delete_program(&pool, opts.id).await { + Ok(_) => println!("✅ Removed program '{}' (ID: {}) from database.", title, opts.id), + Err(e) => eprintln!("❌ Failed to remove program from database: {e}"), } - delete_program(&pool, opts.id).await?; - println!("[OK] Removed program '{}' (ID: {}) from database.", program.title, opts.id); Ok(()) } diff --git a/eclipta-cli/src/commands/ebpf/upload.rs b/eclipta-cli/src/commands/ebpf/upload.rs index 418bda1..42e7c99 100644 --- a/eclipta-cli/src/commands/ebpf/upload.rs +++ b/eclipta-cli/src/commands/ebpf/upload.rs @@ -1,7 +1,7 @@ use crate::db::programs::insert_program; -use crate::utils::db::init_db; +use crate::utils::db::ensure_db_ready; use crate::utils::logger::{success, error, info, warn}; -use aya::Ebpf; +use object::{Object, ObjectSection}; use clap::Args; use std::fs; use std::path::{Path, PathBuf}; @@ -26,6 +26,44 @@ pub struct UploadOptions { pub version: String, } +fn validate_elf_sections(path: &Path) -> Result, String> { + let data = std::fs::read(path).map_err(|e| format!("Failed to read file: {}", e))?; + let file = object::File::parse(&*data).map_err(|e| format!("Failed to parse ELF: {}", e))?; + + let mut recognized: Vec = Vec::new(); + for section in file.sections() { + if let Ok(name) = section.name() { + let name = name.trim(); + // Common eBPF program section name patterns + let is_known = + name == "xdp" || name.starts_with("xdp/") || + name == "tc" || name == "tc_ingress" || name == "tc_egress" || + name.starts_with("classifier/") || name.starts_with("cls/") || + name == "socket_filter" || name.starts_with("socket/") || + name.starts_with("kprobe/") || name.starts_with("kretprobe/") || + name.starts_with("tracepoint/") || name.starts_with("raw_tracepoint/") || + name.starts_with("uprobe/") || name.starts_with("uretprobe/") || + name.starts_with("lsm/") || + name.starts_with("cgroup/") || name.starts_with("cgroup_skb/") || + name.starts_with("cgroup_sock/") || name.starts_with("cgroup_sock_addr/") || + name.starts_with("cgroup_sockopt/") || name.starts_with("cgroup_sysctl/") || + name.starts_with("perf_event/") || name.starts_with("sk_msg/") || + name.starts_with("sk_skb/") || name.starts_with("sk_lookup/") || + name.starts_with("fentry/") || name.starts_with("fexit/"); + + if is_known { + recognized.push(name.to_string()); + } + } + } + + if recognized.is_empty() { + return Err("No recognized eBPF program sections found".to_string()); + } + + Ok(recognized) +} + pub async fn handle_upload(opts: UploadOptions) -> Result<(), Box> { info("Starting upload process..."); @@ -39,11 +77,13 @@ pub async fn handle_upload(opts: UploadOptions) -> Result<(), Box info("ELF validation successful."), + // Step 2: Advanced ELF-level validation (section introspection) + match validate_elf_sections(&opts.program) { + Ok(sections) => { + info(&format!("Found eBPF sections: {}", sections.join(", "))); + } Err(e) => { - error(&format!("Invalid eBPF program: {}", e)); + error(&format!("Validation failed: {}", e)); return Ok(()); } } @@ -71,7 +111,7 @@ pub async fn handle_upload(opts: UploadOptions) -> Result<(), Box p, Err(e) => { error(&format!("Failed to connect to database: {}", e)); @@ -80,27 +120,28 @@ pub async fn handle_upload(opts: UploadOptions) -> Result<(), Box success("Upload complete! Metadata stored in database."), - Err(sqlx::Error::Database(db_err)) => { - if db_err.constraint() == Some("unique_title_version") { - error("A program with this title and version already exists."); - return Ok(()); // or Err if you want non-zero exit - } else { - error(&format!("Database insert failed: {}", db_err)); + &pool, + &opts.title, + &opts.description, + &opts.version, + &dest_path.to_string_lossy(), + ).await { + Ok(_) => success("Upload complete! Metadata stored in database."), + Err(sqlx::Error::Database(db_err)) => { + if db_err.constraint() == Some("unique_title_version") { + error("A program with this title and version already exists."); + return Ok(()); + } else { + error(&format!("Database insert failed: {}", db_err)); + return Ok(()); + } + } + Err(e) => { + error(&format!("Database insert failed: {}", e)); return Ok(()); } } - Err(e) => { - error(&format!("Database insert failed: {}", e)); - return Ok(()); - } -} Ok(()) } + diff --git a/eclipta-cli/src/commands/store/check_db.rs b/eclipta-cli/src/commands/store/check_db.rs index ab5119f..b6fa7ae 100644 --- a/eclipta-cli/src/commands/store/check_db.rs +++ b/eclipta-cli/src/commands/store/check_db.rs @@ -1,18 +1,22 @@ -use crate::utils::db::init_db; +use clap::Args; +use crate::utils::db::ensure_db_ready; use crate::utils::logger::{success, error}; -#[derive(clap::Args)] -pub struct CheckDbOptions {} +#[derive(Args, Debug)] +pub struct CheckDbOptions { + #[arg(long, default_value = "false")] + pub verbose: bool, +} pub async fn handle_check_db(_opts: CheckDbOptions) -> Result<(), Box> { - match init_db().await { + match ensure_db_ready().await { Ok(_) => { success("Database connection successful & migrations applied!"); Ok(()) } Err(e) => { - error(&format!("Database connection failed: {}", e)); - Err(Box::new(e)) + error(&format!("Database check failed: {}", e)); + Err(e) } } } diff --git a/eclipta-cli/src/commands/store/migrate.rs b/eclipta-cli/src/commands/store/migrate.rs new file mode 100644 index 0000000..6209e99 --- /dev/null +++ b/eclipta-cli/src/commands/store/migrate.rs @@ -0,0 +1,28 @@ +use clap::Args; +use crate::utils::db::run_migrations_only; +use crate::utils::logger::{success, error}; + +#[derive(Args, Debug)] +pub struct MigrateOptions { + #[arg(long, default_value = "false")] + pub force: bool, +} + +pub async fn handle_migrate(opts: MigrateOptions) -> Result<(), Box> { + if opts.force { + println!("Force migrating database..."); + } else { + println!("Running database migrations..."); + } + + match run_migrations_only().await { + Ok(()) => { + success("Database migrations completed successfully!"); + Ok(()) + } + Err(e) => { + error(&format!("Migration failed: {}", e)); + Err(e) + } + } +} diff --git a/eclipta-cli/src/commands/store/mod.rs b/eclipta-cli/src/commands/store/mod.rs index 96885e3..fa68cc2 100644 --- a/eclipta-cli/src/commands/store/mod.rs +++ b/eclipta-cli/src/commands/store/mod.rs @@ -1 +1,2 @@ pub mod check_db; +pub mod migrate; diff --git a/eclipta-cli/src/db/migrations.rs b/eclipta-cli/src/db/migrations.rs new file mode 100644 index 0000000..a190cb7 --- /dev/null +++ b/eclipta-cli/src/db/migrations.rs @@ -0,0 +1,131 @@ +use sqlx::{Pool, Postgres, Row}; +use crate::utils::logger::{success, info}; + +pub async fn run_migrations(pool: &Pool) -> Result<(), sqlx::Error> { + info("Checking database migrations..."); + + // Create migrations table if it doesn't exist + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS migrations ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + "# + ) + .execute(pool) + .await?; + + // Check if ebpf_programs table exists + let table_exists: bool = sqlx::query( + "SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'ebpf_programs' + )" + ) + .fetch_one(pool) + .await + .map(|row| row.get::(0))? + ; + + if !table_exists { + info("Creating ebpf_programs table..."); + create_ebpf_programs_table(pool).await?; + create_updated_at_trigger(pool).await?; + + // Record migration + sqlx::query( + "INSERT INTO migrations (name) VALUES ($1)" + ) + .bind("001_create_ebpf_programs_table") + .execute(pool) + .await?; + + success("Database migration completed successfully!"); + } else { + info("Database is up to date"); + } + + Ok(()) +} + +async fn create_ebpf_programs_table(pool: &Pool) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + CREATE TABLE ebpf_programs ( + id SERIAL PRIMARY KEY, + title TEXT NOT NULL, + description TEXT, + version TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'deactive', + path TEXT NOT NULL, + program_id INT, + map_ids INT[], + pinned_path TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT unique_title_version UNIQUE (title, version) + ) + "# + ) + .execute(pool) + .await?; + + Ok(()) +} + +async fn create_updated_at_trigger(pool: &Pool) -> Result<(), sqlx::Error> { + // Create the function + sqlx::query( + r#" + CREATE OR REPLACE FUNCTION set_updated_at() + RETURNS TRIGGER AS $$ + BEGIN + NEW.updated_at = NOW(); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + "# + ) + .execute(pool) + .await?; + + // Create the trigger + sqlx::query( + r#" + DROP TRIGGER IF EXISTS set_updated_at_trigger ON ebpf_programs; + "# + ) + .execute(pool) + .await?; + + sqlx::query( + r#" + CREATE TRIGGER set_updated_at_trigger + BEFORE UPDATE ON ebpf_programs + FOR EACH ROW + EXECUTE FUNCTION set_updated_at(); + "# + ) + .execute(pool) + .await?; + + Ok(()) +} + +pub async fn check_migration_status(pool: &Pool) -> Result { + let table_exists: bool = sqlx::query( + "SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'ebpf_programs' + )" + ) + .fetch_one(pool) + .await + .map(|row| row.get::(0))?; + + Ok(table_exists) +} diff --git a/eclipta-cli/src/db/mod.rs b/eclipta-cli/src/db/mod.rs index 1e3309b..3454d06 100644 --- a/eclipta-cli/src/db/mod.rs +++ b/eclipta-cli/src/db/mod.rs @@ -1 +1,2 @@ pub mod programs; +pub mod migrations; diff --git a/eclipta-cli/src/db/programs.rs b/eclipta-cli/src/db/programs.rs index c00d8b5..1cff0ec 100644 --- a/eclipta-cli/src/db/programs.rs +++ b/eclipta-cli/src/db/programs.rs @@ -1,4 +1,4 @@ -use sqlx::{Pool, Postgres}; +use sqlx::{Pool, Postgres, Row}; #[derive(Debug)] pub struct Program { @@ -33,8 +33,7 @@ pub async fn insert_program( } pub async fn list_programs(pool: &Pool) -> Result, sqlx::Error> { - let rows = sqlx::query_as!( - Program, + let rows = sqlx::query( r#" SELECT id, @@ -49,14 +48,25 @@ pub async fn list_programs(pool: &Pool) -> Result, sqlx:: .fetch_all(pool) .await?; - Ok(rows) + let programs = rows + .iter() + .map(|row| Program { + id: row.get("id"), + title: row.get("title"), + version: row.get("version"), + status: row.get("status"), + path: row.get("path"), + }) + .collect(); + + Ok(programs) } pub async fn delete_program(pool: &Pool, program_id: i32) -> Result<(), sqlx::Error> { - sqlx::query!( - "DELETE FROM ebpf_programs WHERE id = $1", - program_id + sqlx::query( + "DELETE FROM ebpf_programs WHERE id = $1" ) + .bind(program_id) .execute(pool) .await?; @@ -67,8 +77,7 @@ pub async fn get_program_by_id( pool: &Pool, program_id: i32, ) -> Result, sqlx::Error> { - let row = sqlx::query_as!( - Program, + let row = sqlx::query( r#" SELECT id, @@ -78,21 +87,28 @@ pub async fn get_program_by_id( path FROM ebpf_programs WHERE id = $1 - "#, - program_id + "# ) - .fetch_optional(pool) + .bind(program_id) + .fetch_optional(pool) .await?; - Ok(row) + let program = row.map(|row| Program { + id: row.get("id"), + title: row.get("title"), + version: row.get("version"), + status: row.get("status"), + path: row.get("path"), + }); + + Ok(program) } pub async fn get_program_by_title( pool: &Pool, title: &str, ) -> Result, sqlx::Error> { - let rows = sqlx::query_as!( - Program, + let rows = sqlx::query( r#" SELECT id, @@ -103,12 +119,23 @@ pub async fn get_program_by_title( FROM ebpf_programs WHERE title = $1 ORDER BY created_at DESC - "#, - title + "# ) + .bind(title) .fetch_all(pool) .await?; - Ok(rows) + let programs = rows + .iter() + .map(|row| Program { + id: row.get("id"), + title: row.get("title"), + version: row.get("version"), + status: row.get("status"), + path: row.get("path"), + }) + .collect(); + + Ok(programs) } diff --git a/eclipta-cli/src/main.rs b/eclipta-cli/src/main.rs index fb5cd14..ba0dc7a 100644 --- a/eclipta-cli/src/main.rs +++ b/eclipta-cli/src/main.rs @@ -37,6 +37,7 @@ use crate::commands::config::{ // STORE / DB COMMANDS use crate::commands::store::check_db::{handle_check_db, CheckDbOptions}; +use crate::commands::store::migrate::{handle_migrate, MigrateOptions}; // OTHER GLOBAL COMMANDS use crate::commands::{ @@ -70,6 +71,7 @@ enum Commands { Version(VersionOptions), Run(RunOptions), CheckDb(CheckDbOptions), + Migrate(MigrateOptions), Upload(UploadOptions), List, Remove(RemoveOptions), @@ -88,7 +90,7 @@ async fn handle_command(cmd: Commands) -> Result<(), Box> match cmd { Commands::Welcome => run_welcome(), Commands::Status => run_status(), - Commands::Load(opts) => handle_load(opts).await, + Commands::Load(opts) => handle_load(opts).await?, Commands::Unload(opts) => handle_unload(opts), Commands::Inspect(opts) => handle_inspect(opts), Commands::Logs(opts) => handle_logs(opts).await, @@ -101,6 +103,7 @@ async fn handle_command(cmd: Commands) -> Result<(), Box> Commands::Version(opts) => handle_version(opts).await?, Commands::Run(opts) => handle_run(opts).await, Commands::CheckDb(opts) => handle_check_db(opts).await?, + Commands::Migrate(opts) => handle_migrate(opts).await?, Commands::Upload(opts) => { if let Err(e) = handle_upload(opts).await { eprintln!("[UPLOAD ERROR] {}", e); @@ -112,11 +115,10 @@ async fn handle_command(cmd: Commands) -> Result<(), Box> } } Commands::Remove(opts) => { - if let Err(e) = handle_remove(opts).await { - eprintln!("[REMOVE ERROR] {}", e); - } -} - + if let Err(e) = handle_remove(opts).await { + eprintln!("[REMOVE ERROR] {}", e); + } + } } Ok(()) diff --git a/eclipta-cli/src/utils/db.rs b/eclipta-cli/src/utils/db.rs index 6ba51fe..bef1b90 100644 --- a/eclipta-cli/src/utils/db.rs +++ b/eclipta-cli/src/utils/db.rs @@ -1,73 +1,33 @@ use sqlx::{PgPool, Pool, Postgres}; use dotenvy::dotenv; use std::env; -use crate::utils::logger::success; +use crate::utils::logger::error; +use crate::db::migrations::{run_migrations, check_migration_status}; pub type DbPool = Pool; -pub async fn init_db() -> Result { +pub async fn ensure_db_ready() -> Result> { dotenv().ok(); let db_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set in .env"); let pool = PgPool::connect(&db_url).await?; - run_migrations(&pool).await?; + + // Check if migrations are needed + let is_ready = check_migration_status(&pool).await?; + + if !is_ready { + error("Database is not ready. Please run 'cargo run migrate' first to initialize the database."); + return Err("Database not initialized".into()); + } + Ok(pool) } -async fn run_migrations(pool: &Pool) -> Result<(), sqlx::Error> { - sqlx::query( - r#" - CREATE TABLE IF NOT EXISTS ebpf_programs ( - id SERIAL PRIMARY KEY, - title TEXT NOT NULL, - description TEXT, - version TEXT NOT NULL, - status TEXT NOT NULL DEFAULT 'deactive', - path TEXT NOT NULL, - program_id INT, - map_ids INT[], - pinned_path TEXT, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - CONSTRAINT unique_title_version UNIQUE (title, version) - ) - "# - ) - .execute(pool) - .await?; - - sqlx::query( - r#" - CREATE OR REPLACE FUNCTION set_updated_at() - RETURNS TRIGGER AS $$ - BEGIN - NEW.updated_at = NOW(); - RETURN NEW; - END; - $$ LANGUAGE plpgsql; - "# - ) - .execute(pool) - .await?; - - sqlx::query( - r#" - DROP TRIGGER IF EXISTS set_updated_at_trigger ON ebpf_programs; - "# - ) - .execute(pool) - .await?; - - sqlx::query( - r#" - CREATE TRIGGER set_updated_at_trigger - BEFORE UPDATE ON ebpf_programs - FOR EACH ROW - EXECUTE FUNCTION set_updated_at(); - "# - ) - .execute(pool) - .await?; - - success("Database migration successful!"); +pub async fn run_migrations_only() -> Result<(), Box> { + dotenv().ok(); + let db_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set in .env"); + let pool = PgPool::connect(&db_url).await?; + + run_migrations(&pool).await?; + Ok(()) }