elastic / elasticsearch-py

Official Python client for Elasticsearch
https://ela.st/es-python
Apache License 2.0
37 stars 1.18k forks source link

Documents failing to index with helpers.bulk in subsequent chunks after an issue is found #2673

Closed tommycahir closed 4 weeks ago

tommycahir commented 4 weeks ago

Hey Everyone

During some development work over the last few weeks release we noticed an issue with the Elasticsearch helper.bulk method we were using in the python scripts.

Issue: When any docs fail to get uploaded to Elastic for whatever reason (invalid index name, invalid field type etc), not all docs will be uploaded to Elastic, including docs which don't have any issues.

What is happening: The helper.bulk function uploads docs to Elastic in chunks of 500 by default. When it reaches a chunk which has any issues uploading, then that chunk is handled properly (so only docs with issues won't get uploaded), but all future chunks part of the batch will not get uploaded at all.

Scenario: We have a batch of 1,000 which are in the correct format (list of JSON dicts). We call the helper.bulk function, which uploads the docs to Elastic in 500 chunks. In the whole batch, there are only 2 docs which have invalid field types, both of which occur in the first chunk. Ideally, helper.bulk should upload 998 doc to Elastic, but because this occurs in the first chunk, it only uploads 498 docs and the second chunk is ignored completely. This results in Elastic index missing 500 docs.

From what we can see in the docs, it should continue and process all subsequent chunks..

I have tested this with elasticsearch-py client v7.17.0, 7.17.3, 7.17.12 and 8.15.1 ELK stack v7.17.6, 8.5.1 and 8.15.1

They all exhibit the same issues when using the defaults

` def upload_data_to_elastic(df):

  df.fillna("NoneNone", inplace=True)
  df.replace("NoneNone", None, inplace=True)

  elastic = Elasticsearch(
      ["http://localhost:9200"],
      basic_auth=("USERNAME", "PASSWORD")
  )

  batch = df.to_dict('records')

  if isinstance(batch, dict):
      batch = [batch]

  actions = [
      {
          "_index": f"shakespeare-{str(doc['type'])}",
          "_id": doc['ID'],
          "_source": {**doc}
      }
      for doc in batch
  ]

  try:
      response = helpers.bulk(elastic, actions)
  except Exception as e:
      logger.error("Error uploading data to Elasticsearch: %s", str(e))
      return False

  if response[1]:
      logger.error("Errors occurred during bulk indexing: %s", response[1])
      return False

  logger.info(
      "Inserted %s document(s), %s document(s) failed to insert",
      response[0],
      len(response[1]),
      )
  return True`

if we change the bulk request to response = helpers.bulk(elastic, actions, stats_only=True, raise_on_error=False)

it now succeeds BUT we lose the reason why the document was not indexed from the response. having the reason for a document not being indexed is important as it allows us to go back and correct the source system data producers.

tommycahir commented 4 weeks ago

as per https://discuss.elastic.co/t/documents-failing-to-index-with-helpers-bulk/369834/3?u=tommycahir Setting stats_only=True is why I don't see any per-document failure information. Setting this to False will result in a response object which contains a per-document status