elastic / elasticsearch-serverless-python

Official Python client for Elasticsearch Serverless
Apache License 2.0
9 stars 4 forks source link

Differences in behaviour between helper.bulk and Elasticsearch.bulk #39

Closed alvinr closed 4 months ago

alvinr commented 5 months ago

Problem: When indexing documents, there is a different behaviour when there is an ingest pipeline that requires a field that is missing.

Elasticsearch.bulk seems to swallow the error/exception Helper.bulk raises the exception for the user to deal with with the following error code

{'type': 'illegal_argument_exception', 'reason': 'field [content] not present as part of path [content]'}

Workaround: If the pipeline is created with ignore_missing: True, then Helper.bulk will not throw an exception.

Reproduce: See attached file

alvinr commented 5 months ago
from elasticsearch_serverless import Elasticsearch
from elasticsearch_serverless.helpers import bulk

def doit():
  # Serverless, Vector Oprimized instance
  creds = {
    "url": "xxx",
    "api_key": "xxx"
  }

  client = Elasticsearch(
    creds['url'],
    api_key=creds['api_key'],
    request_timeout=500,
    max_retries=0
  )

  index_name = "test"

  # Using bulk
  setup_trained(client, index_name, ".elser_model_2_linux-x86_64", 1, 1, False)

  doc_good =  [{ "index": { "_index": index_name, "_id": 11}}, {"title": "Anarchism", "content": "..."}]
  doc_bad  =  [{ "index": { "_index": index_name, "_id": 12}}, {"title": "AfghanistanHistory", "redirect": "History of Afghanistan"}]

  try:
    # doc inserted
    client.bulk(operations=doc_good)
    # Doc not inserted, but no expection raised
    client.bulk(operations=doc_bad)
  except Exception as e:
    print(e)
    raise

  # Using bulk helper with ignore_missing: True
  doc_good = { "_op_type": "index",
               "_index": index_name,
               "_id": 1,
               "_source": '{"title": "Anarchism", "content": "..."}'}
  doc_bad = { "_op_type": "index",
              "_index": index_name,
              "_id": 2,
              "_source": '{"title": "AfghanistanHistory", "redirect": "History of Afghanistan"}'}

  setup_trained(client, index_name, ".elser_model_2_linux-x86_64", 1, 1, True)

  try:
    # Doc inserted
    bulk(client, actions=[doc_good], raise_on_error=True)
    # Doc inserted
    bulk(client, actions=[doc_bad], raise_on_error=True)
  except Exception as e:
    print(e)
    raise

  # Using bulk helper with ignore_missing: False
  doc_good = { "_op_type": "index",
               "_index": index_name,
               "_id": 21,
               "_source": '{"title": "Anarchism", "content": "..."}'}
  doc_bad = { "_op_type": "index",
              "_index": index_name,
              "_id": 22,
              "_source": '{"title": "AfghanistanHistory", "redirect": "History of Afghanistan"}'}
  setup_trained(client, index_name, ".elser_model_2_linux-x86_64", 1, 1, False)

  try:
    # Doc inserted
    bulk(client, actions=[doc_good], raise_on_error=True)
    # Doc not inserted, but exception raised
    bulk(client, actions=[doc_bad], raise_on_error=True)
  except Exception as e:
    print(e)
    raise

def setup_trained(client, index_name, model, ingest_allocators, ingest_threads, ignore):

  status = client.ml.get_trained_models(model_id=model + "*",
                                        include='definition_status',
                                        allow_no_match=True)

  print(status)
  if status['count'] == 0:
    # download ELSER v2
    client.ml.put_trained_model(model_id=model,
                                wait_for_completion=True,
                                input={'field_names': ['text_field']})

    # deploy the model
    client.ml.start_trained_model_deployment(model_id=model, wait_for='fully_allocated',
                                             cache_size=0,
                                             number_of_allocations=ingest_allocators, threads_per_allocation=ingest_threads)

  # define a pipeline
  client.ingest.put_pipeline(
      id='elser-ingest-pipeline',
      processors=[
          {
              'inference': {
                  'model_id': model,
                  'input_output': [
                      {
                          'input_field': 'content',
                          'output_field': 'elser_embedding',
                      }
                  ],
                  'ignore_missing': ignore
              }
          }
      ]
  )

  client.indices.delete(index=index_name, ignore_unavailable=True)

  client.indices.create(
            index=index_name,
            mappings={
                'properties': {
                    'elser_embedding': {
                        'type': 'sparse_vector',
                    },
                }
            },
            settings={
                'index': {
                    'default_pipeline': 'elser-ingest-pipeline'
                }
            }
        )

if __name__ == '__main__':
    doit()
pquentin commented 5 months ago

Thanks for the reproducing script, and sorry it took so long to get back to you. I ran everything myself, and can confirm the results you are seeing. However, they are expected. (But not obvious, and I had forgot about it when you first asked me about it). Indeed, the client.bulk method is a simple call to the Elasticsearch Bulk API. As mentioned in those docs, errors do not result in an HTTP error code: the response must be inspected instead. If I modify your code to print the response:

# Doc not inserted, but no expection raised
results = client.bulk(operations=doc_bad)
print(json.dumps(results.body, indent=2))

I get the following response, showing clearly the error:

{
  "errors": true,
  "took": 0,
  "ingest_took": 0,
  "items": [
    {
      "index": {
        "_index": "test",
        "_id": "12",
        "status": 400,
        "error": {
          "type": "illegal_argument_exception",
          "reason": "field [content] not present as part of path [content]"
        }
      }
    }
  ]
}

This is one thing that the bulk helper does for you, and one reason why the client bulk method should never be used directly (the only exception being that the bulk helper does not do exactly what you want).

It's a common mistake however (not specific to serverless), and I'm not sure how to make it clearer:

I suppose I can try affecting the documentation generation process to modify https://elasticsearch-py.readthedocs.io/en/v8.13.1/api/elasticsearch.html#elasticsearch.Elasticsearch.bulk to mention that the bulk helper should be preferred as it handles errors and chunks the data.

pquentin commented 4 months ago

Closing I haven't heard from your for now.