minghuaw / azeventhubs

Unofficial Azure Event Hubs SDK over AMQP 1.0 for rust
4 stars 2 forks source link

Storing and reusing a single `EventHubConnection` #9

Open jackgerrits opened 10 months ago

jackgerrits commented 10 months ago

In my setup I create a single consumer per partition, which results in N EventHubClients. If I use the EventHubClient constructor then it looks as though I end up with N AmqpClients, each with their own connection. I also see that those clients spawn their own tasks.

I see the EventHubConnection which looks like it results in reusing a single connection. Would I get better performance using a single shared connection across all clients?

Additionally, it looks like EventHubConnection is generic on the connection. However, I would be using AmqpClient. It looks like the AmqpClient being used in this crate is coming from the amqp subdirectory and is not exposed?

Is there another way I can store an EventHubConnection<AmqpClients>?

minghuaw commented 10 months ago

I see the EventHubConnection which looks like it results in reusing a single connection. Would I get better performance using a single shared connection across all clients?

You could indeed share the connection among multiple clients. There would be less frame exchanges for establishing the AMQP Connection (and consequently less exchanges for TLS, tcp, etc). Each client has its own AMQP Session, so multiple clients won't be sharing the same session window flow control.

A big difference between the rust implementation and the dotnet implementation is that the clients in the dotnet sdk are lazy and are never initialized until the first message is sent/received. However, I haven't really done a benchmark yet, as the focus so far has really been on functionalities and ergonomics.

Additionally, it looks like EventHubConnection is generic on the connection. However, I would be using AmqpClient. It looks like the AmqpClient being used in this crate is coming from the amqp subdirectory and is not exposed?

I was thinking of replacing that generic with just AmqpClient internally (so it will just be EventHubConnection with no generics) but got distracted. I ll make this into "0.14.1" as well. Hopefully I can get something released tomorrow

minghuaw commented 10 months ago

The generic parameter C has been removed from EventHubConnection in "0.14.1", which is just released on crates.io. I will start setting up some benchmarks

minghuaw commented 10 months ago

I see the EventHubConnection which looks like it results in reusing a single connection. Would I get better performance using a single shared connection across all clients?

I have done some initial benchmark (10 partitions, one consumer for each partition) on cost of establishing new consumer clients with dedicated connection vs sharing the same connection. If the consumers were to be created sequentially, creating consumers with dedicated connection takes about 11 to 12 seconds while creating consumers on a shared connection takes about 4 to 4.5 seconds. However, if the consumers with dedicated connections were to be created in parallel, it would only take less than 3 seconds.

I don't think there will be much difference in terms of the time taken to receive events with dedicated connections vs sharing a connection. I will add more benchmark to measure that and will keep you updated.

minghuaw commented 10 months ago

Hi @jackgerrits,

The performance with shared connection and dedicated connection is a bit complicated. Does you use case involve frequently creating and closing event streams?

Because starting a new event stream involves creating AMQP Session and AMQP ReceiverLink, the current implementation with shared connection will need to lock the underlying AmqpClient and essentially makes the operations sequential. This is definitely my oversight where I should have used a more fine-grained lock, and I will try to improve this in the next few days

jackgerrits commented 10 months ago

The event streams should be long lived. I think my use case is pretty standard in the fact that I wish to open an event stream per partition and consume as fast as possible.

I found using buffered helpful in reducing stalls due to fetching new events.

When looking at the total number of tasks I just noticed that there seemed to be more tasks per event stream created which totally makes sense but it got me wondering if reusing the connection would be beneficial in reducing the number of tasks. I am unsure if contention is something I need to be worried about in terms of tokio processing the futures?

minghuaw commented 10 months ago

Have you tried adjusting the ReadEventOptions? It might be a source of stall, and this is a drawback of the current implementation, which essentially tried to copy how the dotnet sdk is implemented.

There are two fields in ReadEventOptions that may contribute to stalls.

  1. cache_event_count (default is 100) determines the size of an internal buffer, and no event will be yielded unless the buffer is filled.
  2. maximum_wait_time (default is None) determines the time to wait before yielding events from the internal buffer. If this value is None, then it will wait indefinitely.

This is one of the things that I doubted at the beginning and wanted to change, because like you mentioned, there already exists mechanisms like StreamExt::buffered that allow users to buffer the event.

I am going to remove this internal buffer and mark cache_event_count as deprecated (and will remove it in "0.15.0").

Regarding contention, I am still doing some benchmarking as I have also discovered some other problems with earlier implementations. I doubt if having a large number of tasks will have any meaningful impact on the performance as tokio's task has a pretty small memory footprint. I think there may be a higher chance of contention if a shared connection is used with large amount of events, since all events from different partitions will then have to go through the same connection and then get distributed to the consumers. But I am not 100% sure either, I will do some more benchmarking and keep you updated.

jackgerrits commented 10 months ago

That's really helpul to understand thank you! I didn't realize that it filled the buffer prior to producing events, that is likely what I was seeing.

In terms of using buffered, does the following seem correct?

let stream = client
    .read_events_from_partition(
        &partition_id,
        EventPosition::earliest(),
        ReadEventOptions {
            maximum_wait_time: None,
            cache_event_count: 1,
            prefetch_count: 1,
            ..Default::default()
        }
    ).await.unwrap();
