elastic / go-elasticsearch

The official Go client for Elasticsearch
https://github.com/elastic/go-elasticsearch#go-elasticsearch
Apache License 2.0
5.66k stars 615 forks source link

How to get successful or failed item index when calling bulk api? #215

Open woodsmur opened 3 years ago

woodsmur commented 3 years ago

I have to send an array of messages coming from pulsar topic to send to es using esutil.NewBulkIndexer. I need to call msg.ack when messages are successful to send, and msg.nack(negative ack) when messages are failed to send to es.

From indexer.go, I could only get the number from bi.Stats(). How can I determine which messages are successful or failed to send when using bulk indexer, so that I can do some extra process, like ack the pulsar message.

In elasticsearch java client, I can iterator bulkResponse's items and check if request is failed through IsFailed. Can I do it the same way in go client?

// send bulk request to elasticsearch
BulkRequest request = new BulkRequest();
try {
    for (IndexRequest req : indexRequests) {
        request.add(req);
    }
    BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
    if (bulkResponse.hasFailures()) {
        System.out.println("Bulk write error : " + bulkResponse.buildFailureMessage());
    }
    BulkItemResponse[] responseItems = bulkResponse.getItems();
    for (int i = 0; i < responseItems.length; i++) {
        if (responseItems[i].isFailed()) {
            batchRecords.get(i).fail();
        } else {
            batchRecords.get(i).ack();
        }
    }
} catch (IOException e) {
    e.printStackTrace();
}

Thanks

karmi commented 3 years ago

It seems like in your case, you want to know which items were sucesfully executed (indexed, updated, deleted, ...), and which were not. For this, you should be able to use the OnSuccess callback, which you register for each item. In the example you've linked, a counter is increased for demonstration purposes, but in your case, you can call the "ack" or "nack" methods for your message?