google / tarpc

An RPC framework for Rust with a focus on ease of use.
MIT License
3.09k stars 189 forks source link

On client's read errors quick subsequent requests may wait for response indefinitely #415

Closed altkdf closed 5 months ago

altkdf commented 6 months ago

If I understand correctly, the dispatch task is terminated on a receiving error in transport (from pump_read()) and subsequent requests return in

        self.to_dispatch
            .send(DispatchRequest {
                ctx,
                span,
                request_id,
                request,
                response_completion,
            })
            .await
            .map_err(|mpsc::error::SendError(_)| RpcError::Shutdown)?;

But for me it does sometime happen that if the request is filed immediately after the error, then

        self.to_dispatch
            .send

does not error and waits indefinitely in response_guard.response().await. Not sure why this is happening, since I would expect that all DispatchRequests would be dropped and also the waiters would notice that and return an error. But it seems that there is a race condition?

Unfortunately, I couldn't produce a minimal working example. A small example "just works".

If I add self.pending_requests_mut().close(); to pump_read(), I can't reproduce the issue anymore. Not sure if that is a solution or it just masks the issue.

    fn pump_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<(), ChannelError<C::Error>>>> {
        self.transport_pin_mut()
            .poll_next(cx)
            .map_err(|e| {
                let e = Arc::new(e);
                for span in self
                    .in_flight_requests()
                    .complete_all_requests(|| Err(RpcError::Receive(e.clone())))
                {
                    let _entered = span.enter();
                    tracing::info!("ReceiveError");
                }
                self.pending_requests_mut().close();
                ChannelError::Read(e)
            })
            .map_ok(|response| {
                self.complete(response);
            })
    }
tikue commented 6 months ago

Thanks for raising this issue! I'll try to take a closer look this week, but also, feel free to send a PR for review.

tikue commented 5 months ago

Could I see your code where the problem occurs? I'm curious how the RequestDispatch is being polled. When it hits an error, is it dropped?

tikue commented 5 months ago

BTW, I was able to refactor an existing test to consistently trigger this race: https://github.com/google/tarpc/compare/master...request_dispatch_race

tikue commented 5 months ago

Can you see if this branch fixes the problem for you? https://github.com/tikue/tarpc/tree/request_dispatch_race

altkdf commented 5 months ago

Can you see if this branch fixes the problem for you? https://github.com/tikue/tarpc/tree/request_dispatch_race

Thanks a lot for implementing it @tikue! I tried it out and, unfortunately, this still results in the same problem. But your code LGTM in general. So I suspect that this may be due to the use of ready! here, which may be returned due to spurious failures and I think returns something else compared to the desired return Poll::Ready(Err(e));. Also note that the PR I created consistently doesn't have this problem.

tikue commented 5 months ago

Hm, but the ready! macro only returns Poll::Pending if the underlying future being polled also returns Pending. The dispatch future should continue to be polled after that, right? If it's a spurious failure, than the mpsc receiver should have already arranged for a wakeup.

The problem with polling in a loop even when pending is returned is that it blocks in a nonblocking function. For example, a sender may have reserved a permit and then subsequently went to sleep for an hour. We don't want to block a Tokio thread for an hour waiting for the client to wake up, as there could be other async tasks that need to run in the meantime.

altkdf commented 5 months ago

Oh, true, you're right - the code should work just fine like that!

Also, I just noticed something strange in my tests. Although you changed the propagated error a little, my tests worked. But now when I try to reproduce my tests, I need to adjust the expected error message. Maybe I or cargo did something wrong, let me test it again.

altkdf commented 5 months ago

So the test has been running repeatedly for more than 20min now and no errors so far. Sorry for causing confusion earlier and thanks again for implementing the fix @tikue !

tikue commented 5 months ago

That's great news, thanks for confirming! Yeah, I changed the client errors a little since more types of channel errors are propagated now. I might still revisit them a bit. (benefits of being perpetually pre-1.0...)