hyperium / hyper

An HTTP library for Rust
https://hyper.rs
MIT License
14.42k stars 1.59k forks source link

Panic: dispatch dropped without returning error #2113

Closed vmalloc closed 4 years ago

vmalloc commented 4 years ago

It seems like https://github.com/hyperium/hyper/blob/v0.12.35/src/client/conn.rs#L259 happens from time to time. I currently don't have much information about when it happens (trying to pin-point it now) but I figure at the very least this should not be a panic that crashes the entire application when this indeed happens

LucioFranco commented 4 years ago

@vmalloc by chance are you using multiple runtimes? If the runtime that the background task is spawned on is dropped then the rx end of the channel will be dropped and thus you can't send a request with that sendrequest/connection pair.

seanmonstar commented 4 years ago

This is the same as #2112.

vmalloc commented 4 years ago

@seanmonstar Im not sure it’s the same. In my case in spawning a future from within actix to send date to s3 via rusoto. What am I doing wrong and what should I do to fix it?

seanmonstar commented 4 years ago

Sorry, it read the same XD

There's only 2 cases that it should occur:

  1. The connection dispatcher task failed mid-request in some very weird way, such as via a panic.
  2. The executor dropped that task unexpectedly.

If you're not creating and dropping runtimes throughout the lifetime of your app, then it should be 1. I suppose that could mean a bug in the connection task.

vmalloc commented 4 years ago

Just so that I’m sure I’m getting it straight - what does connection talk mean in my case? Rusoto’s future?

seanmonstar commented 4 years ago

Specifically, it refers to how hyper::client::conn::handshake() returns (SendRequest, Connection), where Connection is a Future (task) that watches the socket and HTTP state. If using hyper::Client, then the connection is spawned onto the configured executor (if not configured, it's tokio::spawn).

LucioFranco commented 4 years ago

Rusoto uses a thread_local global client that if its original executor gets dropped then it will fail all requests.

vmalloc commented 4 years ago

@LucioFranco I’m not sure what my options here are. The original run time shouldn’t be dropped before rusoto’s request ends, after all it joins on its future. Is there a way to make this use case work?

seanmonstar commented 4 years ago

@vmalloc are you doing anything beyond the standard actix web usage? Does anything start temporary executors, like maybe starting new actors?

If there's no panic elsewhere in hyper, then it's definitely something related to executors being shutdown.

vmalloc commented 4 years ago

What I do is essentially stream data from a request body to s3.

I first initialize a futures::sync::mpsc::channel to push the data into:

 let (sender, receiver) = channel::<Bytes>(...);

I then construct and send a request to send the receiver end to s3:

let req = PutObjectRequest {
    ....
        body: Some(ByteStream::new(receiver.map_err(...))),
    ...
};

I get the future for the request itself:

let put_future = s3.put_object(req).map(drop).map_err(...);

And I join it with the future to read the entirety of the stream:

let read_body_future = body
    .forward(sender)
    .map_err(...);

return read_body_future
        .join(put_future);
vmalloc commented 4 years ago

I might be missing something but I don't think there should be an extra executor hidden here...

seanmonstar commented 4 years ago

Hm, well, without seeing the full program, it's hard to diagnose.

This panic was originally written with the assumption that the executor would never drop, and so the only instance of this expected is because of a bug in hyper where the connection task may have panicked. We could change this to not panic, but instead return an error... But there'd still be a big problem in your program, because we cannot tell how much the server saw of the request (and acted upon it).

brandonros commented 2 years ago

How does one prevent the executor from dropping?

#![allow(non_camel_case_types, unused_parens, non_upper_case_globals, non_snake_case, unsupported_calling_conventions, dead_code, clippy::needless_return, clippy::needless_range_loop, clippy::bool_comparison, clippy::upper_case_acronyms)]

use std::str::FromStr;

use crate::{
  hyper_http_client,
  http_client_lock
};
use crate::logger::{
  info
};

async fn async_http_request(request_url: String, request_body_buffer: Vec<u8>) -> hyper::body::Bytes {
  let request = hyper::Request::builder()
    .method("POST")
    .uri(hyper::Uri::from_str(&request_url).unwrap())
    .header("Content-Type", "application/octect-stream")
    .body(hyper::Body::from(request_body_buffer))
    .unwrap();
  let response = hyper_http_client.request(request).await.unwrap();
  // check response
  if response.status() != 200 {
    panic!("bad status: {}", response.status());
  }
  // deserialize response body
  return hyper::body::to_bytes(response.into_body()).await.unwrap();
}

pub fn http_request(request_url: String, request_body_buffer: Vec<u8>) -> hyper::body::Bytes {
  info!("http_request: {}", request_url);
  // TODO: allow for a concurrency of greater than 1 with a thread pool?
  let _ = *http_client_lock.lock().unwrap();
  // spawn on its own thread with its own runtime to prevent nested tokio runtime errors on RegisterEventCallback -> GetEventItems
  let thread_handle = std::thread::spawn(|| {
    let client_runtime = tokio::runtime::Runtime::new().unwrap();
    return client_runtime.block_on(async {
      return async_http_request(request_url, request_body_buffer).await;
    });
  });
  return thread_handle.join().unwrap();
}