Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.35k stars 1.99k forks source link

[FEATURE REQ] Enable automatic batching for EventHubProducerAsyncClient.send(Flux<EventData>) #33813

Closed Perl99 closed 2 weeks ago

Perl99 commented 1 year ago

Is your feature request related to a problem? Please describe. EventHubProducerAsyncClient allows to send multiple events in one batch, however, it is developer responsibility not to breach the max batch size. Using createBatch() in a fully asynchronous way leads to a complex code.

Describe the solution you'd like It seems that the code is already there to support this request in https://github.com/Azure/azure-sdk-for-java/blame/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java#L577 but the maxNumberOfBatches is hardcoded to 1. It could be exposed as a parameter in SendOptions, a method parameter, or be null (no limit).

Describe alternatives you've considered I tried to use EventHubBufferedProducerAsyncClient but it has a bit different behavior. For example, there is no way to send data immediately - flush() only waits for queue to be empty and the send happens after maxWaitTime. Please consider a method like Mono<Void> flushNow() - trigger send with current queue content without waiting any longer.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

joshfree commented 1 year ago

@conniey could you please follow up with @Perl99

conniey commented 1 year ago

Hey,

Thanks for reporting this.

Perl99 commented 1 year ago

Regarding flush(): If I understand the code correctly, all it does is acquire a semaphore to prevent enqueuing messages and then it waits for the queue to become empty. There is a Mono here: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java#L315 that completes when the queue is empty: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java#L366

The wait is a Flux.interval: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java#L118 and I don't see any connection between this Flux and the flush() method in EventHubBufferedPartitionProducer

Regarding the EventHubProducerClient, a parameter maxBatches in the send() method could limit this infinite Flux to a given number of batches. If some EventData is too large, the behavior could be the same as in case of EventHubBufferedProducerAsyncClient. I think that the messages enqueued successfully are subject to be send and the method enqueueEvents() returns an error (Mono.error()) even in the middle of the Iterable<EventData> - some events are enqueued properly and ready to send, but some are aborted.

anuchandy commented 2 weeks ago

As Connie noted, there are open questions about generalizing such API's behavior, and there's currently no plan to expose such utility APIs on client. However, the following code demonstrates sending an Iterable of events, it logs and continues if an event is too large for a batch. This can be modified to exit with an error if needed.

package org.example;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import java.util.Iterator;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class EventHubBatchSender {
    private static final ClientLogger logger = new ClientLogger(EventHubBatchSender.class);
    private static final EventData END = new EventData(new byte[] {});

    public static Mono<Void> sendEvents(Iterator<EventData> eventsIter, EventHubProducerAsyncClient client) {
        if (!eventsIter.hasNext()) {
            return Mono.empty();
        }
        final CreateBatchOptions options = new CreateBatchOptions();
        final Flux<EventDataBatch> batches = createBatch(eventsIter.next(), eventsIter, client, options)
                .expand(b -> createBatch(b.getMissedEvent(), eventsIter, client, options))
                .map(EventDataBatchWrapper::getBatch)
                .filter(b -> b.getCount() > 0);

        return batches.flatMap(client::send).then();
    }

    private static Mono<EventDataBatchWrapper> createBatch(EventData firstEvent, Iterator<EventData> eventsIter,
        EventHubProducerAsyncClient client, CreateBatchOptions options) {
        if (firstEvent == END) {
            return Mono.empty();
        }
        return client.createBatch(options).flatMap(batch -> {
            EventData nextEvent = firstEvent;
            do {
                if (!batch.tryAdd(nextEvent)) {
                    if (nextEvent == firstEvent) {
                        logger.error("Message {} is too big for new batch", firstEvent);
                        final EventData missedEvent;
                        if (eventsIter.hasNext()) {
                            missedEvent = eventsIter.next();
                        } else {
                            missedEvent = END;
                        }
                        return Mono.just(new EventDataBatchWrapper(batch, missedEvent));
                    } else {
                        return Mono.just(new EventDataBatchWrapper(batch, nextEvent));
                    }
                }

                if (!eventsIter.hasNext()) {
                    return Mono.just(new EventDataBatchWrapper(batch, END));
                }
                nextEvent = eventsIter.next();
            } while(true);
        });
    }

    private static final class EventDataBatchWrapper {
        private final EventDataBatch batch;
        private final EventData missedEvent;

        EventDataBatchWrapper(EventDataBatch batch, EventData missedEvent) {
            this.batch = batch;
            this.missedEvent = missedEvent;
        }

        EventDataBatch getBatch() {
            return batch;
        }

        EventData getMissedEvent() {
            return missedEvent;
        }
    }
}

Closing this as not planned.