Skip to content
Closed
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
testing = ["kernel/testing"]
kthread_test_only = ["kernel/kthread_test_only"] # Run only kthread tests and exit
kthread_stress_test = ["kernel/kthread_stress_test"] # Run kthread stress test (100+ kthreads) and exit
workqueue_test_only = ["kernel/workqueue_test_only"] # Run only workqueue tests and exit
test_divide_by_zero = ["kernel/test_divide_by_zero"]
test_invalid_opcode = ["kernel/test_invalid_opcode"]
test_page_fault = ["kernel/test_page_fault"]
Expand Down
1 change: 1 addition & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ test = false
testing = []
kthread_test_only = ["testing"] # Run only kthread tests and exit
kthread_stress_test = ["testing"] # Run kthread stress test (100+ kthreads) and exit
workqueue_test_only = ["testing"] # Run only workqueue tests and exit
test_divide_by_zero = []
test_invalid_opcode = []
test_page_fault = []
Expand Down
237 changes: 231 additions & 6 deletions kernel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,31 @@ extern "C" fn kernel_main_on_kernel_stack(arg: *mut core::ffi::c_void) -> ! {
process::init();
log::info!("Process management initialized");

// Initialize workqueue subsystem (depends on kthread infrastructure)
task::workqueue::init_workqueue();

// Test kthread lifecycle BEFORE creating userspace processes
// (must be done early so scheduler doesn't preempt to userspace)
#[cfg(feature = "testing")]
test_kthread_lifecycle();
#[cfg(feature = "testing")]
test_kthread_join();
#[cfg(feature = "testing")]
test_workqueue();

// In workqueue_test_only mode, exit immediately after workqueue tests
#[cfg(feature = "workqueue_test_only")]
{
log::info!("=== WORKQUEUE_TEST_ONLY: All workqueue tests passed ===");
log::info!("WORKQUEUE_TEST_ONLY_COMPLETE");
// Exit QEMU with success code
unsafe {
use x86_64::instructions::port::Port;
let mut port = Port::new(0xf4);
port.write(0x00u32); // This causes QEMU to exit
}
loop { x86_64::instructions::hlt(); }
}

// In kthread_test_only mode, exit immediately after join test
#[cfg(feature = "kthread_test_only")]
Expand Down Expand Up @@ -516,20 +535,20 @@ extern "C" fn kernel_main_on_kernel_stack(arg: *mut core::ffi::c_void) -> ! {
loop { x86_64::instructions::hlt(); }
}

#[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test")))]
#[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test"), not(feature = "workqueue_test_only")))]
test_kthread_exit_code();
#[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test")))]
#[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test"), not(feature = "workqueue_test_only")))]
test_kthread_park_unpark();
#[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test")))]
#[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test"), not(feature = "workqueue_test_only")))]
test_kthread_double_stop();
#[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test")))]
#[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test"), not(feature = "workqueue_test_only")))]
test_kthread_should_stop_non_kthread();
#[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test")))]
#[cfg(all(feature = "testing", not(feature = "kthread_test_only"), not(feature = "kthread_stress_test"), not(feature = "workqueue_test_only")))]
test_kthread_stop_after_exit();

// Continue with the rest of kernel initialization...
// (This will include creating user processes, enabling interrupts, etc.)
#[cfg(not(feature = "kthread_stress_test"))]
#[cfg(not(any(feature = "kthread_stress_test", feature = "workqueue_test_only")))]
kernel_main_continue();
}

Expand Down Expand Up @@ -1746,6 +1765,212 @@ fn test_kthread_stop_after_exit() {
log::info!("=== KTHREAD STOP AFTER EXIT TEST: Completed ===");
}

/// Test workqueue functionality
/// This validates the Linux-style work queue implementation:
/// 1. Basic work execution via system workqueue
/// 2. Multiple work items execute in order
/// 3. Flush waits for all pending work
#[cfg(feature = "testing")]
fn test_workqueue() {
use alloc::sync::Arc;
use crate::task::workqueue::{flush_system_workqueue, schedule_work, schedule_work_fn, Work, Workqueue, WorkqueueFlags};
use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};

