Azure / azure-sdk-for-net

This repository is for active development of the Azure SDK for .NET. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/dotnet/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-net.
MIT License
5.57k stars 4.83k forks source link

Output binding should break events into batches before flushing. #28657

Closed brettsam closed 8 months ago

brettsam commented 3 years ago

If the output binding gets too many events added to it, it can result in the exception The maximum size (1536000) has been exceeded from EventGrid. The output binding should be able to break this up and Flush in batches.

azure-sdk commented 2 years ago

Label prediction was below confidence level 0.6 for Model:CategoryLabels: 'Client:0.539298,Mgmt:0.24787387,Service:0.19938183'

jsquire commented 1 year ago

//fyi: @JoshLove-msft - this got mislabeled as Event Hubs.

es-alt commented 1 year ago

We ran into this issue with a function that sends out multiple events. What's the recommended workaround?

andrewjw1995 commented 1 year ago

I need this feature and can implement this.

@JoshLove-msft This will require serializing the message inside the 'AddAsync' method instead of lazily in the 'FlushAsync' method so we can track the total size of the batch when a message is added. This means AddAsync may start throwing exceptions that originally would have only been thrown from FlushAsync. Is that an acceptable change?

andrewjw1995 commented 1 year ago

@es-alt I've wrapped my collector in a custom class which tracks the batch size and flushes automatically before the batch exceeds the maximum size. This class is heavily coupled to the knowledge of how the events are serialized, e.g. adding two bytes for the array brackets and a byte for every comma. I checked the sdk source code to confirm events are not serialized with whitespace when they are published but if that changes in the future then this code would break.

/// <summary>
/// A wrapper around the default EventGrid collector which properly handles
/// flushing automatically when the accumulated batch of events reaches the
/// maximum batch size.
/// </summary>
internal class CloudEventCollector : IAsyncCollector<CloudEvent>
{
    private const int MinimumBatchSize = 2;
    private const int MaximumBatchSize = 1500 * 1024;

    private readonly IAsyncCollector<CloudEvent> _collector;

    // Include 2 bytes for the opening and closing square brackets []
    private int _batchSize = MinimumBatchSize;

    /// <summary>
    /// Creates a new instance which wraps the given
    /// <see cref="IAsyncCollector{T}"/> and flushes automatically when the
    /// accumulated batch of events reaches the maximum batch size.
    /// </summary>
    /// <param name="collector">The collector to wrap.</param>
    public CloudEventCollector(IAsyncCollector<CloudEvent> collector) => _collector = collector;

    /// <inheritdoc />
    public async Task AddAsync(CloudEvent cloudEvent, CancellationToken cancellationToken)
    {
        var serialized = JsonSerializer.Serialize(cloudEvent);
        if (_batchSize + serialized.Length > MaximumBatchSize)
        {
            await _collector.FlushAsync(cancellationToken).ConfigureAwait(false);
            _batchSize = MinimumBatchSize;
        }

        await _collector.AddAsync(cloudEvent, cancellationToken).ConfigureAwait(false);
        var commaNeeded = _batchSize > MinimumBatchSize;
        if (commaNeeded)
        {
            _batchSize++;
        }

        _batchSize += serialized.Length;
    }

    /// <inheritdoc />
    public async Task FlushAsync(CancellationToken cancellationToken)
        => await _collector.FlushAsync(cancellationToken).ConfigureAwait(false);
}

@JoshLove-msft while implementing this, I realised there's two different approaches I could take:

I think there is an implicit assumption that IAsyncCollector has the first behaviour, and that's how some of the other bindings (ServiceBus, EventHub) have been implemented, so I think that should be the preferred approach. However, I can imagine a scenario where a user has coupled to the current behaviour and expects that their messages will only be published when they call FlushAsync. I would never recommend doing this, but they may have code which treats the collector as a buffer and chooses not to call FlushAsync in certain circumstances. I don't think it's worth trying to preserve that behaviour but we might need to notify them of potentially breaking changes.

andrewjw1995 commented 1 year ago

There is a discrepancy between Microsoft's documentation and the response message we are getting from Azure:

The maximum size (1536000) has been exceeded. Status: 413 (Request Entity Too Large) ErrorCode: RequestEntityTooLarge

This indicates a maximum size of 1.5MB, in contrast to the documentation:

When posting events to an Event Grid topic, the array can have a total size of up to 1 MB. Each event in the array is limited to 1 MB. If an event or the array is greater than the size limits, you receive the response 413 Payload Too Large.

I've confirmed that even though the response says the maximum size is 1.5MB, it doesn't accept messages that large. Reducing the threshold to 1MB fixed my issue. This is either a bug in the EventGrid implementation, or the message is referring to the size of the whole HTTP message and not just the payload.

github-actions[bot] commented 9 months ago

Hi @brettsam, we deeply appreciate your input into this project. Regrettably, this issue has remained inactive for over 2 years, leading us to the decision to close it. We've implemented this policy to maintain the relevance of our issue queue and facilitate easier navigation for new contributors. If you still believe this topic requires attention, please feel free to create a new issue, referencing this one. Thank you for your understanding and ongoing support.

github-actions[bot] commented 8 months ago

Hi @brettsam, we deeply appreciate your input into this project. Regrettably, this issue has remained inactive for over 2 years, leading us to the decision to close it. We've implemented this policy to maintain the relevance of our issue queue and facilitate easier navigation for new contributors. If you still believe this topic requires attention, please feel free to create a new issue, referencing this one. Thank you for your understanding and ongoing support.