Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hyperloop-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub fn executor_from_tasks(tokens: TokenStream) -> TokenStream {
static mut EXECUTOR: Option<Executor<#n_tasks>> = None;

let executor = unsafe {
EXECUTOR.get_or_insert(Executor::new())
EXECUTOR.get_or_insert(Executor::new()).get_ref()
};

#tasks
Expand Down
18 changes: 12 additions & 6 deletions hyperloop-priority-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,14 @@ where
}
}

pub fn get_sender(&self) -> PrioritySender<T> {
let queue: &'static Self = unsafe { &*(self as *const Self) };
/// Return sender to queue
///
/// # Safety
///
/// Sender contains a static reference to the queue, so the queue
/// should be static for the sender to be safe.
pub unsafe fn get_sender(&self) -> PrioritySender<T> {
let queue: &'static Self = &*(self as *const Self);

PrioritySender {
slots: &queue.slots,
Expand Down Expand Up @@ -540,7 +546,7 @@ mod tests {
#[test]
fn stack() {
let mut heap: PriorityQueue<u32, Min, 10> = PriorityQueue::new();
let sender = heap.get_sender();
let sender = unsafe { heap.get_sender() };

for i in 0..10 {
sender.stack_push(i).unwrap();
Expand Down Expand Up @@ -568,7 +574,7 @@ mod tests {
#[test]
fn channel() {
let mut queue: PriorityQueue<u32, Min, 10> = PriorityQueue::new();
let sender = queue.get_sender();
let sender = unsafe { queue.get_sender() };

for i in 0..10 {
sender.send(i).unwrap();
Expand Down Expand Up @@ -619,7 +625,7 @@ mod tests {
let n_items = n_threads * n_items_per_thread;

for i in 0..n_threads {
let sender = queue.get_sender();
let sender = unsafe { queue.get_sender() };
let handler = thread::spawn(move || {
for j in 0..n_items_per_thread {
loop {
Expand Down Expand Up @@ -683,7 +689,7 @@ mod tests_loom {

let handles: Vec<_> = (0..n_threads)
.map(|i| {
let sender = queue.get_sender();
let sender = unsafe { queue.get_sender() };
thread::spawn(move || {
sender.stack_push(i).unwrap();
})
Expand Down
64 changes: 30 additions & 34 deletions hyperloop/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,54 @@ impl<const N: usize> Executor<N> {
/// pointers to the tasks stored in the executor. The pointers can
/// be dereferenced at any time and will be dangling if the
/// exeutor is moved or dropped.
pub unsafe fn poll_tasks(&mut self) {
unsafe fn poll_tasks(&mut self) {
while let Some(ticket) = self.queue.pop() {
let _ = ticket.get_task().poll();
}
}

pub fn get_sender(&self) -> TaskSender {
unsafe fn get_sender(&self) -> TaskSender {
self.queue.get_sender()
}

pub fn get_ref(&'static mut self) -> ExecutorRef<N> {
ExecutorRef::new(self)
}
}

/// Wrapper around Executor to allow safe polling
pub struct ExecutorRef<const N: usize> {
executor: &'static mut Executor<N>,
}

impl<const N: usize> ExecutorRef<N> {
fn new(executor: &'static mut Executor<N>) -> Self {
Self { executor }
}

pub fn poll_tasks(&mut self) {
unsafe {
self.executor.poll_tasks();
}
}

pub fn get_sender(&self) -> TaskSender {
unsafe { self.executor.get_sender() }
}
}

#[cfg(test)]
mod tests {
use crossbeam_queue::ArrayQueue;
use hyperloop_macros::{executor_from_tasks, task};
use std::boxed::Box;
use std::sync::Arc;

use super::*;
use crate::task::Task;

#[test]
fn test_executor() {
let mut executor = Executor::<10>::new();
let mut executor = Box::leak(Box::new(Executor::<10>::new())).get_ref();
let queue = Arc::new(ArrayQueue::new(10));

let test_future = |queue, value| {
Expand All @@ -109,40 +134,11 @@ mod tests {
task3.add_to_executor(executor.get_sender()).unwrap();
task4.add_to_executor(executor.get_sender()).unwrap();

unsafe {
executor.poll_tasks();
}
executor.poll_tasks();

assert_eq!(queue.pop().unwrap(), 4);
assert_eq!(queue.pop().unwrap(), 2);
assert_eq!(queue.pop().unwrap(), 3);
assert_eq!(queue.pop().unwrap(), 1);
}

#[test]
fn macros() {
#[task(priority = 1)]
async fn test_task1(queue: Arc<ArrayQueue<u32>>) {
queue.push(1).unwrap();
}

#[task(priority = 2)]
async fn test_task2(queue: Arc<ArrayQueue<u32>>) {
queue.push(2).unwrap();
}

let queue = Arc::new(ArrayQueue::new(10));

let task1 = test_task1(queue.clone()).unwrap();
let task2 = test_task2(queue.clone()).unwrap();

let executor = executor_from_tasks!(task1, task2);

unsafe {
executor.poll_tasks();
}

assert_eq!(queue.pop().unwrap(), 2);
assert_eq!(queue.pop().unwrap(), 1);
}
}
34 changes: 34 additions & 0 deletions hyperloop/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,37 @@ mod priority_queue {

#[macro_use]
extern crate std;

#[cfg(test)]
mod tests {
use crate::executor::Executor;
use crate::task::Task;
use crossbeam_queue::ArrayQueue;
use hyperloop_macros::{executor_from_tasks, task};
use std::sync::Arc;

#[test]
fn macros() {
#[task(priority = 1)]
async fn test_task1(queue: Arc<ArrayQueue<u32>>) {
queue.push(1).unwrap();
}

#[task(priority = 2)]
async fn test_task2(queue: Arc<ArrayQueue<u32>>) {
queue.push(2).unwrap();
}

let queue = Arc::new(ArrayQueue::new(10));

let task1 = test_task1(queue.clone()).unwrap();
let task2 = test_task2(queue.clone()).unwrap();

let mut executor = executor_from_tasks!(task1, task2);

executor.poll_tasks();

assert_eq!(queue.pop().unwrap(), 2);
assert_eq!(queue.pop().unwrap(), 1);
}
}
23 changes: 6 additions & 17 deletions hyperloop/src/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ mod tests {
fn notify() {
let notification = Box::leak(Box::new(Notification::new()));

let mut executor = Executor::<10>::new();
let mut executor = Box::leak(Box::new(Executor::<10>::new())).get_ref();
let queue = Arc::new(ArrayQueue::new(10));

let wait = |receiver, queue| {
Expand All @@ -92,39 +92,28 @@ mod tests {

task1.add_to_executor(executor.get_sender()).unwrap();

unsafe {
executor.poll_tasks();
}
executor.poll_tasks();

assert_eq!(queue.pop(), Some(1));
assert_eq!(queue.pop(), None);

unsafe {
executor.poll_tasks();
}
executor.poll_tasks();

assert_eq!(queue.pop(), None);

notification.notify();

unsafe {
executor.poll_tasks();
}
executor.poll_tasks();

assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), None);

unsafe {
executor.poll_tasks();
}
executor.poll_tasks();

assert_eq!(queue.pop(), None);

notification.notify();

unsafe {
executor.poll_tasks();
}
executor.poll_tasks();

assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), None);
Expand Down
7 changes: 5 additions & 2 deletions hyperloop/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ where

#[cfg(test)]
mod tests {
use std::boxed::Box;

use crate::{
interrupt::yield_now,
priority_queue::{Max, PriorityQueue},
Expand All @@ -159,7 +161,8 @@ mod tests {

#[test]
fn task() {
let mut queue: PriorityQueue<Ticket, Max, 1> = PriorityQueue::new();
let queue: &'static mut PriorityQueue<Ticket, Max, 1> =
Box::leak(Box::new(PriorityQueue::new()));

let test_future = || {
|| {
Expand All @@ -175,7 +178,7 @@ mod tests {

let task = Task::new(test_future(), 1);

task.set_sender(queue.get_sender()).unwrap();
task.set_sender(unsafe { queue.get_sender() }).unwrap();

assert_eq!(task.get_state(), TaskState::NotQueued);

Expand Down
Loading