hyperium / hyper

An HTTP library for Rust
https://hyper.rs
MIT License
14.4k stars 1.59k forks source link

Request a example for working with futures_cpupool #1271

Closed sbwtw closed 7 years ago

sbwtw commented 7 years ago

Hi, I have visited the exmaples code for making a single HTTP request, but I want to learn how to send multple request and process each one in same time.(like multithread, I means) I found the futures_cpupool crate, but don't know how to work with hyper::client::Client. can someone please give me some example code(or existed project which using hyper) for deep learing?

seanmonstar commented 7 years ago

There is a guide that shows making multiple requests at the same time in 1 thread. Is that useful?

I'm going to close the issue, since this is a usage question and not a bug.

suyanlong commented 7 years ago

Hi ,I want to know with theadpool,how do work for making multiple requests ? @seanmonstar

parasyte commented 6 years ago

FWIW, I was able to get futures-cpupool working. Spoiler: in a baseline benchmark, the naive single-threaded design with HTTP/1.1 pipelining is an order of magnitude faster.

With futures-cpupool:

extern crate futures;
extern crate futures_cpupool;
extern crate hyper;

use futures::{Async, Future, Poll};
use futures_cpupool::{CpuFuture, CpuPool};
use hyper::{Method, StatusCode};
use hyper::server::{Http, Request, Response, Service};

struct Ping {
    pool: CpuPool,
}

impl Ping {
    fn new() -> Self {
        Self {
            pool: CpuPool::new_num_cpus(),
        }
    }
}

impl Service for Ping {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<CpuFuture<Self::Response, Self::Error>>;

    fn call(&self, req: Request) -> Self::Future {
        Box::new(self.pool.spawn(PingResponse::new(req)))
    }
}

struct PingResponse {
    req: Request,
}

impl PingResponse {
    fn new(req: Request) -> Self {
        Self { req }
    }

    fn worker(&self) -> Poll<Response, hyper::Error> {
        let mut response = Response::new();

        match (self.req.method(), self.req.path()) {
            (&Method::Get, "/ping") => {
                response.set_body("pong");
            },
            _ => {
                response.set_status(StatusCode::NotFound);
            },
        };

        Ok(Async::Ready(response))
    }
}

impl Future for PingResponse {
    type Item = Response;
    type Error = hyper::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        self.worker()
    }
}

fn main() {
    let hostport = "127.0.0.1:3000";
    let addr = hostport.parse().unwrap();
    let server = Http::new()
        .pipeline(true)
        .bind(&addr, || Ok(Ping::new())).unwrap();

    println!("Listening on {}", hostport);

    server.run().unwrap();
}

Naive single-threaded design:

extern crate futures;
extern crate hyper;

use futures::Future;
use hyper::{Method, StatusCode};
use hyper::server::{Http, Request, Response, Service};

struct Ping;

impl Service for Ping {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;

    fn call(&self, req: Request) -> Self::Future {
        let mut response = Response::new();

        match (req.method(), req.path()) {
            (&Method::Get, "/ping") => {
                response.set_body("pong");
            },
            _ => {
                response.set_status(StatusCode::NotFound);
            },
        };

        Box::new(futures::future::ok(response))
    }
}

fn main() {
    let hostport = "127.0.0.1:3000";
    let addr = hostport.parse().unwrap();
    let server = Http::new()
        .pipeline(true)
        .bind(&addr, || Ok(Ping)).unwrap();

    println!("Listening on http://{}/", hostport);

    server.run().unwrap();
}

Benchmark with h2load:

h2load --h1 -c 100 -n 1000000 -m 50 http://localhost:3000/ping

Benchmark numbers are from a Mid-2014 MBP with 2.2 GHz Intel Core i7.

I haven't done a full analysis, but the primary issue appears to be thread synchronization. The pool is created with the same size as the number of CPU cores available, but the CPUs do not get saturated under load... Only about half of them stay busy. On my system that's 4x more work for 5x less performance. Abysmal. Cranking the pool size to n*2+1 makes the benchmark tank, and CPU saturation is even worse.

IMHO, run one server for each available core and load balance across them. 😕