nats-io / nats.rs

Rust client for NATS, the cloud native messaging system.
Apache License 2.0
1.04k stars 164 forks source link

Add `client.flush()` calls to example code in docs #1263

Closed HeCorr closed 4 months ago

HeCorr commented 4 months ago

This PR adds client.flush() calls to publish examples in the documentation to help avoid potential confusion when messages aren't immediately published due to buffering (in fact, on my short-lived CLI app they weren't being sent at all). See https://github.com/nats-io/nats.rs/issues/773#issuecomment-2119116600.

@Jarema I actually have a few questions in regards to this:

  1. is flush() supposed to be called on a regular basis if not publishing too frequently?
  2. do you mind briefly explaining to me how the buffering works in this case? is it only count-based or time-based too?
  3. how could I tweak it so that it flushes more frequently by itself?
Jarema commented 4 months ago

Thank you for your PR!

  1. No
  2. The client does not flush based on time, nor count based. Instead, on every message received, it checks if the publish buffer is empty or not. It will try to write as many messages as they are available, or until buffer is full. This means, that its optimized for both infrequent publishing with low latency (as if you publish just one message, it will be flushed immediately) and high throughput (it will fill the buffer on high load before sending out the messages).

The reason why the messages can be lost on dropping the client/shutting is because there is no custom client drop logic that would clear our all buffers. We were discussing few times if we should do that, but as its similar to how other clients behave, we keep it as it is. Especially that making sure that everything is flushed can cause potential edge cases of very slow (which is unwated) drop process (for example if client encounters some connectivity problems at that time).

  1. Why would like to to that?
HeCorr commented 4 months ago

No

noted, so I assume you're only supposed to call flush() when shutting down.

on every message received, it checks if the publish buffer is empty or not

pardon my ignorance, but what if I'm not receiving messages, only publishing them?

if you publish just one message, it will be flushed immediately

but that doesn't seem to be the case here, unless I'm hitting an edge-case where there's just not enough time for the buffer to get flushed before the app exits.

Why would you like to do that?

I thought the flushing mechanism was count or time-based so I expected there to be some sort of threshold setting but since you said it's not, never mind.

Jarema commented 4 months ago

No

noted, so I assume you're only supposed to call flush() when shutting down.

Yes. We have flush since the time when flush was time-based. Now it's useful just for closing.

on every message received, it checks if the publish buffer is empty or not

pardon my ignorance, but what if I'm not receiving messages, only publishing them?

Sorry, mental shortcut. I meant - the loop that polls the queue is receiving the published messages and putting them into the stream :).

if you publish just one message, it will be flushed immediately

but that doesn't seem to be the case here, unless I'm hitting an edge-case where there's just not enough time for the buffer to get flushed before the app exits.

This is different - when close the client immediately after publishing the message, there is nothing that forces the drop to wait until loop finished reading/flushing messages in the buffer. We might revisit it, but I assume that as Drop does not work with async, it might be tricky.

Why would you like to do that?

I thought the flushing mechanism was count or time-based so I expected there to be some sort of threshold setting but since you said it's not, never mind.

Understood :).

HeCorr commented 4 months ago

Got it, thank you for taking your time to explain it!

Should I change the comment's phrasing to say that calling flush() is optional?

-//! // Flush internal buffer to make sure messages are sent
+//! // Optionally flush internal buffer to make sure messages are sent
 //! client.flush().await?;

Or maybe instead of suggesting flush() here we change some other part of the docs to warn about doing so before exiting, I think that would make more sense. What do you think?

Jarema commented 4 months ago

Happy to help!

I would rather explicitly mention that it's a good idea to flush before shutting down the application/closing the client.

HeCorr commented 4 months ago

So it seems I'm unable to run the example subscriber code..? What am I doing wrong?

image

I wanted to check if the example code works as expected on my end.

Jarema commented 4 months ago

You forgot to import use futures::StreamExt as that is the trait that next is based on.

HeCorr commented 4 months ago

oops. my bad, thank you.

HeCorr commented 4 months ago

Alright, how does this look?