elastic / elasticsearch-py

Official Python client for Elasticsearch
https://ela.st/es-python
Apache License 2.0
4.23k stars 1.18k forks source link

5.4.0 New 'pipeline' metadata key should be '_pipeline' #640

Closed thatcher closed 7 years ago

thatcher commented 7 years ago

Upgrading from 5.3 to 5.4 broke my ETL because i already have a field 'pipeline' in my batch submissions. Metadata fields are always prefixed with '_' so I can avoid colliding with them. Why not '_pipeline'?

Traceback (most recent call last):
  File "/opt/geostellar_com/etl/geostellar_etl/__init__.py", line 546, in checkpoint
    self.submit(results[:self.batch_size])
  File "/opt/geostellar_com/etl/geostellar_etl/load/__init__.py", line 233, in submit
    bulk(self.destination, batch)
  File "/opt/anaconda2/lib/python2.7/site-packages/elasticsearch/helpers/__init__.py", line 195, in bulk
    for ok, item in streaming_bulk(client, actions, **kwargs):
  File "/opt/anaconda2/lib/python2.7/site-packages/elasticsearch/helpers/__init__.py", line 163, in streaming_bulk
    for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
  File "/opt/anaconda2/lib/python2.7/site-packages/elasticsearch/helpers/__init__.py", line 92, in _process_bulk_chunk
    raise e
RequestError: TransportError(400, u'illegal_argument_exception', u'Malformed action/metadata line [1], expected a simple value for field [pipeline] but found [START_OBJECT]')

Much appreciated, Thatcher

thatcher commented 7 years ago

Just checking in to see if anyone had feedback about this issue? I can submit a pull request if it seems like a reasonable request, or if there is a good reason to not use the underscore prefix for pipeline I might need some help determining the best course to correct the conflict in my ETL implementation.

fxdgear commented 7 years ago

@thatcher Sorry the late reply.

Can i please see an example of the bulk body you're trying to submit? feel free to obfuscate the data i only need a minimal example to try and reproduce.

thanks

thatcher commented 7 years ago

Sure thanks. With elasticsearch-py 5.3.0 this works, with 5.4.0 it breaks with the stacktrace above.

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

connection = Elasticsearch(
    hosts=['localhost:9200'],
)
connection.indices.delete(
    index='foo',
    ignore=[400, 404],
)
connection.indices.create(
    index='foo',
    ignore=[400, 404],
)

batch = [{
    '_op_type': 'index',
    '_index': 'foo',
    '_type': 'bar',
    '_id': 'doc-000',
    'title': 'Deal John Doe, Montana, Ranch',
    'pipeline': {
        'Organic Lead': 100,
        'Engagement': 50,
        'Proposal': 25
    },
}, {
    '_op_type': 'index',
    '_index': 'foo',
    '_type': 'bar',
    '_id': 'doc-001',
    'title': 'Deal Jane Doe, Florida, Condo',
    'pipeline': {
        'Organic Lead': 200,
        'Engagement': 80,
        'Proposal': 45
    },
}]
bulk(connection, batch)
Traceback (most recent call last):
  File "pipeline_meta_field_issue.py", line 42, in <module>
    bulk(connection, batch)
  File "/opt/anaconda2/lib/python2.7/site-packages/elasticsearch/helpers/__init__.py", line 195, in bulk
    for ok, item in streaming_bulk(client, actions, **kwargs):
  File "/opt/anaconda2/lib/python2.7/site-packages/elasticsearch/helpers/__init__.py", line 163, in streaming_bulk
    for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
  File "/opt/anaconda2/lib/python2.7/site-packages/elasticsearch/helpers/__init__.py", line 92, in _process_bulk_chunk
    raise e
elasticsearch.exceptions.RequestError: TransportError(400, u'illegal_argument_exception', u'Malformed action/metadata line [1], expected a simple value for field [pipeline] but found [START_OBJECT]')
fxdgear commented 7 years ago

Thanks will try to reproduce :)

fxdgear commented 7 years ago

@thatcher so the pipeline is a reserved word in the Elasticsearch API. Not anything that is done wrt to the python client.

https://github.com/elastic/elasticsearch/blob/master/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json#L56-L58

I'm going to close this issue since it's not to do with the python client but rather Elasticsearch instead.

A work around for you to try would be to specify the doc type in your bulk action

something like this:

batch = [{
    '_op_type': 'index',
    '_index': 'foo',
    '_type': 'bar',
    '_id': 'doc-000',
    'doc': {
        'title': 'Deal John Doe, Montana, Ranch',
        'pipeline': {
            'Organic Lead': 100,
            'Engagement': 50,
            'Proposal': 25
        },
    }
}, {
    '_op_type': 'index',
    '_index': 'foo',
    '_type': 'bar',
    '_id': 'doc-001',
    'doc': {
        'title': 'Deal Jane Doe, Florida, Condo',
        'pipeline': {
            'Organic Lead': 200,
            'Engagement': 80,
            'Proposal': 45
        },
    }
}]
thatcher commented 7 years ago

ah got it, thanks!