Closed ebiggs closed 2 years ago
I think don't implementing Clone
by setting the source to None
is right. That means you're loosing information just from cloning the status, and the source is important in figuring out what the underlying error is.
I'd recommend you instead use an Arc<Status>
.
I don't believe the framework has me at liberty to use an Arc<Status>
?
The Stream must conform to:
type *TheStreamType*: futures_core::Stream<Item = Result<*TheResponseType*, tonic::Status>
Perhaps a lossless Clone of tonic::Status can/should be achieved by making source
an
Option<Arc<Box<dyn Error + Send + Sync + 'static>>>
?
Can you explain more about your use case?
syntax = "proto3";
package hellowatch;
service Watcher {
rpc Watch (WatchRequest) returns (stream WatchReply) {}
}
message WatchRequest {
}
message WatchReply {
int32 i = 1;
}
use tokio::sync::watch::{Receiver, Sender};
use tokio::time;
use tokio::time::Duration;
use tonic::{transport::Server, Response, Status};
use hello_watch::watcher_server::{Watcher, WatcherServer};
use hello_watch::{WatchReply, WatchRequest};
pub mod hello_watch {
tonic::include_proto!("hellowatch");
}
type WatchWrapper<T> = tokio_stream::wrappers::WatchStream<T>;
pub struct MyWatcher {
rx: Receiver<Result<WatchReply, Status>>,
}
#[tonic::async_trait]
impl Watcher for MyWatcher {
type WatchStream = WatchWrapper<Result<WatchReply, Status>>;
async fn watch(
&self,
_request: tonic::Request<WatchRequest>,
) -> Result<tonic::Response<Self::WatchStream>, tonic::Status> {
let rx = self.rx.clone();
Ok(Response::new(WatchWrapper::new(rx)))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut interval = time::interval(Duration::from_secs(2));
type Tuple = (
Sender<Result<WatchReply, Status>>,
Receiver<Result<WatchReply, Status>>,
);
let (tx, rx): Tuple = tokio::sync::watch::channel(Ok(WatchReply { i: 0 }));
tokio::spawn(async move {
let mut i = 0;
loop {
i += 1;
interval.tick().await;
tx.send(Ok(WatchReply { i })).unwrap();
}
});
let addr = "[::1]:50051".parse().unwrap();
let watcher = MyWatcher { rx };
println!("GreeterServer listening on {}", addr);
// Interval::new(Instant::now(), Duration::new(5, 0));
Server::builder()
.add_service(WatcherServer::new(watcher))
.serve(addr)
.await?;
Ok(())
}
Is your actual code more complicated than this? Because here you could just not send a Result.
This was the minimal simplest example of the issue i'm facing. I wouldn't be building Results if the associated type for the Watcher trait didn't require it:
#[async_trait]
pub trait Watcher: Send + Sync + 'static {
///Server streaming response type for the Watch method.
type WatchStream: futures_core::Stream<
Item = Result<super::WatchReply, tonic::Status>,
>
+ Send
+ 'static;
async fn watch(
&self,
request: tonic::Request<super::WatchRequest>,
) -> Result<tonic::Response<Self::WatchStream>, tonic::Status>;
}
That is a trait that tonic_build
generates.
Perhaps you are right though as I'm no rust or tonic wizard, perhaps i'm overlooking something?
Good point re Arc<dyn ...>
. That works https://github.com/hyperium/tonic/pull/1076
Feature Request
Implement
Clone
fortonic::Status
to enable the ergonomic consumption of tokio::sync:: watch ::Receiver for rpc streaming responses.Crates
tonic
Motivation
tokio_stream::wrappers::WatchStream provides a handy way of creating a Stream for a watch channel, but its
Item
associated type is bound by theClone
trait and the code generated bytonic_build
expects aStream<Item=Result<_, tonic::Status>>
howeverClone
is not implemented forStatus
Browsing through the code history this was not always the case,
#[derive(Clone)]
was ontonic::Status
until the optional underlyingsource
error was added as a trait object which cannot be simply bound by Clone.Proposal
In my patch fork of tonic I have implemented Clone as such:
I'd be happy to create a PR for this solution or some other
impl Clone for Status
with guidance.Alternatives
I can think of other ways besides
tokio_stream::wrappers::WatchStream
to derive a Stream for a watch channel which doesn't requireItem
be bound byClone
, but being able to use tokio's own wrapper would be much more ergonomic.