seanmonstar / reqwest

An easy and powerful Rust HTTP Client
https://docs.rs/reqwest
Apache License 2.0
9.92k stars 1.12k forks source link

0.11.27->0.12.x regression: hyper::Error(Canceled, "connection closed") #2325

Closed davemilter closed 4 months ago

davemilter commented 5 months ago

I have long list of files that download using async part of reqwest. Pseudo code:

 for [f1, f2, f3, f4] in not_downloaded.chunks(4) {
   tokio::try_join!(download_file(f1), download_file(f2),  download_file(f3), download_file(f4))
}

where download_file is just code around GET:

http_client.get(url.clone()).send();

All works fine, until 0.12.x. After update I start getting random error:

reqwest::Error { kind: Request, url: Url { scheme: "https", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("foo.com")), port: None, path: "/api/Documents/sha/52C9FCC6156DF7270006C58FC417BAE88612962F", query: None, fragment: None }, source: hyper_util::client::legacy::Error(SendRequest, hyper::Error(Canceled, "connection closed")) }

Full log with RUST_LOG=trace:

[TRACE reqwest::async_impl::client] can retry hyper_util::client::legacy::Error(SendRequest, hyper::Error(Http2, Error { kind: GoAway(b"", NO_ERROR, Remote) }))
called `Result::unwrap()` on an `Err` value: NetworkError(reqwest::Error { kind: Request, url: Url { scheme: "https", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("foo.com")), port: None, path: "/api/Documents/sha/52C9FCC6156DF7270006C58FC417BAE88612962F", query: None, fragment: None }, source: hyper_util::client::legacy::Error(SendRequest, hyper::Error(Canceled, "connection closed")) })

may be this is issue related https://github.com/hyperium/hyper/issues/3673 .

davemilter commented 5 months ago

I compare two logs of bad one (reqwest 0.12.5) and good one (reqwest 0.11.27), and can not see obvious problems. reqwest 0.11.27 just gracefully retry, and reqwest 0.12.5 can not retry for some unknown for me reason:

BAD 0.12.5:

-TRACE h2::proto::streams::streams: drop_stream_ref; stream=Stream { id: StreamId(1999), state: State { inner: HalfClosedLocal(AwaitingHeaders) }, is_counted: true, ref_count: 1, next_pending_send: None, is_pending_send: false, send_flow: FlowControl { window_size: Window(65536), available: Window(0) }, requested_send_capacity: 0, buffered_send_data: 0, send_task: None, pending_send: Deque { indices: None }, next_pending_send_capacity: None, is_pending_send_capacity: false, send_capacity_inc: false, next_open: None, is_pending_open: false, is_pending_push: false, next_pending_accept: None, is_pending_accept: false, recv_flow: FlowControl { window_size: Window(2097152), available: Window(2097152) }, in_flight_recv_data: 0, next_window_update: None, is_pending_window_update: false, reset_at: None, next_reset_expire: None, pending_recv: Deque { indices: None }, is_recv: true, recv_task: Some(Waker { data: 0x12f605680, vtable: 0x100fa6228 }), pending_push_promises: Queue { indices: None, _p: PhantomData<h2::proto::streams::stream::NextAccept> }, content_length: Omitted }
-TRACE h2::proto::streams::prioritize: schedule_send stream.id=StreamId(1999)
-TRACE h2::proto::streams::store: Queue::push_back
-TRACE h2::proto::streams::store:  -> first entry
-TRACE h2::proto::streams::recv: enqueue_reset_expiration; StreamId(1999)
-TRACE h2::proto::streams::store: Queue::push_back
-TRACE h2::proto::streams::store:  -> first entry
-TRACE h2::proto::streams::counts: transition_after; stream=StreamId(1999); state=State { inner: Closed(ScheduledLibraryReset(CANCEL)) }; is_closed=true; pending_send_empty=true; buffered_send_data=0; num_recv=0; num_send=3
-TRACE h2::proto::streams::counts: dec_num_streams; stream=StreamId(1999)
-TRACE Connection{peer=Client}:poll:pop_frame:popped{stream.id=StreamId(1999) stream.state=State { inner: Closed(ScheduledLibraryReset(CANCEL)) }}: h2::proto::streams::prioritize: is_pending_reset=true
-TRACE Connection{peer=Client}:poll:pop_frame:popped{stream.id=StreamId(1999) stream.state=State { inner: Closed(ScheduledLibraryReset(CANCEL)) }}: h2::proto::streams::prioritize: pop_frame; frame=Reset { stream_id: StreamId(1999), error_code: CANCEL }
-TRACE Connection{peer=Client}:poll:pop_frame:popped{stream.id=StreamId(1999) stream.state=State { inner: Closed(ScheduledLibraryReset(CANCEL)) }}: h2::proto::streams::counts: transition_after; stream=StreamId(1999); state=State { inner: Closed(Error(Reset(StreamId(1999), CANCEL, Library))) }; is_closed=true; pending_send_empty=true; buffered_send_data=0; num_recv=0; num_send=2
-TRACE Connection{peer=Client}:poll: h2::proto::streams::prioritize: writing frame=Reset { stream_id: StreamId(1999), error_code: CANCEL }
-DEBUG Connection{peer=Client}:poll:FramedWrite::buffer{frame=Reset { stream_id: StreamId(1999), error_code: CANCEL }}: h2::codec::framed_write: send frame=Reset { stream_id: StreamId(1999), error_code: CANCEL }
-TRACE Connection{peer=Client}:poll:FramedWrite::buffer{frame=Reset { stream_id: StreamId(1999), error_code: CANCEL }}: h2::frame::reset: encoding RESET; id=StreamId(1999) code=CANCEL
-TRACE Connection{peer=Client}:poll:FramedWrite::buffer{frame=Reset { stream_id: StreamId(1999), error_code: CANCEL }}: h2::codec::framed_write: encoded reset rem=13