static EXEC_COUNT: AtomicU32 = AtomicU32::new(0);
static EXEC_ORDER: [AtomicU32; 3] = [
AtomicU32::new(0),
AtomicU32::new(0),
AtomicU32::new(0),
];

// Reset counters
EXEC_COUNT.store(0, Ordering::SeqCst);
for order in &EXEC_ORDER {
order.store(0, Ordering::SeqCst);
}

log::info!("=== WORKQUEUE TEST: Starting workqueue test ===");

// Enable interrupts so worker thread can run
x86_64::instructions::interrupts::enable();

// Test 1: Basic execution
log::info!("WORKQUEUE_TEST: Testing basic execution...");
let work1 = schedule_work_fn(
|| {
EXEC_COUNT.fetch_add(1, Ordering::SeqCst);
log::info!("WORKQUEUE_TEST: work1 executed");
},
"test_work1",
);

// Wait for work1 to complete
work1.wait();
let count = EXEC_COUNT.load(Ordering::SeqCst);
assert_eq!(count, 1, "work1 should have executed once");
log::info!("WORKQUEUE_TEST: basic execution passed");

// Test 2: Multiple work items
log::info!("WORKQUEUE_TEST: Testing multiple work items...");
let work2 = schedule_work_fn(
|| {
let order = EXEC_COUNT.fetch_add(1, Ordering::SeqCst);
EXEC_ORDER[0].store(order, Ordering::SeqCst);
log::info!("WORKQUEUE_TEST: work2 executed (order={})", order);
},
"test_work2",
);

let work3 = schedule_work_fn(
|| {
let order = EXEC_COUNT.fetch_add(1, Ordering::SeqCst);
EXEC_ORDER[1].store(order, Ordering::SeqCst);
log::info!("WORKQUEUE_TEST: work3 executed (order={})", order);
},
"test_work3",
);

let work4 = schedule_work_fn(
|| {
let order = EXEC_COUNT.fetch_add(1, Ordering::SeqCst);
EXEC_ORDER[2].store(order, Ordering::SeqCst);
log::info!("WORKQUEUE_TEST: work4 executed (order={})", order);
},
"test_work4",
);

// Wait for all work items
work2.wait();
work3.wait();
work4.wait();

let final_count = EXEC_COUNT.load(Ordering::SeqCst);
assert_eq!(final_count, 4, "all 4 work items should have executed");

// Verify execution order (work2 < work3 < work4)
let order2 = EXEC_ORDER[0].load(Ordering::SeqCst);
let order3 = EXEC_ORDER[1].load(Ordering::SeqCst);
let order4 = EXEC_ORDER[2].load(Ordering::SeqCst);
assert!(order2 < order3, "work2 should execute before work3");
assert!(order3 < order4, "work3 should execute before work4");
log::info!("WORKQUEUE_TEST: multiple work items passed");

// Test 3: Flush functionality
log::info!("WORKQUEUE_TEST: Testing flush...");
static FLUSH_WORK_DONE: AtomicU32 = AtomicU32::new(0);
FLUSH_WORK_DONE.store(0, Ordering::SeqCst);

let _flush_work = schedule_work_fn(
|| {
FLUSH_WORK_DONE.fetch_add(1, Ordering::SeqCst);
log::info!("WORKQUEUE_TEST: flush_work executed");
},
"flush_work",
);

// Flush should wait for the work to complete
flush_system_workqueue();

let flush_done = FLUSH_WORK_DONE.load(Ordering::SeqCst);
assert_eq!(flush_done, 1, "flush should have waited for work to complete");
log::info!("WORKQUEUE_TEST: flush completed");

// Test 4: Re-queue rejection test
log::info!("WORKQUEUE_TEST: Testing re-queue rejection...");
static REQUEUE_BLOCK: AtomicBool = AtomicBool::new(false);
REQUEUE_BLOCK.store(false, Ordering::SeqCst);
let requeue_work = schedule_work_fn(
|| {
// Use spin_loop instead of HLT - HLT waits for interrupts which is slow in CI
while !REQUEUE_BLOCK.load(Ordering::Acquire) {
core::hint::spin_loop();
}
},
"requeue_work",
);
let requeue_work_clone = Arc::clone(&requeue_work);
let requeue_accepted = schedule_work(requeue_work_clone);
assert!(
!requeue_accepted,
"re-queue should be rejected while work is pending"
);
REQUEUE_BLOCK.store(true, Ordering::Release);
requeue_work.wait();
log::info!("WORKQUEUE_TEST: re-queue rejection passed");

