dapr / components-contrib

Community driven, reusable components for distributed apps
Apache License 2.0
547 stars 478 forks source link

EventHub component delivers out of order messages (pubsub, binding) #3568

Open oising opened 2 weeks ago

oising commented 2 weeks ago

Expected Behavior

When using Event Hubs as a pubsub or binding, messages should be delivered in the order they were posted (assume PartitionKey is set when publishing/posting to ensure ordering across partitions.)

Actual Behavior

In the pubsub case, the sidecar delivers new events before the subscriber has completed handling the last one. This causes major problems when trying to ensure order sensitive work is executed correctly (e.g. starting a workflow to process subsequent events.)

Steps to Reproduce the Problem

We're publishing to our topic like this (dotnet sdk):


await dapr.PublishEventAsync(
        WellKnownPubSubNames.Percept,
        WellKnownPubSubTopics.RecordingSessionFileEventV1, 
        data,
        new Dictionary<string, string>
        {
            { "partitionKey", data.DeviceId }
        }
    );

and receiving like this:

pubsub.MapPost(
    WellKnownPubSubTopics.RecordingSessionFileEventV1,
    [Topic(WellKnownPubSubNames.Percept, WellKnownPubSubTopics.RecordingSessionFileEventV1)]
    async (
        [FromServices] ILogger<Program> logger,
        [FromServices] IRecordingSessionRepository recordingSessionRepository,
        HttpContext HttpContext,
        [FromBody] RecordingSessionFileEventV1 data
    ) => {
        try {
            var messageId = HttpContext.Request.Headers["Message-Id"];

            logger.LogInformation("Start message id: {messageId}", messageId!);

            await Task.Delay(500);

            logger.LogInformation("Stop message id: {messageId}", messageId!);

            return Results.Ok();
        }
        catch (Exception ex) {
            logger.LogError(ex);
            return Results.Problem(ex.Message);
        }
    }
);

The problem is clear when watching the logs: instead of seeing a constant start/stop/start/stop alternating sequence of log events, we're seeing start/stop/start/start/stop/stop interleaving. The sidecar should not be sending another event until the current one has completed processing, i.e. it receives a http 200 (in this case.)

The same issue likely occurs for the binding since the common code is the problem (according to @yaron2):

It appears our implementation is using goroutines for each message, which would cause out-of-order delivery: https://github.com/dapr/components-contrib/blob/main/common/component/azure/eventhubs/eventhubs.go#L293. We do have a concurrency mechanism for components, it just hasn't been applied to Event Hubs. This should be an easy change - can you please open an issue in components-contrib for this? I'll add it to the v1.15 milestone

Release Note

PubSub and Binding components using ordered delivery (with a partitionkey) would interleave event deliveries to a subscriber. Now the sidecar will wait until the handler returns before sending the next event.

RELEASE NOTE:

yaron2 commented 2 weeks ago

Triaged

olitomlinson commented 2 weeks ago

Imo this is a P0 as it fundamentally breaks the underlying FIFO ordering that one would expect from EventHubs when processing each message individually via Dapr PubSub

yaron2 commented 1 week ago

Imo this is a P0 as it fundamentally breaks the underlying FIFO ordering that one would expect from EventHubs when processing each message individually via Dapr PubSub

Agree

oising commented 1 week ago

Imo this is a P0 as it fundamentally breaks the underlying FIFO ordering that one would expect from EventHubs when processing each message individually via Dapr PubSub

And also the EH binding! They share the same code AFAICT.

oising commented 1 week ago

Hey @olitomlinson @yaron2 - After a crash course in golang, I don't think the issue is at https://github.com/dapr/components-contrib/blob/main/common/component/azure/eventhubs/eventhubs.go?rgh-link-date=2024-10-16T14%3A11%3A56Z#L293 as this starts a goroutine for the partition which seems perfectly fine. Partitions should be handled in parallel. However, looking at https://github.com/dapr/components-contrib/blob/4ca04dbb61c553047727ec709f37a2b4b9832159/common/component/azure/eventhubs/eventhubs.go#L398 I can see that messages within the partition are dispatched as goroutines, where I would expect here they should be blocking calls to ensure correct dispatch ordering, no?

