elastic / elasticsearch-java

Official Elasticsearch Java Client
Apache License 2.0
6 stars 242 forks source link

BulkIngester close() does not wait for listener to finish #559

Open frank-montyne opened 1 year ago

frank-montyne commented 1 year ago

Java API client version

7.17.9 and 7.17.10-SNAPSHOT

Java version

java 19

Elasticsearch Version

7.17.9

Problem description

When using the BulkIngester with a BulkListener then calling the close() on the BulkIngester returns before all afterBulk() BulkListener callbacks are finished. Below is a snippet of code that uses the BulkIngester. If you simply add a few logging statements after the close() and in the afterBulk() you will see that the afterBulk() callbacks are still busy after the close() returns. That should not be the case. On return of the close() call the bulk should have been handled completely.

Thanks for looking into the problem.

``

private void handleBulk(Collection<E> events) {
    long start = DateUtils.nowMillis();

    // Create bulk listener.
    BulkListener<String> bulkListener = new BulkListener<String>() {
        @Override
        public void beforeBulk(long executionId, BulkRequest bulkRequest, List<String> contexts) {
        }

        @Override
        public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, BulkResponse bulkResponse) {
            for (BulkResponseItem bulkResponseItem : bulkResponse.items()) {

                if (bulkResponseItem.error() != null) {
                    logger.error("Event with id '%s' has error %s".formatted(bulkResponseItem.id(), bulkResponseItem.error()));
                }
            }
            // Check for errors.
            if (bulkResponse.errors()) {
                logger.error("Bulk processing of %d events has some failures".formatted(events.size()));
            }
            else {
                logger.info("Bulk processed %d events in %d ms".formatted(events.size(), DateUtils.diffMillis(start)));
            }
        }

        @Override
        public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, Throwable failure) {
            // Since all event processing failed we can skip adding the specific event indexes to the set of indexes to refresh after the bulk request is completed if necessary.

            logger.error("Bulk processing %d events failed completely. %s".formatted(events.size(), ExceptionUtils.getMessage(failure)));
        }
    };

    // Create bulk ingester.
    BulkIngester<String> bulkIngester = BulkIngester.of(ingesterBuilder ->
        ingesterBuilder
            .client(elasticSearch.getESClient())
            // Accumulate one next batch while processing the current batch. Do not set to 0, the bulkIngester.flush() will hang, filed bug, not fixed yet in 7.17.10-SNAPSHOT!
            .maxConcurrentRequests(1)
            // Process current batch after each 10.000 operations added.
            .maxOperations(10000)
            // Or process current batch after 5 MB of data was added.
            .maxSize(5 * 1024 * 1024)
            // Or process current batch when 1 seconds have elapsed.
            .flushInterval(1, TimeUnit.SECONDS)
            .globalSettings(gsBuilder ->
                gsBuilder
                    .waitForActiveShards(asBuilder -> asBuilder.count(1))
                    .refresh(Refresh.False)
            .listener(bulkListener)
        );

    try {
        // Add events to bulk ingester.
        for (E event : events) {
            switch (event.action()) {
                case create:
                    bulkIngester.add(new CreateOperation.Builder<BinaryData>()
                            .index(event.esIndex())
                            .id(event.id())
                            .document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8), ContentType.APPLICATION_JSON))
                            .build()
                            ._toBulkOperation());
                    break;

                case update:
              // Full update of document.
                    bulkIngester.add(new IndexOperation.Builder<BinaryData>()
                            .index(event.esIndex())
                            .id(event.id())
                            .document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8), ContentType.APPLICATION_JSON))
                            .build()
                            ._toBulkOperation());
                    break;

                case purge:
                    // Real physical delete of document.
                    bulkIngester.add(new DeleteOperation.Builder()
                            .index(event.esIndex())
                            .id(event.id())
                            .build()
                            ._toBulkOperation());
                    break;

                default:
                    // Should not get here. Log anyway.
                    logger.error(String.format("Skipped event with unsupported action '%s' -> %s", event.action().name(), event.toJson()));
                    break;
            }
        }
     // ElasticSearch bug: the call to close does not wait until the listener has handled the afterBulk() callback.
     //
        bulkIngester.close();
    }
    catch (Exception e) {
        logger.error("Failed to process %d events. %s".formatted(events.size(), e.getMessage()));
    }
}
frank-montyne commented 1 year ago

@swallez any ideas on this one?

itajun commented 7 months ago

+1 flush() should wait for the listeners as well, as it was the case with the previous implementation of the BulkProcessor.

I have a scenario where I need to use the BulkIngester to handle retries, metrics, etc., but I want to have control over when the request is sent so that I can rollback the distributed operation in case of failure. My idea was to just set the maximum size/number of requests to Long.MAX_VALUE and call flush() when certain conditions were met relevant to my use case.

The problem is that there is no way to know if the operation succeeded or not, since the listener's afterBulk calls happen after things are considered "complete", differently to the way the BulkProcessor worked in previous versions.

...
        if (exec != null) {
            // A request was actually sent
            exec.futureResponse.handle((resp, thr) -> {

                sendRequestCondition.signalIfReadyAfter(() -> {
                    requestsInFlightCount--;
                    closeCondition.signalAllIfReady();
                });

// Problem between the above ^ and below \/

                if (resp != null) {
                    // Success
                    if (listener != null) {
                        listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
                    }
...

I have no way to synchronise the call on my scenario, where I want to throw an exception synchronously when flush() has any errors in the response. If I try to check the number of requestsInFlight/Pending, there may be a race condition where it gets to zero before the callbacks are called.

If we could return the exec.id on flush(), we could wait until it was sent to afterBulk and sync things up, but it would still be an ugly workaround that wasn't required in the BulkProcessor.

fabriziofortino commented 2 weeks ago

This might be fixed with #867