hyperium / hyper

An HTTP library for Rust
https://hyper.rs
MIT License
14.58k stars 1.6k forks source link

concurrent requests performance #1685

Closed johndoe52 closed 5 years ago

johndoe52 commented 6 years ago

WARNING: I am new to Rust and hyper

hello,

I want to figure out how to optimize performance of concurrent requests with hyper so that a machine can saturate its network link, the example below will get bottle necked by TCP port exhaustion, which would be solved by assigning an IPv4/6 range to the machine and use set_local_address, but see these two issues https://github.com/hyperium/hyper/issues/1684 https://github.com/hyperium/hyper/issues/1683 for why I have not implemented it yet. For IPv6, I can get a range from Hurricane Electric for free, to work around TCP port exhaustion.

HTTP2 could be used to work around TCP port exhaustion, but not all web servers support it.

Another common use case for this kind of optimization is a web spider.

consider this sample project Note that this project does not do concurrent requests for fetching book pages all at once, because the total number of pages is not known, guessing number of pages with HEAD requests to then fetch all pages in parallel will be faster and is more likely to bottle neck with TCP port exhaustion.

src/main.rs

extern crate hyper_tls;
extern crate futures;
extern crate tokio;
extern crate tokio_timer;
#[macro_use] extern crate lazy_static;

use std::env;
use hyper::rt::{self, Future}; // if using res.body().concat2() -> use hyper::rt::{self, Future, Stream};
use hyper::{Client, Method, Request};
use hyper::client::connect::HttpConnector;
use hyper_tls::HttpsConnector;
use futures::future::ok;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time;

fn fetch_page(count: &'static AtomicUsize, book: u32, page: u32)->
Box<dyn Future<Item=(), Error=()> + Send> {

    lazy_static! {
        static ref CLIENT: Client<HttpsConnector<HttpConnector>, hyper::Body>
            = Client::builder().build::<_, hyper::Body>(HttpsConnector::new(4).unwrap());
    }

    let mut req = Request::new(hyper::Body::from(""));

    *req.uri_mut() = format!("https://example.com/library/{}/{}", book, page).parse().unwrap();
    *req.method_mut() = Method::GET;

    Box::new(CLIENT.request(req)
             .and_then(move |res| {
                 count.fetch_add(1, Ordering::SeqCst);
                 if res.status().is_success() { 
                     // book page fetched, do whatever with res.body()
                     // ...
                     //
                     tokio::spawn(fetch_page(count, book, page + 1));
                     ok(())
                 } else if res.status().as_u16() == 429 {
                     // server sends rate limit ms in retry-after if it wants us to slow down
                     // should actually synchronize this with tokio and make tokio pause execution
                     // of futures for that time
                     tokio::spawn(tokio_timer::sleep(
                             time::Duration::from_millis(res.headers()
                                                         .get("retry-after").unwrap().to_str().unwrap()
                                                         .parse().unwrap()))
                         .and_then(move |_| {
                             tokio::spawn(fetch_page(count, book, page));
                             ok(())
                         }).map_err(|_| {}));
                     ok(())
                 } else if res.status().as_u16() == 404 {
                     // book pages fetched
                     println!("book {} fetched", book);
                     ok(())
                 } else if res.status().is_server_error() {
                     // server error, retrying
                     tokio::spawn(fetch_page(count, book, page));         
                     ok(())
                 } else {
                     println!("unhandled status: {} book: {}", res.status().as_u16(), book);
                     ok(())
                 }
             }).map_err(move |e| {
                 eprintln!("{}", e);
             }))
}

fn main() {

    lazy_static! {
        static ref COUNT: AtomicUsize = AtomicUsize::new(0);
    }

    let appname = env::args().nth(0).unwrap();
    let usage = &format!("Usage: {} first_book last_book", appname);

    let first: u32 = match env::args().nth(1) {
        Some(first) => first.parse().expect("Could not parse first_book"),
        None => {
            println!("{}", usage);
            return;
        }
    };

    let last: u32 = match env::args().nth(2) {
        Some(last) => last.parse().expect("Could not parse last_book"),
        None => {
            println!("{}", usage);
            return;
        }
    };

    tokio::run(rt::lazy(move || { 
        for book in first..last+1 {
            tokio::spawn(fetch_page(&COUNT, book, 0));
        }

        display_stats(COUNT.load(Ordering::Relaxed), &COUNT);

        ok(())
    }));
}

fn display_stats(mut tmp_count: usize, count: &'static AtomicUsize)
{
    println!("req count: {} - {} page/s", count.load(Ordering::Relaxed), count.load(Ordering::Relaxed) - tmp_count);
    tmp_count = count.load(Ordering::Relaxed);
    tokio::spawn(
        tokio_timer::sleep(time::Duration::from_millis(1000))
        .and_then(move |_| {
            display_stats(tmp_count, count);
            ok(())
        }).map_err(|_| {}));
}

Cargo.toml

[package]
name = "crawl-library"
version = "0.1.0"
authors = ["rust"]

[dependencies]
hyper = "0.12"
hyper-tls = "0.3"
lazy_static = "1"
futures = "0.1"
tokio = "0.1"
tokio-timer = "0.2"

PS: This project would make a good usage example for hyper

johndoe52 commented 6 years ago

https://github.com/hyperium/hyper/issues/1657

To extract body from response

add use hyper::rt::Stream;

Box::new(CLIENT.request(req)
             .and_then(move |res| {
                 count.fetch_add(1, Ordering::SeqCst);
                 if res.status().is_success() { 
                     // book page fetched, do whatever with res.body()
                     // ...
                     //
                     tokio::spawn(fetch_page(count, book, page + 1));
                     return res.into_body().concat2() // returns a future, resolved in .map
                 } else if res.status().as_u16() == 429 {
                     // server sends rate limit ms in retry-after if it wants us to slow down
                     // should actually synchronize this with tokio and make tokio pause execution
                     // of futures for that time
                     tokio::spawn(tokio_timer::sleep(
                             time::Duration::from_millis(res.headers()
                                                         .get("retry-after").unwrap().to_str().unwrap()
                                                         .parse().unwrap()))
                         .and_then(move |_| {
                             tokio::spawn(fetch_page(count, book, page));
                             ok(())
                         }).map_err(|_| {}));
                     ok(())
                 } else if res.status().as_u16() == 404 {
                     // book pages fetched
                     println!("book {} fetched", book);
                     ok(())
                 } else if res.status().is_server_error() {
                     // server error, retrying
                     tokio::spawn(fetch_page(count, book, page));         
                     ok(())
                 } else {
                     println!("unhandled status: {} book: {}", res.status().as_u16(), book);
                     ok(())
                 }
             }).map(|body| {
                 println!("body: {}", ::std::str::from_utf8(&body).unwrap()); // prints downloaded body here
             }).map_err(move |e| {
                 eprintln!("{}", e);
             }))
seanmonstar commented 5 years ago

hyper's Client does not limit how many outstanding requests or connections is has, so depending on the work load and the resources of the machine, you may run into TCP issues. You can use Stream::buffered to control how many futures are pending at a time.