elastic / elasticsearch-py

Official Python client for Elasticsearch
https://ela.st/es-python
Apache License 2.0
29 stars 1.18k forks source link

parallel_bulk leaks memory and retries forever while still consuming the input iterator #1077

Open AntonFriberg opened 4 years ago

AntonFriberg commented 4 years ago

Description Since there is no max_retry configuration on helpers.parallel_bulk (#645) the default case seems to retry forever and never stop. This is a very strange default behavior and caused my batch processing script to run out of memory.

This situation is caused by having the elasticsearch database run out of storage space which I easily achieved by filling a default elasticsearch docker container with documents until my partition was filled.

That lead to the following error with basic curl insertion.

curl -X POST "localhost:9200/test/_doc/?pretty" -H 'Content-Type: application/json' -d'
{
    "key1" : 1,
    "key2": "value2"
}
'
..... # Long wait here until I recieved any result
{
  "error" : {
    "root_cause" : [
      {
        "type" : "cluster_block_exception",
        "reason" : "blocked by: [SERVICE_UNAVAILABLE/2/no master];",
        "suppressed" : [
          {
            "type" : "master_not_discovered_exception",
            "reason" : null
          }
        ]
      }
    ],
    "type" : "cluster_block_exception",
    "reason" : "blocked by: [SERVICE_UNAVAILABLE/2/no master];",
    "suppressed" : [
      {
        "type" : "master_not_discovered_exception",
        "reason" : null
      }
    ]
  },
  "status" : 503
}

Since the parallel_bulk method retries on 503 responses and it retires an infinite amount of times this becomes a major issue. In my case I need to ingest a large amount of small documents into ES on periodic schedule. To do this quickly I increased the queue_size and chunk_size of the parallel_bulk call according to the Elasticsearch documentation. In my case an optimal configuration looked like this.

 for success, info in elasticsearch.helpers.parallel_bulk(
                elastic,
                documents,
                max_retries=2,
                queue_size=10, 
                chunk_size=2000,
                raise_on_exception=True,
                raise_on_error=True):
            if not success:
                # Log any exception or database errors.
                self.logger.error("Failed to insert document: %s", info)

Despite having raise_on_exception and raise_on_error set to true this call continues to consume my iterator and fill up my memory despite every single insertion attempt being stuck on infinite retries. I did especially not expect the iterator to continue to be consumed in such a situation.

Environment: Linux - Debian Stretch Python 3.7 with pip install elasticsearch==7.1.0 Elasticsearch database running with docker run --rm -p 9200:9200 elasticsearch:7.4.2

Expected outcome That the default configuration would be to retry a limited amount of times and that the iterator stops being consumed until the insertion is either aborted or starts working again. I would also very much appreciate getting #645 fixed so that we have control over the amount of retires.

marshallmain commented 4 years ago

parallel_bulk doesn't have any retry logic in it, each bulk action is sent exactly once regardless of network or server failures.

A TransportError or BulkIndexError should be raised if a 503 is returned. Perhaps you are catching this elsewhere in your code and retrying?

AntonFriberg commented 4 years ago

Sorry for the late reply, what happens if the server does not respond at all? What is the default timeout setting for the bulk_insert method?

I did not have any retry logic as far as I remember but I did start multiple parallel_bulk actions in 4 separate processes.

honzakral commented 4 years ago

the default is driven by the client instance passed in as the first parameter. The default timeout for that is 10 seconds after which an operation will timeout and the helper, by default, will raise an exception.

honzakral commented 4 years ago

the default is driven by the client instance passed in as the first parameter. The default timeout for that is 10 seconds after which an operation will timeout and the helper, by default, will raise an exception.