hyperium / tonic

A native gRPC client & server implementation with async/await support.
https://docs.rs/tonic
MIT License
10.02k stars 1.02k forks source link

Concurrent client requests #33

Closed frol closed 4 years ago

frol commented 5 years ago

Feature Request

Motivation

It is common to have a need to do concurrent client requests, but it seems that Grpc client dispatcher is implemented to handle one call at a time, so the following example does not work by design:

use futures::join;

pub mod hello_world {
    tonic::include_proto!("helloworld");
}

use hello_world::{client::GreeterClient, HelloRequest};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = GreeterClient::connect("http://[::1]:50051")?;

    let request1 = tonic::Request::new(HelloRequest { name: "hello".into() });
    let res1 = client.say_hello(request1);

    let request2 = tonic::Request::new(HelloRequest { name: "hello".into() });
    let res2 = client.say_hello(request2);

    println!("RESPONSES={:?}", join!(res1, res2));

    Ok(())
}
error[E0499]: cannot borrow `client` as mutable more than once at a time
  --> tonic-examples/src/helloworld/client.rs:17:16
   |
14 |     let res1 = client.say_hello(request1);
   |                ------ first mutable borrow occurs here
...
17 |     let res2 = client.say_hello(request2);
   |                ^^^^^^ second mutable borrow occurs here
18 |
19 |     println!("RESPONSES={:?}", join!(res1, res2));
   |                                      ---- first borrow later used here

Proposal

It seems that there is a need for multiplexing and a pool of clients.

zackangelo commented 5 years ago

@frol I believe that the .clone() operation on a tonic client should be relatively lightweight (just a handle to the underlying channel). You should be able to dispatch a request in parallel by cloning the client before executing its method.

LucioFranco commented 5 years ago

@zackangelo is correct!

So pretty much this comes down to how tower handles back pressure. Internally we need a &mut self within the future to keep checking if our inner service is ready to accept the next request. To do this we need to borrow the service exclusively because only one access to that service can submit the request when its ready. To fix this internally we use a buffer channel to allow you to multiplex this concept of polling ready and handling back pressure. Hopefully this explains it a bit better.

frol commented 5 years ago

Thank you for the thorough explanation!

Just to confirm, I was able to use .clone() just fine:

use futures::join;

pub mod hello_world {
    tonic::include_proto!("helloworld");
}

use hello_world::{client::GreeterClient, HelloRequest};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let main_client = GreeterClient::connect("http://[::1]:50051")?;

    let mut client = main_client.clone();
    let res1 = client.say_hello(tonic::Request::new(HelloRequest { name: "hello1".into() }));

    let mut client = main_client.clone();
    let res2 = client.say_hello(tonic::Request::new(HelloRequest { name: "hello2".into() }));

    println!("RESPONSES={:?}", join!(res1, res2));

    Ok(())
}

Could you help me with a solution to fetch a list of resources concurrently (e.g. I have a vector of tonic::Requests)? I have tried to use futures::future::join_all:

pub mod hello_world {
    tonic::include_proto!("helloworld");
}

use hello_world::{client::GreeterClient, HelloRequest};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let main_client = GreeterClient::connect("http://[::1]:50051")?;

    let mut client = main_client.clone();
    let res1 = client.say_hello(tonic::Request::new(HelloRequest { name: "hello1".into() }));

    let mut client = main_client.clone();
    let res2 = client.say_hello(tonic::Request::new(HelloRequest { name: "hello2".into() }));

    println!("RESPONSES={:?}", futures::future::join_all(vec![res1, res2]).await);

    Ok(())
}
error[E0597]: `client` does not live long enough
 --> tonic-examples/src/helloworld/client.rs:17:16
  |
17|     let res2 = client.say_hello(tonic::Request::new(HelloRequest { name: "hello2".into() }));
  |                ^^^^^^ borrowed value does not live long enough
...
22| }
  | -
  | |
  | `client` dropped here while still borrowed
  | borrow might be used here, when `res1` is dropped and runs the destructor for type `impl core::future::future::Future`

And even if that would work, I would still need to store the clients somewhere in a list, right?

alce commented 5 years ago

@frol this works:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let main_client = GreeterClient::connect("http://[::1]:50051")?;

    let mut client1 = main_client.clone();
    let mut client2 = main_client.clone();

    let res1 = client1.say_hello(tonic::Request::new(HelloRequest {
        name: "hello".into(),
    }));

    let res2 = client2.say_hello(tonic::Request::new(HelloRequest {
        name: "world".into(),
    }));

    let responses = futures::future::try_join_all(vec![res1, res2]).await?;

    println!("RESPONSE={:?}", responses);

    Ok(())
}
blittable commented 5 years ago

Super-helpful... thank you.

If it was in a loop? I'm generating the req/resp from the protobufs, and tonic::response::Response - I cannot clone it.

This definitely inspired me to test the streaming approach, which works swimmingly. But, a batch of requests and handling their responses, does this require an Arc<Mutex> and pinning around the data-structure?

alce commented 5 years ago

@blittable see https://github.com/hyperium/tonic/issues/44#issuecomment-543829569 for a possible solution

LucioFranco commented 4 years ago

I think we can close this, feel free to reopen if you have any more questions :)

ettersi commented 2 years ago

Rust and tonic noob here, but if client.clone() is cheap and required frequently, wouldn't it be easier if client types had the Copy trait and client calls would take self by value instead of mutable reference?

davidpdrsn commented 2 years ago

@ettersi Copy is for types that can be copied by simplying copying their byte representation, such as i32. Two i32s are identical if the contain the same bytes. Whereas something like Box<String> is different. You cannot get a clone of the box by just copying the boxes bytes on the stack. There is memory on the heap to deal with. So Copy doesn't simply mean "cheap to clone".

ettersi commented 2 years ago

Right, I forgot that it has to be a bitwise copy. Sorry about the spam, then.

gabrik commented 1 year ago

Quick question, given that the Clone is lightweight, why does the client need &mut self instead of &self ?

horacimacias commented 1 year ago

@gabrik see previous info https://github.com/hyperium/tonic/issues/33#issuecomment-538154015