let stream = stream.map(|x| async { x }).buffered(256);

In particular, is it valid to simply do stream.map(|x| async { x })? Since buffered has to operate over a stream of futures not values.

minghuaw commented 10 months ago

That seems right. But this got me thinking whether buffered could provide much gain given the current implementation.

This really reminds me that I may need to change the EventStream impl to try to take advantage of this, which might be breaking and will have to be put into "0.15.0". I think azure_core releases a new version somewhere in the middle of each month, so "0.15.0" wouldn't be too far. If that is indeed the case, I will probably make a preview version "0.15.0-alpha" first.

jackgerrits commented 10 months ago

I have a setup very similar to the above where I am pulling events from event hub in batches. A single event from event hub could be in the order of 100 individual events (individual events is the metric I am using below). I am noticing the following behavior when tweaking these values:

cache_event_count buffered length events/s
256 4 2493 (Hung for 60, and then 120 seconds almost exactly. Rate limited?)
1 128 3111
1 8 3200
128 None 21099
128 8 24133
16 16 24356
16 None 25618
32 8 26773
8 32 26945

(buffered length == None means I removed buffering entirely)

Also, there was a massive difference in performance between prefetch_count 1 and 0. 1 would be in the order of a few thousand events/s and 0 would be in the tens of thousands. Was 0 just going back to prefetching the default amount?

Am I misusing buffering or doing something else wrong here? It seems like cache_event_count works much better than buffered.

minghuaw commented 10 months ago

~Another note regarding the ReadEventOptions. You would probably want to increase prefetch to some value larger than 1. A prefetch value of 1 essentially makes it working in a pulling mode. It will need to send a frame to the EventHub to allocate a credit of 1 before it can get the event. However, a prefetch value larger than 1 will make it working in a pushing mode, where it allocates credits that is equal to the prefetch all at once, and the Event Hub will have the freedom to push all of these events to the consumer without having to wait for one credit for one event.~

Sorry for the confusion. I meant to say that a prefetch of 0 is pulling mode and a prefetch of 1 isn't too much different. Though it is technically pushing mode, it doesn't make too much difference in terms of the amount of flow control frames needed

minghuaw commented 10 months ago

Also, there was a massive difference in performance between prefetch_count 1 and 0. 1 would be in the order of a few thousand events/s and 0 would be in the tens of thousands. Was 0 just going back to prefetching the default amount?

This is indeed a possibly a bug with the current implementation. If the prefetch is 0, it will use the size of the internal buffer as the credit. But this behavior will likely be different in the next version, since I am planning to remove the internal buffer because it has been really confusing

jackgerrits commented 10 months ago

I see that makes sense.

Combining all things seems to be the key for performance:

cache_event_count prefetch_count buffered length events/s
32 128 4 27666

Although, it is still in the realm of the others.

I am interested in any changes you make, no worries about breakage. Let me know if I can be of help in testing a version.

Thanks again for your time!

minghuaw commented 9 months ago

I have done some naive benchmark (receiving 300 events per partition) comparing the performance with and without the internal buffer. The size of the internal buffer is set to 30 for the case that has internal buffer.

What I found is that the new implementation without internal buffering at all performs at least on par and sometimes better than the version with internal buffer. The new version also deprecates the field max_wait_time from ReadEventOptions.

This makes sense due to the fact that channels are used to transfer frames from connection to session and then to the receiving link. So as long as prefetch is not some really small number, there should be plenty internal buffer.

StreamExt::buffered also seems to provide very little performance gain and could event worsen the performance for the new version.

The new implementation didn't really bring any breaking change, so I will release it as 14.4 later today, and I will think about how to hide some of the generics (like AmqpConsumer) from public APIs for "0.15.0"

jackgerrits commented 9 months ago

That's great, thanks!

minghuaw commented 9 months ago

@jackgerrits I have just released "0.14.4" on crates.io

minghuaw commented 9 months ago

@jackgerrits It seems that they have bumped azure_core to "0.15.0". I am planning to update azeventhubs to "0.15.0" by the end of the day. It would come with the following breaking changes

  1. The generic parameter of EventStream now only carries the retry policy. Private types like AmqpConsumer and MultipleAmqpConsumer are hidden in an internal enum. I have done some quick testing and didn't find noticeable performance loss.
  2. Removed the deprecated fields cache_event_count and max_wait_time from ReadEventOptions
  3. The MaxRetries type (used in EventHubsRetryOptions) can only be constructed via TryFrom::try_from() or new() which is just an alias for try_from()
minghuaw commented 9 months ago

"0.15.0" has been released

minghuaw commented 9 months ago

@jackgerrits azeventhubs has now migrated to a separate repo (https://github.com/minghuaw/azeventhubs) as per discussions here (https://github.com/Azure/azure-sdk-for-rust/pull/1260#issuecomment-1739693018).

jackgerrits commented 9 months ago

Sounds good! It is going to make understanding, finding and searching through the crate easier

minghuaw commented 7 months ago

@jackgerrits I was transferring issues from the old repo to the new repo. I apologize if you are getting a bunch of email notification because of this.

jackgerrits commented 7 months ago

No worries!