From 130cc83647c515f6fa9c629a4270521022a788a8 Mon Sep 17 00:00:00 2001 From: Eivind Alexander Bergem Date: Thu, 21 Apr 2022 14:22:49 +0200 Subject: [PATCH] Big rewrite - Added `TaskHandle` as a non-generic reference to `Task`. - Use array to initialize executor with tasks, avoiding macro. - Wrap `Executor` and `Scheduler` in handlers requiring pinned references. This way we don't need unsafe when calling from the handler. - Removed async timer task to avoid having N + 1 tasks. - Moved timer wake logic into interrupt handler. - Use UnsafeCell for interior mutability in priority queue - Added special handling when popping root from heap of size 1 to avoid swapping item with itself. - Use `UnsafeCell` for timer counter. - Use UnsafeCell and raw pointers in executor to keep miri happy. - Added macro to create static executor. - Cleaned up memory leaks in test code - Moved macro tests to integration test - Removed dependency on atomig - Disable std feature for crossbeam-utils - Removed dependency on log - Removed timer from executor - Used fugit instead of embedded time - Replaced raw pointer with pinned pointer in `ExecutorHandle` - Added systick tick-less timer - Use build script to allow memory.x in crate root - Added std timer - Added std example - Task takes future instead of function - Moved type alias to inne module to make compiler happy - Moved priority queue into hyperloop crate and fixed loom test - Added `nightly` and `macros` features - Added changelog - Added license - Automated readme --- .github/workflows/changelog.yml | 17 + .github/workflows/rust.yml | 80 +- CHANGELOG.md | 10 + Cargo.toml | 4 +- LICENSE | 373 ++++++ README.md | 82 +- examples/std/Cargo.toml | 7 + examples/std/src/main.rs | 37 + examples/stm32f103/.cargo/config.toml | 8 + examples/stm32f103/Cargo.toml | 14 + examples/stm32f103/Embed.toml | 5 + examples/stm32f103/build.rs | 43 + examples/stm32f103/memory.x | 6 + examples/stm32f103/src/main.rs | 105 ++ hyperloop-macros/Cargo.toml | 1 + hyperloop-macros/src/lib.rs | 87 +- hyperloop-priority-queue/Cargo.toml | 16 - hyperloop/Cargo.toml | 35 +- hyperloop/README.tpl | 44 + hyperloop/src/common.rs | 26 - hyperloop/src/executor.rs | 373 +++++- .../src/executor/priority_queue.rs | 190 +-- hyperloop/src/interrupt.rs | 152 --- hyperloop/src/lib.rs | 61 +- hyperloop/src/notify.rs | 132 -- hyperloop/src/task.rs | 244 ++-- hyperloop/src/timer.rs | 1103 +++++++++++------ hyperloop/src/timer/cortex_m.rs | 115 ++ hyperloop/src/timer/list.rs | 335 +++++ hyperloop/src/timer/std.rs | 50 + hyperloop/tests/tests.rs | 31 + rust-toolchain.toml | 2 +- 32 files changed, 2704 insertions(+), 1084 deletions(-) create mode 100644 .github/workflows/changelog.yml create mode 100644 CHANGELOG.md create mode 100644 LICENSE create mode 100644 examples/std/Cargo.toml create mode 100644 examples/std/src/main.rs create mode 100644 examples/stm32f103/.cargo/config.toml create mode 100644 examples/stm32f103/Cargo.toml create mode 100644 examples/stm32f103/Embed.toml create mode 100644 examples/stm32f103/build.rs create mode 100644 examples/stm32f103/memory.x create mode 100644 examples/stm32f103/src/main.rs delete mode 100644 hyperloop-priority-queue/Cargo.toml create mode 100644 hyperloop/README.tpl rename hyperloop-priority-queue/src/lib.rs => hyperloop/src/executor/priority_queue.rs (77%) delete mode 100644 hyperloop/src/interrupt.rs delete mode 100644 hyperloop/src/notify.rs create mode 100644 hyperloop/src/timer/cortex_m.rs create mode 100644 hyperloop/src/timer/list.rs create mode 100644 hyperloop/src/timer/std.rs create mode 100644 hyperloop/tests/tests.rs diff --git a/.github/workflows/changelog.yml b/.github/workflows/changelog.yml new file mode 100644 index 0000000..83915d3 --- /dev/null +++ b/.github/workflows/changelog.yml @@ -0,0 +1,17 @@ +name: "Pull Request Workflow" +on: + pull_request: + # The specific activity types are listed here to include "labeled" and "unlabeled" + # (which are not included by default for the "pull_request" trigger). + # This is needed to allow skipping enforcement of the changelog in PRs with specific labels, + # as defined in the (optional) "skipLabels" property. + types: [opened, synchronize, reopened, ready_for_review, labeled, unlabeled] + +jobs: + # Enforces the update of a changelog file on every pull request + changelog: + runs-on: ubuntu-latest + steps: + # The checkout step is needed since the enforcer relies on local git commands + - uses: actions/checkout@v2 + - uses: dangoslen/changelog-enforcer@v2 diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index d086cfb..49c8a0c 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -10,17 +10,87 @@ env: CARGO_TERM_COLOR: always jobs: - build: + readme: + runs-on: ubuntu-latest + + defaults: + run: + working-directory: ./hyperloop + + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/install@v0.1 + with: + crate: cargo-readme + version: latest + use-tool-cache: true + - name: Readme + run: cargo readme > ../README.md && git diff --exit-code + format: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Format run: cargo fmt --all -- --check + + build: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Build + run: cd hyperloop && cargo build --verbose + + build-std-example: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 - name: Build - run: cargo build --verbose + run: cd examples/std && cargo build --verbose + + build-stm32f03-example: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Install toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + target: thumbv7m-none-eabi + + - name: Build + run: cd examples/stm32f103 && cargo build --verbose + + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 - name: Run tests - run: cargo test --verbose + run: cd hyperloop && cargo test --verbose --all-features + + loom: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 - name: Loom tests - run: RUSTFLAGS="--cfg loom" cargo test --release tests_loom + run: cd hyperloop && RUSTFLAGS="--cfg loom" cargo test --release tests_loom + + miri: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: nightly + override: true + components: miri + - name: Miri tests + run: cd hyperloop && cargo miri test --verbose --all-features diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..d9c0000 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +- First release diff --git a/Cargo.toml b/Cargo.toml index ca0c782..5d6668e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,9 @@ [workspace] +resolver = "2" members = [ + "examples/std", + "examples/stm32f103", "hyperloop", "hyperloop-macros", - "hyperloop-priority-queue", ] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..ee6256c --- /dev/null +++ b/LICENSE @@ -0,0 +1,373 @@ +Mozilla Public License Version 2.0 +================================== + +1. Definitions +-------------- + +1.1. "Contributor" + means each individual or legal entity that creates, contributes to + the creation of, or owns Covered Software. + +1.2. "Contributor Version" + means the combination of the Contributions of others (if any) used + by a Contributor and that particular Contributor's Contribution. + +1.3. "Contribution" + means Covered Software of a particular Contributor. + +1.4. "Covered Software" + means Source Code Form to which the initial Contributor has attached + the notice in Exhibit A, the Executable Form of such Source Code + Form, and Modifications of such Source Code Form, in each case + including portions thereof. + +1.5. "Incompatible With Secondary Licenses" + means + + (a) that the initial Contributor has attached the notice described + in Exhibit B to the Covered Software; or + + (b) that the Covered Software was made available under the terms of + version 1.1 or earlier of the License, but not also under the + terms of a Secondary License. + +1.6. "Executable Form" + means any form of the work other than Source Code Form. + +1.7. "Larger Work" + means a work that combines Covered Software with other material, in + a separate file or files, that is not Covered Software. + +1.8. "License" + means this document. + +1.9. "Licensable" + means having the right to grant, to the maximum extent possible, + whether at the time of the initial grant or subsequently, any and + all of the rights conveyed by this License. + +1.10. "Modifications" + means any of the following: + + (a) any file in Source Code Form that results from an addition to, + deletion from, or modification of the contents of Covered + Software; or + + (b) any new file in Source Code Form that contains any Covered + Software. + +1.11. "Patent Claims" of a Contributor + means any patent claim(s), including without limitation, method, + process, and apparatus claims, in any patent Licensable by such + Contributor that would be infringed, but for the grant of the + License, by the making, using, selling, offering for sale, having + made, import, or transfer of either its Contributions or its + Contributor Version. + +1.12. "Secondary License" + means either the GNU General Public License, Version 2.0, the GNU + Lesser General Public License, Version 2.1, the GNU Affero General + Public License, Version 3.0, or any later versions of those + licenses. + +1.13. "Source Code Form" + means the form of the work preferred for making modifications. + +1.14. "You" (or "Your") + means an individual or a legal entity exercising rights under this + License. For legal entities, "You" includes any entity that + controls, is controlled by, or is under common control with You. For + purposes of this definition, "control" means (a) the power, direct + or indirect, to cause the direction or management of such entity, + whether by contract or otherwise, or (b) ownership of more than + fifty percent (50%) of the outstanding shares or beneficial + ownership of such entity. + +2. License Grants and Conditions +-------------------------------- + +2.1. Grants + +Each Contributor hereby grants You a world-wide, royalty-free, +non-exclusive license: + +(a) under intellectual property rights (other than patent or trademark) + Licensable by such Contributor to use, reproduce, make available, + modify, display, perform, distribute, and otherwise exploit its + Contributions, either on an unmodified basis, with Modifications, or + as part of a Larger Work; and + +(b) under Patent Claims of such Contributor to make, use, sell, offer + for sale, have made, import, and otherwise transfer either its + Contributions or its Contributor Version. + +2.2. Effective Date + +The licenses granted in Section 2.1 with respect to any Contribution +become effective for each Contribution on the date the Contributor first +distributes such Contribution. + +2.3. Limitations on Grant Scope + +The licenses granted in this Section 2 are the only rights granted under +this License. No additional rights or licenses will be implied from the +distribution or licensing of Covered Software under this License. +Notwithstanding Section 2.1(b) above, no patent license is granted by a +Contributor: + +(a) for any code that a Contributor has removed from Covered Software; + or + +(b) for infringements caused by: (i) Your and any other third party's + modifications of Covered Software, or (ii) the combination of its + Contributions with other software (except as part of its Contributor + Version); or + +(c) under Patent Claims infringed by Covered Software in the absence of + its Contributions. + +This License does not grant any rights in the trademarks, service marks, +or logos of any Contributor (except as may be necessary to comply with +the notice requirements in Section 3.4). + +2.4. Subsequent Licenses + +No Contributor makes additional grants as a result of Your choice to +distribute the Covered Software under a subsequent version of this +License (see Section 10.2) or under the terms of a Secondary License (if +permitted under the terms of Section 3.3). + +2.5. Representation + +Each Contributor represents that the Contributor believes its +Contributions are its original creation(s) or it has sufficient rights +to grant the rights to its Contributions conveyed by this License. + +2.6. Fair Use + +This License is not intended to limit any rights You have under +applicable copyright doctrines of fair use, fair dealing, or other +equivalents. + +2.7. Conditions + +Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted +in Section 2.1. + +3. Responsibilities +------------------- + +3.1. Distribution of Source Form + +All distribution of Covered Software in Source Code Form, including any +Modifications that You create or to which You contribute, must be under +the terms of this License. You must inform recipients that the Source +Code Form of the Covered Software is governed by the terms of this +License, and how they can obtain a copy of this License. You may not +attempt to alter or restrict the recipients' rights in the Source Code +Form. + +3.2. Distribution of Executable Form + +If You distribute Covered Software in Executable Form then: + +(a) such Covered Software must also be made available in Source Code + Form, as described in Section 3.1, and You must inform recipients of + the Executable Form how they can obtain a copy of such Source Code + Form by reasonable means in a timely manner, at a charge no more + than the cost of distribution to the recipient; and + +(b) You may distribute such Executable Form under the terms of this + License, or sublicense it under different terms, provided that the + license for the Executable Form does not attempt to limit or alter + the recipients' rights in the Source Code Form under this License. + +3.3. Distribution of a Larger Work + +You may create and distribute a Larger Work under terms of Your choice, +provided that You also comply with the requirements of this License for +the Covered Software. If the Larger Work is a combination of Covered +Software with a work governed by one or more Secondary Licenses, and the +Covered Software is not Incompatible With Secondary Licenses, this +License permits You to additionally distribute such Covered Software +under the terms of such Secondary License(s), so that the recipient of +the Larger Work may, at their option, further distribute the Covered +Software under the terms of either this License or such Secondary +License(s). + +3.4. Notices + +You may not remove or alter the substance of any license notices +(including copyright notices, patent notices, disclaimers of warranty, +or limitations of liability) contained within the Source Code Form of +the Covered Software, except that You may alter any license notices to +the extent required to remedy known factual inaccuracies. + +3.5. Application of Additional Terms + +You may choose to offer, and to charge a fee for, warranty, support, +indemnity or liability obligations to one or more recipients of Covered +Software. However, You may do so only on Your own behalf, and not on +behalf of any Contributor. You must make it absolutely clear that any +such warranty, support, indemnity, or liability obligation is offered by +You alone, and You hereby agree to indemnify every Contributor for any +liability incurred by such Contributor as a result of warranty, support, +indemnity or liability terms You offer. You may include additional +disclaimers of warranty and limitations of liability specific to any +jurisdiction. + +4. Inability to Comply Due to Statute or Regulation +--------------------------------------------------- + +If it is impossible for You to comply with any of the terms of this +License with respect to some or all of the Covered Software due to +statute, judicial order, or regulation then You must: (a) comply with +the terms of this License to the maximum extent possible; and (b) +describe the limitations and the code they affect. Such description must +be placed in a text file included with all distributions of the Covered +Software under this License. Except to the extent prohibited by statute +or regulation, such description must be sufficiently detailed for a +recipient of ordinary skill to be able to understand it. + +5. Termination +-------------- + +5.1. The rights granted under this License will terminate automatically +if You fail to comply with any of its terms. However, if You become +compliant, then the rights granted under this License from a particular +Contributor are reinstated (a) provisionally, unless and until such +Contributor explicitly and finally terminates Your grants, and (b) on an +ongoing basis, if such Contributor fails to notify You of the +non-compliance by some reasonable means prior to 60 days after You have +come back into compliance. Moreover, Your grants from a particular +Contributor are reinstated on an ongoing basis if such Contributor +notifies You of the non-compliance by some reasonable means, this is the +first time You have received notice of non-compliance with this License +from such Contributor, and You become compliant prior to 30 days after +Your receipt of the notice. + +5.2. If You initiate litigation against any entity by asserting a patent +infringement claim (excluding declaratory judgment actions, +counter-claims, and cross-claims) alleging that a Contributor Version +directly or indirectly infringes any patent, then the rights granted to +You by any and all Contributors for the Covered Software under Section +2.1 of this License shall terminate. + +5.3. In the event of termination under Sections 5.1 or 5.2 above, all +end user license agreements (excluding distributors and resellers) which +have been validly granted by You or Your distributors under this License +prior to termination shall survive termination. + +************************************************************************ +* * +* 6. Disclaimer of Warranty * +* ------------------------- * +* * +* Covered Software is provided under this License on an "as is" * +* basis, without warranty of any kind, either expressed, implied, or * +* statutory, including, without limitation, warranties that the * +* Covered Software is free of defects, merchantable, fit for a * +* particular purpose or non-infringing. The entire risk as to the * +* quality and performance of the Covered Software is with You. * +* Should any Covered Software prove defective in any respect, You * +* (not any Contributor) assume the cost of any necessary servicing, * +* repair, or correction. This disclaimer of warranty constitutes an * +* essential part of this License. No use of any Covered Software is * +* authorized under this License except under this disclaimer. * +* * +************************************************************************ + +************************************************************************ +* * +* 7. Limitation of Liability * +* -------------------------- * +* * +* Under no circumstances and under no legal theory, whether tort * +* (including negligence), contract, or otherwise, shall any * +* Contributor, or anyone who distributes Covered Software as * +* permitted above, be liable to You for any direct, indirect, * +* special, incidental, or consequential damages of any character * +* including, without limitation, damages for lost profits, loss of * +* goodwill, work stoppage, computer failure or malfunction, or any * +* and all other commercial damages or losses, even if such party * +* shall have been informed of the possibility of such damages. This * +* limitation of liability shall not apply to liability for death or * +* personal injury resulting from such party's negligence to the * +* extent applicable law prohibits such limitation. Some * +* jurisdictions do not allow the exclusion or limitation of * +* incidental or consequential damages, so this exclusion and * +* limitation may not apply to You. * +* * +************************************************************************ + +8. Litigation +------------- + +Any litigation relating to this License may be brought only in the +courts of a jurisdiction where the defendant maintains its principal +place of business and such litigation shall be governed by laws of that +jurisdiction, without reference to its conflict-of-law provisions. +Nothing in this Section shall prevent a party's ability to bring +cross-claims or counter-claims. + +9. Miscellaneous +---------------- + +This License represents the complete agreement concerning the subject +matter hereof. If any provision of this License is held to be +unenforceable, such provision shall be reformed only to the extent +necessary to make it enforceable. Any law or regulation which provides +that the language of a contract shall be construed against the drafter +shall not be used to construe this License against a Contributor. + +10. Versions of the License +--------------------------- + +10.1. New Versions + +Mozilla Foundation is the license steward. Except as provided in Section +10.3, no one other than the license steward has the right to modify or +publish new versions of this License. Each version will be given a +distinguishing version number. + +10.2. Effect of New Versions + +You may distribute the Covered Software under the terms of the version +of the License under which You originally received the Covered Software, +or under the terms of any subsequent version published by the license +steward. + +10.3. Modified Versions + +If you create software not governed by this License, and you want to +create a new license for such software, you may create and use a +modified version of this License if you rename the license and remove +any references to the name of the license steward (except to note that +such modified license differs from this License). + +10.4. Distributing Source Code Form that is Incompatible With Secondary +Licenses + +If You choose to distribute Source Code Form that is Incompatible With +Secondary Licenses under the terms of this version of the License, the +notice described in Exhibit B of this License must be attached. + +Exhibit A - Source Code Form License Notice +------------------------------------------- + + This Source Code Form is subject to the terms of the Mozilla Public + License, v. 2.0. If a copy of the MPL was not distributed with this + file, You can obtain one at https://mozilla.org/MPL/2.0/. + +If it is not possible or desirable to put the notice in a particular +file, then You may include the notice in a location (such as a LICENSE +file in a relevant directory) where a recipient would be likely to look +for such a notice. + +You may add additional accurate notices of copyright ownership. + +Exhibit B - "Incompatible With Secondary Licenses" Notice +--------------------------------------------------------- + + This Source Code Form is "Incompatible With Secondary Licenses", as + defined by the Mozilla Public License, v. 2.0. diff --git a/README.md b/README.md index d08d43a..7e94f2c 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,86 @@ -# Hyperloop +![Pipeline](https://github.com/rustne-kretser/hyperloop/actions/workflows/rust.yml/badge.svg) +[![Crates.io](https://img.shields.io/crates/v/hyperloop.svg)](https://crates.io/crates/hyperloop) +[![API reference](https://docs.rs/hyperloop/badge.svg)](https://docs.rs/hyperloop/) -Hyperloop is a priority based async runtime targeting embedded systems written in Rust. +# Hyperloop – the new superloop -## Project aims -- Provide a lean async runtime suitable for resource constrained microcontrollers. +Hyperloop is a priority based async runtime targetting embedded +systems. -- No heap allocations. +Features: +- Static allocation +- Priority based scheduling of async tasks +- No critical sections, uses atomics for concurrent access +- Unsafe code tested with miri, concurrency tested with loom -- No critical sections, using atomics for all synchronization, thus - being multicore friendly. +## Example +```rust +/// Simple task that blinks toggles LED every second +#[task(priority = 1)] +async fn blinky(timer: Timer, mut led: OutputPin) { + // Use periodic for even 1 second period + let mut periodic = timer.periodic(Duration::secs(1)); + loop { + // Toggle led and wait till next period + led.toggle(); + periodic.wait().await; + } +} -- Being a fully fledged alternative to stackful RTOS's with priority based - scheduling, but with stackless asynchronous tasks. +fn main() { + let timer_queue = [...]; + let led = [...]; + + // Create executor with blinky task + let mut executor = static_executor!(blinky(timer_queue.get_timer(), led).unwrap()); + + // Start the timer + timer_queue.start_timer(); + + loop { + timer_queue.wake_tasks(); + executor.poll_tasks(); + + timer_queue.wait_for_alarm(); + } +} +``` + +See the `examples/` directory for complete examples. + +For more details, see [docs](https://docs.rs/hyperloop/). ## Minimum supported Rust version (MSRV) -Hyperloop requires nightly for the time being, due to dependence on unstable features. +Requires nightly for macros. + +## Usage + +Add this to your Cargo.toml: + +```toml +[dependencies] +hyperloop = "0.1.0" +``` + +## License + +MPL-2.0 ## FAQ ### Why async? Can't we just use plain old stackful tasks? -Async tasks have a minimal memory footprint and are a good fit for memory constrained microcontrollers. Tasks should be cheap and you should be able to another task without having to worry much about memory usage. Stackful tasks need a lot of memory for the stack, especially if you need string formatting for logging. Stackful tasks do allow for preemption, but it comes at a high price. +Async tasks have a minimal memory footprint and are a good fit for +memory constrained microcontrollers. Tasks should be cheap and you +should be able to another task without having to worry much about +memory usage. Stackful tasks need a lot of memory for the stack, +especially if you need string formatting for logging. Stackful tasks +do allow for preemption, but it comes at a high price. ### How does Hyperloop differ from [Embassy](https://github.com/embassy-rs/embassy) -Embassy provides not only an executor, but a whole embedded async ecosystem. Hyperloop is much more limited in scope. The main difference between the Embassy exeutor and Hyperloop is that Hyperloop uses priority based scheduling. +Embassy provides not only an executor, but a whole embedded async +ecosystem. Hyperloop is much more limited in scope. The main +difference between the Embassy exeutor and Hyperloop is that Hyperloop +uses priority based scheduling. diff --git a/examples/std/Cargo.toml b/examples/std/Cargo.toml new file mode 100644 index 0000000..bb25f3e --- /dev/null +++ b/examples/std/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "std" +version = "0.1.0" +edition = "2021" + +[dependencies] +hyperloop = {path = "../../hyperloop", features = ["std"]} diff --git a/examples/std/src/main.rs b/examples/std/src/main.rs new file mode 100644 index 0000000..da15d7e --- /dev/null +++ b/examples/std/src/main.rs @@ -0,0 +1,37 @@ +#![feature(type_alias_impl_trait)] + +use std::{pin::pin, time::Duration}; + +use hyperloop::{ + executor::Executor, + task, + timer::{std::StdTimer, TimerQueue}, +}; + +type Timer = hyperloop::timer::Timer; + +#[task(priority = 1)] +async fn hello(timer: Timer) { + let mut periodic = timer.periodic(Duration::from_secs(1)); + let start = timer.now(); + + loop { + println!("Hello at {:?}", start.elapsed()); + periodic.wait().await; + } +} + +fn main() { + let timer_queue = pin!(TimerQueue::new(StdTimer::new())); + let mut executor = pin!(Executor::new([ + hello(timer_queue.as_ref().get_timer()).unwrap() + ])); + let mut handle = executor.as_mut().get_handle(); + + loop { + timer_queue.wake_tasks(); + handle.poll_tasks(); + + timer_queue.wait_for_alarm(); + } +} diff --git a/examples/stm32f103/.cargo/config.toml b/examples/stm32f103/.cargo/config.toml new file mode 100644 index 0000000..51a5deb --- /dev/null +++ b/examples/stm32f103/.cargo/config.toml @@ -0,0 +1,8 @@ +[target.'cfg(all(target_arch = "arm", target_os = "none"))'] +#runner = "probe-run --chip nRF52840_xxAA" +rustflags = [ + "-C", "link-arg=-Tdefmt.x", +] + +[build] +target = "thumbv7m-none-eabi" diff --git a/examples/stm32f103/Cargo.toml b/examples/stm32f103/Cargo.toml new file mode 100644 index 0000000..1f4dbee --- /dev/null +++ b/examples/stm32f103/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "stm32f103" +version = "0.1.0" +edition = "2021" + +[dependencies] +cortex-m = { version = "0.7.7", features = ["critical-section-single-core"] } +cortex-m-rt = "0.7.3" +defmt = "0.3.8" +defmt-rtt = "0.4.1" +fugit = { version = "0.3.7", features = ["defmt"] } +hyperloop = {path = "../../hyperloop"} +panic-probe = { version = "0.3.2", features = ["defmt", "print-defmt"] } +stm32f1xx-hal = { version = "0.10.0", features = ["rt", "stm32f103"] } diff --git a/examples/stm32f103/Embed.toml b/examples/stm32f103/Embed.toml new file mode 100644 index 0000000..61985e3 --- /dev/null +++ b/examples/stm32f103/Embed.toml @@ -0,0 +1,5 @@ +[default.rtt] +enabled = true +channels = [ + { up = 0, name = "defmt_rtt", format = "Defmt" }, +] diff --git a/examples/stm32f103/build.rs b/examples/stm32f103/build.rs new file mode 100644 index 0000000..2a51f4e --- /dev/null +++ b/examples/stm32f103/build.rs @@ -0,0 +1,43 @@ +//! This build script copies the `memory.x` file from the crate root into +//! a directory where the linker can always find it at build time. +//! For many projects this is optional, as the linker always searches the +//! project root directory -- wherever `Cargo.toml` is. However, if you +//! are using a workspace or have a more complicated build setup, this +//! build script becomes required. Additionally, by requesting that +//! Cargo re-run the build script whenever `memory.x` is changed, +//! updating `memory.x` ensures a rebuild of the application with the +//! new memory settings. +//! +//! The build script also sets the linker flags to tell it which link script to use. + +use std::env; +use std::fs::File; +use std::io::Write; +use std::path::PathBuf; + +fn main() { + // Put `memory.x` in our output directory and ensure it's + // on the linker search path. + let out = &PathBuf::from(env::var_os("OUT_DIR").unwrap()); + File::create(out.join("memory.x")) + .unwrap() + .write_all(include_bytes!("memory.x")) + .unwrap(); + println!("cargo:rustc-link-search={}", out.display()); + + // By default, Cargo will re-run a build script whenever + // any file in the project changes. By specifying `memory.x` + // here, we ensure the build script is only re-run when + // `memory.x` is changed. + println!("cargo:rerun-if-changed=memory.x"); + + // Specify linker arguments. + + // `--nmagic` is required if memory section addresses are not aligned to 0x10000, + // for example the FLASH and RAM sections in your `memory.x`. + // See https://github.com/rust-embedded/cortex-m-quickstart/pull/95 + println!("cargo:rustc-link-arg=--nmagic"); + + // Set the linker script to the one provided by cortex-m-rt. + println!("cargo:rustc-link-arg=-Tlink.x"); +} diff --git a/examples/stm32f103/memory.x b/examples/stm32f103/memory.x new file mode 100644 index 0000000..71f245d --- /dev/null +++ b/examples/stm32f103/memory.x @@ -0,0 +1,6 @@ +/* Linker script for the STM32F103C8T6 */ +MEMORY +{ + FLASH : ORIGIN = 0x08000000, LENGTH = 64K + RAM : ORIGIN = 0x20000000, LENGTH = 20K +} diff --git a/examples/stm32f103/src/main.rs b/examples/stm32f103/src/main.rs new file mode 100644 index 0000000..a347b49 --- /dev/null +++ b/examples/stm32f103/src/main.rs @@ -0,0 +1,105 @@ +#![no_std] +#![no_main] +#![feature(type_alias_impl_trait)] + +use core::{cell::Cell, pin::Pin}; + +use panic_probe as _; + +use cortex_m::interrupt::Mutex; +use cortex_m_rt::{entry, exception}; +use hyperloop::{ + static_executor, task, + timer::cortex_m::{Duration, SysTickTimer}, + timer::TimerQueue, +}; +use stm32f1xx_hal::{ + device::Peripherals, + gpio::{ErasedPin, Output}, + prelude::*, +}; + +use defmt_rtt as _; + +const SYSCLK_FREQ_HZ: u32 = 8_000_000; + +type Timer = hyperloop::timer::Timer>; + +static G_TIMER_QUEUE: Mutex>>>>> = + Mutex::new(Cell::new(None)); + +/// Systick exception handler +#[exception] +fn SysTick() { + static mut TIMER_QUEUE: Option>>> = None; + + let timer_queue = TIMER_QUEUE.get_or_insert_with(|| { + cortex_m::interrupt::free(|cs| G_TIMER_QUEUE.borrow(cs).take().unwrap()) + }); + + timer_queue.interrupt_handler(); +} + +/// Simple task that blinks toggles LED every second +#[task(priority = 1)] +async fn blinky(timer: Timer, mut led: ErasedPin) { + // Use periodic for even 1 second period + let mut periodic = timer.periodic(Duration::secs(1)); + + loop { + defmt::println!("Time: {}", timer.now().duration_since_epoch().to_micros()); + + // Toggle led and wait till next period + led.toggle(); + periodic.wait().await; + } +} + +#[entry] +fn main() -> ! { + let mut cp = cortex_m::Peripherals::take().unwrap(); + let dp = Peripherals::take().unwrap(); + + // Configure PC13 pin to blink LED + let mut gpioc = dp.GPIOC.split(); + let mut led = gpioc.pc13.into_push_pull_output(&mut gpioc.crh).erase(); + led.set_high(); // Turn off + + // Set up systick timer + let timer_queue = { + static mut TIMER_QUEUE: Option>> = None; + + unsafe { + TIMER_QUEUE = Some(TimerQueue::new(SysTickTimer::new( + &mut cp.DCB, + cp.DWT, + cp.SYST, + SYSCLK_FREQ_HZ, + ))); + } + + let timer_queue = + unsafe { Pin::new_unchecked(TIMER_QUEUE.as_ref().unwrap_or_else(|| unreachable!())) }; + + cortex_m::interrupt::free(|cs| { + G_TIMER_QUEUE.borrow(cs).replace(Some(timer_queue)); + }); + + timer_queue + }; + + // Create executor with blinky task + let mut executor = static_executor!(blinky(timer_queue.get_timer(), led).unwrap()); + + loop { + timer_queue.wake_tasks(); + executor.poll_tasks(); + + // Commented out `wait_for_alarm()` because `wfi` prevents + // debugger from connecting, see + // https://github.com/probe-rs/probe-rs/issues/350#issuecomment-740550519 + // for a fix + + // timer_queue.wait_for_alarm(); + } +} diff --git a/hyperloop-macros/Cargo.toml b/hyperloop-macros/Cargo.toml index 84589a0..bf89ba5 100644 --- a/hyperloop-macros/Cargo.toml +++ b/hyperloop-macros/Cargo.toml @@ -2,6 +2,7 @@ name = "hyperloop-macros" version = "0.1.0" edition = "2021" +license = "MPL-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/hyperloop-macros/src/lib.rs b/hyperloop-macros/src/lib.rs index d2ef941..fdefe84 100644 --- a/hyperloop-macros/src/lib.rs +++ b/hyperloop-macros/src/lib.rs @@ -2,15 +2,14 @@ #![feature(box_into_inner)] use darling::FromMeta; -use proc_macro::{self, TokenStream}; +use proc_macro::TokenStream; use quote::{format_ident, quote}; use syn::{ parse::Parse, - parse_quote, punctuated::{Pair, Punctuated}, spanned::Spanned, token::Comma, - FnArg, Ident, Pat, Stmt, Token, + Expr, FnArg, Ident, Pat, Token, }; #[derive(Debug, FromMeta)] @@ -18,7 +17,20 @@ struct TaskArgs { priority: u8, } -// This macro has liberally borrowed from embassy +/// Attribute macro for creating statically allocated async task +/// +/// # Example +/// ``` +/// #[task(priority = 1)] +/// async fn task() { +/// loop { +/// // Do stuff in here +/// } +/// } +/// +/// let task_handle = task().unwrap(); +/// ``` +/// #[proc_macro_attribute] pub fn task(args: TokenStream, item: TokenStream) -> TokenStream { let macro_args = syn::parse_macro_input!(args as syn::AttributeArgs); @@ -69,22 +81,25 @@ pub fn task(args: TokenStream, item: TokenStream) -> TokenStream { let result = quote! { #(#attrs)* - #visibility fn #name(#args) -> Option<&'static mut crate::task::Task<#future_type>> { - type F = #future_type; + #visibility fn #name(#args) -> Option<::hyperloop::task::TaskHandle> { + mod inner { + use super::*; - fn wrapper(#args) -> impl FnOnce() -> F { - move || { + pub type F = #future_type; + + pub fn wrapper(#args) -> F { #task_fn task(#arg_values) } } - static mut TASK: Option> = None; + static mut TASK: Option<::hyperloop::task::Task> = None; unsafe { - if let None = TASK { - TASK = Some(Task::new(wrapper(#arg_values), #priority)); - Some(TASK.as_mut().unwrap()) + if TASK.is_none() { + TASK = Some(::hyperloop::task::Task::new(inner::wrapper(#arg_values), + #priority)); + Some(::core::pin::Pin::static_mut(TASK.as_mut().unwrap()).get_handle()) } else { None } @@ -95,60 +110,42 @@ pub fn task(args: TokenStream, item: TokenStream) -> TokenStream { } struct Args { - args: Punctuated, + args: Punctuated, } impl Parse for Args { fn parse(input: syn::parse::ParseStream) -> syn::Result { - match Punctuated::::parse_terminated(&input) { + match Punctuated::::parse_terminated(input) { Ok(args) => Ok(Self { args }), Err(err) => Err(err), } } } -struct Statements { - data: Vec, -} - -impl quote::ToTokens for Statements { - fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { - for stmt in self.data.iter() { - stmt.to_tokens(tokens); - } - } -} - +/// Macro for creating a static allocator +/// +/// Returns an [`ExecutorHandle`] +/// +/// # Example +/// ```ignore +/// let mut executor = static_executor!(some_task(), another_task()); +/// ``` #[proc_macro] -pub fn executor_from_tasks(tokens: TokenStream) -> TokenStream { +pub fn static_executor(tokens: TokenStream) -> TokenStream { let args = syn::parse_macro_input!(tokens as Args).args; let n_tasks = args.len(); - let tasks = Statements { - data: args - .pairs() - .map(|pair| { - let task = pair.into_value(); - let stmt: Stmt = parse_quote!( - #task.add_to_executor(executor.get_sender()).unwrap(); - ); - stmt - }) - .collect(), - }; - let result = quote! { { - static mut EXECUTOR: Option> = None; + static mut EXECUTOR: Option<::hyperloop::executor::Executor<#n_tasks>> = None; + let args = [#args]; let executor = unsafe { - EXECUTOR.get_or_insert(Executor::new()) + EXECUTOR.get_or_insert(::hyperloop::executor::Executor::new(args)) }; - #tasks - - executor + ::core::pin::Pin::static_mut(executor).get_handle() } }; diff --git a/hyperloop-priority-queue/Cargo.toml b/hyperloop-priority-queue/Cargo.toml deleted file mode 100644 index b52f775..0000000 --- a/hyperloop-priority-queue/Cargo.toml +++ /dev/null @@ -1,16 +0,0 @@ -[package] -name = "hyperloop-priority-queue" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -atomig = {version = "0.3.2", features = ["derive"]} -crossbeam-utils = "0.8.5" - -[target.'cfg(loom)'.dependencies] -loom = "0.5.4" - -[dev-dependencies.crossbeam-queue] -version = "0.3" diff --git a/hyperloop/Cargo.toml b/hyperloop/Cargo.toml index 05e0477..e009ac7 100644 --- a/hyperloop/Cargo.toml +++ b/hyperloop/Cargo.toml @@ -2,22 +2,35 @@ name = "hyperloop" version = "0.1.0" authors = ["Eivind Alexander Bergem "] -edition = "2018" +license = "MPL-2.0" +edition = "2021" [dependencies] -futures = {version = "0.3.15", default-features = false} -embedded-time = "0.12.0" -atomig = {version = "0.3.2", features = ["derive"]} -hyperloop-priority-queue = {path = "../hyperloop-priority-queue"} +futures = {version = "0.3.30", default-features = false} +pinned-aliasable = "0.1.3" +fugit = { version = "0.3.7", features = ["defmt"], optional = true} +cortex-m = {version = "0.7.7", optional = true} +defmt = "0.3.8" +crossbeam-utils = { version = "0.8.20", default-features = false, features = ["loom"] } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["cortex-m", "nightly", "macros"] +cortex-m = ["dep:cortex-m", "fugit"] +std = [] +nightly = [] +macros = ["hyperloop-macros"] + +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] } -[dependencies.log] -version = "0.4" -default-features = false +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies.hyperloop-macros] path = "../hyperloop-macros" +optional = true + +[dev-dependencies] +crossbeam-queue = "0.3.11" -[dev-dependencies.crossbeam-queue] -version = "0.3" +[target.'cfg(loom)'.dependencies] +loom = "0.7" diff --git a/hyperloop/README.tpl b/hyperloop/README.tpl new file mode 100644 index 0000000..ba8974d --- /dev/null +++ b/hyperloop/README.tpl @@ -0,0 +1,44 @@ +![Pipeline](https://github.com/rustne-kretser/hyperloop/actions/workflows/rust.yml/badge.svg) +[![Crates.io](https://img.shields.io/crates/v/hyperloop.svg)](https://crates.io/crates/hyperloop) +[![API reference](https://docs.rs/hyperloop/badge.svg)](https://docs.rs/hyperloop/) + +# Hyperloop – the new superloop + +{{readme}} + +For more details, see [docs](https://docs.rs/hyperloop/). + +## Minimum supported Rust version (MSRV) + +Requires nightly for macros. + +## Usage + +Add this to your Cargo.toml: + +```toml +[dependencies] +{{crate}} = "{{version}}" +``` + +## License + +{{license}} + +## FAQ + +### Why async? Can't we just use plain old stackful tasks? + +Async tasks have a minimal memory footprint and are a good fit for +memory constrained microcontrollers. Tasks should be cheap and you +should be able to another task without having to worry much about +memory usage. Stackful tasks need a lot of memory for the stack, +especially if you need string formatting for logging. Stackful tasks +do allow for preemption, but it comes at a high price. + +### How does Hyperloop differ from [Embassy](https://github.com/embassy-rs/embassy) + +Embassy provides not only an executor, but a whole embedded async +ecosystem. Hyperloop is much more limited in scope. The main +difference between the Embassy exeutor and Hyperloop is that Hyperloop +uses priority based scheduling. diff --git a/hyperloop/src/common.rs b/hyperloop/src/common.rs index 52ab779..7eb205d 100644 --- a/hyperloop/src/common.rs +++ b/hyperloop/src/common.rs @@ -21,30 +21,4 @@ pub mod tests { self.woke.store(true, Ordering::Relaxed); } } - - use log::{Level, Metadata, Record}; - - struct SimpleLogger; - - use log::LevelFilter; - - static LOGGER: SimpleLogger = SimpleLogger; - - impl log::Log for SimpleLogger { - fn enabled(&self, metadata: &Metadata) -> bool { - metadata.level() <= Level::Info - } - - fn log(&self, record: &Record) { - if self.enabled(record.metadata()) { - println!("{} - {}", record.level(), record.args()); - } - } - - fn flush(&self) {} - } - - pub fn log_init() { - let _ = log::set_logger(&LOGGER).map(|()| log::set_max_level(LevelFilter::Info)); - } } diff --git a/hyperloop/src/executor.rs b/hyperloop/src/executor.rs index 095d500..46c6693 100644 --- a/hyperloop/src/executor.rs +++ b/hyperloop/src/executor.rs @@ -1,25 +1,50 @@ +//! Async executor +//! +//! # Example +//!``` +//! # use core::sync::atomic::{AtomicBool, Ordering}; +//! # use core::pin::pin; +//! # use hyperloop::task::Task; +//! # use hyperloop::executor::Executor; +//! async fn task_fn(flag: &AtomicBool) { +//! flag.store(true, Ordering::Relaxed); +//! } +//! +//! static FLAG: AtomicBool = AtomicBool::new(false); +//! +//! let mut task = pin!(Task::new(task_fn(&FLAG), 1)); +//! let mut executor = pin!(Executor::new([task.as_mut().get_handle()])); +//! let mut handle = executor.as_mut().get_handle(); +//! +//! assert!(!FLAG.load(Ordering::Relaxed)); +//! +//! handle.poll_tasks(); +//! assert!(FLAG.load(Ordering::Relaxed)); +//! ``` + +use core::cell::UnsafeCell; use core::cmp::Ordering; +use core::pin::Pin; +use core::sync::atomic::{self, AtomicBool}; +use core::task::{Poll, RawWaker, RawWakerVTable, Waker}; + +use priority_queue::{Max, PriorityQueue, PrioritySender}; -use crate::priority_queue::{Max, PriorityQueue, PrioritySender}; +use crate::task::{Priority, TaskHandle}; -use crate::task::PollTask; +mod priority_queue; -pub(crate) type Priority = u8; +type TaskId = u16; -#[derive(Debug, Clone, Copy)] -pub struct Ticket { - task: *const dyn PollTask, +struct Ticket { + task: TaskId, priority: Priority, } impl Ticket { - pub(crate) fn new(task: *const dyn PollTask, priority: Priority) -> Self { + pub(crate) fn new(task: TaskId, priority: Priority) -> Self { Self { task, priority } } - - unsafe fn get_task(&self) -> &dyn PollTask { - &*self.task - } } impl PartialEq for Ticket { @@ -42,76 +67,282 @@ impl Ord for Ticket { } } -pub(crate) type TaskSender = PrioritySender; +type TaskSender = PrioritySender; + +const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); + +unsafe fn clone(ptr: *const ()) -> RawWaker { + RawWaker::new(ptr, &VTABLE) +} + +unsafe fn wake(ptr: *const ()) { + let task = &*(ptr as *const ExecutorTask); + task.wake(); +} + +unsafe fn drop(_ptr: *const ()) {} + +struct ExecutorTask { + task: TaskHandle, + task_id: TaskId, + priority: Priority, + sender: Option, + pending_wake: AtomicBool, +} + +impl ExecutorTask { + fn new( + task: TaskHandle, + task_id: TaskId, + priority: Priority, + sender: Option, + ) -> Self { + Self { + task, + task_id, + priority, + sender, + pending_wake: AtomicBool::new(false), + } + } + + fn set_sender(&mut self, sender: TaskSender) { + self.sender = Some(sender); + } + + fn get_waker(task: *const ExecutorTask) -> Waker { + let ptr: *const () = task.cast(); + let vtable = &VTABLE; + + unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) } + } + + fn send_ticket(&self, ticket: Ticket) -> Result<(), ()> { + let sender = unsafe { self.sender.as_ref().unwrap_unchecked() }; + + sender.send(ticket).map_err(|_| ()) + } + + fn wake(&self) { + if self + .pending_wake + .compare_exchange( + false, + true, + atomic::Ordering::Acquire, + atomic::Ordering::Acquire, + ) + .is_ok() + { + let ticket = Ticket::new(self.task_id, self.priority); + + self.send_ticket(ticket).unwrap_or_else(|_| unreachable!()); + } + } + + fn clear_pending_wake_flag(&self) { + let _ = self.pending_wake.compare_exchange( + true, + false, + atomic::Ordering::Acquire, + atomic::Ordering::Acquire, + ); + } + + fn poll(&mut self, waker: Waker) -> Poll<()> { + self.task.poll(waker) + } +} +/// Async executor +/// +/// # Example +///``` +/// # use core::sync::atomic::{AtomicBool, Ordering}; +/// # use core::pin::pin; +/// # use hyperloop::task::Task; +/// # use hyperloop::executor::Executor; +/// async fn task_fn(flag: &AtomicBool) { +/// flag.store(true, Ordering::Relaxed); +/// } +/// +/// static FLAG: AtomicBool = AtomicBool::new(false); +/// +/// let mut task = pin!(Task::new(task_fn(&FLAG), 1)); +/// let mut executor = pin!(Executor::new([task.as_mut().get_handle()])); +/// let mut handle = executor.as_mut().get_handle(); +/// +/// assert!(!FLAG.load(Ordering::Relaxed)); +/// +/// handle.poll_tasks(); +/// assert!(FLAG.load(Ordering::Relaxed)); +/// ``` pub struct Executor { + tasks: [UnsafeCell; N], queue: PriorityQueue, } impl Executor { - pub fn new() -> Self { + /// Create new executor with tasks + pub fn new(tasks: [TaskHandle; N]) -> Self { + let mut i = 0; + let tasks = tasks.map(|task| { + let priority = task.priority; + let task = UnsafeCell::new(ExecutorTask::new(task, i, priority, None)); + i += 1; + task + }); + Self { + tasks, queue: PriorityQueue::new(), } } + fn get_task(&mut self, task_id: TaskId) -> *mut ExecutorTask { + self.tasks[task_id as usize].get() + } + + fn init(self: Pin<&mut Self>) { + let this = unsafe { self.get_unchecked_mut() }; + + for i in 0..N { + let sender = unsafe { this.queue.get_sender() }; + let task = unsafe { &mut *this.get_task(i as u16) }; + + task.set_sender(sender); + task.wake(); + } + } + + fn poll_task(&mut self, task_id: TaskId) { + let task = self.get_task(task_id); + let waker = ExecutorTask::get_waker(task); + let task = unsafe { &mut *task }; + + task.clear_pending_wake_flag(); + + let _ = task.poll(waker); + } + /// Poll all tasks in the queue - /// - /// # Safety - /// - /// This function is unsafe. The caller must guarantee that the - /// executor is never dropped or moved. The wakers contain raw - /// pointers to the tasks stored in the executor. The pointers can - /// be dereferenced at any time and will be dangling if the - /// exeutor is moved or dropped. - pub unsafe fn poll_tasks(&mut self) { - while let Some(ticket) = self.queue.pop() { - let _ = ticket.get_task().poll(); + fn poll_tasks(self: Pin<&mut Self>) { + let this = unsafe { self.get_unchecked_mut() }; + + while let Some(ticket) = this.queue.pop() { + this.poll_task(ticket.task); } } - pub fn get_sender(&self) -> TaskSender { - self.queue.get_sender() + /// Initialize and return handle to pinned `Executor` + pub fn get_handle(mut self: Pin<&mut Self>) -> ExecutorHandle { + self.as_mut().init(); + + ExecutorHandle { executor: self } + } +} + +/// Handle to pinned [`Executor`] +/// +/// Use this to ensure pinned and initalized [`Executor`]. Use +/// [`Executor::get_handle`] to create handle. +pub struct ExecutorHandle<'a, const N: usize> { + executor: Pin<&'a mut Executor>, +} + +impl<'a, const N: usize> ExecutorHandle<'a, N> { + /// Poll all tasks in the queue + pub fn poll_tasks(&mut self) { + self.executor.as_mut().poll_tasks() } } #[cfg(test)] mod tests { + use core::task::Context; use crossbeam_queue::ArrayQueue; - use hyperloop_macros::{executor_from_tasks, task}; + use futures::{task::AtomicWaker, Future}; use std::sync::Arc; use super::*; use crate::task::Task; + #[derive(Debug)] + pub struct Notification { + ready: AtomicBool, + waker: AtomicWaker, + } + + impl Notification { + pub const fn new() -> Self { + Self { + ready: AtomicBool::new(false), + waker: AtomicWaker::new(), + } + } + + pub fn notify(&self) { + self.ready.store(true, atomic::Ordering::Relaxed); + self.waker.wake(); + } + + pub fn wait(&self) -> NotificationFuture<'_> { + self.ready.store(false, atomic::Ordering::Relaxed); + NotificationFuture::new(&self) + } + } + + pub struct NotificationFuture<'a> { + notification: &'a Notification, + } + + impl<'a> NotificationFuture<'a> { + fn new(shared: &'a Notification) -> Self { + Self { + notification: shared, + } + } + } + + impl<'a> Future for NotificationFuture<'a> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.notification.ready.load(atomic::Ordering::Relaxed) { + Poll::Ready(()) + } else { + self.notification.waker.register(cx.waker()); + Poll::Pending + } + } + } + #[test] fn test_executor() { - let mut executor = Executor::<10>::new(); let queue = Arc::new(ArrayQueue::new(10)); - let test_future = |queue, value| { - move || { - async fn future(queue: Arc>, value: u32) { - queue.push(value).unwrap(); - } - - future(queue, value) + let test_future = move |queue, value| { + async fn future(queue: Arc>, value: u32) { + queue.push(value).unwrap(); } + + future(queue, value) }; - let task1 = Task::new(test_future(queue.clone(), 1), 1); - let task2 = Task::new(test_future(queue.clone(), 2), 3); - let task3 = Task::new(test_future(queue.clone(), 3), 2); - let task4 = Task::new(test_future(queue.clone(), 4), 4); + let mut task1 = Task::new(test_future(queue.clone(), 1), 1); + let mut task2 = Task::new(test_future(queue.clone(), 2), 3); + let mut task3 = Task::new(test_future(queue.clone(), 3), 2); + let mut task4 = Task::new(test_future(queue.clone(), 4), 4); - task1.add_to_executor(executor.get_sender()).unwrap(); - task2.add_to_executor(executor.get_sender()).unwrap(); - task3.add_to_executor(executor.get_sender()).unwrap(); - task4.add_to_executor(executor.get_sender()).unwrap(); + let mut executor = Executor::new([ + unsafe { Pin::new_unchecked(&mut task1) }.get_handle(), + unsafe { Pin::new_unchecked(&mut task2) }.get_handle(), + unsafe { Pin::new_unchecked(&mut task3) }.get_handle(), + unsafe { Pin::new_unchecked(&mut task4) }.get_handle(), + ]); + let mut handle = unsafe { Pin::new_unchecked(&mut executor).get_handle() }; - unsafe { - executor.poll_tasks(); - } + handle.poll_tasks(); assert_eq!(queue.pop().unwrap(), 4); assert_eq!(queue.pop().unwrap(), 2); @@ -120,29 +351,45 @@ mod tests { } #[test] - fn macros() { - #[task(priority = 1)] - async fn test_task1(queue: Arc>) { - queue.push(1).unwrap(); - } + fn test_pending_wake() { + let queue = Arc::new(ArrayQueue::new(10)); + let notify = Arc::new(Notification::new()); - #[task(priority = 2)] - async fn test_task2(queue: Arc>) { - queue.push(2).unwrap(); - } + let test_future = |queue, notify| { + async fn future(queue: Arc>, notify: Arc) { + for i in 0..10 { + queue.push(i).unwrap(); + notify.wait().await; + } + } - let queue = Arc::new(ArrayQueue::new(10)); + future(queue, notify) + }; - let task1 = test_task1(queue.clone()).unwrap(); - let task2 = test_task2(queue.clone()).unwrap(); + let mut task = Task::new(test_future(queue.clone(), notify.clone()), 1); - let executor = executor_from_tasks!(task1, task2); + let mut executor = Executor::new([unsafe { Pin::new_unchecked(&mut task).get_handle() }]); + let mut handle = unsafe { Pin::new_unchecked(&mut executor).get_handle() }; - unsafe { - executor.poll_tasks(); - } + handle.poll_tasks(); + + assert_eq!(queue.pop().unwrap(), 0); + assert!(queue.pop().is_none()); + + notify.notify(); + + handle.poll_tasks(); - assert_eq!(queue.pop().unwrap(), 2); assert_eq!(queue.pop().unwrap(), 1); + assert!(queue.pop().is_none()); + + handle.poll_tasks(); + assert!(queue.pop().is_none()); + + notify.notify(); + handle.poll_tasks(); + + assert_eq!(queue.pop().unwrap(), 2); + assert!(queue.pop().is_none()); } } diff --git a/hyperloop-priority-queue/src/lib.rs b/hyperloop/src/executor/priority_queue.rs similarity index 77% rename from hyperloop-priority-queue/src/lib.rs rename to hyperloop/src/executor/priority_queue.rs index 6f81c94..53de6f9 100644 --- a/hyperloop-priority-queue/src/lib.rs +++ b/hyperloop/src/executor/priority_queue.rs @@ -1,14 +1,16 @@ -#![no_std] - -use core::{marker::PhantomData, ops::Deref, sync::atomic::Ordering}; +use core::{cell::UnsafeCell, marker::PhantomData, mem, pin::Pin, sync::atomic::Ordering}; #[cfg(not(loom))] use core::sync::atomic::AtomicUsize; +#[cfg(test)] +use core::ops::Deref; + #[cfg(loom)] use loom::sync::atomic::AtomicUsize; use crossbeam_utils::Backoff; +use pinned_aliasable::Aliasable; pub enum HeapKind { Max, @@ -83,10 +85,8 @@ where } else { return Some(right); } - } else { - if let Some(left) = left { - return Some(left); - } + } else if let Some(left) = left { + return Some(left); } None @@ -102,9 +102,10 @@ where } fn item(&self) -> &T { - self.heap.slots[self.pos].as_ref().unwrap() + unsafe { self.heap.slot_mut(self.pos).as_ref().unwrap() } } + #[allow(clippy::mut_from_ref)] unsafe fn slot_mut(&self) -> &mut Option { self.heap.slot_mut(self.pos) } @@ -113,10 +114,7 @@ where let slot = unsafe { self.slot_mut() }; let other_slot = unsafe { other.slot_mut() }; - let item = slot.take(); - *slot = other_slot.take(); - *other_slot = item; - + mem::swap(slot, other_slot); other } @@ -187,7 +185,7 @@ impl AtomicStackPosition { fn compare_exchange(&self, current: usize, new: usize) -> Result { self.atomic - .compare_exchange_weak(current, new, Ordering::Release, Ordering::Relaxed) + .compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Relaxed) } } @@ -195,36 +193,44 @@ pub struct PrioritySender where T: 'static, { - slots: &'static [Option], - available: &'static AtomicUsize, - stack_pos: &'static AtomicStackPosition, + slots: *const [UnsafeCell>], + available: *const AtomicUsize, + stack_pos: *const AtomicStackPosition, } impl Clone for PrioritySender { fn clone(&self) -> Self { Self { - slots: self.slots.clone(), - available: self.available.clone(), - stack_pos: self.stack_pos.clone(), + slots: self.slots, + available: self.available, + stack_pos: self.stack_pos, } } } +unsafe impl Send for PrioritySender {} +unsafe impl Sync for PrioritySender {} + impl PrioritySender { + #[allow(clippy::mut_from_ref)] unsafe fn slot_mut(&self, index: usize) -> &mut Option { - &mut *((&self.slots[index] as *const Option) as *mut Option) + let slots = &*self.slots; + + &mut *slots[index].get() } fn stack_push(&self, item: T) -> Result<(), T> { + let stack_pos = unsafe { &*self.stack_pos }; + loop { - let current = self.stack_pos.load(); + let current = stack_pos.load(); if current.pos() > 0 { let new = current.reserved(); - if let Ok(_) = self - .stack_pos + if stack_pos .compare_exchange(current.value(), new.value()) + .is_ok() { let slot = unsafe { self.slot_mut(new.pos()) }; *slot = Some(item); @@ -236,10 +242,10 @@ impl PrioritySender { } loop { - let old = self.stack_pos.load(); + let old = stack_pos.load(); let new = old.pushed(); - if let Ok(_) = self.stack_pos.compare_exchange(old.value(), new.value()) { + if stack_pos.compare_exchange(old.value(), new.value()).is_ok() { break; } } @@ -248,16 +254,21 @@ impl PrioritySender { } pub fn send(&self, item: T) -> Result<(), T> { + let available = unsafe { &*self.available }; + loop { - let available = self.available.load(Ordering::Acquire); - - if available > 0 { - if let Ok(_) = self.available.compare_exchange( - available, - available - 1, - Ordering::Release, - Ordering::Relaxed, - ) { + let n_available = available.load(Ordering::Acquire); + + if n_available > 0 { + if available + .compare_exchange( + n_available, + n_available - 1, + Ordering::Release, + Ordering::Relaxed, + ) + .is_ok() + { break; } } else { @@ -269,6 +280,7 @@ impl PrioritySender { } } +#[cfg(test)] pub struct PeekMut<'a, T, K, const N: usize> where T: PartialOrd, @@ -277,6 +289,7 @@ where queue: &'a mut PriorityQueue, } +#[cfg(test)] impl<'a, T, K, const N: usize> PeekMut<'a, T, K, N> where T: PartialOrd + 'static, @@ -287,15 +300,16 @@ where } } +#[cfg(test)] impl<'a, T, K, const N: usize> Deref for PeekMut<'a, T, K, N> where - T: PartialOrd, - K: Kind, + T: PartialOrd + 'static, + K: Kind + 'static, { type Target = T; fn deref(&self) -> &Self::Target { - self.queue.slots[0].as_ref().unwrap() + unsafe { self.queue.slot_mut(0).as_ref().unwrap() } } } @@ -304,7 +318,7 @@ where T: PartialOrd, K: Kind, { - slots: [Option; N], + slots: Aliasable<[UnsafeCell>; N]>, available: AtomicUsize, stack_pos: AtomicStackPosition, heap_size: usize, @@ -318,7 +332,7 @@ where { pub fn new() -> Self { Self { - slots: [(); N].map(|_| None), + slots: Aliasable::new([(); N].map(|_| UnsafeCell::new(None))), available: AtomicUsize::new(N), stack_pos: AtomicStackPosition::new(N), heap_size: 0, @@ -326,18 +340,21 @@ where } } - pub fn get_sender(&self) -> PrioritySender { - let queue: &'static Self = unsafe { &*(self as *const Self) }; + pub unsafe fn get_sender(&self) -> PrioritySender { + let queue: &'static Self = &*(self as *const Self); + let slots = Aliasable::get(Pin::new_unchecked(&self.slots)) as *const _; PrioritySender { - slots: &queue.slots, + slots, available: &queue.available, stack_pos: &queue.stack_pos, } } + #[allow(clippy::mut_from_ref)] unsafe fn slot_mut(&self, index: usize) -> &mut Option { - &mut *((&self.slots[index] as *const Option) as *mut Option) + let slots = Aliasable::get(Pin::new_unchecked(&self.slots)); + &mut *slots[index].get() } fn get_node(&self, index: usize) -> Node { @@ -364,15 +381,18 @@ where break Err(()); } else { let new = current.popped(); - let item = self.slots[current.pos()].take(); + let item = unsafe { self.slot_mut(current.pos()).take() }; - if let Ok(_) = self + if self .stack_pos .compare_exchange(current.value(), new.value()) + .is_ok() { break Ok(item); } else { - self.slots[current.pos()] = item; + unsafe { + *self.slot_mut(current.pos()) = item; + } } } } @@ -407,7 +427,9 @@ where let index = self.heap_size; if index < N { - self.slots[index] = Some(item); + unsafe { + *self.slot_mut(index) = Some(item); + } self.heap_size += 1; @@ -431,17 +453,23 @@ where } fn take_root(&mut self) -> Option { - if let Some(item) = self.slots[0].take() { - { - let root = self.get_root(); - let last = self.get_last(); - last.swap(root); - } - self.heap_size -= 1; + match self.heap_size.cmp(&1) { + core::cmp::Ordering::Greater => { + let item = unsafe { self.slot_mut(0).take() }.unwrap(); + { + let root = self.get_root(); + let last = self.get_last(); + last.swap(root); + } + self.heap_size -= 1; - Some(item) - } else { - None + Some(item) + } + core::cmp::Ordering::Equal => { + self.heap_size -= 1; + Some(unsafe { self.slot_mut(0).take() }.unwrap()) + } + core::cmp::Ordering::Less => None, } } @@ -473,12 +501,16 @@ where loop { let available = self.available.load(Ordering::Acquire); - if let Ok(_) = self.available.compare_exchange( - available, - available + 1, - Ordering::Release, - Ordering::Relaxed, - ) { + if self + .available + .compare_exchange( + available, + available + 1, + Ordering::Release, + Ordering::Relaxed, + ) + .is_ok() + { break Some(item); } } @@ -487,7 +519,8 @@ where } } - pub fn peek_mut<'a>(&'a mut self) -> Option> { + #[cfg(test)] + pub fn peek_mut(&mut self) -> Option> { self.sort(); if self.heap_size > 0 { @@ -498,12 +531,20 @@ where } } +impl Default for PriorityQueue +where + T: PartialOrd + 'static, + K: Kind + 'static, +{ + fn default() -> Self { + Self::new() + } +} + #[cfg(not(loom))] #[cfg(test)] mod tests { - use std::thread; - - use std::vec::Vec; + use std::{thread, vec::Vec}; use super::*; @@ -540,7 +581,7 @@ mod tests { #[test] fn stack() { let mut heap: PriorityQueue = PriorityQueue::new(); - let sender = heap.get_sender(); + let sender = unsafe { heap.get_sender() }; for i in 0..10 { sender.stack_push(i).unwrap(); @@ -568,7 +609,7 @@ mod tests { #[test] fn channel() { let mut queue: PriorityQueue = PriorityQueue::new(); - let sender = queue.get_sender(); + let sender = unsafe { queue.get_sender() }; for i in 0..10 { sender.send(i).unwrap(); @@ -608,6 +649,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn channel_thread() { const N: usize = 1000; let mut queue: PriorityQueue = PriorityQueue::new(); @@ -619,13 +661,13 @@ mod tests { let n_items = n_threads * n_items_per_thread; for i in 0..n_threads { - let sender = queue.get_sender(); + let sender = unsafe { queue.get_sender() }; let handler = thread::spawn(move || { for j in 0..n_items_per_thread { loop { let item = i * n_items_per_thread + j; - if let Ok(_) = sender.send(item) { + if sender.send(item).is_ok() { break; } @@ -659,10 +701,6 @@ mod tests { } } -#[cfg(test)] -#[macro_use] -extern crate std; - #[cfg(test)] #[cfg(loom)] mod tests_loom { @@ -683,7 +721,7 @@ mod tests_loom { let handles: Vec<_> = (0..n_threads) .map(|i| { - let sender = queue.get_sender(); + let sender = unsafe { queue.get_sender() }; thread::spawn(move || { sender.stack_push(i).unwrap(); }) diff --git a/hyperloop/src/interrupt.rs b/hyperloop/src/interrupt.rs deleted file mode 100644 index 6468c15..0000000 --- a/hyperloop/src/interrupt.rs +++ /dev/null @@ -1,152 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, -}; - -use core::future::Future; - -unsafe fn clone(ptr: *const ()) -> RawWaker { - RawWaker::new(ptr, &VTABLE) -} - -unsafe fn wake(_ptr: *const ()) {} - -unsafe fn drop(_ptr: *const ()) {} - -const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); - -#[derive(Debug)] -struct YieldFuture { - done: bool, -} - -impl YieldFuture { - fn new() -> Self { - Self { done: false } - } -} - -impl Future for YieldFuture { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - if !self.done { - self.done = true; - Poll::Pending - } else { - Poll::Ready(()) - } - } -} - -pub struct Interrupt -where - F: Future + 'static, -{ - future: F, -} - -impl Interrupt -where - F: Future + 'static, -{ - pub fn new(future_fn: impl FnOnce() -> F) -> Self { - Self { - future: future_fn(), - } - } - - fn get_waker(&self) -> Waker { - unsafe { Waker::from_raw(RawWaker::new((self as *const Self).cast(), &VTABLE)) } - } - - pub fn poll(&'static mut self) { - let waker = self.get_waker(); - let mut cx = Context::from_waker(&waker); - - let future = unsafe { Pin::new_unchecked(&mut self.future) }; - - let _ = future.poll(&mut cx); - } -} - -pub fn yield_now() -> impl Future { - YieldFuture::new() -} - -#[cfg(test)] -mod tests { - use core::task::Waker; - - use crossbeam_queue::ArrayQueue; - use std::{boxed::Box, sync::Arc}; - - use crate::common::tests::MockWaker; - - use super::*; - - #[test] - fn test_yield_now() { - let mut future = Box::pin(yield_now()); - let mockwaker = Arc::new(MockWaker::new()); - let waker: Waker = mockwaker.into(); - let mut cx = Context::from_waker(&waker); - - assert_eq!(future.as_mut().poll(&mut cx), Poll::Pending); - assert_eq!(future.as_mut().poll(&mut cx), Poll::Ready(())); - assert_eq!(future.as_mut().poll(&mut cx), Poll::Ready(())); - } - - #[test] - fn test_interrupt() { - static mut QUEUE: Option> = None; - - fn handler() { - async fn async_handler() { - let queue = unsafe { QUEUE.as_ref().unwrap() }; - - for i in 0.. { - queue.push(i).unwrap(); - yield_now().await; - } - } - - type F = impl Future + 'static; - - static mut INTERRUPT: Option> = None; - - fn get_future() -> impl FnOnce() -> F { - || async_handler() - } - - let interrupt = unsafe { INTERRUPT.get_or_insert(Interrupt::new(get_future())) }; - - interrupt.poll(); - } - - unsafe { - QUEUE = Some(ArrayQueue::new(10)); - }; - - let queue = unsafe { QUEUE.as_ref().unwrap() }; - - handler(); - - assert_eq!(queue.pop(), Some(0)); - assert_eq!(queue.pop(), None); - - handler(); - assert_eq!(queue.pop(), Some(1)); - assert_eq!(queue.pop(), None); - - handler(); - handler(); - handler(); - handler(); - assert_eq!(queue.pop(), Some(2)); - assert_eq!(queue.pop(), Some(3)); - assert_eq!(queue.pop(), Some(4)); - assert_eq!(queue.pop(), Some(5)); - assert_eq!(queue.pop(), None); - } -} diff --git a/hyperloop/src/lib.rs b/hyperloop/src/lib.rs index 0d1ab5a..273be48 100644 --- a/hyperloop/src/lib.rs +++ b/hyperloop/src/lib.rs @@ -1,19 +1,56 @@ -#![no_std] -#![feature(const_fn_trait_bound)] -#![feature(type_alias_impl_trait)] -#![feature(once_cell)] +//! Hyperloop is a priority based async runtime targetting embedded +//! systems. +//! +//! Features: +//! - Static allocation +//! - Priority based scheduling of async tasks +//! - Fixed capacity executor +//! - No critical sections, uses atomics for concurrent access +//! - Unsafe code tested with miri, concurrency tested with loom +//! +//! # Example +//! ```ignore +//! /// Simple task that blinks toggles LED every second +//! #[task(priority = 1)] +//! async fn blinky(timer: Timer, mut led: OutputPin) { +//! // Use periodic for even 1 second period +//! let mut periodic = timer.periodic(Duration::secs(1)); +//! loop { +//! // Toggle led and wait till next period +//! led.toggle(); +//! periodic.wait().await; +//! } +//! } +//! +//! fn main() { +//! let timer_queue = [...]; +//! let led = [...]; +//! +//! // Create executor with blinky task +//! let mut executor = static_executor!(blinky(timer_queue.get_timer(), led).unwrap()); +//! +//! // Start the timer +//! timer_queue.start_timer(); +//! +//! loop { +//! timer_queue.wake_tasks(); +//! executor.poll_tasks(); +//! +//! timer_queue.wait_for_alarm(); +//! } +//! } +//! ``` +//! +//! See the `examples/` directory for complete examples. + +#![cfg_attr(not(any(test, feature = "std")), no_std)] +#![cfg_attr(feature = "nightly", feature(doc_cfg))] mod common; pub mod executor; -pub mod interrupt; -pub mod notify; pub mod task; pub mod timer; -mod priority_queue { - pub(crate) use hyperloop_priority_queue::*; -} - -#[macro_use] -extern crate std; +#[cfg(feature = "macros")] +pub use hyperloop_macros::*; diff --git a/hyperloop/src/notify.rs b/hyperloop/src/notify.rs deleted file mode 100644 index 98575bd..0000000 --- a/hyperloop/src/notify.rs +++ /dev/null @@ -1,132 +0,0 @@ -use core::{ - pin::Pin, - sync::atomic::{AtomicBool, Ordering}, - task::{Context, Poll}, -}; - -use futures::task::AtomicWaker; - -use core::future::Future; - -#[derive(Debug)] -pub struct Notification { - ready: AtomicBool, - waker: AtomicWaker, -} - -impl Notification { - pub const fn new() -> Self { - Self { - ready: AtomicBool::new(false), - waker: AtomicWaker::new(), - } - } - - pub fn notify(&self) { - self.ready.store(true, Ordering::Relaxed); - self.waker.wake(); - } - - pub fn wait(&'static self) -> NotificationFuture { - self.ready.store(false, Ordering::Relaxed); - NotificationFuture::new(&self) - } -} - -pub struct NotificationFuture { - notification: &'static Notification, -} - -impl NotificationFuture { - fn new(shared: &'static Notification) -> Self { - Self { - notification: shared, - } - } -} - -impl Future for NotificationFuture { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.notification.ready.load(Ordering::Relaxed) { - Poll::Ready(()) - } else { - self.notification.waker.register(cx.waker()); - Poll::Pending - } - } -} - -#[cfg(test)] -mod tests { - use crossbeam_queue::ArrayQueue; - use std::boxed::Box; - use std::sync::Arc; - - use crate::{executor::Executor, task::Task}; - - use super::*; - - #[test] - fn notify() { - let notification = Box::leak(Box::new(Notification::new())); - - let mut executor = Executor::<10>::new(); - let queue = Arc::new(ArrayQueue::new(10)); - - let wait = |receiver, queue| { - move || { - async fn future(notification: &'static Notification, queue: Arc>) { - queue.push(1).unwrap(); - notification.wait().await; - queue.push(2).unwrap(); - notification.wait().await; - queue.push(3).unwrap(); - } - future(receiver, queue) - } - }; - - let task1 = Task::new(wait(notification, queue.clone()), 1); - - task1.add_to_executor(executor.get_sender()).unwrap(); - - unsafe { - executor.poll_tasks(); - } - - assert_eq!(queue.pop(), Some(1)); - assert_eq!(queue.pop(), None); - - unsafe { - executor.poll_tasks(); - } - - assert_eq!(queue.pop(), None); - - notification.notify(); - - unsafe { - executor.poll_tasks(); - } - - assert_eq!(queue.pop(), Some(2)); - assert_eq!(queue.pop(), None); - - unsafe { - executor.poll_tasks(); - } - - assert_eq!(queue.pop(), None); - - notification.notify(); - - unsafe { - executor.poll_tasks(); - } - - assert_eq!(queue.pop(), Some(3)); - assert_eq!(queue.pop(), None); - } -} diff --git a/hyperloop/src/task.rs b/hyperloop/src/task.rs index 7217c7e..a1f68a7 100644 --- a/hyperloop/src/task.rs +++ b/hyperloop/src/task.rs @@ -1,216 +1,148 @@ +//! Async task with priority use core::{ - lazy::OnceCell, pin::Pin, - task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + task::{Context, Poll, Waker}, }; -use atomig::{Atom, Atomic, Ordering}; use futures::Future; -use crate::executor::{Priority, TaskSender, Ticket}; - -unsafe fn clone + 'static>(ptr: *const ()) -> RawWaker { - let task = &*(ptr as *const Task); - - RawWaker::new(ptr, &task.vtable) -} - -unsafe fn wake + 'static>(ptr: *const ()) { - let task = &*(ptr as *const Task); - task.wake(); -} - -unsafe fn drop(_ptr: *const ()) {} - -pub(crate) trait PollTask { - unsafe fn poll(&self) -> Poll<()>; -} - -#[repr(u8)] -#[derive(Copy, Clone, Eq, PartialEq, Debug, Atom)] -enum TaskState { - NotQueued, - Queued, -} - +/// Task priority +pub type Priority = u8; + +/// Async task with priority +/// +/// # Example +///``` +/// use hyperloop::task::Task; +/// +/// async fn task_fn() { +/// } +/// +/// let task = Task::new(task_fn(), 1); +/// ``` pub struct Task where F: Future + 'static, { future: F, priority: Priority, - sender: OnceCell, - vtable: RawWakerVTable, - state: Atomic, } impl Task where F: Future + 'static, { - pub fn new(future_fn: impl FnOnce() -> F, priority: Priority) -> Self { - Self { - future: future_fn(), - priority, - sender: OnceCell::new(), - vtable: RawWakerVTable::new(clone::, wake::, wake::, drop), - state: Atomic::new(TaskState::NotQueued), - } - } - - fn update_state(&self, old: TaskState, new: TaskState) -> bool { - if let Ok(_) = self - .state - .compare_exchange(old, new, Ordering::Relaxed, Ordering::Relaxed) - { - true - } else { - false - } + /// Create new task from future with given priority + pub fn new(future: F, priority: Priority) -> Self { + Self { future, priority } } - #[cfg(test)] - fn get_state(&self) -> TaskState { - self.state.load(Ordering::Relaxed) + /// Return handle to pinned task + pub fn get_handle(self: Pin<&mut Self>) -> TaskHandle { + TaskHandle::new(self) } +} - unsafe fn as_static(&self) -> &'static Self { - &*(self as *const Self) - } +/// Type erased handle for `Task` +pub struct TaskHandle { + future: *mut (), + poll: fn(*mut (), &mut Context<'_>) -> Poll<()>, + pub priority: Priority, +} - unsafe fn as_mut(&self) -> &mut Self { - &mut *((self as *const Self) as *mut Self) +impl TaskHandle { + fn new>(task: Pin<&mut Task>) -> Self { + unsafe { + let task = Pin::get_unchecked_mut(task); + + Self { + future: &mut task.future as *mut F as *mut (), + poll: core::mem::transmute::< + fn(Pin<&mut F>, &mut Context<'_>) -> Poll, + fn(*mut (), &mut Context<'_>) -> Poll<()>, + >(F::poll), + priority: task.priority, + } + } } - unsafe fn get_waker(&self) -> Waker { - let ptr: *const () = (self as *const Task).cast(); - let vtable = &self.as_static().vtable; + /// Poll task with given waker + pub fn poll(&mut self, waker: Waker) -> Poll<()> { + let mut cx = Context::from_waker(&waker); - Waker::from_raw(RawWaker::new(ptr, vtable)) - } + let poll = self.poll; - pub fn wake(&self) { - self.schedule().unwrap(); + poll(self.future, &mut cx) } +} - pub fn add_to_executor(&self, sender: TaskSender) -> Result<(), ()> { - self.set_sender(sender)?; - self.schedule() - } +#[derive(Debug)] +struct YieldFuture { + done: bool, +} - fn set_sender(&self, sender: TaskSender) -> Result<(), ()> { - match self.sender.set(sender) { - Ok(_) => Ok(()), - Err(_) => Err(()), - } +impl YieldFuture { + fn new() -> Self { + Self { done: false } } +} - fn send_ticket(&self, ticket: Ticket) -> Result<(), ()> { - if let Some(sender) = self.sender.get() { - if let Ok(_) = sender.send(ticket) { - return Ok(()); - } - } - - Err(()) - } +impl Future for YieldFuture { + type Output = (); - fn schedule(&self) -> Result<(), ()> { - if self.update_state(TaskState::NotQueued, TaskState::Queued) { - let ticket = Ticket::new(self as *const Self, self.priority); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if !self.done { + self.done = true; + cx.waker().wake_by_ref(); - match self.send_ticket(ticket) { - Ok(_) => Ok(()), - Err(_) => { - assert!(self.update_state(TaskState::Queued, TaskState::NotQueued)); - Err(()) - } - } + Poll::Pending } else { - Ok(()) + Poll::Ready(()) } } } -impl PollTask for Task -where - F: Future + 'static, -{ - unsafe fn poll(&self) -> Poll<()> { - let waker = self.get_waker(); - let mut cx = Context::from_waker(&waker); - let future = Pin::new_unchecked(&mut self.as_mut().future); - - assert!(self.update_state(TaskState::Queued, TaskState::NotQueued)); - let result = future.poll(&mut cx); - - result - } +/// Yield task +pub fn yield_now() -> impl Future { + YieldFuture::new() } #[cfg(test)] mod tests { - use crate::{ - interrupt::yield_now, - priority_queue::{Max, PriorityQueue}, - }; + use crossbeam_queue::ArrayQueue; + use std::sync::Arc; + + use crate::common::tests::MockWaker; use super::*; #[test] fn task() { - let mut queue: PriorityQueue = PriorityQueue::new(); - - let test_future = || { - || { - async fn future() { - loop { - yield_now().await - } - } + let queue = Arc::new(ArrayQueue::new(10)); - future() + let test_future = |queue| { + async fn future(queue: Arc>) { + for i in 0.. { + queue.push(i).unwrap(); + yield_now().await; + } } - }; - let task = Task::new(test_future(), 1); - - task.set_sender(queue.get_sender()).unwrap(); - - assert_eq!(task.get_state(), TaskState::NotQueued); - - task.schedule().unwrap(); - - assert_eq!(task.get_state(), TaskState::Queued); - - assert!(queue.pop().is_some()); - assert!(queue.pop().is_none()); - - task.schedule().unwrap(); - - assert!(queue.pop().is_none()); - - unsafe { - assert_eq!(task.poll(), Poll::Pending); - } - - assert_eq!(task.get_state(), TaskState::NotQueued); + future(queue) + }; - task.wake(); - task.wake(); - task.wake(); + let mut task = Task::new(test_future(queue.clone()), 1); + let mut handle = unsafe { Pin::new_unchecked(&mut task).get_handle() }; + let waker: Waker = Arc::new(MockWaker::new()).into(); - assert_eq!(task.get_state(), TaskState::Queued); + assert_eq!(handle.poll(waker.clone()), Poll::Pending); - assert!(queue.pop().is_some()); + assert_eq!(queue.pop().unwrap(), 0); assert!(queue.pop().is_none()); - task.wake(); - task.wake(); - task.wake(); - - assert_eq!(task.get_state(), TaskState::Queued); + assert_eq!(handle.poll(waker.clone()), Poll::Pending); + assert_eq!(queue.pop().unwrap(), 1); assert!(queue.pop().is_none()); } } diff --git a/hyperloop/src/timer.rs b/hyperloop/src/timer.rs index ec9c8fb..0f93af1 100644 --- a/hyperloop/src/timer.rs +++ b/hyperloop/src/timer.rs @@ -1,185 +1,335 @@ +//! Timer for delaying tasks and getting timestamp +//! +//! [`TimerQueue`] requires a hardware timer that implements +//! [`HardwareTimer`]. Two reference implementations are provided: +//! - [`cortex_m::SysTickTimer`] - A systick-based timer for Cortex-M systems +//! - [`std::StdTimer`] - A standard library based timer for testing in software + use core::{ + cell::UnsafeCell, + ops::{Add, AddAssign}, pin::Pin, task::{Context, Poll, Waker}, }; -use embedded_time::{ - duration::{Duration, Milliseconds}, - fixed_point::FixedPoint, - rate::{Hertz, Rate}, -}; - use core::future::Future; -use futures::{task::AtomicWaker, Stream, StreamExt}; -use log::error; +use pinned_aliasable::Aliasable; + +use self::list::{List, Node}; + +mod list; + +#[cfg(feature = "cortex-m")] +#[doc(cfg(feature = "cortex-m"))] +pub mod cortex_m; + +#[cfg(feature = "std")] +#[doc(cfg(feature = "std"))] +pub mod std; + +/// Trait for interfacing with hardware timers. +/// +/// Should provide monotonic time and the ability to set alarms. Check +/// out `systick::SysTickTimer` for a reference implementation. +pub trait HardwareTimer { + type Instant: Ord + + Eq + + Copy + + AddAssign + + Add; + type Duration: Copy; -use crate::priority_queue::{Min, PeekMut, PriorityQueue, PrioritySender}; + /// Start timer + fn start(&self); -type Tick = u64; + /// Return current time + fn now(&self) -> Self::Instant; -pub struct TickCounter { - count: Tick, - waker: AtomicWaker, + /// Set alarm expiring at given instant. If None is provided, the + /// alarm should be set to the maximum delay + fn set_alarm(&self, expires: Option); + + /// Block until alarm expires + fn wait_for_alarm(&self); +} + +/// Timer queue for delayed tasks +pub struct TimerQueue +where + HW: HardwareTimer, +{ + hw: UnsafeCell, + queue: UnsafeCell>>, } -impl TickCounter { - pub const fn new() -> Self { +unsafe impl Sync for TimerQueue where HW: HardwareTimer {} +unsafe impl Send for TimerQueue where HW: HardwareTimer {} + +impl TimerQueue +where + HW: HardwareTimer, +{ + /// Create new timer queue + pub fn new(hw: HW) -> Self { + hw.start(); + Self { - count: 0, - waker: AtomicWaker::new(), + hw: UnsafeCell::new(hw), + queue: UnsafeCell::new(List::new()), } } - /// Increment tick count - /// - /// # Safety - /// - /// Updating the tick value is not atomic on 32-bit systems, so it - /// would be possible to get an invalid reading if reading during - /// a write. For this reason, this function should only be called - /// from a high priority interrupt handler. - pub unsafe fn increment(&mut self) { - self.count += 1; + /// Return next exired waker, if any + fn next_waker(&self) -> Option { + let queue = unsafe { &mut *self.queue.get() }; + let hw = unsafe { &*self.hw.get() }; + + if let Some(peek) = queue.peek_mut() { + let ticket = peek.get(); + + if hw.now() > ticket.expires { + let waker = ticket.waker.clone(); + unsafe { peek.pop() }; + return Some(waker); + } + } + + None + } + + /// Get time of next expiring waker + fn next_expiration(&self) -> Option { + let queue = unsafe { &mut *self.queue.get() }; + + if let Some(peek) = queue.peek_mut() { + let ticket = peek.get(); + + Some(ticket.expires) + } else { + None + } } - pub fn wake(&self) { - self.waker.wake(); + /// Set alarm at given instant. If none provided, sets the alarm + /// to the maximum supported delay + fn set_alarm(&self, expires: Option) { + self.get_hw_timer().set_alarm(expires); } - pub unsafe fn tick(&mut self) { - self.increment(); - self.wake(); + /// Block until alarm expires + pub fn wait_for_alarm(&self) { + self.get_hw_timer().wait_for_alarm() } - pub fn get_token(&self) -> TickCounterToken { - TickCounterToken { - counter: unsafe { &*(self as *const Self) }, + /// Wake expired tasks, if any + pub fn wake_tasks(&self) { + while let Some(waker) = self.next_waker() { + waker.wake(); } } -} -#[derive(Clone)] -pub struct TickCounterToken { - counter: &'static TickCounter, -} + /// Return timer for use in tasks + pub fn get_timer(self: Pin<&Self>) -> Timer { + let hw = self.hw.get(); + let sender = self.queue.get(); + + Timer::new(hw, sender) + } -impl TickCounterToken { - pub fn register_waker(&self, waker: &Waker) { - self.counter.waker.register(waker); + /// Run interrupt handler + pub fn interrupt_handler(&self) { + self.set_alarm(self.next_expiration()); } - pub fn get_count(&self) -> Tick { - self.counter.count + /// Return reference to hardware timer + pub fn get_hw_timer(&self) -> &HW { + unsafe { &*self.hw.get() } + } + + #[cfg(test)] + #[allow(clippy::mut_from_ref)] + fn get_hw_timer_mut(&self) -> &mut HW { + unsafe { &mut *self.hw.get() } } } -#[derive(Debug, Clone)] -pub struct Ticket { - expires: Tick, +/// Ticket used in timer queue +/// +/// Contains expiration time and a waker +#[derive(Clone)] +struct Ticket { + expires: I, waker: Waker, } -impl Ticket { - fn new(expires: Tick, waker: Waker) -> Self { +impl Ticket { + fn new(expires: I, waker: Waker) -> Self { Self { expires, waker } } } -impl PartialEq for Ticket { +impl PartialEq for Ticket +where + I: Eq, +{ fn eq(&self, other: &Self) -> bool { self.expires == other.expires } } -impl Eq for Ticket {} +impl Eq for Ticket where I: Eq + Ord {} -impl PartialOrd for Ticket { +impl PartialOrd for Ticket +where + I: Ord, +{ fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for Ticket { +impl Ord for Ticket +where + I: Ord, +{ fn cmp(&self, other: &Self) -> core::cmp::Ordering { self.expires.cmp(&other.expires) } } -struct DelayFuture { - sender: PrioritySender, - counter: TickCounterToken, - expires: Tick, - started: bool, +/// Future for delaying a task until a given expiration time +struct DelayFuture +where + HW: HardwareTimer, +{ + queue: *mut List>, + hw: *const HW, + expires: HW::Instant, + node: Option>>>>, } -impl DelayFuture { - fn new(sender: PrioritySender, counter: TickCounterToken, expires: Tick) -> Self { +impl Drop for DelayFuture +where + C: HardwareTimer, +{ + fn drop(&mut self) { + // Make sure to unlink node if dropped before it's popped from + // the queue + if let Some(node) = &self.node { + let node = unsafe { Pin::new_unchecked(node) }; + let node = node.get().get(); + + unsafe { Node::unlink(node) }; + } + } +} + +impl DelayFuture +where + HW: HardwareTimer, +{ + fn new(queue: *mut List>, hw: *const HW, expires: HW::Instant) -> Self { Self { - sender, - counter, + queue, + hw, expires, - started: false, + node: None, } } + + unsafe fn get_queue_and_node( + &mut self, + cx: &mut Context<'_>, + ) -> ( + &mut List>, + &mut Node>, + ) { + let queue = unsafe { &mut *self.queue }; + self.node = Some(Aliasable::new(UnsafeCell::new(Node::new(Ticket::new( + self.expires, + cx.waker().clone(), + ))))); + + let node = &mut *Aliasable::get(Pin::new_unchecked(self.node.as_ref().unwrap())).get(); + + (queue, node) + } } -impl Future for DelayFuture { +impl Future for DelayFuture +where + HW: HardwareTimer, +{ type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if !self.started { - if let Err(_) = self - .sender - .send(Ticket::new(self.expires, cx.waker().clone())) - { - error!("failed to send ticket"); - } - self.started = true; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = unsafe { Pin::get_unchecked_mut(self) }; + let hw = unsafe { &*this.hw }; + + // Check if future has been added to the queue yet + if this.node.is_none() { + // Add node to queue and set alarm. Always return pending + // on first await, even if time has already expired + let (queue, node) = unsafe { this.get_queue_and_node(cx) }; + unsafe { queue.insert(node) }; + + hw.set_alarm(Some(this.expires)); Poll::Pending } else { // Delay until tick count is greater than the // expiration. This ensures that we wait for no less than // the specified duration, and possibly one tick longer // than desired. - if self.counter.get_count() > self.expires { - Poll::Ready(()) - } else { + + if this.expires > hw.now() { + hw.set_alarm(Some(this.expires)); Poll::Pending + } else { + Poll::Ready(()) } } } } -pub struct TimeoutFuture +#[derive(Eq, PartialEq, Debug)] +pub struct TimeoutError {} + +/// Future that wraps around another future, cancelling if awaiting +/// doesn't finish before the expiration time. Uses `DelayFuture` +/// under the hood. +struct TimeoutFuture where F: Future, + HW: HardwareTimer, { future: F, - delay: DelayFuture, + delay: DelayFuture, } -impl TimeoutFuture +impl TimeoutFuture where F: Future, + HW: HardwareTimer, { fn new( future: F, - sender: PrioritySender, - counter: TickCounterToken, - expires: Tick, + sender: *mut List>, + hw: *const HW, + expires: HW::Instant, ) -> Self { Self { future, - delay: DelayFuture::new(sender, counter, expires), + delay: DelayFuture::new(sender, hw, expires), } } } -impl Future for TimeoutFuture +impl Future for TimeoutFuture where F: Future, + HW: HardwareTimer, { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (future, delay) = unsafe { @@ -190,462 +340,663 @@ where ) }; + // Poll the inner future first, then if it not ready, check + // to see if the delay has expired if let Poll::Ready(ret) = future.poll(cx) { Poll::Ready(Ok(ret)) + } else if let Poll::Ready(()) = delay.poll(cx) { + // If the delay has expired, return a timeout error + Poll::Ready(Err(TimeoutError {})) } else { - if let Poll::Ready(()) = delay.poll(cx) { - Poll::Ready(Err(())) - } else { - Poll::Pending - } + Poll::Pending } } } -#[derive(Clone)] -pub struct Timer { - rate: Hertz, - counter: TickCounterToken, - sender: PrioritySender, +/// Periodic delay maintaing a constant frequency. +/// +/// Create with [`Timer::periodic`]. +pub struct Periodic +where + HW: HardwareTimer, +{ + queue: *mut List>, + hw: *const HW, + period: HW::Duration, + previous: HW::Instant, } -impl Timer { - pub fn new(rate: Hertz, counter: TickCounterToken, sender: PrioritySender) -> Self { +impl Periodic +where + HW: HardwareTimer, +{ + fn new(queue: *mut List>, hw: *const HW, period: HW::Duration) -> Self { + let previous = unsafe { (*hw).now() }; + Self { - rate, - counter, - sender, + queue, + hw, + period, + previous, } } - fn get_rate(&self) -> Hertz { - self.rate - } + /// Wait till next period + pub fn wait(&mut self) -> impl Future { + self.previous += self.period; - fn get_count(&self) -> Tick { - self.counter.get_count() + DelayFuture::new(self.queue, self.hw, self.previous) } +} - fn delay_to_ticks>(&self, duration: D) -> Tick { - let ms: Milliseconds = duration.into(); - let rate = self.get_rate().to_duration::().unwrap(); +/// Handle for timer +#[derive(Clone)] +pub struct Timer +where + HW: HardwareTimer, +{ + hw: *const HW, + sender: *mut List>, +} - assert!(ms.integer() == 0 || ms >= rate); +impl Timer +where + HW: HardwareTimer, +{ + fn new(hw: *const HW, sender: *mut List>) -> Self { + Self { hw, sender } + } - let ticks: Tick = (ms.integer() / rate.integer()).into(); + /// Get expiration time from duration + fn expiration(&self, duration: HW::Duration) -> HW::Instant { + (unsafe { (*self.hw).now() }) + duration + } - self.get_count() + ticks + /// Delay for duration + /// + /// # Example + /// ``` + /// # use hyperloop::timer::TimerQueue; + /// # use hyperloop::timer::std::StdTimer; + /// # use std::time::Duration; + /// # use std::pin::pin; + /// # let mut queue = pin!(TimerQueue::new(StdTimer::new())); + /// # let timer = queue.as_ref().get_timer(); + /// timer.delay(Duration::from_millis(1)); + /// ``` + pub fn delay(&self, duration: HW::Duration) -> impl Future { + self.delay_until(self.expiration(duration)) } - pub fn delay(&self, duration: Milliseconds) -> impl Future { - DelayFuture::new( - self.sender.clone(), - self.counter.clone(), - self.delay_to_ticks(duration), - ) + /// Delay until deadline + /// + /// # Example + /// ``` + /// # use hyperloop::timer::TimerQueue; + /// # use hyperloop::timer::std::StdTimer; + /// # use std::time::Duration; + /// # use std::pin::pin; + /// # let mut queue = pin!(TimerQueue::new(StdTimer::new())); + /// # let timer = queue.as_ref().get_timer(); + /// timer.delay_until(timer.now() + Duration::from_millis(10)); + /// ``` + pub fn delay_until(&self, deadline: HW::Instant) -> impl Future { + DelayFuture::new(self.sender, self.hw, deadline) } - pub fn timeout(&self, future: F, duration: Milliseconds) -> TimeoutFuture { - TimeoutFuture::new( - future, - self.sender.clone(), - self.counter.clone(), - self.delay_to_ticks(duration), - ) + /// Drop future if not ready by expiration + /// + /// # Example + /// ``` + /// # use hyperloop::timer::TimerQueue; + /// # use hyperloop::timer::std::StdTimer; + /// # use hyperloop::task::yield_now; + /// # use std::time::Duration; + /// # use std::pin::pin; + /// # let mut queue = pin!(TimerQueue::new(StdTimer::new())); + /// # let timer = queue.as_ref().get_timer(); + /// async fn never_ends() { + /// loop { + /// yield_now().await; + /// } + /// } + /// + /// timer.timeout(never_ends(), Duration::from_millis(100)); + /// ``` + pub fn timeout( + &self, + future: F, + duration: HW::Duration, + ) -> impl Future> { + TimeoutFuture::new(future, self.sender, self.hw, self.expiration(duration)) } -} -struct TimerFuture { - counter: TickCounterToken, - expires: Option, -} + /// Consistent periodic delay without drift + /// + /// # Example + /// ``` + /// # use hyperloop::timer::{Timer, HardwareTimer}; + /// # async fn periodic_task(timer: Timer, period: HW::Duration) { + /// let mut periodic = timer.periodic(period); + /// + /// loop { + /// // Do some periodic work here + /// + /// // Wait until next period + /// periodic.wait().await; + /// } + /// # } + /// ``` + + pub fn periodic(&self, duration: HW::Duration) -> Periodic { + Periodic::new(self.sender, self.hw, duration) + } -impl TimerFuture { - fn new(counter: TickCounterToken) -> Self { - Self { - counter, - expires: None, - } + /// Return current time + pub fn now(&self) -> HW::Instant { + unsafe { (*self.hw).now() } } } -impl Stream for TimerFuture { - type Item = (); +#[cfg(test)] +mod tests { + use ::std::pin::pin; + use core::sync::atomic::{AtomicBool, Ordering}; + use core::time::Duration; + + use crate::common::tests::MockWaker; + use crate::executor::Executor; + use crate::task::Task; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(expires) = self.expires { - if self.counter.get_count() >= expires { - self.expires = None; - return Poll::Ready(Some(())); + use super::*; + + use ::std::sync::Arc; + use crossbeam_queue::ArrayQueue; + + const TICK: Duration = Duration::from_millis(1); + + #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] + struct MockInstant { + instant: Duration, + } + + impl MockInstant { + fn from_ticks(ticks: u32) -> Self { + Self { + instant: TICK * ticks, } - } else { - self.expires = Some(self.counter.get_count() + 1_u64); } - self.counter.register_waker(cx.waker()); - Poll::Pending + fn ticks(&self) -> u128 { + self.instant.as_millis() + } } -} -pub struct Scheduler { - rate: Hertz, - counter: TickCounterToken, - queue: PriorityQueue, -} + impl Add for MockInstant { + type Output = Self; -impl Scheduler { - pub fn new(rate: Hertz, counter: TickCounterToken) -> Self { - Self { - rate, - counter, - queue: PriorityQueue::new(), + fn add(self, rhs: Duration) -> Self::Output { + let instant = self.instant + rhs; + + Self { instant } + } + } + + impl AddAssign for MockInstant { + fn add_assign(&mut self, rhs: Duration) { + self.instant += rhs; } } - pub fn get_timer(&self) -> Timer { - Timer::new(self.rate, self.counter.clone(), self.queue.get_sender()) + #[derive(Clone)] + struct MockHardwareTimer { + time: Duration, } - fn next_waker(&mut self) -> Option { - if let Some(ticket) = self.queue.peek_mut().as_mut() { - if self.counter.get_count() > ticket.expires { - return Some(PeekMut::pop(ticket).waker); + impl MockHardwareTimer { + fn new() -> Self { + Self { + time: Duration::ZERO, } } - None - } + fn add(&mut self, duration: Duration) { + self.time = self.time.checked_add(duration).unwrap(); + } - pub async fn task(&mut self) { - let mut timer = TimerFuture::new(self.counter.clone()); + fn tick(&mut self) { + self.add(TICK); + } - loop { - if let Some(waker) = self.next_waker() { - waker.wake(); - } else { - timer.next().await.unwrap() - } + fn ticks(&self) -> u128 { + self.time.as_millis() } } -} -#[cfg(test)] -mod tests { - use core::sync::atomic::Ordering; + impl HardwareTimer for MockHardwareTimer { + type Instant = MockInstant; + type Duration = Duration; - use crate::common::tests::{log_init, MockWaker}; - use crate::executor::Executor; - use crate::task::Task; + fn now(&self) -> Self::Instant { + MockInstant { instant: self.time } + } - use super::*; + fn set_alarm(&self, _duration: Option) {} - use core::future::Future; - use crossbeam_queue::ArrayQueue; - use embedded_time::duration::Extensions as Ext; - use embedded_time::rate::Extensions; - use std::{boxed::Box, sync::Arc}; + fn start(&self) {} + + fn wait_for_alarm(&self) {} + } #[test] fn state() { - let counter = Box::leak(Box::new(TickCounter::new())); - let token = counter.get_token(); + let queue = TimerQueue::new(MockHardwareTimer::new()); - assert_eq!(token.get_count(), 0); + assert_eq!(unsafe { (*queue.hw.get()).ticks() }, 0); + assert_eq!(unsafe { (*queue.hw.get()).ticks() }, 0); - unsafe { counter.increment() }; - assert_eq!(token.get_count(), 1); + unsafe { + (*queue.hw.get()).tick(); + } + + assert_eq!(unsafe { (*queue.hw.get()).ticks() }, 1); + + unsafe { + (*queue.hw.get()).tick(); + } + + assert_eq!(unsafe { (*queue.hw.get()).ticks() }, 2); + } + + #[test] + fn delay() { + let queue = pin!(TimerQueue::new(MockHardwareTimer::new())); + let timer = queue.as_ref().get_timer(); let mockwaker = Arc::new(MockWaker::new()); let waker: Waker = mockwaker.clone().into(); + let mut cx = Context::from_waker(&waker); - token.register_waker(&waker); - counter.wake(); + let mut future1 = DelayFuture::new(timer.sender, timer.hw, MockInstant::from_ticks(1)); - assert_eq!(mockwaker.woke.load(Ordering::Relaxed), true); + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Pending + ); + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Pending + ); + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Pending + ); - mockwaker.woke.store(false, Ordering::Relaxed); - token.register_waker(&waker); + assert!(queue.next_waker().is_none()); unsafe { - counter.tick(); + (*queue.hw.get()).tick(); } - assert_eq!(token.get_count(), 2); - assert_eq!(mockwaker.woke.load(Ordering::Relaxed), true); + unsafe { + (*queue.hw.get()).tick(); + } + + let waker = queue.next_waker().unwrap(); + + waker.wake(); + assert!(mockwaker.woke.load(Ordering::Relaxed)); + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Ready(()) + ); } #[test] - fn delay() { - let counter = Box::leak(Box::new(TickCounter::new())); - let token = counter.get_token(); - let scheduler: &'static mut Scheduler<10> = - Box::leak(Box::new(Scheduler::new(1000.Hz(), token.clone()))); - let sender = scheduler.queue.get_sender(); + fn delay2() { + let queue = pin!(TimerQueue::new(MockHardwareTimer::new())); + let timer = queue.as_ref().get_timer(); + let mockwaker = Arc::new(MockWaker::new()); let waker: Waker = mockwaker.clone().into(); let mut cx = Context::from_waker(&waker); - let mut future = DelayFuture::new(sender.clone(), token.clone(), 1); + let mut future1 = DelayFuture::new(timer.sender, timer.hw, MockInstant::from_ticks(1)); - assert_eq!(Pin::new(&mut future).poll(&mut cx), Poll::Pending); - assert_eq!(Pin::new(&mut future).poll(&mut cx), Poll::Pending); - assert_eq!(Pin::new(&mut future).poll(&mut cx), Poll::Pending); - assert_eq!(future.started, true); + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Pending + ); + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Pending + ); + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Pending + ); unsafe { - counter.tick(); - counter.tick(); + (*queue.hw.get()).tick(); + } + unsafe { + (*queue.hw.get()).tick(); } - assert_eq!(Pin::new(&mut future).poll(&mut cx), Poll::Ready(())); + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Ready(()) + ); - let mut future = DelayFuture::new(sender.clone(), token.clone(), 20); + let mut future2 = DelayFuture::new(timer.sender, timer.hw, MockInstant::from_ticks(20)); - assert_eq!(Pin::new(&mut future).poll(&mut cx), Poll::Pending); + assert_eq!( + unsafe { Pin::new_unchecked(&mut future2).poll(&mut cx) }, + Poll::Pending + ); - let mut future = DelayFuture::new(sender.clone(), token.clone(), 15); + let mut future3 = DelayFuture::new(timer.sender, timer.hw, MockInstant::from_ticks(15)); - assert_eq!(Pin::new(&mut future).poll(&mut cx), Poll::Pending); + assert_eq!( + unsafe { Pin::new_unchecked(&mut future3).poll(&mut cx) }, + Poll::Pending + ); - if let Some(ticket) = scheduler.queue.pop() { - assert_eq!(ticket.expires, 1); - ticket.waker.wake(); - assert_eq!(mockwaker.woke.load(Ordering::Relaxed), true) + if let Some(ticket) = unsafe { (*queue.queue.get()).pop() } { + let ticket = unsafe { &*ticket }; + assert_eq!(ticket.expires.ticks(), 1); + ticket.waker.wake_by_ref(); + assert!(mockwaker.woke.load(Ordering::Relaxed)) } - if let Some(ticket) = scheduler.queue.pop() { - assert_eq!(ticket.expires, 15); + if let Some(ticket) = unsafe { (*queue.queue.get()).pop() } { + let ticket = unsafe { &*ticket }; + assert_eq!(ticket.expires.ticks(), 15); } - if let Some(ticket) = scheduler.queue.pop() { - assert_eq!(ticket.expires, 20); + if let Some(ticket) = unsafe { (*queue.queue.get()).pop() } { + let ticket = unsafe { &*ticket }; + assert_eq!(ticket.expires.ticks(), 20); } } #[test] - fn timer() { - let counter = Box::leak(Box::new(TickCounter::new())); - let token = counter.get_token(); - let scheduler: &'static mut Scheduler<10> = - Box::leak(Box::new(Scheduler::new(1000.Hz(), token.clone()))); - let timer = scheduler.get_timer(); - let mut executor = Box::new(Executor::<10>::new()); - let queue = Arc::new(ArrayQueue::new(10)); + fn delay_until() { + let queue = pin!(TimerQueue::new(MockHardwareTimer::new())); + let timer = queue.as_ref().get_timer(); - log_init(); + let mockwaker = Arc::new(MockWaker::new()); + let waker: Waker = mockwaker.clone().into(); + let mut cx = Context::from_waker(&waker); - let test_future = |queue, timer| { - move || { - async fn future(queue: Arc>, timer: Timer) { - queue.push(1).unwrap(); + unsafe { + (*queue.hw.get()).tick(); + } - timer.delay(0.milliseconds()).await; + let mut future1 = timer.delay_until(MockInstant::from_ticks(0)); - queue.push(2).unwrap(); + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Pending + ); - timer.delay(1.milliseconds()).await; + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Ready(()) + ); - queue.push(3).unwrap(); + let mut future1 = timer.delay_until(MockInstant::from_ticks(2)); - timer.delay(1.milliseconds()).await; + for _ in 0..3 { + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Pending + ); + } - queue.push(4).unwrap(); + unsafe { + (*queue.hw.get()).tick(); + (*queue.hw.get()).tick(); + } + + assert_eq!( + unsafe { Pin::new_unchecked(&mut future1).poll(&mut cx) }, + Poll::Ready(()) + ); + } + + #[test] + fn timer() { + let queue = pin!(TimerQueue::new(MockHardwareTimer::new())); + let timer = queue.as_ref().get_timer(); + + let test_future = |queue, timer| { + async fn future(queue: Arc>, timer: Timer) { + queue.push(1).unwrap(); - timer.delay(1.milliseconds()).await; + timer.delay(Duration::ZERO).await; - queue.push(5).unwrap(); + queue.push(2).unwrap(); - timer.delay(10.milliseconds()).await; + timer.delay(TICK).await; - queue.push(6).unwrap(); - } + queue.push(3).unwrap(); - future(queue, timer) + timer.delay(TICK).await; + + queue.push(4).unwrap(); + + timer.delay(TICK).await; + + queue.push(5).unwrap(); + + timer.delay(TICK * 10).await; + + queue.push(6).unwrap(); } + + future(queue, timer) }; - let task1 = Task::new(move || scheduler.task(), 1); - let task2 = Task::new(test_future(queue.clone(), timer.clone()), 1); + let array_queue = Arc::new(ArrayQueue::new(10)); - task1.add_to_executor(executor.get_sender()).unwrap(); - task2.add_to_executor(executor.get_sender()).unwrap(); + let mut task = Task::new(test_future(array_queue.clone(), timer.clone()), 1); - unsafe { - executor.poll_tasks(); - } + let mut executor = Executor::new([unsafe { Pin::new_unchecked(&mut task).get_handle() }]); + let mut executor_handle = unsafe { Pin::new_unchecked(&mut executor).get_handle() }; - assert_eq!(queue.pop(), Some(1)); - assert_eq!(queue.pop(), None); + executor_handle.poll_tasks(); + + assert_eq!(array_queue.pop(), Some(1)); + assert_eq!(array_queue.pop(), None); unsafe { - counter.tick(); - } - unsafe { - executor.poll_tasks(); + (*queue.hw.get()).tick(); } + queue.wake_tasks(); + executor_handle.poll_tasks(); - assert_eq!(queue.pop(), Some(2)); - assert_eq!(queue.pop(), None); + assert_eq!(array_queue.pop(), Some(2)); + assert_eq!(array_queue.pop(), None); - counter.wake(); - unsafe { - executor.poll_tasks(); - } + queue.wake_tasks(); + executor_handle.poll_tasks(); - assert_eq!(queue.pop(), None); + assert_eq!(array_queue.pop(), None); unsafe { - counter.tick(); - } - unsafe { - counter.tick(); + (*queue.hw.get()).tick(); } unsafe { - executor.poll_tasks(); + (*queue.hw.get()).tick(); } + queue.wake_tasks(); + executor_handle.poll_tasks(); - assert_eq!(queue.pop(), Some(3)); - assert_eq!(queue.pop(), None); + assert_eq!(array_queue.pop(), Some(3)); + assert_eq!(array_queue.pop(), None); unsafe { - counter.tick(); + (*queue.hw.get()).tick(); } unsafe { - counter.tick(); - } - unsafe { - executor.poll_tasks(); + (*queue.hw.get()).tick(); } + queue.wake_tasks(); + executor_handle.poll_tasks(); - assert_eq!(queue.pop(), Some(4)); - assert_eq!(queue.pop(), None); + assert_eq!(array_queue.pop(), Some(4)); + assert_eq!(array_queue.pop(), None); unsafe { - counter.tick(); - } - unsafe { - counter.tick(); + (*queue.hw.get()).tick(); } unsafe { - executor.poll_tasks(); + (*queue.hw.get()).tick(); } + queue.wake_tasks(); + executor_handle.poll_tasks(); - assert_eq!(queue.pop(), Some(5)); - assert_eq!(queue.pop(), None); + assert_eq!(array_queue.pop(), Some(5)); + assert_eq!(array_queue.pop(), None); for _ in 0..10 { unsafe { - counter.tick(); - } - unsafe { - executor.poll_tasks(); + (*queue.hw.get()).tick(); } + queue.wake_tasks(); + executor_handle.poll_tasks(); - assert_eq!(queue.pop(), None); + assert_eq!(array_queue.pop(), None); } unsafe { - counter.tick(); - } - unsafe { - executor.poll_tasks(); + (*queue.hw.get()).tick(); } + queue.wake_tasks(); + executor_handle.poll_tasks(); - assert_eq!(queue.pop(), Some(6)); - assert_eq!(queue.pop(), None); + assert_eq!(array_queue.pop(), Some(6)); + assert_eq!(array_queue.pop(), None); } #[test] - fn timeout() { - let counter = Box::leak(Box::new(TickCounter::new())); - let token = counter.get_token(); - let scheduler: &'static mut Scheduler<10> = - Box::leak(Box::new(Scheduler::new(1000.Hz(), token.clone()))); - let timer = scheduler.get_timer(); - let mut executor = Executor::<10>::new(); - let queue = Arc::new(ArrayQueue::new(10)); - - log_init(); - - let waiting_future = |queue, timer| { - move || { - async fn slow_future(timer: Timer) { - timer.delay(1000.milliseconds()).await; - } - - async fn future(queue: Arc>, timer: Timer) { - queue.push(1).unwrap(); - - assert_eq!( - timer - .timeout(slow_future(timer.clone()), 100.milliseconds()) - .await, - Err(()) - ); - queue.push(2).unwrap(); - - assert_eq!( - timer - .timeout(slow_future(timer.clone()), 1001.milliseconds()) - .await, - Ok(()) - ); - queue.push(3).unwrap(); - } - future(queue, timer) - } - }; + fn nested() { + let queue = pin!(TimerQueue::new(MockHardwareTimer::new())); + let timer = queue.as_ref().get_timer(); + let flag = Arc::new(AtomicBool::new(false)); + + async fn inner(timer: Timer) { + timer.delay(Duration::from_millis(100)).await; + } - let task1 = Task::new(move || scheduler.task(), 1); - let task2 = Task::new(waiting_future(queue.clone(), timer), 1); + async fn outer(timer: Timer, flag: Arc) { + assert_eq!( + timer + .timeout(inner(timer.clone()), Duration::from_millis(50)) + .await, + Err(TimeoutError {}) + ); - task1.add_to_executor(executor.get_sender()).unwrap(); - task2.add_to_executor(executor.get_sender()).unwrap(); + assert_eq!( + timer + .timeout(inner(timer.clone()), Duration::from_millis(200)) + .await, + Ok(()) + ); - unsafe { - executor.poll_tasks(); + flag.store(true, Ordering::Relaxed); } - assert_eq!(queue.pop(), Some(1)); - assert_eq!(queue.pop(), None); + let mut task = Task::new(outer(timer.clone(), flag.clone()), 1); - for _ in 0..101 { - unsafe { - counter.increment(); - } - } + let mut executor = Executor::new([unsafe { Pin::new_unchecked(&mut task).get_handle() }]); + let mut handle = unsafe { Pin::new_unchecked(&mut executor).get_handle() }; - counter.wake(); + handle.poll_tasks(); unsafe { - executor.poll_tasks(); + (*queue.hw.get()).add(Duration::from_millis(51)); } - assert_eq!(queue.pop(), Some(2)); - assert_eq!(queue.pop(), None); + queue.wake_tasks(); + handle.poll_tasks(); - for _ in 0..1000 { - unsafe { - counter.increment(); - } - counter.wake(); + unsafe { + (*queue.hw.get()).add(Duration::from_millis(101)); + } - unsafe { - executor.poll_tasks(); - } + queue.wake_tasks(); + handle.poll_tasks(); - assert_eq!(queue.pop(), None); - } + assert!(flag.load(Ordering::Relaxed)); unsafe { - counter.increment(); + (*queue.hw.get()).add(Duration::from_millis(50)); } - counter.wake(); - unsafe { - executor.poll_tasks(); + assert!(queue.next_waker().is_none()); + } + + #[test] + fn periodic() { + let queue = pin!(TimerQueue::new(MockHardwareTimer::new())); + let timer = queue.as_ref().get_timer(); + let array_queue = Arc::new(ArrayQueue::new(10)); + + async fn task(timer: Timer, queue: Arc>) { + let mut periodic = timer.periodic(Duration::from_millis(100)); + + for i in 0..4 { + queue.push(i).unwrap(); + periodic.wait().await; + } } - assert_eq!(queue.pop(), Some(3)); - assert_eq!(queue.pop(), None); + let mut task = Task::new(task(timer, array_queue.clone()), 1); + + let mut executor = Executor::new([unsafe { Pin::new_unchecked(&mut task).get_handle() }]); + let mut handle = unsafe { Pin::new_unchecked(&mut executor).get_handle() }; + + queue.wake_tasks(); + handle.poll_tasks(); + + assert_eq!(array_queue.pop(), Some(0)); + assert_eq!(array_queue.pop(), None); + + queue.wake_tasks(); + handle.poll_tasks(); + + assert_eq!(array_queue.pop(), None); + + queue.get_hw_timer_mut().add(Duration::from_millis(50)); + + queue.wake_tasks(); + handle.poll_tasks(); + + assert_eq!(array_queue.pop(), None); + + queue.get_hw_timer_mut().add(Duration::from_millis(51)); + + queue.wake_tasks(); + handle.poll_tasks(); + + assert_eq!(array_queue.pop(), Some(1)); + assert_eq!(array_queue.pop(), None); + + queue.get_hw_timer_mut().add(Duration::from_millis(150)); + + queue.wake_tasks(); + handle.poll_tasks(); + + assert_eq!(array_queue.pop(), Some(2)); + assert_eq!(array_queue.pop(), None); + + queue.get_hw_timer_mut().add(Duration::from_millis(50)); + + queue.wake_tasks(); + handle.poll_tasks(); + + assert_eq!(array_queue.pop(), Some(3)); + assert_eq!(array_queue.pop(), None); } } diff --git a/hyperloop/src/timer/cortex_m.rs b/hyperloop/src/timer/cortex_m.rs new file mode 100644 index 0000000..e24443c --- /dev/null +++ b/hyperloop/src/timer/cortex_m.rs @@ -0,0 +1,115 @@ +//! Monotonic timer based on SysTick and cycle counter +//! +//! Based on + +use core::cell::{Cell, RefCell}; + +use cortex_m::{ + interrupt::Mutex, + peripheral::{syst::SystClkSource, DCB, DWT, SYST}, +}; + +use crate::timer::HardwareTimer; + +pub type Instant = fugit::Instant; +pub type Duration = fugit::Duration; + +/// Monotonic timer based on SysTick and cycle counter +pub struct SysTickTimer { + ticks: Mutex>, + systick: Mutex>, +} + +impl SysTickTimer { + pub fn new(dcb: &mut DCB, mut dwt: DWT, mut systick: SYST, sysclk: u32) -> Self { + assert!(TIMER_HZ == sysclk); + + dcb.enable_trace(); + DWT::unlock(); + assert!(DWT::has_cycle_counter()); + + dwt.enable_cycle_counter(); + dwt.set_cycle_count(0); + + systick.set_clock_source(SystClkSource::Core); + + Self { + ticks: Mutex::new(Cell::new(0)), + systick: Mutex::new(RefCell::new(systick)), + } + } +} + +impl HardwareTimer for SysTickTimer { + type Instant = Instant; + type Duration = Duration; + + fn start(&self) { + self.set_alarm(None); + + cortex_m::interrupt::free(|cs| { + let mut systick = self.systick.borrow(cs).borrow_mut(); + + systick.set_clock_source(SystClkSource::Core); + + systick.enable_counter(); + systick.enable_interrupt(); + }); + } + + fn now(&self) -> Self::Instant { + let old = cortex_m::interrupt::free(|cs| self.ticks.borrow(cs).get()); + + let mut high = (old >> 32) as u32; + let low = old as u32; + let now = DWT::cycle_count(); + + // Detect CYCCNT overflow + if now < low { + high = high.wrapping_add(1); + } + + let new = cortex_m::interrupt::free(|cs| { + let ticks = self.ticks.borrow(cs); + + ticks.replace(((high as u64) << 32) | (now as u64)); + ticks.get() + }); + + Self::Instant::from_ticks(new) + } + + fn set_alarm(&self, expiration: Option) { + let max = 0xff_ffff; + + let ticks = expiration + // Get duration between now and expiration, set to + // zero if already expired + .and_then(|expiration| { + expiration + .checked_duration_since(self.now()) + .or_else(|| Some(Self::Duration::from_ticks(0))) + }) + // Try to convert duration to ticks + .and_then(|duration| duration.ticks().try_into().ok()) + // If no expiration provided, or duration to long to fit + // ticks in u32, set ticks to max value + .unwrap_or(max) + // If duration is zero, set to 1 because 0 has special + // meaning in systick reload register + .max(1) + // Make sure ticks is not bigger than max + .min(max); + + cortex_m::interrupt::free(|cs| { + let mut systick = self.systick.borrow(cs).borrow_mut(); + + systick.set_reload(ticks); + systick.clear_current(); + }); + } + + fn wait_for_alarm(&self) { + cortex_m::asm::wfi(); + } +} diff --git a/hyperloop/src/timer/list.rs b/hyperloop/src/timer/list.rs new file mode 100644 index 0000000..fa2e6cc --- /dev/null +++ b/hyperloop/src/timer/list.rs @@ -0,0 +1,335 @@ +use core::{cell::UnsafeCell, marker::PhantomData, pin::Pin}; + +use pinned_aliasable::Aliasable; + +pub struct PeekMut<'a, T> { + node: *mut Node, + _marker: PhantomData<&'a ()>, +} + +impl<'a, T> PeekMut<'a, T> +where + T: Ord, +{ + pub fn get(&self) -> &T { + let node = unsafe { &*self.node }; + node.get_item() + } + + pub unsafe fn pop(self) -> *const T { + Node::unlink(self.node); + self.get() + } +} + +struct Sentinel { + next: ForwardLink, +} + +impl Sentinel { + fn new() -> Self { + Self { + next: ForwardLink::Empty, + } + } +} + +pub struct List { + sentinel: Aliasable>>, +} + +impl List { + pub fn new() -> Self { + Self { + sentinel: Aliasable::new(UnsafeCell::new(Sentinel::new())), + } + } + + #[allow(clippy::mut_from_ref)] + unsafe fn get_sentinel(&self) -> &mut Sentinel { + unsafe { &mut *Aliasable::get(Pin::new_unchecked(&self.sentinel)).get() } + } + + pub unsafe fn insert(&mut self, new_node: *mut Node) { + let mut link = self.get_sentinel().next.clone(); + let mut prev: Option<*mut Node> = None; // link.get_node().unwrap(); + + loop { + let backlink = if let Some(node) = prev { + BackwardLink::Node(node) + } else { + BackwardLink::List(self) + }; + + match link { + ForwardLink::Empty => { + (*new_node).link(backlink, ForwardLink::Empty); + break; + } + ForwardLink::Node(node) => { + if (*node).get_item() > (*new_node).get_item() { + (*new_node).link(backlink, ForwardLink::Node(node)); + + break; + } else { + prev = Some(node); + link = (*node).next.clone(); + } + } + } + } + } + + pub fn peek_mut(&mut self) -> Option> { + let head = unsafe { self.get_sentinel().next.clone() }; + + match head { + ForwardLink::Empty => None, + ForwardLink::Node(node) => Some(PeekMut { + node, + _marker: PhantomData, + }), + } + } + + #[cfg(test)] + pub unsafe fn pop(&mut self) -> Option<*const T> { + self.peek_mut().map(|peek_mut| peek_mut.pop()) + } + + #[cfg(test)] + fn drain(&mut self) -> Vec + where + T: Copy, + { + let mut v = Vec::new(); + + while let Some(item) = unsafe { self.pop() } { + let item = unsafe { &*item }; + v.push(*item); + } + + v + } +} + +impl Default for List { + fn default() -> Self { + Self::new() + } +} + +enum ForwardLink { + Empty, + Node(*mut Node), +} + +impl Clone for ForwardLink { + fn clone(&self) -> Self { + match self { + Self::Empty => Self::Empty, + Self::Node(arg0) => Self::Node(*arg0), + } + } +} + +enum BackwardLink { + Empty, + Node(*mut Node), + List(*mut List), +} + +impl Clone for BackwardLink { + fn clone(&self) -> Self { + match self { + Self::Empty => Self::Empty, + Self::Node(arg0) => Self::Node(*arg0), + Self::List(arg0) => Self::List(*arg0), + } + } +} + +pub struct Node { + item: T, + next: ForwardLink, + prev: BackwardLink, +} + +impl Node +where + T: Ord, +{ + pub fn new(item: T) -> Self { + Self { + item, + next: ForwardLink::Empty, + prev: BackwardLink::Empty, + } + } + + unsafe fn link(&mut self, prev: BackwardLink, next: ForwardLink) { + self.prev = prev; + self.next = next; + + match self.prev { + BackwardLink::Node(node) => { + let node = unsafe { &mut *node }; + node.next = ForwardLink::Node(self); + } + BackwardLink::List(list) => { + let list = unsafe { &mut *list }; + let sentinel = unsafe { list.get_sentinel() }; + sentinel.next = ForwardLink::Node(self); + } + BackwardLink::Empty => unreachable!(), + } + + match self.next { + ForwardLink::Empty => (), + ForwardLink::Node(node) => { + let node = unsafe { &mut *node }; + node.prev = BackwardLink::Node(self); + } + } + } + + fn get_item(&self) -> &T { + &self.item + } + + pub unsafe fn unlink(node: *mut Self) { + let node = unsafe { &mut *node }; + + match node.prev { + BackwardLink::Empty => (), + BackwardLink::Node(prev) => { + let prev = unsafe { &mut *prev }; + prev.next = node.next.clone(); + } + BackwardLink::List(list) => { + let list = unsafe { &mut *list }; + let sentinel = unsafe { list.get_sentinel() }; + sentinel.next = node.next.clone(); + } + } + + match node.next { + ForwardLink::Empty => (), + ForwardLink::Node(next) => { + let next = unsafe { &mut *next }; + next.prev = node.prev.clone(); + } + } + + node.prev = BackwardLink::Empty; + node.next = ForwardLink::Empty; + } +} + +#[cfg(test)] +mod tests { + use core::cell::UnsafeCell; + + use super::*; + + #[test] + fn linked_list() { + let mut root = List::new(); + let list = &mut root; + + let items = (0..10) + .map(|i| UnsafeCell::new(Node::new(i))) + .collect::>(); + + for item in &items { + let node = item; + unsafe { list.insert(node.get()) }; + } + + let mut v = Vec::new(); + + while let Some(item) = unsafe { list.pop() } { + let item = unsafe { &*item }; + v.push(*item); + } + + assert_eq!(v, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + } + + #[test] + fn reversed() { + let mut root = List::new(); + let list = &mut root; + + let items = (0..10) + .rev() + .map(|i| UnsafeCell::new(Node::new(i))) + .collect::>(); + + for item in &items { + let node = item; + unsafe { list.insert(node.get()) }; + } + + let mut v = Vec::new(); + + while let Some(item) = unsafe { list.pop() } { + let item = unsafe { &*item }; + v.push(*item); + } + + assert_eq!(v, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + } + + #[test] + fn order() { + let mut root = List::new(); + let list = &mut root; + + let items = [1, 20, 15] + .iter() + .copied() + .map(|i| UnsafeCell::new(Node::new(i))) + .collect::>(); + + for item in &items { + let node = item; + unsafe { list.insert(node.get()) }; + } + + let mut v = Vec::new(); + + while let Some(item) = unsafe { list.pop() } { + let item = unsafe { &*item }; + v.push(*item); + } + + assert_eq!(v, [1, 15, 20]); + } + + #[test] + fn remove() { + let root = UnsafeCell::new(List::new()); + + for i in 0..3 { + let nodes = [1, 2, 3] + .iter() + .copied() + .map(|i| UnsafeCell::new(Node::new(i))) + .collect::>(); + + for node in &nodes { + let list = unsafe { &mut *root.get() }; + unsafe { list.insert(node.get()) }; + } + + unsafe { Node::unlink(nodes[i].get()) }; + + let mut expected = vec![1, 2, 3]; + expected.remove(i); + + let list = unsafe { &mut *root.get() }; + assert_eq!(list.drain(), expected); + } + } +} diff --git a/hyperloop/src/timer/std.rs b/hyperloop/src/timer/std.rs new file mode 100644 index 0000000..5bc37ee --- /dev/null +++ b/hyperloop/src/timer/std.rs @@ -0,0 +1,50 @@ +//! Standard library based [`HardwareTimer`] +//! +//! Implementation of [`HardwareTimer`] using Rust standard +//! library. This is mostly useful for testing code in pure software. + +use std::{sync::Mutex, time::Instant}; + +use crate::timer::HardwareTimer; + +/// Standard library based [`HardwareTimer`] +pub struct StdTimer { + alarm: Mutex>, +} + +impl StdTimer { + pub fn new() -> Self { + Self { + alarm: Mutex::new(None), + } + } +} + +impl Default for StdTimer { + fn default() -> Self { + Self::new() + } +} + +impl HardwareTimer for StdTimer { + type Instant = std::time::Instant; + type Duration = std::time::Duration; + + fn start(&self) {} + + fn now(&self) -> Self::Instant { + Self::Instant::now() + } + + fn set_alarm(&self, expires: Option) { + *self.alarm.lock().unwrap() = expires; + } + + fn wait_for_alarm(&self) { + if let Some(alarm) = self.alarm.lock().unwrap().as_ref() { + if let Some(delay) = alarm.checked_duration_since(Instant::now()) { + std::thread::sleep(delay); + } + } + } +} diff --git a/hyperloop/tests/tests.rs b/hyperloop/tests/tests.rs new file mode 100644 index 0000000..b09b4d5 --- /dev/null +++ b/hyperloop/tests/tests.rs @@ -0,0 +1,31 @@ +#![feature(type_alias_impl_trait)] + +use crossbeam_queue::ArrayQueue; +use hyperloop::{static_executor, task}; + +use std::sync::Arc; + +#[test] +fn macros() { + #[task(priority = 1)] + async fn test_task1(queue: Arc>) { + queue.push(1).unwrap(); + } + + #[task(priority = 2)] + async fn test_task2(queue: Arc>) { + queue.push(2).unwrap(); + } + + let queue = Arc::new(ArrayQueue::new(10)); + + let task1 = test_task1(queue.clone()).unwrap(); + let task2 = test_task2(queue.clone()).unwrap(); + + let mut executor = static_executor!(task1, task2); + + executor.poll_tasks(); + + assert_eq!(queue.pop().unwrap(), 2); + assert_eq!(queue.pop().unwrap(), 1); +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 8315f48..e9d4cf2 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "nightly-2021-10-16" +channel = "nightly" components = [ "rustfmt", "clippy" ]