// Test 5: Multi-item flush test
log::info!("WORKQUEUE_TEST: Testing multi-item flush...");
static MULTI_FLUSH_COUNT: AtomicU32 = AtomicU32::new(0);
MULTI_FLUSH_COUNT.store(0, Ordering::SeqCst);
for _ in 0..6 {
let _work = schedule_work_fn(
|| {
MULTI_FLUSH_COUNT.fetch_add(1, Ordering::SeqCst);
},
"multi_flush_work",
);
}
flush_system_workqueue();
let multi_flush_done = MULTI_FLUSH_COUNT.load(Ordering::SeqCst);
assert_eq!(
multi_flush_done, 6,
"multi-item flush should execute all work items"
);
log::info!("WORKQUEUE_TEST: multi-item flush passed");

// Test 6: Shutdown test
log::info!("WORKQUEUE_TEST: Testing workqueue shutdown...");
static SHUTDOWN_WORK_DONE: AtomicBool = AtomicBool::new(false);
SHUTDOWN_WORK_DONE.store(false, Ordering::SeqCst);
let wq = Workqueue::new("test_shutdown_wq", WorkqueueFlags::default());
let shutdown_work = Work::new(
|| {
SHUTDOWN_WORK_DONE.store(true, Ordering::SeqCst);
},
"shutdown_work",
);
let shutdown_queued = wq.queue(Arc::clone(&shutdown_work));
assert!(shutdown_queued, "shutdown work should be queued");
// Wait for work to complete before destroying - critical for CI where worker
// thread may not be scheduled immediately after ensure_worker() spawns it
shutdown_work.wait();
wq.destroy();
let shutdown_done = SHUTDOWN_WORK_DONE.load(Ordering::SeqCst);
assert!(
shutdown_done,
"workqueue destroy should complete pending work"
);
log::info!("WORKQUEUE_TEST: shutdown test passed");

// Test 7: Error path test
log::info!("WORKQUEUE_TEST: Testing error path re-queue...");
static ERROR_PATH_BLOCK: AtomicBool = AtomicBool::new(false);
ERROR_PATH_BLOCK.store(false, Ordering::SeqCst);
let error_work = Work::new(
|| {
// Use spin_loop instead of HLT - HLT waits for interrupts which is slow in CI
while !ERROR_PATH_BLOCK.load(Ordering::Acquire) {
core::hint::spin_loop();
}
},
"error_path_work",
);
let first_schedule = schedule_work(Arc::clone(&error_work));
assert!(first_schedule, "schedule_work should accept idle work");
let second_schedule = schedule_work(Arc::clone(&error_work));
assert!(
!second_schedule,
"schedule_work should reject re-queue while work is pending"
);
ERROR_PATH_BLOCK.store(true, Ordering::Release);
error_work.wait();
log::info!("WORKQUEUE_TEST: error path test passed");

x86_64::instructions::interrupts::disable();
log::info!("WORKQUEUE_TEST: all tests passed");
log::info!("=== WORKQUEUE TEST: Completed ===");
}

/// Stress test for kthreads - creates 100+ kthreads and rapidly starts/stops them.
/// This tests:
/// 1. The race condition fix in kthread_park() (checking should_stop after setting parked)
Expand Down
8 changes: 8 additions & 0 deletions kernel/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod process_task;
pub mod scheduler;
pub mod spawn;
pub mod thread;
pub mod workqueue;

// Re-export kthread public API for kernel-wide use
// These are intentionally available but may not be called yet
Expand All @@ -23,6 +24,13 @@ pub use kthread::{
kthread_unpark, KthreadError, KthreadHandle,
};

// Re-export workqueue public API for kernel-wide use
#[allow(unused_imports)]
pub use workqueue::{
flush_system_workqueue, init_workqueue, schedule_work, schedule_work_fn, Work, Workqueue,
WorkqueueFlags,
};

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct TaskId(u64);

Expand Down
Loading
Loading