diff --git a/README.md b/README.md index 7b94956..8c16179 100644 --- a/README.md +++ b/README.md @@ -20,14 +20,14 @@ You will also be without yeild from (under the hood `await`) so you should think ## Breaking Down Python - How AsyncIO works in Python under the hood -Python's AsyncIO is built off the idea of a event loop and generator coroutines allowing functions to be suspended and resumed which is managed by the event loop when a `await` block is hit. This gives us the affect of single threaded concurrency. +Python's AsyncIO is built off the idea of an event loop and generator coroutines allowing functions to be suspended and resumed which is managed by the event loop when a `await` block is hit. This gives us the effect of single threaded concurrency. ### Breaking down async/await -Since Python 3.5.5 synatic sugar methods of `async` and `await` make our lives alot easier by doing all the heavy lifting for us keeping all the messy internals hidden away. +Since Python 3.5.5 the syntactic sugar keywords `async` and `await` make our lives a lot easier by doing all the heavy lifting for us, keeping all the messy internals hidden away. However, this wasn't always a thing. In Python versions older than 3.5.5 we were left with manually decorating our functions and using `yield from` (more on this later), if you've worked with async in Python 2 or old versions of Python 3 this might come easier to you. #### What is `await` under the hood? -await is basically a `yield from` wrapper after calling the coroutine and then calling `__await__`, which is all we really need to know for the sake of writing async extensions in Rust, the next question you will probably have is "well... What's yield from?", glad you asked! All we need to know about yield from is that we yield the results from another iterator until it's exhuased (All coroutines are generators so all support being iterated over which will support yield from) however, we cannot use `yield from` in our Rust code so here's what we would do logically in Rust (Implemented in Python). +await is basically a `yield from` wrapper after calling the coroutine and then calling `__await__`, which is all we really need to know for the sake of writing async extensions in Rust, the next question you will probably have is "well... What's yield from?", glad you asked! All we need to know about yield from is that we yield the results from another iterator until it's exhausted (All coroutines are generators so all support being iterated over which will support yield from) however, we cannot use `yield from` in our Rust code so here's what we would do logically in Rust (Implemented in Python). **Example code:** ```py @@ -67,7 +67,7 @@ except StopIteration as result: ``` #### What is `async def` under the hood? -`async def` or `async` key words are alot more complicated under the hood than `await`. +`async def` or `async` key words are a lot more complicated under the hood than `await`. Before `async` became a keyword you would have to decorate your function with `@asyncio.coroutine` to wrap your code in a coroutine class, however we have to re-create the coroutine class itself. **A coroutine remake in Python** @@ -96,7 +96,8 @@ class MyCoroutineCopy: return self # __iter__ is used just to return a iterator, we dont need this to be self - # but for the sake of this we're using the class itself as a iterator. + # but for the sake of this we're using the class itself as an iterator. + # Note that it is required for old-style generator-based coroutines compatibility. def __iter__(self): return self @@ -125,14 +126,207 @@ my_coroutine returned with: 'foo' MyCoroutineCopy returned with: 'foo' ``` -## Implementing Rust - IN PROGESS -Now we've got all the concepts out of the way and under our tool belt we can actually start to recreate this is Rust using PyO3. +--- + +### A look behind the scenes + +So far, we've only showed you how to make a coroutine, not how to make a coroutine *asynchronous*. +That is, how to interact with *other* systems in a *non-blocking* way (disk access, network access, other threads, etc. Everything that we call I/O.). All the code that we've shown so far, even if using `async` and `await` keywords, is synchronous, it runs sequentially. + +Here's the proof: + +``` python +# python 3.8 +import asyncio + + +async def level3(i): + print(f"task {i} reached the bottom, exiting...") + + +async def level2(i): + print(f"task {i} entering level 3") + result = await level3(i) + print(f"task {i} exiting level 3") + + +async def level1(i): + print(f"task {i} entering level 2") + result = await level2(i) + print(f"task {i} exiting level 2") + + +async def main(): + return await asyncio.gather(*[level1(i) for i in range(3)]) + + +asyncio.run(main()) +``` + +The above code will always, deterministicly, produce the following output. (Try and play with the code a little, to convince yourself.) + +``` +task 0 entering level 2 +task 0 entering level 3 +task 0 reached the bottom, exiting... +task 0 exiting level 3 +task 0 exiting level 2 +task 1 entering level 2 +task 1 entering level 3 +task 1 reached the bottom, exiting... +task 1 exiting level 3 +task 1 exiting level 2 +task 2 entering level 2 +task 2 entering level 3 +task 2 reached the bottom, exiting... +task 2 exiting level 3 +task 2 exiting level 2 +``` + +And this is good! As long as there is nothing to wait for, we just chain the iterators, and keep going down the async stack, and your CPU doesn't waste cycles on expensive context switches. + +So, to sum it up, `async` and `await` are the building blocks of asynchronous computations, they *enable* asynchronicity, but they don't, *by themselves*, make your program magically asynchronous. + +For that, we're missing a key element that lies in deep in the guts of asyncio, because just like with any magic, there's a trick, and to find it, we have to take a look behind the scenes. + +That missing piece of the puzzle is - well - the `Future`. +Not the Rust one, the [`asyncio.Future`](https://docs.python.org/3.8/library/asyncio-future.html#asyncio.Future). + +In essence, if you think of the async call stack as a tree, the Futures are always the leaves. +They represent the very moment where you are about to wait for an I/O operation, and you want to tell the async loop "Alright, I can't go any further now, could you please put my parent task on hold? +**Someone will *call* you *back* when I'm ready to proceed**" + +#### Implementing your own (Python) future + +So, to illustrate, let's implement our very own dumb future, that will spawn a thread to wake it up after some time. + +``` python +# python 3.8 +import asyncio +from asyncio.exceptions import InvalidStateError +import threading +import time + + +class MyFuture: + def __init__(self, result): + """ So here is our future. From the user's perspective, it takes `result` in + and will spit the square of that result after 1s when you await it. + """ + self._result = result + # The loop is here for asyncio's sanity checks but also to schedule the + # resuming of the task once the future is completed + self._loop = None + # The callbacks is a list of functions to be called when the future completes + self._callbacks = [] + # Now this is an ugly bit as it serves a double purpose: + # First, the loop will check that it's present as a flag of whether this + # future is advertising itself to be asyncio compatible or not. + # Second, it's value determines if the Task should be put on the waiting + # queue. This is exactly what we want, so we set it to True. + self._asyncio_future_blocking = True + + def __await__(self): + # Now that we're in an async context, we store the loop + self._loop = asyncio.get_running_loop() + + + def sleep_and_return(result): + """ Here is what's going to run in our thread. + We simulate some computations, and then return the result. + The task will be woken up on the next cycle of the loop. + Note that we're using the thread-safe variant of call_soon. + (This is all to avoid deadlocks.) + """ + time.sleep(1) + # More about `self._set_result` later + self._loop.call_soon_threadsafe(self._set_result, result * result) + + + threading.Thread(target=sleep_and_return, args=(self._result,)).start() + # We forget our result. It will be the thread's job to set it back. + self._result = None + # We are the iterator + return self + + def __next__(self): + # This will be called immediately afterwards, but the thread won't be + # finished yet, so... + if self._result is None: + # ... Here's the trick! We yield... ourself! A future! + return self + # Asyncio will now check _asyncio_future_blocking... True. + # "So, we have a future here, let's check its loop... get_loop()?" + raise StopIteration(self._result) + + def get_loop(self): + return self._loop + # "... Yes, the loop matches, alright, let's give it a callback..." + + def add_done_callback(self, callback, *, context=None): + self._callbacks.append((callback, context)) + # "... Put it on the waiting queue, and forget about it." + # Now the loop is free to do something else, or sleep... + # Until our thread finishes and calls: + + def _set_result(self, result): + """ This is our waker. It will set the result and schedule the + callbacks to resume our task. + One very important note is that it's a separate function as it + needs to be an atomic operation that happens on the main thread. + (In this simple Python example it's actually not required, but I + prefer to introduce this 2-step callback technique early on because + it will prove necessary in the Rust implementation to sidestep + issues with threads.) + """ + self._result = result + # This code is taken straight from asyncio's source code + for cb, ctx in self._callbacks: + self._loop.call_soon(cb, self, context=ctx) + self._callbacks[:] = [] + + # Now, before resuming, the first thing asyncio does is call `.result()` + # Don't ask me why, as the return is just ignored, but it does. + # Maybe it's there to mirror `concurrent.futures.Future`, and the source + # implementation uses it in the `__await__` function so maybe that's to avoid + # code duplication ? But then why call it as part of the loop and force it + # as part of the Future API? I just don't know + def result(self): + if self._result is None: + # This is never called, the exception is there to prove you that the + # task will only be resumed once the result is ready once again + raise InvalidStateError('result is not ready') + return None + # Now the task is resumed and __next__ is called a final time to + # provide the result. + +async def wrapper(i): + """ Simulating some depth in the call async stack. + (It also lets us make the shortcut of initialising _asyncio_future_blocking + to True.) + """ + return await MyFuture(i) + +async def main(): + """ To show you that this is indeed async, let's spawn a thousand of those + futures that will all complete after 1 second. + """ + results = await asyncio.gather(*[wrapper(i) for i in range(1000)]) + print(results) + +asyncio.run(main()) +``` + +--- + +## Implementing Rust +Now we've got all the concepts out of the way and under our tool belt we can actually start to recreate this in Rust using PyO3. We'll start by breaking down and recreating our first Python example recreating a `await`:

### Re-Creating Python's `await` in Rust **Setting up boilerplate:** -This is just our standard bit of setup, if you already have a existing bit of code with this in you can ignore it. +This is just our standard bit of setup, if you already have an existing bit of code with this in, you can ignore it. Though for the purposes of learning it would be a good idea to have a seperate area to mess with before putting it in your actual code. ```rust @@ -297,9 +491,10 @@ We're going to use the `#[pyproto]` macro for a couple things:

**Setting up our boilerplate:**
-Like we did with `await` we need to setup some basic boiler plate for the sake of demonstatration. +Like we did with `await` we need to setup some basic boiler plate for the sake of demonstration. -Our struct `MyCoroutine` will form the bases for our awaitable, it is important to note now that this will not be identified as a `coroutine` type by Python but as a awaitable type instead. This will mean things like `asyncio.create_task()` wont work but `asyncio.ensure_future()` will. +Our struct `MyAwaitable` will form the basis for our awaitable, it is important to note now that this will not be identified as a `coroutine` type by Python but as an awaitable type instead. +This will mean things like `asyncio.create_task()` wont work but `asyncio.ensure_future()` will. ```rust // lets get our basics setup first @@ -308,18 +503,18 @@ use pyo3::prelude::*; // Our base Python class that will do the job of Python's // coroutine class. #[pyclass] -struct MyCoroutine {} +struct MyAwaitable {} -// Lets just call our module await_from_rust for simplicity. +// Let's make it an other module since we're now doing the opposite: awaitable_rust #[pymodule] -fn await_from_rust(_py: Python, m: &PyModule) -> PyResult<()> { - m.add_class::()?; +fn awaitable_rust(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_class::()?; Ok(()) } ``` **Making it 'awaitable'**
-Python has a very simple system for making a object awaitable, simply `await` calls `__await__` under the hood, we can recreate this using the `pyproto` macro and the `PyAsyncProtocol`. +Python has a very simple system for making an object awaitable, `await` simply calls `__await__` under the hood. We can recreate this using the `pyproto` macro and the `PyAsyncProtocol`. ```rust // lets get our basics setup first @@ -328,34 +523,34 @@ use pyo3::prelude::*; // Our base Python class that will do the job of Python's // coroutine class. #[pyclass] -struct MyCoroutine {} +struct MyAwaitable {} -// Adding out async protocol, this makes use awaitable. -// it should be important to note: DO NOT LISTEN TO YOUR LINTER. +// Adding out async protocol, this makes us awaitable. +// It should be important to note: DO NOT LISTEN TO YOUR LINTER. // Your linter can get very, very confused at this system and will // want you to implement things that you do *not* want to implement // to 'satisfy' this protocol implementation. -// even if it highlights in red the bellow implementation will stil compile. +// Even if it highlights in red the below implementation will stil compile. #[pyproto] -impl PyAsyncProtocol for MyCoroutine { +impl PyAsyncProtocol for MyAwaitable { fn __await__(slf: PyRef) -> PyRef { slf // We're saying that we are the iterable part of the coroutine. } } -// Lets just call our module await_from_rust for simplicity. +// let's make it an other module since we're now doing the opposite: awaitable_rust #[pymodule] -fn await_from_rust(_py: Python, m: &PyModule) -> PyResult<()> { - m.add_class::()?; +fn awaitable_rust(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_class::()?; Ok(()) } ``` *Wow! is it that easy to make a coroutine?* - Sadly not quite, the `slf` reference does not allow us to internally call functions we've defined, in the above example as well we are telling Python that our iterable is itself. This will crash if we try to run this on Python now because we're missing the iterator protocol. -However, this simple setup still carries alot of use. If you have something that just needs to be awaitable and transfer some pre-computed fields to a existing awaitable or PyObject we can just create the object -> call `__await__` and return that. This can make things considerably easier if your Rust coroutines are simply acting as a middle man for some efficent code. +However, this simple setup still carries a lot of use. If you have something that just needs to be awaitable and transfer some pre-computed fields to an existing awaitable or PyObject you can just create the object -> call `__await__` and return that. This can make things considerably easier if your Rust coroutines are simply acting as a middle man for some efficent code. -**Making our awaitable a iterable**
+**Making our awaitable an iterable**
It should be important to note that just because something is awaitable does not make it a coroutine, coroutines are essentially self contained classes that return `self` on both `__await__` and `__iter__` calls and execute the actual code upon the `__next__` call (Please note I am heavily simplifying it to make sense of the following Rust code.) Just like we did with `__await__` we can use `pyproto` to implement the iterable dunder methods: @@ -370,26 +565,26 @@ use pyo3::wrap_pyfunction; // Our base Python class that will do the job of Python's // coroutine class. #[pyclass] -struct MyCoroutine {} +struct MyAwaitable {} -// Adding out async protocol, this makes use awaitable. +// Adding out async protocol, this makes us awaitable. #[pyproto] -impl PyAsyncProtocol for MyCoroutine { +impl PyAsyncProtocol for MyAwaitable { fn __await__(slf: PyRef) -> PyRef { slf // We're saying that we are the iterable part of the coroutine. } } #[pyproto] -impl PyIterProtocol for MyCoroutine { - // This is a optional function, if you dont want todo anything like returning a existing iterator - // dont worry about implementing this. +impl PyIterProtocol for MyAwaitable { + // This is an optional function. If you dont want to do anything like returning + // an existing iterator, dont worry about implementing this. fn __iter__(slf: PyRef) -> PyRef { slf } - // There are other return types you can give however IterNextOutput is by far the biggest - // helper you will get when making coroutines. + // There are other return types you can give, however IterNextOutput is by far the biggest + // helper you will get when making awaitables. fn __next__(_slf: PyRefMut) -> IterNextOutput, &'static str> { IterNextOutput::Return("Ended") } @@ -398,14 +593,14 @@ impl PyIterProtocol for MyCoroutine { // Exposing our custom awaitable to Python. // This will behave like a coroutine. #[pyfunction] -fn my_coroutine() -> MyCoroutine { - MyCoroutine {} +fn my_awaitable() -> MyAwaitable { + MyAwaitable {} } -// Lets just call our module await_from_rust for simplicity. +// let's make it an other module since we're now doing the opposite: awaitable_rust #[pymodule] -fn await_from_rust(_py: Python, m: &PyModule) -> PyResult<()> { - m.add_class::()?; +fn awaitable_rust(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_class::()?; m.add_function(wrap_pyfunction!(my_coroutine, m)?).unwrap(); Ok(()) } @@ -417,12 +612,12 @@ And now we can call our Rust coroutine from Python. # python 3.8 import asyncio -import await_from_rust +import awaitable_rust async def main(): # Everything works as if it was coroutine... - result = await await_from_rust.my_coroutine() + result = await awaitable_rust.my_awaitable() print(f"my_coroutine returned with: {result!r}") # But note that this won't work: @@ -434,4 +629,436 @@ asyncio.run(main()) ### Using Rust's futures as Python awaitables -TODO... ;) +What if you could turn your rust `async` functions into awaitable Python objects? + +Turns out, Rust's and Python's `async`/`await` models aren't that different and can be made to work together. + +#### Comparison of Python's and Rust's `async`/`await` models + +[Earlier](#implementing-your-own-python-future) we broke down how futures work in python, which should hopefully have given us a good enough understanding of the Python `async`/`await` strategy. + +Now, if you haven't already, you should read [this chapter](https://rust-lang.github.io/async-book/02_execution/01_chapter.html) of the async Rust book which does an excellent job at explaining how `Futures` work in Rust. + +And armed with this knowledge we should have everything we need to write the glue to make those two work together, but for the sake of this explanation, let's recap: + +- In Python (asyncio), `async def`s translate to `coroutine`s. +These coroutines are just generators that yield from other generators (coroutines) whenever they encounter an `await` statement until they hit an `asyncio.Future` (a leaf) down the chain. +Then the whole `Task` (stack of generators/coroutines) is put on the waiting queue, and callbacks are setup on the Future to resume the Task once ready. +This process repeats until the Task finishes with a result. +- In Rust, async blocks are compiled down to state machines that advance between each state of waiting when polled, until they reach the final Ready state. +This is conceptually equivalent to Python's approach: +async blocks are syntactic sugar for structs that `impl` the `std::future::Future` trait by wrapping other objects that `impl` the trait. +Somewhere down the line, there will be a leaf Future that actually waits for some I/O and implements the trait manually. +Along this line is passed a `Waker` that will be used to signal when the whole (compiled) Future is ready to be resumed. + +In summary, async blocks are coroutines, wakers are callbacks, futures are futures, and future stacks compiled to state machines are Tasks. + +#### Implementing a Python future in Rust + +To hammer down what we've learned so far and as a first step, let's reimplement [our asyncio future](#implementing-your-own-python-future) in Rust. +I recommend you keep the Python version opened on the side to follow along. + +```rust +use { + pyo3::{iter::IterNextOutput, prelude::*, types::IntoPyDict, PyAsyncProtocol, PyIterProtocol}, + std::{thread, time::Duration}, +}; + +// So this is our class, except it's now implemented in Rust! +// I have kept the same layout so you can follow the corresponding Python code. +#[pyclass] +struct MyFuture { + // `result` is now concretized to a u32 for the sake of the example, where + // in Python it could have been anything. You can use PyObject if you want. + result: Option, + aio_loop: Option, + callbacks: Vec<(PyObject, Option)>, + #[pyo3(get, set)] + _asyncio_future_blocking: bool, +} + +#[pyproto] +impl PyAsyncProtocol for MyFuture { + fn __await__(slf: Py) -> PyResult> { + // Only locking the GIL for what we need + let (result, aio_loop) = Python::with_gil(|py| -> PyResult<_> { + let mut slf = slf.try_borrow_mut(py)?; + let aio_loop: PyObject = PyModule::import(py, "asyncio")? + .call0("get_running_loop")? + .into(); + slf.aio_loop = Some(aio_loop.clone()); + Ok((slf.result.take(), aio_loop)) + })?; + // We know we have a result thanks to the contstructor + let result = result.expect("result uninitialised"); + + // Now this is not obvious at first, but think of our Python implementation, + // when we passed `self._set_result` to `call_soon_threadsafe`. + // `self._set_result` is actually an object. + // An object that wraps (or closes over) some pointers to call a function. + // This is our handcrafted lambda to wake our task. + #[pyclass] + struct WakerClosure { + result: u32, + aio_loop: PyObject, + future: Py, + } + + let future = slf.clone(); + thread::spawn(move || { + thread::sleep(Duration::from_secs(1)); + let waker = WakerClosure { + result: result * result, + aio_loop: aio_loop.clone(), + future, + }; + Python::with_gil(|py| aio_loop.call_method1(py, "call_soon_threadsafe", (waker,))) + .expect("exception thrown by the event loop (probably closed)"); + }); + + // When the thread is finished, on its next iteration, the loop will + // call our waker, which in turn will call the registered callbacks. + #[pymethods] + impl WakerClosure { + #[call] + pub fn __call__(slf: PyRef) -> PyResult<()> { + let py = slf.py(); + // As I've touched about earlier, we would have a problem if + // the following line ran on the child thread + let mut future = slf.future.try_borrow_mut(py)?; + future.result = Some(slf.result); + // The same code from asyncio, only translated to rust. + let callbacks = std::mem::take(&mut future.callbacks); + for (callback, context) in callbacks { + slf.aio_loop.call_method( + py, + "call_soon", + (callback, &future), + Some(vec![("context", context)].into_py_dict(py)), + )?; + } + Ok(()) + } + } + + // Again, why we need to do this 2-step callback here, is to avoid complications which could either lead to deadlocks or errors when we try to access shared state. + // So we keep shared state out of the child thread, and keep its + // interaction with the GIL to a bare minimum. + + Ok(slf) + } +} + +// The rest is a pretty straight-forward translation. + +#[pyproto] +impl PyIterProtocol for MyFuture { + fn __next__(slf: PyRef) -> IterNextOutput, u32> { + match slf.result { + None => IterNextOutput::Yield(slf), + Some(result) => IterNextOutput::Return(result), + } + } +} + +#[pymethods] +impl MyFuture { + #[new] + fn new(result: u32) -> Self { + Self { + result: Some(result), + aio_loop: None, + callbacks: vec![], + _asyncio_future_blocking: true, + } + } + + fn get_loop(&self) -> Option<&PyObject> { + self.aio_loop.as_ref() + } + + fn add_done_callback(&mut self, callback: PyObject, context: Option) { + self.callbacks.push((callback, context)); + } + + fn result(&self) -> Option { + None + } +} + +#[pymodule] +fn awaitable_rust(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_class::()?; + Ok(()) +} +``` + +And of course our little shim of Python to see it in action. + +```python +# python 3.8 +import asyncio +import time +# Bonus! You can try it with a different asyncio-compatible loop +#import uvloop + +import awaitable_rust + +async def wrapper(i): + return await awaitable_rust.MyFuture(i) + +async def main(): + results = await asyncio.gather(*[wrapper(i) for i in range(1000)]) + print(results) + + +#uvloop.install() +asyncio.run(main()) + +``` + +#### Bridging the gap + +For the next step, let's throw some *Rust* futures in the mix. +We will turn the thread code into a struct implementing the `std::future::Future` trait and make our *Python* future await it. + +For that we will simply be adapting [the `TimerFuture` from the Asynchronous Programming in Rust book](https://rust-lang.github.io/async-book/02_execution/03_wakeups.html#applied-build-a-timer) as it (not so) coincidentally looks very much like what we have been implementing so far. + +```rust +use { + futures::{ + future::{BoxFuture, FutureExt}, + task::{waker_ref, ArcWake}, + }, + pyo3::{ + iter::IterNextOutput, prelude::*, types::IntoPyDict, wrap_pyfunction, PyAsyncProtocol, + PyIterProtocol, + }, + std::{ + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, + thread, + time::Duration, + }, +}; + +// Straight from Asynchronous Programming in Rust, I have removed the comments +// and commented what I changed. +pub struct TimerFuture { + shared_state: Arc>, +} + +struct SharedState { + /// This bool is now an option with our result + result: Option, + + waker: Option, +} + +impl Future for TimerFuture { + type Output = u32; // The Future now returns something + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut shared_state = self.shared_state.lock().unwrap(); + match shared_state.result { + Some(result) => Poll::Ready(result), + None => { + shared_state.waker = Some(cx.waker().clone()); + Poll::Pending + } + } + } +} + +impl TimerFuture { + /// The timer is now hardcoded to 1 and we take the result to return instead + pub fn new(result: u32) -> Self { + let shared_state = Arc::new(Mutex::new(SharedState { + result: None, + waker: None, + })); + + // And here is the gist of it. We're essentially still doing the same thing: + // Spawn a thread, wait 1s, square the result, set the result, wake the task + let thread_shared_state = shared_state.clone(); + thread::spawn(move || { + thread::sleep(Duration::from_secs(1)); + let mut shared_state = thread_shared_state.lock().unwrap(); + shared_state.result = Some(result * result); + if let Some(waker) = shared_state.waker.take() { + // Note here that we are still locking the shared state when + // calling the waker. This will be relevant later. + waker.wake() + } + }); + + TimerFuture { shared_state } + } +} + +#[pyclass] +struct MyFuture { + // Now we're encapsulating a Rust future + future: BoxFuture<'static, u32>, + aio_loop: Option, + callbacks: Vec<(PyObject, Option)>, + #[pyo3(get, set)] + _asyncio_future_blocking: bool, + // And we keep a reference to the waker that we will be passing to the + // rust future + waker: Option>, +} + +impl MyFuture { + // The constructor is now internal and takes a Future that must be Send + 'static + // in order to be `.boxed()` (Python couldn't possibly pass us a Future anyway) + fn new(future: impl Future + Send + 'static) -> Self { + Self { + future: future.boxed(), + aio_loop: None, + callbacks: vec![], + _asyncio_future_blocking: true, + waker: None, + } + } +} + +#[pyproto] +impl PyAsyncProtocol for MyFuture { + fn __await__(slf: Py) -> PyResult> { + let py_future = slf.clone(); + Python::with_gil(|py| -> PyResult<_> { + let mut slf = slf.try_borrow_mut(py)?; + let aio_loop: PyObject = PyModule::import(py, "asyncio")? + .call0("get_running_loop")? + .into(); + slf.aio_loop = Some(aio_loop.clone()); + slf.waker = Some(Arc::new(WakerClosure { + aio_loop, + py_future, + })); + Ok(()) + })?; + + Ok(slf) + } +} + +#[pyproto] +impl PyIterProtocol for MyFuture { + // Now MyFuture has truly become a Future wrapper + fn __next__(mut py_slf: PyRefMut) -> IterNextOutput, u32> { + // This is so we can borrow both slf.waker & slf.future mutably + let slf = &mut *py_slf; + let waker = slf.waker.as_mut().expect("future was not awaited"); + // Thanks to `futures` we don't need to implement the Waker pseudo-trait + // manually, so this is all boiler-plate to pass the Waker to the Future. + let waker_ref = waker_ref(&waker); + let context = &mut Context::from_waker(&*waker_ref); + // Ideally, we should be releasing the GIL (which we are still holding + // with the PyRefMut) before calling `poll`, to avoid deadlocking. + // However in this example there is only one task per child thread so + // it cannot happen. + match slf.future.as_mut().poll(context) { + Poll::Pending => { + // In case the future needs to be put on hold multiple times, + // we need to set this back to true every time we're in a + // Pending state. Asyncio always sets it to false after + // reading it when we yield ourself. + slf._asyncio_future_blocking = true; + IterNextOutput::Yield(py_slf) + } + Poll::Ready(result) => IterNextOutput::Return(result), + } + } +} + +#[pyclass] +#[derive(Clone)] +struct WakerClosure { + aio_loop: PyObject, + py_future: Py, +} + +// WakerClosure now implements `futures::task::ArcWake` so it can be passed to a +// Rust Future as a waker, like we did above. +impl ArcWake for WakerClosure { + fn wake_by_ref(arc_self: &Arc) { + let closure = (**arc_self).clone(); + Python::with_gil(|py| { + arc_self + .aio_loop + .call_method1(py, "call_soon_threadsafe", (closure,)) + }) + .expect("exception thrown by the event loop (probably closed)"); + // Again, as a reminder, we are only setting ourself up to be called by + // the event loop from the main thread so this child thread is released + // before the task is woken up. Remember how the TimerFuture still has + // its shared state locked when calling `.wake()`? This is where it + // would have become a problem if we tried to call the callbacks directly. + // Of course, here, we could fix the issue in the TimerFuture's code, but + // in the wild we won't have control over how the waker is called. + // (That state locking issue isn't something I made up or introduced on + // purpose, it's genuinely the code from the async book.) + } +} + +#[pymethods] +impl WakerClosure { + // I didn't change anything here, but as an exercise, try and move the code + // from this function to `wake_by_ref` above to skip the double callback, + // and see what happens. + #[call] + pub fn __call__(slf: PyRef) -> PyResult<()> { + let py = slf.py(); + let mut py_future = slf.py_future.try_borrow_mut(py)?; + let callbacks = std::mem::take(&mut py_future.callbacks); + for (callback, context) in callbacks { + slf.aio_loop.call_method( + py, + "call_soon", + (callback, &py_future), + Some(vec![("context", context)].into_py_dict(py)), + )?; + } + Ok(()) + } +} + +#[pymethods] +impl MyFuture { + fn get_loop(&self) -> Option<&PyObject> { + self.aio_loop.as_ref() + } + + fn add_done_callback(&mut self, callback: PyObject, context: Option) { + self.callbacks.push((callback, context)); + } + + fn result(&self) -> Option { + None + } +} + +// All that's left to do now is to wrap the TimerFuture in MyFuture, and everything +// works just as before. +// (You'll have to update the Python shim to use this function.) +#[pyfunction] +fn my_timer_future(result: u32) -> MyFuture { + MyFuture::new(TimerFuture::new(result)) +} + +#[pymodule] +fn awaitable_rust(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_class::()?; + m.add_function(wrap_pyfunction!(my_timer_future, m)?)?; + Ok(()) +} +``` + +**And with that you have a working wrapper for asyncio-driven Rust futures.** + +The remaining work is to generalise to any kind of future (by taking an `impl IntoPyObject` as `Output` for example), add a `cancel` method, potentially implement the `Waker` pseudo-trait manually for our `WakerClosure` to avoid wrapping it in an `Arc`, release the GIL when `poll`ing, and generally polish some rough edges. + +But that is left to libraries to implement, or to the reader as an exercise.