rsocket / rsocket-rust

RSocket Rust Implementation using Tokio
Apache License 2.0
209 stars 19 forks source link

Support CANCEL Frames for Request-Stream #48

Closed yuriykulikov closed 3 years ago

yuriykulikov commented 3 years ago

RSocket CANCEL frames can be used to close (abort, short-circuit, :stop_button: ) streams from the client, when the client is no longer interested in consuming the rest of the stream items.

Motivation

Closing the streams upon reception of a CANCEL frame allows to free up the resources (for example memory allocated for buffers).

Desired solution

CANCEL Frame support should be implemented on both the server and the client side in the DuplexSocket.

Desired solution (client)

On the client side, the DuplexSocket has to send a CANCEL frame when the client code drops the stream handle.

There are several ways to achieve that. One is check the error code when sending values to the Handler::ReqRS(sender) sender. Unfortunately, this will send the cancel frame only when the server sends the next value after the stream was dropped. Another possible solution is to check the tx.is_closed() periodically. I don't know which one is better and most likely there is some other way.

Desired solution (server)

On the server side, the responder RSocket has to be notified about the fact that the stream is closed by the client.

To achieve that, the stream which is returned by the responder can be wrapped in an Abortable:

let (abort_handle, abort_registration) = AbortHandle::new_pair();
let mut payloads = Abortable::new(responder.request_stream(input),abort_registration);

The handle can be stored for the future use in a map:

abort_handles.insert(sid, abort_handle);

When the client send the CANCEL Frame and it is received on the server side, DuplexSocket performs a lookup in the map and aborts the stream:

if let Some((_,abort_handle)) = self.abort_handles.remove(&sid) {
     abort_handle.abort();
}

This will allow the RSocket responder implementation to perform a tx.is_closed() check (which will return true) and free up the resources.

Considered alternatives

One alternative solution would involve changing the RSocket API to expose the AbortHandle to the RSocket implementation. In this case the client can use this handle to abort explicitly instead of dropping the stream. The server would get the AbortHandle and can use it to check whether it is aborted or not. It seems that AbortHandle cannot be polled making this alternative pretty useless.

Additional context

A TODO is present in the DuplexSocket code: // TODO: support cancel

I would be glad to implement a PR if this issue is accepted :smile:

jjeffcaii commented 3 years ago

@yuriykulikov That's great! Thanks for PR~ 😄

yuriykulikov commented 3 years ago

@jjeffcaii I have opened a PR. Could you please take a look? I did my best, but I am not a Rust developer and will need some guidance :smile:

jjeffcaii commented 3 years ago

Since the PR has been merged and released with v0.7.2, I'll close this issue. Thanks, @yuriykulikov 👍