GOOD 0.11.27:

+TRACE Connection{peer=Client}:poll:FramedWrite::flush: h2::codec::framed_write: flushing buffer
+TRACE Connection{peer=Client}:poll: h2::proto::connection: connection.state=Open
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: poll
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: read.bytes=227
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}: h2::codec::framed_read: decoding frame from 227B
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}: h2::codec::framed_read: frame.kind=Headers
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}: h2::frame::headers: loading headers; flags=(0x4: END_HEADERS)
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: decode
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=218 kind=Indexed
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=217 kind=LiteralWithIndexing
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=206 kind=LiteralWithIndexing
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=182 kind=LiteralWithIndexing
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=169 kind=LiteralWithIndexing
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=161 kind=LiteralWithoutIndexing
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=145 kind=LiteralWithoutIndexing
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=121 kind=LiteralWithoutIndexing
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=96 kind=LiteralWithoutIndexing
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=76 kind=LiteralWithoutIndexing
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=62 kind=LiteralWithoutIndexing
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=227}:hpack::decode: h2::hpack::decoder: rem=32 kind=LiteralWithoutIndexing
+DEBUG Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: received frame=Headers { stream_id: StreamId(1997), flags: (0x4: END_HEADERS) }
+TRACE Connection{peer=Client}:poll: h2::proto::connection: recv HEADERS frame=Headers { stream_id: StreamId(1997), flags: (0x4: END_HEADERS) }
+TRACE Connection{peer=Client}:poll: h2::proto::streams::streams: recv_headers; stream=StreamId(1997); state=State { inner: HalfClosedLocal(AwaitingHeaders) }
+TRACE Connection{peer=Client}:poll: h2::proto::streams::recv: opening stream; init_window=2097152
+TRACE Connection{peer=Client}:poll: h2::proto::streams::counts: transition_after; stream=StreamId(1997); state=State { inner: HalfClosedLocal(Streaming) }; is_closed=false; pending_send_empty=true; buffered_send_data=0; num_recv=0; num_send=3
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: poll
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: read.bytes=3736
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=3736}: h2::codec::framed_read: decoding frame from 3736B
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=3736}: h2::codec::framed_read: frame.kind=Data
+DEBUG Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1997) }
+TRACE Connection{peer=Client}:poll: h2::proto::connection: recv DATA frame=Data { stream_id: StreamId(1997) }
+TRACE Connection{peer=Client}:poll: h2::proto::streams::recv: recv_data; size=3727; connection=4122141; stream=2097152
+TRACE Connection{peer=Client}:poll: h2::proto::streams::flow_control: send_data; sz=3727; window=4122141; available=5242880
+TRACE Connection{peer=Client}:poll: h2::proto::streams::flow_control: send_data; sz=3727; window=2097152; available=2097152
+TRACE Connection{peer=Client}:poll: h2::proto::streams::counts: transition_after; stream=StreamId(1997); state=State { inner: HalfClosedLocal(Streaming) }; is_closed=false; pending_send_empty=true; buffered_send_data=0; num_recv=0; num_send=3
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: poll
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: read.bytes=4105
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=4105}: h2::codec::framed_read: decoding frame from 4105B
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next:FramedRead::decode_frame{offset=4105}: h2::codec::framed_read: frame.kind=Data
+DEBUG Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1997) }
+TRACE Connection{peer=Client}:poll: h2::proto::connection: recv DATA frame=Data { stream_id: StreamId(1997) }
+TRACE Connection{peer=Client}:poll: h2::proto::streams::recv: recv_data; size=4096; connection=4118414; stream=2093425
+TRACE Connection{peer=Client}:poll: h2::proto::streams::flow_control: send_data; sz=4096; window=4118414; available=5239153
+TRACE Connection{peer=Client}:poll: h2::proto::streams::flow_control: send_data; sz=4096; window=2093425; available=2093425
+TRACE Connection{peer=Client}:poll: h2::proto::streams::counts: transition_after; stream=StreamId(1997); state=State { inner: HalfClosedLocal(Streaming) }; is_closed=false; pending_send_empty=true; buffered_send_data=0; num_recv=0; num_send=3
+TRACE Connection{peer=Client}:poll:FramedRead::poll_next: h2::codec::framed_read: poll
+TRACE Connection{peer=Client}:poll: h2::proto::streams::prioritize: poll_complete

Full logs: logs.zip

seanmonstar commented 4 months ago

A possible solution proposed in: https://github.com/hyperium/hyper/pull/3691

davemilter commented 4 months ago

But why https://github.com/hyperium/hyper/pull/3691 close this issue? It contains only new method for hyper, shouldn't a separate PR be created for reqwest to use try_send_request?

flisky commented 4 months ago

I suppose hyperium/hyper-util#133 really fixes this, @davemilter