elastic / elasticsearch

Free and Open Source, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
984 stars 24.82k forks source link

Bulk request retry silently drops pipeline when retrying individual response that failed #78792

Open benwtrent opened 3 years ago

benwtrent commented 3 years ago

Elasticsearch version (bin/elasticsearch --version): 8.0.0 (probably earlier)

Description of the problem including expected versus actual behavior: When an individual bulk action fails due to 429 it should still use the original indexing pipeline on retry.

Instead, the pipeline (at least during reindex) is dropped from the re-created bulk request.

Steps to reproduce: I have a local WIP processor that is throwing a 429 manually to recreate.

Provide logs (if relevant):

I added a bunch of logging lines (and the pipeline attached to the index request in the output) and here is what I got:

//Original Request
[2021-10-06T16:54:46,677][INFO ][o.e.a.b.R.RetryHandler   ] [runTask-0] BULK IT: BulkRequest{requests=[index {[dumbo][V_JiV3wBwot2XBnG8O0d][classification], source[{
  "text_field": "Ben is a very joyful person"
}]}], indices=[dumbo], timeout=1m, waitForActiveShards=DEFAULT, refreshPolicy=NONE, globalPipeline='null', globalRouting='null', globalIndex='null', globalRequireAlias=null, sizeInBytes=99}

//Recreating via an individual request failing
[2021-10-06T16:54:46,717][INFO ][o.e.a.b.R.RetryHandler   ] [runTask-0] creating new bulk request from: BulkRequest{requests=[index {[dumbo][V_JiV3wBwot2XBnG8O0d][_none], source[{
  "text_field": "Ben is a very joyful person"
}]}], indices=[dumbo], timeout=1m, waitForActiveShards=DEFAULT, refreshPolicy=NONE, globalPipeline='null', globalRouting='null', globalIndex='null', globalRequireAlias=null, sizeInBytes=99}
[2021-10-06T16:54:46,717][INFO ][o.e.a.b.R.RetryHandler   ] [runTask-0] Adding request: BulkRequest{requests=[index {[dumbo][V_JiV3wBwot2XBnG8O0d][_none], source[{
  "text_field": "Ben is a very joyful person"
}]}], indices=[dumbo], timeout=1m, waitForActiveShards=DEFAULT, refreshPolicy=NONE, globalPipeline='null', globalRouting='null', globalIndex='null', globalRequireAlias=null, sizeInBytes=99}

//Sending the request again
[2021-10-06T16:54:47,219][INFO ][o.e.a.b.R.RetryHandler   ] [runTask-0] BULK IT: BulkRequest{requests=[index {[dumbo][V_JiV3wBwot2XBnG8O0d][_none], source[{
  "text_field": "Ben is a very joyful person"
}]}], indices=[dumbo], timeout=1m, waitForActiveShards=DEFAULT, refreshPolicy=NONE, globalPipeline='null', globalRouting='null', globalIndex='null', globalRequireAlias=null, sizeInBytes=99}
[2021-10-06T16:54:47,224][INFO ][o.e.c.m.MetadataCreateIndexService] [runTask-0] [dumbo] creating index, cause [auto(bulk api)], templates [], shards [1]/[1]

You can see from the first line [V_JiV3wBwot2XBnG8O0d][classification] pipeline classification was used.

Then in the second line (this method, I log the currentBulkRequest), the pipeline from the request is gone [V_JiV3wBwot2XBnG8O0d][_none]

https://github.com/elastic/elasticsearch/blob/68817d7ca29c264b3ea3f766737d81e2ebb4028c/server/src/main/java/org/elasticsearch/action/bulk/Retry.java#L130

elasticmachine commented 3 years ago

Pinging @elastic/es-data-management (Team:Data Management)

benwtrent commented 3 years ago

This looks REALLY suspicious: https://github.com/elastic/elasticsearch/blob/e801035acb293c7d2c6f2a8d38b5396dce3312bc/server/src/main/java/org/elasticsearch/ingest/IngestService.java#L507

If the index request is the same object as is in the danged bulk request, no wonder the pipeline goes away, we set it to NOOP.

jakelandis commented 3 years ago

@benwtrent - can you retry your test with a default pipeline and/or a final pipeline ? I am curious if that could be a work around.

benwtrent commented 3 years ago

I definitely can tomorrow. Silently ignoring something is pretty bad

jakelandis commented 3 years ago

Silently ignoring something is pretty bad

Agreed. However, I think the only way an individual bulk item can return a 429 is if the ingest processor does this and I think enrich is the only processor that can do this when ingest rate dramatically outpaces the ablity to execute the underlying search for enrichment. In this case it would have to do this via reindex workflow (I can not think of other workflows that would re-use this an internal client). Normally a 429 would apply to entire bulk, and unsure if this bug presents in that case (if so, that is worse).

benwtrent commented 3 years ago

@jakelandis

Both final_pipeline and default_pipeline are also silently ignored. I tried the same scenario (processor always throwing a 429) with both and the document only hits the pipeline once, and then is indexed by passing the pipelines.

DaveCTurner commented 3 years ago

I think the only way an individual bulk item can return a 429 is if the ingest processor does this

I would also expect us to return 429 on individual items if ~the coordinating node or~ the primary had to push back because its write threadpool queue was full or it hit indexing_pressure.memory.limit.

(edit: actually not sure if the coordinating node can reject individual items, but definitely the primary can)

benwtrent commented 3 years ago

@jakelandis enrich isn't the only processor (at least soon) that throws a 429 on individual requests: https://github.com/elastic/elasticsearch/pull/78757

jakelandis commented 3 years ago

Thanks for the additional context and testing. This really helps to evaluate the impact of the bug.

joshdevins commented 3 years ago

@jakelandis Any idea if this bug can be prioritised for 8.0? This becomes a pretty big failure for reindexing with inference ingest processors that can sometimes timeout due to resource (CPU) constraints. It's a scenario we'd like to have a good story around for GA with the new NLP/PyTorch model inference feature.