-
Notifications
You must be signed in to change notification settings - Fork 11
Description
heya ! love the work being done on yaque it is an insanely useful crate :)
Description of issue
I have a long running future as such:
tokio::spawn(async move {
let mut recv: Receiver = match Receiver::open(qpath) {
Ok(recv) => recv,
Err(e) => {
log::error!("Error opening receiver: {}", e);
return;
}
};
while (*status).load(Ordering::Relaxed) {
loop {
match recv.recv().await {
Ok(bytes) => {
let bytes_inner = bytes.deref().clone();
Self::execute_job_from_bytes(bytes_inner, store.clone()).await;
match bytes.commit() {
Ok(_) => {}
Err(e) => {
log::error!("Error committing to queue: {}", e);
}
}
}
Err(e) => log::error!("Error receiving from queue: {:?}", e),
}
}
}
})With a sender that is triggered by API calls in a separate future.
sender
.send(cbor_bytes)
.await
.map_err(|e| warp::reject::custom(Failure::Execute(e.to_string())))?;However the receiver loop gets stuck at recv.recv().await, never resolving -- even when the sender succesfully sends !
I've dug a bit into yaque to see where it hangs -- and it seems to never resolve on the following line (line 272) of queue/receiver.rs
// Read header:
let mut header = [0; 4];
self.tail_follower.read_exact(&mut header).await?;Digging further into the generated ReadExact future. read_until_you_drain gets called twice when the queue is empty as dictated by the poll function
impl<'a> Future for ReadExact<'a> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
self.was_polled = true;
// See what happens when we read.
let outcome = self.read_until_you_drain();
if outcome.is_pending() {
// Set the waker in the file watcher:
let mut lock = self.waker.lock().expect("waker mutex poisoned");
*lock = Some(context.waker().clone());
// Now, you will have to recheck (TOCTOU!)
self.read_until_you_drain()
} else {
outcome
}
}
}self.read_until_you_drain() returns a Poll::Pending state as we'd expect -- but then ... never gets triggered again so the future hangs indefinitly -- possibly the waker not operating as expected ?
If I ctrlc-c to kill the process then reboot it -- the queue gets read correctly and the logic ensures correctly ... until the queue is empty once more and things hang.
As a work around I currently have replaced recv() with try_recv() which isn't ideal as the loop now spins and consumes the CPU entirely.
Any help on the above would be much appreciated :)