Open jsudano opened 2 weeks ago
I've included a sort of design proposal below and I have a fork PR with changes to match it (didn't want to open a PR here without some discussion first). Obviously I'm not familiar with this code and there are a lot of ways to approach this, so I'd be happy to take a different approach
Generally the flow for draining a subscription (at least according to the doc above) is basically:
Looking at the python lib, it looks like this is more-or-less achieved by:
UNSUB
is delivered and any messages from the server are receivedThe challenge with implementing this within rust's ownership model is that "drain" is effectively a "delayed delete" where the final delete is triggered by the crate, but we can't force the client to drop its Subscriber
. All we can really do is drop the receiver
associated with the subscriber. Hence "Delete the subscription" really translates to "the creator will delete the internal subscription object" and the owner should handle any resulting "disconnected" errors. As such, here's how I aimed to implement this:
Subscriber::drain()
On the Subscriber
side there's really only 1 step:
Subscriber
sends new Command::Drain { sid: u64 }
to the Handler
, await
s to ensure it is written to the queueAfter this, the process is complete on the crate-user's side. The crate-internal Handler
side will write any remaining messages to the associated sender for Subscriber::receiver
then drop it, so the owner of the Subscriber
can consume the remaining messages and know the drain is complete when it receives Option::None
/Error::Disconnected
.
An alternate approach here would be to have Subscriber::drain()
block until the Handler
explicitly notifies it (with an observer oneshot similar to flush()
) and return any remaining messages in a collection/iterator of some kind (instead of having the Subscriber
owner process remaining messages from the existing stream). This would keep the stream and the Subscriber
lifetimes more tightly-coupled and allow the owner to drop the Subscriber
as soon as drain()
has completed, but felt a little less idiomatic.
Handler
logic for Command::Drain
The main challenge here was that we need the Handler::process()
poll loop to receive the Command::Drain
message and leave the internal Subscription
alive for long enough to write any remaining messages to Subscription::sender
after UNSUB
is sent to the server and the connection is flushed. Currently the unsubscribe flow just deletes the Subscription
right away, which means any messages published to the subject between the client unsubscribing and server receiving the UNSUB
message will be dropped. To address this, the Handler
will:
Command::Drain
, queue an UNSUB
message to the server, and set Subscription::is_draining
(a new field) to true
. At the end of the current call to Handler::process()
's Future::poll
the UNSUB
message will be flushedHandler::process()
's Future::poll
, after processing any incoming messages from the server and delivering them to any subscriptions (via Handler::handle_server_op()
), drop any subscriptions with is_draining: true
. This should ensure that the subscription isn't dropped until after any remaining messages on the subscription are delivered.There is a small risk with this approach that the crate-user could try to perform further operations on Subscriber
after calling drain()
, which would go ignored, but this was already possible.
As mentioned in the doc, this is more complex. The flow is effectively:
UNSUB
to the NATS server for each subscriptionSo following the patter for an individual subscribe, it might translate to
UNSUB
to the serverCommand
s (we could drop the Command
receiver, but that would complicate other operations by making the receiver
optional)Client::drain()
Update the Command::Drain { sid: u64 }
above to make sid
Optional
. If sid.is_none()
then the Handler
can assume this is a connection-level drain. As above, the Client
-side logic is really only one step:
Client::drain()
sends Command::Drain{ sid: None }
and await
s to ensure it is written to the queueAfter this, any Subscriber
s held by the crate-user will deliver any remaining messages then return Option::None
and calls to various Subscriber
impl fns will eventually error once the Command
receiver
is dropped. Similarly to the Subscriber
flow, we could use a oneshot to await
until the operation is complete.
Handler
logic for Command::Drain
Command::Drain
message, we will check if Command::Drain::sid.is_some()
. If not, we will queue an UNSUB
message and set is_draining
for each entry in Handler::subscriptions
. We will also set Handler::is_draining
(a new field) to true
. At the end of the current call to Handler::process()
's Future::poll
any outgoing messages will be flushed.Handler::process()
's Future::poll
, after processing any incoming messages from the server and delivering them to any subscriptions (via Handler::handle_server_op()
), and dropping any subscriptions with is_draining == true
(which should be all subscriptions in this case), and check if Handler::is_draining == true
. If so, exit Handler::process()
with ExitReason::Closed
. This order will ensure all required UNSUB
messages are sent to the server, any remaining messages (both incoming and outgoing) are processed and delivered, and that the process itself is closed.
Proposed change
Implement the "drain" functionality described in this NATS doc, which is already present in other languages' NATS libraries.
Use case
A service I work on makes pretty heavy use of this crate, and we've hit a couple (admittedly rare) timing bugs which would (hopefully) be resolved by implementing the "drain" functionality described in this NATS doc. I poked around issues and don't see any mention or request of this feature for the new async crate. I also spent a while looking through the code and you could almost convince me that
drain()
isn't needed with the way messages are handled, but given that the internal subscription object is dropped immediately after sending the unsubscribe message to the NATS server, I still think there's a small chance for in-flight messages to be dropped.Contribution
I would love to!