Open autodidaddict opened 6 years ago
Yes, I am stuck on the exact same issue right now; a streaming client sample would be extremely helpful.
Aha, I figured it out. Ping: @autodidaddict
Given a proto like:
message ListRequest {
}
message ListResponse {
string filename = 1;
}
service MyService {
rpc List(ListRequest) returns (stream ListResponse) {}
}
The server can write to the stream like:
fn list(
&self,
_m: grpc::RequestOptions,
_req: ListRequest,
) -> grpc::StreamingResponse<ListResponse> {
// Send back 13 dummy list response objects
let iter = iter::repeat(())
.map(|_| {
let s = "MyTestFileName.bin".to_owned();
let mut resp = ListResponse::new();
resp.set_filename(s);
resp
})
.take(13);
grpc::StreamingResponse::iter(iter)
}
And the client can consume the stream like:
let list_req = ListRequest::new();
let list_res = client.list(grpc::RequestOptions::new(), list_req);
match list_res.wait() {
Err(e) => panic!("{:?}", e),
Ok((_, stream)) => {
for stream_item in stream {
let response = stream_item.unwrap();
println!("> {}", response.get_filename());
}
}
}
It would be nice to include the canonical route_guide example for grpc-rust usage, but at least I'm now using streaming rpcs without any issue using this library.
Hope that helps! Graham
Question being still: how can I sit on a streaming response and get the items before connection has been closed/EOF'ed. The wait() blocks right now simply until response is finished ...
OK, the answer is super elegant (but takes a lot of code reading to get there ;-)
`
let c : StreamingResponse
c .map_items(|i| println!("item: {:?}", i) ) .into_future() .wait(); `
I wonder if there's also a better approach server-side to stream results async and then return the final response in a more begin/end friendly manner (i.e. keep the connection open and periodically send responses over to the client instead of waiting on a thread for all the data to be returned)
I'd also love to see an example of how a bi-directional RPC call is supposed to work; route guide has a bi-di RouteChat
RPC call.
From what I can tell of the generated code, there is no way to hold on to the request and response streams simultaneously. I'm trying to integrate with a third-party RPC server that has a (in my opinion, poorly designed) interface that takes a stream of requests and gets back a stream of responses. The request stream needs access to the response stream so it can read back responses to send more requests.
This doesn't seem possible to do with the generated code from grpc-rust
.
Here's an example protobuf:
service RpcExample {
rpc Example(stream Message) returns (stream Message);
}
And here's the grpc-rust
generated signature for this RPC call:
fn example(&self, o: ::grpc::RequestOptions, p: ::grpc::StreamingRequest<super::RpcExample::Message>) -> ::grpc::StreamingResponse<super::RpcExample::Message> {
self.grpc_client.call_bidi(o, p, self.method_Example.clone())
}
This method will move the request stream and give back the response stream. That works for sending all the requests at once and then receiving all of the responses, but that's not really the intention of bi-directional in gRPC, since requests and responses can be interleaved.
For comparison, grpc-rs
generates code for the route guide example that can be invoked like so:
let (mut sink, mut receiver) = client.route_chat().unwrap();
This is more of what I would expect bi-di to look like and is quite similar to how the method would be generated for languages like Go and C#.
@stepancheg am I overlooking something that grpc-rust
generates that would support this mode of operation? Thanks!
@stepancheg As @peterhuene said, requests and responses seem unable to be interleaved when using BIDI.
No, first, requests and responses can be interleaved.
Second, anyway, in master grpc-rust API is significantly changed (user now takes Sink instead of providing Stream).
Now I need to implement route guide.
@stepancheg Thank you. Looking forward to the guide.
This commit adds route guide server: https://github.com/stepancheg/grpc-rust/commit/0f1faf9d9930c0243871fb15f7bd11d7c150836c
Seems to be compatible with go client.
Can you please provide a route guide server and client in the examples (you've already got the protobuf there) so we can see how to properly handle a streaming request and streaming response. I'm struggling with how to extract individual elements of type
T
from theStreamingRequest<T>
. Various forms ofiter()
andinto_iter()
don't appear to work and a sample would likely clear that up.