Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 8 additions & 24 deletions hyperloop-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use proc_macro::{self, TokenStream};
use quote::{format_ident, quote};
use syn::{
parse::Parse,
parse_quote,
punctuated::{Pair, Punctuated},
spanned::Spanned,
token::Comma,
FnArg, Ident, Pat, Stmt, Token,
Expr, FnArg, Ident, Pat, Stmt, Token,
};

#[derive(Debug, FromMeta)]
Expand Down Expand Up @@ -69,7 +68,7 @@ pub fn task(args: TokenStream, item: TokenStream) -> TokenStream {

let result = quote! {
#(#attrs)*
#visibility fn #name(#args) -> Option<&'static mut crate::task::Task<#future_type>> {
#visibility fn #name(#args) -> Option<crate::task::TaskHandle> {
type F = #future_type;

fn wrapper(#args) -> impl FnOnce() -> F {
Expand All @@ -84,7 +83,7 @@ pub fn task(args: TokenStream, item: TokenStream) -> TokenStream {
unsafe {
if let None = TASK {
TASK = Some(Task::new(wrapper(#arg_values), #priority));
Some(TASK.as_mut().unwrap())
Some(TASK.as_mut().unwrap().get_handle())
} else {
None
}
Expand All @@ -95,12 +94,12 @@ pub fn task(args: TokenStream, item: TokenStream) -> TokenStream {
}

struct Args {
args: Punctuated<Ident, Token![,]>,
args: Punctuated<Expr, Token![,]>,
}

impl Parse for Args {
fn parse(input: syn::parse::ParseStream) -> syn::Result<Self> {
match Punctuated::<Ident, Token![,]>::parse_terminated(&input) {
match Punctuated::<Expr, Token![,]>::parse_terminated(&input) {
Ok(args) => Ok(Self { args }),
Err(err) => Err(err),
}
Expand All @@ -120,35 +119,20 @@ impl quote::ToTokens for Statements {
}

#[proc_macro]
pub fn executor_from_tasks(tokens: TokenStream) -> TokenStream {
pub fn static_executor(tokens: TokenStream) -> TokenStream {
let args = syn::parse_macro_input!(tokens as Args).args;

let n_tasks = args.len();

let tasks = Statements {
data: args
.pairs()
.map(|pair| {
let task = pair.into_value();
let stmt: Stmt = parse_quote!(
#task.add_to_executor(executor.get_sender()).unwrap();
);
stmt
})
.collect(),
};

let result = quote! {
{
static mut EXECUTOR: Option<Executor<#n_tasks>> = None;

let executor = unsafe {
EXECUTOR.get_or_insert(Executor::new())
EXECUTOR.get_or_insert(Executor::new([#args]))
};

#tasks

executor
executor.get_handle()
}
};

Expand Down
90 changes: 49 additions & 41 deletions hyperloop-priority-queue/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![no_std]

use core::{marker::PhantomData, ops::Deref, sync::atomic::Ordering};
use core::{cell::UnsafeCell, marker::PhantomData, mem, ops::Deref, sync::atomic::Ordering};

#[cfg(not(loom))]
use core::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -102,7 +102,7 @@ where
}

fn item(&self) -> &T {
self.heap.slots[self.pos].as_ref().unwrap()
unsafe { self.heap.slot_mut(self.pos).as_ref().unwrap() }
}

unsafe fn slot_mut(&self) -> &mut Option<T> {
Expand All @@ -113,10 +113,7 @@ where
let slot = unsafe { self.slot_mut() };
let other_slot = unsafe { other.slot_mut() };

let item = slot.take();
*slot = other_slot.take();
*other_slot = item;

mem::swap(slot, other_slot);
other
}

Expand Down Expand Up @@ -187,17 +184,17 @@ impl AtomicStackPosition {

fn compare_exchange(&self, current: usize, new: usize) -> Result<usize, usize> {
self.atomic
.compare_exchange_weak(current, new, Ordering::Release, Ordering::Relaxed)
.compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Relaxed)
}
}

pub struct PrioritySender<T>
where
T: 'static,
{
slots: &'static [Option<T>],
available: &'static AtomicUsize,
stack_pos: &'static AtomicStackPosition,
slots: *const [UnsafeCell<Option<T>>],
available: *const AtomicUsize,
stack_pos: *const AtomicStackPosition,
}

impl<T> Clone for PrioritySender<T> {
Expand All @@ -210,22 +207,24 @@ impl<T> Clone for PrioritySender<T> {
}
}

unsafe impl<T> Send for PrioritySender<T> {}
unsafe impl<T> Sync for PrioritySender<T> {}

impl<T> PrioritySender<T> {
unsafe fn slot_mut(&self, index: usize) -> &mut Option<T> {
&mut *((&self.slots[index] as *const Option<T>) as *mut Option<T>)
&mut *(*self.slots)[index].get()
}

fn stack_push(&self, item: T) -> Result<(), T> {
let stack_pos = unsafe { &*self.stack_pos };

loop {
let current = self.stack_pos.load();
let current = stack_pos.load();

if current.pos() > 0 {
let new = current.reserved();

if let Ok(_) = self
.stack_pos
.compare_exchange(current.value(), new.value())
{
if let Ok(_) = stack_pos.compare_exchange(current.value(), new.value()) {
let slot = unsafe { self.slot_mut(new.pos()) };
*slot = Some(item);
break;
Expand All @@ -236,10 +235,10 @@ impl<T> PrioritySender<T> {
}

loop {
let old = self.stack_pos.load();
let old = stack_pos.load();
let new = old.pushed();

if let Ok(_) = self.stack_pos.compare_exchange(old.value(), new.value()) {
if let Ok(_) = stack_pos.compare_exchange(old.value(), new.value()) {
break;
}
}
Expand All @@ -248,13 +247,15 @@ impl<T> PrioritySender<T> {
}

pub fn send(&self, item: T) -> Result<(), T> {
let available = unsafe { &*self.available };

loop {
let available = self.available.load(Ordering::Acquire);
let n_available = available.load(Ordering::Acquire);

if available > 0 {
if let Ok(_) = self.available.compare_exchange(
available,
available - 1,
if n_available > 0 {
if let Ok(_) = available.compare_exchange(
n_available,
n_available - 1,
Ordering::Release,
Ordering::Relaxed,
) {
Expand Down Expand Up @@ -289,13 +290,13 @@ where

impl<'a, T, K, const N: usize> Deref for PeekMut<'a, T, K, N>
where
T: PartialOrd,
K: Kind,
T: PartialOrd + 'static,
K: Kind + 'static,
{
type Target = T;

fn deref(&self) -> &Self::Target {
self.queue.slots[0].as_ref().unwrap()
unsafe { self.queue.slot_mut(0).as_ref().unwrap() }
}
}

Expand All @@ -304,7 +305,7 @@ where
T: PartialOrd,
K: Kind,
{
slots: [Option<T>; N],
slots: [UnsafeCell<Option<T>>; N],
available: AtomicUsize,
stack_pos: AtomicStackPosition,
heap_size: usize,
Expand All @@ -318,16 +319,16 @@ where
{
pub fn new() -> Self {
Self {
slots: [(); N].map(|_| None),
slots: [(); N].map(|_| UnsafeCell::new(None)),
available: AtomicUsize::new(N),
stack_pos: AtomicStackPosition::new(N),
heap_size: 0,
_phantom: PhantomData,
}
}

pub fn get_sender(&self) -> PrioritySender<T> {
let queue: &'static Self = unsafe { &*(self as *const Self) };
pub unsafe fn get_sender(&self) -> PrioritySender<T> {
let queue: &'static Self = &*(self as *const Self);

PrioritySender {
slots: &queue.slots,
Expand All @@ -337,7 +338,7 @@ where
}

unsafe fn slot_mut(&self, index: usize) -> &mut Option<T> {
&mut *((&self.slots[index] as *const Option<T>) as *mut Option<T>)
&mut *self.slots[index].get()
}

fn get_node(&self, index: usize) -> Node<T, K, N> {
Expand All @@ -364,15 +365,17 @@ where
break Err(());
} else {
let new = current.popped();
let item = self.slots[current.pos()].take();
let item = unsafe { self.slot_mut(current.pos()).take() };

if let Ok(_) = self
.stack_pos
.compare_exchange(current.value(), new.value())
{
break Ok(item);
} else {
self.slots[current.pos()] = item;
unsafe {
*self.slot_mut(current.pos()) = item;
}
}
}
}
Expand Down Expand Up @@ -407,7 +410,9 @@ where
let index = self.heap_size;

if index < N {
self.slots[index] = Some(item);
unsafe {
*self.slot_mut(index) = Some(item);
}

self.heap_size += 1;

Expand All @@ -431,7 +436,8 @@ where
}

fn take_root(&mut self) -> Option<T> {
if let Some(item) = self.slots[0].take() {
if self.heap_size > 1 {
let item = unsafe { self.slot_mut(0).take() }.unwrap();
{
let root = self.get_root();
let last = self.get_last();
Expand All @@ -440,6 +446,9 @@ where
self.heap_size -= 1;

Some(item)
} else if self.heap_size == 1 {
self.heap_size -= 1;
Some(unsafe { self.slot_mut(0).take() }.unwrap())
} else {
None
}
Expand Down Expand Up @@ -501,9 +510,7 @@ where
#[cfg(not(loom))]
#[cfg(test)]
mod tests {
use std::thread;

use std::vec::Vec;
use std::{thread, vec::Vec};

use super::*;

Expand Down Expand Up @@ -540,7 +547,7 @@ mod tests {
#[test]
fn stack() {
let mut heap: PriorityQueue<u32, Min, 10> = PriorityQueue::new();
let sender = heap.get_sender();
let sender = unsafe { heap.get_sender() };

for i in 0..10 {
sender.stack_push(i).unwrap();
Expand Down Expand Up @@ -568,7 +575,7 @@ mod tests {
#[test]
fn channel() {
let mut queue: PriorityQueue<u32, Min, 10> = PriorityQueue::new();
let sender = queue.get_sender();
let sender = unsafe { queue.get_sender() };

for i in 0..10 {
sender.send(i).unwrap();
Expand Down Expand Up @@ -608,6 +615,7 @@ mod tests {
}

#[test]
#[cfg_attr(miri, ignore)]
fn channel_thread() {
const N: usize = 1000;
let mut queue: PriorityQueue<u128, Min, N> = PriorityQueue::new();
Expand All @@ -619,7 +627,7 @@ mod tests {
let n_items = n_threads * n_items_per_thread;

for i in 0..n_threads {
let sender = queue.get_sender();
let sender = unsafe { queue.get_sender() };
let handler = thread::spawn(move || {
for j in 0..n_items_per_thread {
loop {
Expand Down
2 changes: 1 addition & 1 deletion hyperloop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "hyperloop"
version = "0.1.0"
authors = ["Eivind Alexander Bergem <eivind.bergem@gmail.com>"]
edition = "2018"
edition = "2021"

[dependencies]
futures = {version = "0.3.15", default-features = false}
Expand Down
Loading