tokahuke / yaque

Yaque is yet another disk-backed persistent queue for Rust.
79 stars 11 forks source link

`recv()` stuck awaiting #32

Open alexander-camuto opened 7 months ago

alexander-camuto commented 7 months ago

heya ! love the work being done on yaque it is an insanely useful crate :)

Description of issue

I have a long running future as such:

tokio::spawn(async move {
            let mut recv: Receiver = match Receiver::open(qpath) {
                Ok(recv) => recv,
                Err(e) => {
                    log::error!("Error opening receiver: {}", e);

            while (*status).load(Ordering::Relaxed) {
                loop {
                    match recv.recv().await {
                        Ok(bytes) => {
                            let bytes_inner = bytes.deref().clone();
                            Self::execute_job_from_bytes(bytes_inner, store.clone()).await;
                            match bytes.commit() {
                                Ok(_) => {}
                                Err(e) => {
                                    log::error!("Error committing to queue: {}", e);
                        Err(e) => log::error!("Error receiving from queue: {:?}", e),

With a sender that is triggered by API calls in a separate future.

        .map_err(|e| warp::reject::custom(Failure::Execute(e.to_string())))?;

However the receiver loop gets stuck at recv.recv().await, never resolving -- even when the sender succesfully sends !

I've dug a bit into yaque to see where it hangs -- and it seems to never resolve on the following line (line 272) of queue/

// Read header:
let mut header = [0; 4];
self.tail_follower.read_exact(&mut header).await?;

Digging further into the generated ReadExact future. read_until_you_drain gets called twice when the queue is empty as dictated by the poll function

impl<'a> Future for ReadExact<'a> {
    type Output = io::Result<()>;
    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
        self.was_polled = true;
        // See what happens when we read.
        let outcome = self.read_until_you_drain();

        if outcome.is_pending() {
            // Set the waker in the file watcher:
            let mut lock = self.waker.lock().expect("waker mutex poisoned");
            *lock = Some(context.waker().clone());

            // Now, you will have to recheck (TOCTOU!)
        } else {

self.read_until_you_drain() returns a Poll::Pending state as we'd expect -- but then ... never gets triggered again so the future hangs indefinitly -- possibly the waker not operating as expected ?

If I ctrlc-c to kill the process then reboot it -- the queue gets read correctly and the logic ensures correctly ... until the queue is empty once more and things hang.

As a work around I currently have replaced recv() with try_recv() which isn't ideal as the loop now spins and consumes the CPU entirely.

Any help on the above would be much appreciated :)

tokahuke commented 7 months ago

Whoa! This is a tough one. I have been digging into your issue and will have to give it a thought during the weekend, now that Dobby is finally free.

tokahuke commented 7 months ago

So, as a better workaround than spinning a try_recv, you could try a recv_timeout to reduce CPU load substantially. That is still a hack, but it's all I can do right away.

alexander-camuto commented 7 months ago

yeah have been using that so far :) reduces cpu usage from 100% to 0.1%

tokahuke commented 7 months ago

Question, what OS and what version of yaque are you using? It can be that the reason your app is getting stuck is the code not handling some specific event from the OS filesystem (this is still one other hypothesis of what is going on).

alexander-camuto commented 7 months ago

MacOS 13.0.1 and version 0.6.6 for yaque

raphaelcoeffic commented 4 months ago

I'm seeing the same here on macOS 13.6.1 and version yaque 0.6.6 as well. I'd strongly suspect a different behaviour of the notify crate on macOS. As it turned out, the culprit was in fact tokio, as show in #33.