yaron2 commented 1 week ago

Hey @olitomlinson @yaron2 - After a crash course in golang, I don't think the issue is at https://github.com/dapr/components-contrib/blob/main/common/component/azure/eventhubs/eventhubs.go?rgh-link-date=2024-10-16T14%3A11%3A56Z#L293 as this starts a goroutine for the partition which seems perfectly fine. Partitions should be handled in parallel. However, looking at

https://github.com/dapr/components-contrib/blob/4ca04dbb61c553047727ec709f37a2b4b9832159/common/component/azure/eventhubs/eventhubs.go#L398

I can see that messages within the partition are dispatched as goroutines, where I would expect here they should be blocking calls to ensure correct dispatch ordering, no?

That seems correct, yes

oising commented 1 week ago

So removing the go prefix should be enough? I should probably rename handleAsync to something like handleEvents -- it's interesting to me how the method being called asynchronously and named as such has no bearing on the method's body. Quite simple!

If this really is a one line fix, would you expect unit tests? They would be entirely beyond me at this point in my golang career :D

Also, as a P0 bug - would this warrant making it into 1.14.5 ?

olitomlinson commented 1 week ago

Hey @olitomlinson @yaron2 - After a crash course in golang, I don't think the issue is at https://github.com/dapr/components-contrib/blob/main/common/component/azure/eventhubs/eventhubs.go?rgh-link-date=2024-10-16T14%3A11%3A56Z#L293 as this starts a goroutine for the partition which seems perfectly fine. Partitions should be handled in parallel. However, looking at

https://github.com/dapr/components-contrib/blob/4ca04dbb61c553047727ec709f37a2b4b9832159/common/component/azure/eventhubs/eventhubs.go#L398

I can see that messages within the partition are dispatched as goroutines, where I would expect here they should be blocking calls to ensure correct dispatch ordering, no?

This is exactly what I said in Discord :)

oising commented 1 week ago

This is exactly what I said in Discord :)

I obviously misread or missed that -- but it's good that we agree! :) I will submit the two-line PR as draft and link it, and we can go from there. Given this is a blocker for our solution, I would really like to see this make a point release and not wait for 1.15...

olitomlinson commented 1 week ago

So removing the go prefix should be enough? I should probably rename handleAsync to something like handleEvents -- it's interesting to me how the method being called asynchronously and named as such has no bearing on the method's body. Quite simple!

If this really is a one line fix, would you expect unit tests? They would be entirely beyond me at this point in my golang career :D

Also, as a P0 bug - would this warrant making it into 1.14.5 ?

It could be as simple as a one-liner, but it needs thorough testing to makes sure that the checkpointing is done correctly after each message completes.

My one reservation on fixing this quickly is that there may be users out there in the wild with high-throughput use-cases that depend on the throughput that is currently afforded by this incorrect implementation. Until a fix is in place, its hard to quantify what that performance degradation maybe by checkpointing on each message.

The real solution here is to use Bulk Subscriptions for high throughput use-cases, but this is not Stable yet.

Idea : This could be fixed but the fix is put behind an opt-in feature-flag on the metadata so it doesn't impact people with existing expectations (from the incorrect implementation).

name : enableInOrderMessageDelivery
value : "false"

Then, when bulk subscriptions does graduate to Stable, the feature flag could be removed and replaced with an opt-in feature flag that reverts the behavior back to the broken implementation. And users with high-throughput expectations are encouraged to migrate to Bulk Subscriptions (or opt back in to the previous broken implementation, for a window of supported releases)

name : enableLegacyMessageDelivery
value : "true"
oising commented 1 week ago

Hmm, I'm not going to be competent enough in the language to fix this in the window that my project requires. If you could collaborate with me, then I may learn enough to address my other feature requests for event hubs myself. How busy are you, lol