diff --git a/Cargo.toml b/Cargo.toml index ace41ea..1f50db8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" testing = ["kernel/testing"] kthread_test_only = ["kernel/kthread_test_only"] # Run only kthread tests and exit kthread_stress_test = ["kernel/kthread_stress_test"] # Run kthread stress test (100+ kthreads) and exit +workqueue_test_only = ["kernel/workqueue_test_only"] # Run only workqueue tests and exit test_divide_by_zero = ["kernel/test_divide_by_zero"] test_invalid_opcode = ["kernel/test_invalid_opcode"] test_page_fault = ["kernel/test_page_fault"] diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index c58909c..ca074c1 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -20,6 +20,7 @@ test = false testing = [] kthread_test_only = ["testing"] # Run only kthread tests and exit kthread_stress_test = ["testing"] # Run kthread stress test (100+ kthreads) and exit +workqueue_test_only = ["testing"] # Run only workqueue tests and exit test_divide_by_zero = [] test_invalid_opcode = [] test_page_fault = [] diff --git a/kernel/src/main.rs b/kernel/src/main.rs index 743349a..b854edc 100644 --- a/kernel/src/main.rs +++ b/kernel/src/main.rs @@ -478,12 +478,31 @@ extern "C" fn kernel_main_on_kernel_stack(arg: *mut core::ffi::c_void) -> ! { process::init(); log::info!("Process management initialized"); + // Initialize workqueue subsystem (depends on kthread infrastructure) + task::workqueue::init_workqueue(); + // Test kthread lifecycle BEFORE creating userspace processes // (must be done early so scheduler doesn't preempt to userspace) #[cfg(feature = "testing")] test_kthread_lifecycle(); #[cfg(feature = "testing")] test_kthread_join(); + #[cfg(feature = "testing")] + test_workqueue(); + + // In workqueue_test_only mode, exit immediately after workqueue tests + #[cfg(feature = "workqueue_test_only")] + { + log::info!("=== WORKQUEUE_TEST_ONLY: All workqueue tests passed ==="); + log::info!("WORKQUEUE_TEST_ONLY_COMPLETE"); + // Exit QEMU with success code + unsafe { + use x86_64::instructions::port::Port; + let mut port = Port::new(0xf4); + port.write(0x00u32); // This causes QEMU to exit + } + loop { x86_64::instructions::hlt(); } + } // In kthread_test_only mode, exit immediately after join test #[cfg(feature = "kthread_test_only")] @@ -516,20 +535,20 @@ extern "C" fn kernel_main_on_kernel_stack(arg: *mut core::ffi::c_void) -> ! { loop { x86_64::instructions::hlt(); } } - #[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test")))] + #[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test"), not(feature = "workqueue_test_only")))] test_kthread_exit_code(); - #[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test")))] + #[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test"), not(feature = "workqueue_test_only")))] test_kthread_park_unpark(); - #[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test")))] + #[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test"), not(feature = "workqueue_test_only")))] test_kthread_double_stop(); - #[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test")))] + #[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test"), not(feature = "workqueue_test_only")))] test_kthread_should_stop_non_kthread(); - #[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test")))] + #[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test"), not(feature = "workqueue_test_only")))] test_kthread_stop_after_exit(); // Continue with the rest of kernel initialization... // (This will include creating user processes, enabling interrupts, etc.) - #[cfg(not(feature = "kthread_stress_test"))] + #[cfg(not(any(feature = "kthread_stress_test", feature = "workqueue_test_only")))] kernel_main_continue(); } @@ -1746,6 +1765,212 @@ fn test_kthread_stop_after_exit() { log::info!("=== KTHREAD STOP AFTER EXIT TEST: Completed ==="); } +/// Test workqueue functionality +/// This validates the Linux-style work queue implementation: +/// 1. Basic work execution via system workqueue +/// 2. Multiple work items execute in order +/// 3. Flush waits for all pending work +#[cfg(feature = "testing")] +fn test_workqueue() { + use alloc::sync::Arc; + use crate::task::workqueue::{flush_system_workqueue, schedule_work, schedule_work_fn, Work, Workqueue, WorkqueueFlags}; + use core::sync::atomic::{AtomicBool, AtomicU32, Ordering}; + + static EXEC_COUNT: AtomicU32 = AtomicU32::new(0); + static EXEC_ORDER: [AtomicU32; 3] = [ + AtomicU32::new(0), + AtomicU32::new(0), + AtomicU32::new(0), + ]; + + // Reset counters + EXEC_COUNT.store(0, Ordering::SeqCst); + for order in &EXEC_ORDER { + order.store(0, Ordering::SeqCst); + } + + log::info!("=== WORKQUEUE TEST: Starting workqueue test ==="); + + // Enable interrupts so worker thread can run + x86_64::instructions::interrupts::enable(); + + // Test 1: Basic execution + log::info!("WORKQUEUE_TEST: Testing basic execution..."); + let work1 = schedule_work_fn( + || { + EXEC_COUNT.fetch_add(1, Ordering::SeqCst); + log::info!("WORKQUEUE_TEST: work1 executed"); + }, + "test_work1", + ); + + // Wait for work1 to complete + work1.wait(); + let count = EXEC_COUNT.load(Ordering::SeqCst); + assert_eq!(count, 1, "work1 should have executed once"); + log::info!("WORKQUEUE_TEST: basic execution passed"); + + // Test 2: Multiple work items + log::info!("WORKQUEUE_TEST: Testing multiple work items..."); + let work2 = schedule_work_fn( + || { + let order = EXEC_COUNT.fetch_add(1, Ordering::SeqCst); + EXEC_ORDER[0].store(order, Ordering::SeqCst); + log::info!("WORKQUEUE_TEST: work2 executed (order={})", order); + }, + "test_work2", + ); + + let work3 = schedule_work_fn( + || { + let order = EXEC_COUNT.fetch_add(1, Ordering::SeqCst); + EXEC_ORDER[1].store(order, Ordering::SeqCst); + log::info!("WORKQUEUE_TEST: work3 executed (order={})", order); + }, + "test_work3", + ); + + let work4 = schedule_work_fn( + || { + let order = EXEC_COUNT.fetch_add(1, Ordering::SeqCst); + EXEC_ORDER[2].store(order, Ordering::SeqCst); + log::info!("WORKQUEUE_TEST: work4 executed (order={})", order); + }, + "test_work4", + ); + + // Wait for all work items + work2.wait(); + work3.wait(); + work4.wait(); + + let final_count = EXEC_COUNT.load(Ordering::SeqCst); + assert_eq!(final_count, 4, "all 4 work items should have executed"); + + // Verify execution order (work2 < work3 < work4) + let order2 = EXEC_ORDER[0].load(Ordering::SeqCst); + let order3 = EXEC_ORDER[1].load(Ordering::SeqCst); + let order4 = EXEC_ORDER[2].load(Ordering::SeqCst); + assert!(order2 < order3, "work2 should execute before work3"); + assert!(order3 < order4, "work3 should execute before work4"); + log::info!("WORKQUEUE_TEST: multiple work items passed"); + + // Test 3: Flush functionality + log::info!("WORKQUEUE_TEST: Testing flush..."); + static FLUSH_WORK_DONE: AtomicU32 = AtomicU32::new(0); + FLUSH_WORK_DONE.store(0, Ordering::SeqCst); + + let _flush_work = schedule_work_fn( + || { + FLUSH_WORK_DONE.fetch_add(1, Ordering::SeqCst); + log::info!("WORKQUEUE_TEST: flush_work executed"); + }, + "flush_work", + ); + + // Flush should wait for the work to complete + flush_system_workqueue(); + + let flush_done = FLUSH_WORK_DONE.load(Ordering::SeqCst); + assert_eq!(flush_done, 1, "flush should have waited for work to complete"); + log::info!("WORKQUEUE_TEST: flush completed"); + + // Test 4: Re-queue rejection test + log::info!("WORKQUEUE_TEST: Testing re-queue rejection..."); + static REQUEUE_BLOCK: AtomicBool = AtomicBool::new(false); + REQUEUE_BLOCK.store(false, Ordering::SeqCst); + let requeue_work = schedule_work_fn( + || { + // Use spin_loop instead of HLT - HLT waits for interrupts which is slow in CI + while !REQUEUE_BLOCK.load(Ordering::Acquire) { + core::hint::spin_loop(); + } + }, + "requeue_work", + ); + let requeue_work_clone = Arc::clone(&requeue_work); + let requeue_accepted = schedule_work(requeue_work_clone); + assert!( + !requeue_accepted, + "re-queue should be rejected while work is pending" + ); + REQUEUE_BLOCK.store(true, Ordering::Release); + requeue_work.wait(); + log::info!("WORKQUEUE_TEST: re-queue rejection passed"); + + // Test 5: Multi-item flush test + log::info!("WORKQUEUE_TEST: Testing multi-item flush..."); + static MULTI_FLUSH_COUNT: AtomicU32 = AtomicU32::new(0); + MULTI_FLUSH_COUNT.store(0, Ordering::SeqCst); + for _ in 0..6 { + let _work = schedule_work_fn( + || { + MULTI_FLUSH_COUNT.fetch_add(1, Ordering::SeqCst); + }, + "multi_flush_work", + ); + } + flush_system_workqueue(); + let multi_flush_done = MULTI_FLUSH_COUNT.load(Ordering::SeqCst); + assert_eq!( + multi_flush_done, 6, + "multi-item flush should execute all work items" + ); + log::info!("WORKQUEUE_TEST: multi-item flush passed"); + + // Test 6: Shutdown test + log::info!("WORKQUEUE_TEST: Testing workqueue shutdown..."); + static SHUTDOWN_WORK_DONE: AtomicBool = AtomicBool::new(false); + SHUTDOWN_WORK_DONE.store(false, Ordering::SeqCst); + let wq = Workqueue::new("test_shutdown_wq", WorkqueueFlags::default()); + let shutdown_work = Work::new( + || { + SHUTDOWN_WORK_DONE.store(true, Ordering::SeqCst); + }, + "shutdown_work", + ); + let shutdown_queued = wq.queue(Arc::clone(&shutdown_work)); + assert!(shutdown_queued, "shutdown work should be queued"); + // Wait for work to complete before destroying - critical for CI where worker + // thread may not be scheduled immediately after ensure_worker() spawns it + shutdown_work.wait(); + wq.destroy(); + let shutdown_done = SHUTDOWN_WORK_DONE.load(Ordering::SeqCst); + assert!( + shutdown_done, + "workqueue destroy should complete pending work" + ); + log::info!("WORKQUEUE_TEST: shutdown test passed"); + + // Test 7: Error path test + log::info!("WORKQUEUE_TEST: Testing error path re-queue..."); + static ERROR_PATH_BLOCK: AtomicBool = AtomicBool::new(false); + ERROR_PATH_BLOCK.store(false, Ordering::SeqCst); + let error_work = Work::new( + || { + // Use spin_loop instead of HLT - HLT waits for interrupts which is slow in CI + while !ERROR_PATH_BLOCK.load(Ordering::Acquire) { + core::hint::spin_loop(); + } + }, + "error_path_work", + ); + let first_schedule = schedule_work(Arc::clone(&error_work)); + assert!(first_schedule, "schedule_work should accept idle work"); + let second_schedule = schedule_work(Arc::clone(&error_work)); + assert!( + !second_schedule, + "schedule_work should reject re-queue while work is pending" + ); + ERROR_PATH_BLOCK.store(true, Ordering::Release); + error_work.wait(); + log::info!("WORKQUEUE_TEST: error path test passed"); + + x86_64::instructions::interrupts::disable(); + log::info!("WORKQUEUE_TEST: all tests passed"); + log::info!("=== WORKQUEUE TEST: Completed ==="); +} + /// Stress test for kthreads - creates 100+ kthreads and rapidly starts/stops them. /// This tests: /// 1. The race condition fix in kthread_park() (checking should_stop after setting parked) diff --git a/kernel/src/task/mod.rs b/kernel/src/task/mod.rs index 43fb29f..287e44e 100644 --- a/kernel/src/task/mod.rs +++ b/kernel/src/task/mod.rs @@ -14,6 +14,7 @@ pub mod process_task; pub mod scheduler; pub mod spawn; pub mod thread; +pub mod workqueue; // Re-export kthread public API for kernel-wide use // These are intentionally available but may not be called yet @@ -23,6 +24,13 @@ pub use kthread::{ kthread_unpark, KthreadError, KthreadHandle, }; +// Re-export workqueue public API for kernel-wide use +#[allow(unused_imports)] +pub use workqueue::{ + flush_system_workqueue, init_workqueue, schedule_work, schedule_work_fn, Work, Workqueue, + WorkqueueFlags, +}; + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct TaskId(u64); diff --git a/kernel/src/task/workqueue.rs b/kernel/src/task/workqueue.rs new file mode 100644 index 0000000..060eaa9 --- /dev/null +++ b/kernel/src/task/workqueue.rs @@ -0,0 +1,400 @@ +//! Linux-style work queues for deferred execution. +//! +//! Work queues allow kernel code to schedule work to run in process context +//! (i.e., in a kernel thread that can sleep), rather than interrupt context. +//! +//! # Architecture +//! +//! - `Work`: A unit of deferred work containing a closure +//! - `Workqueue`: Manages a queue of work items and a worker thread +//! - System workqueue: A global default workqueue for general use +//! +//! # Example +//! +//! ```rust,ignore +//! use kernel::task::workqueue::{schedule_work_fn, Work}; +//! +//! // Schedule work on the system workqueue +//! let work = schedule_work_fn(|| { +//! log::info!("Deferred work executing!"); +//! }, "example_work"); +//! +//! // Optionally wait for completion +//! work.wait(); +//! ``` + +use alloc::boxed::Box; +use alloc::collections::VecDeque; +use alloc::sync::Arc; +use core::cell::UnsafeCell; +use core::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}; + +use spin::Mutex; + +use super::kthread::{kthread_park, kthread_run, kthread_should_stop, kthread_stop, kthread_unpark, KthreadHandle}; +use super::scheduler; +use super::thread::ThreadState; + +/// Work states +const WORK_IDLE: u8 = 0; +const WORK_PENDING: u8 = 1; +const WORK_RUNNING: u8 = 2; + +/// Sentinel value indicating no waiter. +/// We use u64::MAX instead of 0 because TID 0 is valid (it's the idle/boot thread). +const NO_WAITER: u64 = u64::MAX; + +/// A unit of deferred work. +/// +/// Work items are created with a closure that will be executed by a worker thread. +/// The work can be queued to a workqueue and waited on for completion. +pub struct Work { + /// The function to execute (wrapped in Option for take() semantics) + func: UnsafeCell>>, + /// Current state: Idle -> Pending -> Running -> Idle + state: AtomicU8, + /// Set to true after func returns + completed: AtomicBool, + /// Thread ID waiting for completion (NO_WAITER = no waiter) + waiter: AtomicU64, + /// Debug name for this work item + name: &'static str, +} + +// SAFETY: Work is Send because: +// - func is only accessed by the worker thread (via take()) +// - All other fields are atomic or immutable +unsafe impl Send for Work {} +// SAFETY: Work is Sync because: +// - func access is serialized (queued once, executed once) +// - All other fields are atomic or immutable +unsafe impl Sync for Work {} + +impl Work { + /// Create a new work item with the given function and debug name. + pub fn new(func: F, name: &'static str) -> Arc + where + F: FnOnce() + Send + 'static, + { + Arc::new(Work { + func: UnsafeCell::new(Some(Box::new(func))), + state: AtomicU8::new(WORK_IDLE), + completed: AtomicBool::new(false), + waiter: AtomicU64::new(NO_WAITER), + name, + }) + } + + /// Check if this work item has completed execution. + #[allow(dead_code)] // Part of public API for callers to poll completion status + pub fn is_completed(&self) -> bool { + self.completed.load(Ordering::Acquire) + } + + /// Wait for this work item to complete (blocking). + /// + /// If the work is already complete, returns immediately. + /// Otherwise, blocks until the worker thread signals completion. + /// + /// This uses a proper block/unblock pattern like kthread_park() - the thread + /// is marked Blocked and removed from the ready queue, then execute()'s + /// unblock() call will wake it. This avoids relying on slow timer interrupts. + pub fn wait(&self) { + // Fast path: already completed + if self.completed.load(Ordering::Acquire) { + return; + } + + // Register ourselves as waiter + // Note: TID 0 is valid (it's the idle/boot thread), so we must use Option properly + let tid = match scheduler::current_thread_id() { + Some(id) => id, + None => { + // No valid thread ID (early boot before scheduler init) - fall back to spin loop + while !self.completed.load(Ordering::Acquire) { + core::hint::spin_loop(); + } + return; + } + }; + self.waiter.store(tid, Ordering::Release); + + // Check again after registering (handles race with completion) + if self.completed.load(Ordering::Acquire) { + self.waiter.store(NO_WAITER, Ordering::Release); + return; + } + + // Wait for completion using proper block/unblock pattern (like kthread_park) + while !self.completed.load(Ordering::Acquire) { + // CRITICAL: Disable interrupts while updating scheduler state to prevent + // race where execute() completes between our check and blocking + x86_64::instructions::interrupts::without_interrupts(|| { + // Re-check completed under interrupt disable to handle race with execute() + if self.completed.load(Ordering::Acquire) { + return; // Already done, don't block + } + + // Mark thread as Blocked and remove from ready queue + // This allows execute()'s unblock() call to actually wake us + scheduler::with_scheduler(|sched| { + if let Some(thread) = sched.current_thread_mut() { + thread.state = ThreadState::Blocked; + } + sched.remove_from_ready_queue(tid); + }); + }); + + // Check again after critical section - execute() might have completed + if self.completed.load(Ordering::Acquire) { + break; + } + + // Yield and wait for unblock from execute() + scheduler::yield_current(); + x86_64::instructions::hlt(); + } + + // Clear waiter + self.waiter.store(NO_WAITER, Ordering::Release); + } + + /// Get the debug name of this work item. + #[allow(dead_code)] // Part of public API for debugging and logging + pub fn name(&self) -> &'static str { + self.name + } + + /// Transition from Idle to Pending. Returns false if not Idle. + fn try_set_pending(&self) -> bool { + self.state + .compare_exchange(WORK_IDLE, WORK_PENDING, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + } + + /// Execute this work item (called by worker thread). + fn execute(&self) { + // Transition to Running + self.state.store(WORK_RUNNING, Ordering::Release); + + // Take and execute the function + // SAFETY: Only the worker thread calls execute(), and only once + let func = unsafe { (*self.func.get()).take() }; + if let Some(func) = func { + func(); + } + + // Mark complete and transition back to Idle + self.state.store(WORK_IDLE, Ordering::Release); + self.completed.store(true, Ordering::Release); + + // Wake waiter if any (NO_WAITER means no one is waiting) + let waiter = self.waiter.load(Ordering::Acquire); + if waiter != NO_WAITER { + scheduler::with_scheduler(|sched| { + sched.unblock(waiter); + }); + } + } +} + +/// Flags for workqueue creation (reserved for future use). +#[derive(Default)] +pub struct WorkqueueFlags { + // Future: max_workers, priority, cpu_affinity, etc. +} + +/// A workqueue manages a queue of work items and a worker thread. +pub struct Workqueue { + /// Queue of pending work items + queue: Mutex>>, + /// Worker thread handle (created on first queue) + worker: Mutex>, + /// Shutdown flag - signals worker to exit + shutdown: AtomicBool, + /// Debug name for this workqueue + name: &'static str, +} + +impl Workqueue { + /// Create a new workqueue with the given name. + pub fn new(name: &'static str, _flags: WorkqueueFlags) -> Arc { + Arc::new(Workqueue { + queue: Mutex::new(VecDeque::new()), + worker: Mutex::new(None), + shutdown: AtomicBool::new(false), + name, + }) + } + + /// Queue work for execution. Returns false if work is already pending. + /// + /// The work item must be in the Idle state to be queued. + pub fn queue(self: &Arc, work: Arc) -> bool { + // Reject if already pending + if !work.try_set_pending() { + log::warn!( + "workqueue({}): work '{}' already pending, rejecting", + self.name, + work.name + ); + return false; + } + + // Add to queue + self.queue.lock().push_back(work); + + // Ensure worker thread exists and wake it + self.ensure_worker(); + + true + } + + /// Wait for all pending work to complete (flush the queue). + pub fn flush(&self) { + // Create a sentinel work item + let sentinel = Work::new(|| {}, "flush_sentinel"); + + // Queue and wait for sentinel - all work before it will be complete + if sentinel.try_set_pending() { + self.queue.lock().push_back(Arc::clone(&sentinel)); + self.wake_worker(); + sentinel.wait(); + } + } + + /// Destroy this workqueue, stopping the worker thread. + /// + /// All pending work will be completed before destruction. + pub fn destroy(&self) { + // Signal shutdown + self.shutdown.store(true, Ordering::Release); + + // Stop worker thread + let worker = self.worker.lock().take(); + if let Some(handle) = worker { + // Wake worker so it sees shutdown + kthread_unpark(&handle); + // Signal stop + let _ = kthread_stop(&handle); + } + } + + /// Ensure worker thread exists, creating it if needed. + fn ensure_worker(self: &Arc) { + let mut worker_guard = self.worker.lock(); + if worker_guard.is_none() { + let wq = Arc::clone(self); + let thread_name = self.name; + match kthread_run( + move || { + worker_thread_fn(wq); + }, + thread_name, + ) { + Ok(handle) => { + log::info!("KWORKER_SPAWN: {} started", thread_name); + *worker_guard = Some(handle); + } + Err(e) => { + log::error!("workqueue({}): failed to create worker: {:?}", self.name, e); + } + } + } else { + // Worker exists, just wake it + if let Some(ref handle) = *worker_guard { + kthread_unpark(handle); + } + } + } + + /// Wake the worker thread (if it exists). + fn wake_worker(&self) { + if let Some(ref handle) = *self.worker.lock() { + kthread_unpark(handle); + } + } +} + +impl Drop for Workqueue { + fn drop(&mut self) { + self.destroy(); + } +} + +/// Worker thread main function. +fn worker_thread_fn(wq: Arc) { + // Enable interrupts for preemption + x86_64::instructions::interrupts::enable(); + + log::debug!("workqueue({}): worker thread started", wq.name); + + while !wq.shutdown.load(Ordering::Acquire) && !kthread_should_stop() { + // Try to get work from queue + let work = wq.queue.lock().pop_front(); + + match work { + Some(work) => { + log::debug!("workqueue({}): executing work '{}'", wq.name, work.name); + work.execute(); + } + None => { + // No work available, park until woken + kthread_park(); + } + } + } + + log::debug!("workqueue({}): worker thread exiting", wq.name); +} + +// ============================================================================= +// System Workqueue (Global Default) +// ============================================================================= + +/// Global system workqueue for general use. +static SYSTEM_WQ: Mutex>> = Mutex::new(None); + +/// Initialize the workqueue subsystem. +/// +/// Creates the system workqueue. Must be called during boot after kthread +/// infrastructure is ready. +pub fn init_workqueue() { + let wq = Workqueue::new("kworker/0", WorkqueueFlags::default()); + *SYSTEM_WQ.lock() = Some(wq); + log::info!("WORKQUEUE_INIT: workqueue system initialized"); +} + +/// Schedule work on the system workqueue. +/// +/// Returns true if work was queued, false if already pending. +pub fn schedule_work(work: Arc) -> bool { + if let Some(ref wq) = *SYSTEM_WQ.lock() { + wq.queue(work) + } else { + log::error!("schedule_work: system workqueue not initialized"); + false + } +} + +/// Create and schedule a work item on the system workqueue. +/// +/// Convenience function that creates a Work item and queues it in one step. +/// Returns the Work handle for waiting on completion. +pub fn schedule_work_fn(func: F, name: &'static str) -> Arc +where + F: FnOnce() + Send + 'static, +{ + let work = Work::new(func, name); + if !schedule_work(Arc::clone(&work)) { + log::warn!("schedule_work_fn: failed to queue work '{}'", name); + } + work +} + +/// Flush the system workqueue, waiting for all pending work to complete. +pub fn flush_system_workqueue() { + if let Some(ref wq) = *SYSTEM_WQ.lock() { + wq.flush(); + } +} diff --git a/xtask/src/main.rs b/xtask/src/main.rs index 5a5674e..2f276cb 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -1705,6 +1705,68 @@ fn get_boot_stages() -> Vec { failure_meaning: "Kthread not unparked - kthread_park() blocked forever or kthread_unpark() not working", check_hint: "Check kthread_park() and kthread_unpark() in kernel/src/task/kthread.rs", }, + // === Work Queues === + // Tests the Linux-style work queue infrastructure for deferred execution + BootStage { + name: "Workqueue system initialized", + marker: "WORKQUEUE_INIT: workqueue system initialized", + failure_meaning: "Work queue initialization failed - system workqueue not created", + check_hint: "Check kernel/src/task/workqueue.rs:init_workqueue()", + }, + BootStage { + name: "Kworker thread started", + marker: "KWORKER_SPAWN: kworker/0 started", + failure_meaning: "Worker thread spawn failed - kthread_run() or worker_thread_fn() broken", + check_hint: "Check kernel/src/task/workqueue.rs:ensure_worker() and worker_thread_fn()", + }, + BootStage { + name: "Workqueue basic execution passed", + marker: "WORKQUEUE_TEST: basic execution passed", + failure_meaning: "Basic work execution failed - work not queued or worker not executing", + check_hint: "Check Work::execute() and Workqueue::queue() in kernel/src/task/workqueue.rs", + }, + BootStage { + name: "Workqueue multiple items passed", + marker: "WORKQUEUE_TEST: multiple work items passed", + failure_meaning: "Multiple work items test failed - work not executed in order or not all executed", + check_hint: "Check worker_thread_fn() loop and queue ordering in kernel/src/task/workqueue.rs", + }, + BootStage { + name: "Workqueue flush completed", + marker: "WORKQUEUE_TEST: flush completed", + failure_meaning: "Flush test failed - flush_system_workqueue() did not wait for pending work", + check_hint: "Check Workqueue::flush() sentinel pattern in kernel/src/task/workqueue.rs", + }, + BootStage { + name: "Workqueue all tests passed", + marker: "WORKQUEUE_TEST: all tests passed", + failure_meaning: "One or more workqueue tests failed", + check_hint: "Check test_workqueue() in kernel/src/main.rs for specific assertion failures", + }, + BootStage { + name: "Workqueue re-queue rejection passed", + marker: "WORKQUEUE_TEST: re-queue rejection passed", + failure_meaning: "Re-queue rejection test failed - schedule_work allowed already-pending work", + check_hint: "Check Work::try_set_pending() and Workqueue::queue() in kernel/src/task/workqueue.rs", + }, + BootStage { + name: "Workqueue multi-item flush passed", + marker: "WORKQUEUE_TEST: multi-item flush passed", + failure_meaning: "Multi-item flush test failed - flush did not wait for all queued work", + check_hint: "Check Workqueue::flush() and flush_system_workqueue() in kernel/src/task/workqueue.rs", + }, + BootStage { + name: "Workqueue shutdown test passed", + marker: "WORKQUEUE_TEST: shutdown test passed", + failure_meaning: "Workqueue shutdown test failed - destroy did not complete pending work", + check_hint: "Check Workqueue::destroy() and worker_thread_fn() in kernel/src/task/workqueue.rs", + }, + BootStage { + name: "Workqueue error path test passed", + marker: "WORKQUEUE_TEST: error path test passed", + failure_meaning: "Workqueue error path test failed - schedule_work accepted re-queue", + check_hint: "Check Work::try_set_pending() and Workqueue::queue() in kernel/src/task/workqueue.rs", + }, // === Graphics syscalls === // Tests that the FbInfo syscall (410) works correctly BootStage {