yougov / mongo-connector

MongoDB data stream pipeline tools by YouGov (adopted from MongoDB)
Apache License 2.0
1.88k stars 478 forks source link

mongo-connector stops after choking on large field #482

Open PeterBackman opened 8 years ago

PeterBackman commented 8 years ago

Elasticsearch/Lucene can not handle terms larger then 32K. Trying to insert something bigger in an unanalyzed field will yield an exception from Elasticsearch. When this happens when inserting from mongo-connector the connector will stop.

One malformatted document should not bring the connector to a full stop. It would be better to just log this as an ERROR in the connector log clearly stating what document was unsuccessful.

Used versions: -Elasticsearch 2.3.1 -mongo-connector 2.4 -es2-doc-manager 0.1.0

Log from mongo-connector:

2016-06-15 19:01:00,478 [CRITICAL] mongo_connector.oplog_manager:549 - Exception during collection dump
Traceback (most recent call last):
  File "/usr/eiffel/es/eiseiffel2.es52.ES2.3/dist/python/lib/python3.4/site-packages/mongo_connector/oplog_manager.py", line 501, in do_dump
    upsert_all(dm)
  File "/usr/eiffel/es/eiseiffel2.es52.ES2.3/dist/python/lib/python3.4/site-packages/mongo_connector/oplog_manager.py", line 485, in upsert_all
    dm.bulk_upsert(docs_to_dump(namespace), mapped_ns, long_ts)
  File "/usr/eiffel/es/eiseiffel2.es52.ES2.3/dist/python/lib/python3.4/site-packages/mongo_connector/util.py", line 32, in wrapped
    return f(*args, **kwargs)
  File "/usr/eiffel/es/eiseiffel2.es52.ES2.3/dist/python/lib/python3.4/site-packages/mongo_connector/doc_managers/elastic2_doc_manager.py", line 203, in bulk_upsert
    for ok, resp in responses:
  File "/usr/eiffel/es/eiseiffel2.es52.ES2.3/dist/python/lib/python3.4/site-packages/elasticsearch/helpers/__init__.py", line 160, in streaming_bulk
    for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
  File "/usr/eiffel/es/eiseiffel2.es52.ES2.3/dist/python/lib/python3.4/site-packages/elasticsearch/helpers/__init__.py", line 132, in _process_bulk_chunk
    raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
elasticsearch.helpers.BulkIndexError: ('1 document(s) failed to index.', [{'index': {'_id': '0037ad0b-acfa-11e5-843d-8b904bca518b', '_type': 'documents', '_index': 'eiffel009_allevents_2', 'status': 400, 'error': {'type': 'illegal_argument_exception', 'reason': 'Document contains at least one immense term in field="eventData.optionalParameters.COMPLETE_BASELINE" (whose UTF8 encoding is longer than the max length 32766), all of which were skipped.  Please correct the analyzer to not produce such terms.  The prefix of the first immense term is: \'[123, 34, 67, 88, 67, 49, 55, 51, 51, 49, 53, 48, 34, 58, 123, 34, 83, 72, 65, 49, 34, 58, 34, 34, 44, 34, 78, 65, 77, 69]...\', original message: bytes can be at most 32766 in length; got 33328', 'caused_by': {'type': 'max_bytes_length_exceeded_exception', 'reason': 'max_bytes_length_exceeded_exception: bytes can be at most 32766 in length; got 33328'}}}}])
2016-06-15 19:01:00,479 [ERROR] mongo_connector.oplog_manager:557 - OplogThread: Failed during dump collection cannot recover! Collection(Database(MongoClient(host=['arm104-eiffel999.lmera.ericsson.se:27017'], document_class=dict, tz_aware=False, connect=True, replicaset='rs0'), 'local'), 'oplog.rs')
2016-06-15 19:01:01,091 [ERROR] __main__:302 - MongoConnector: OplogThread <OplogThread(Thread-2, started 140659493086976)> unexpectedly stopped! Shutting down
2016-06-15 19:15:23,948 [ERROR] mongo_connector.oplog_manager:488 - OplogThread: caught exception during bulk upsert, re-upserting documents serially
Traceback (most recent call last):
  File "/usr/eiffel/es/eiseiffel2.es52.ES2.3/dist/python/lib/python3.4/site-packages/mongo_connector/oplog_manager.py", line 485, in upsert_all
    dm.bulk_upsert(docs_to_dump(namespace), mapped_ns, long_ts)
  File "/usr/eiffel/es/eiseiffel2.es52.ES2.3/dist/python/lib/python3.4/site-packages/mongo_connector/util.py", line 32, in wrapped
    return f(*args, **kwargs)
  File "/usr/eiffel/es/eiseiffel2.es52.ES2.3/dist/python/lib/python3.4/site-packages/mongo_connector/doc_managers/elastic2_doc_manager.py", line 203, in bulk_upsert
    for ok, resp in responses:
  File "/usr/eiffel/es/eiseiffel2.es52.ES2.3/dist/python/lib/python3.4/site-packages/elasticsearch/helpers/__init__.py", line 160, in streaming_bulk
    for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
  File "/usr/eiffel/es/eiseiffel2.es52.ES2.3/dist/python/lib/python3.4/site-packages/elasticsearch/helpers/__init__.py", line 132, in _process_bulk_chunk
    raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
elasticsearch.helpers.BulkIndexError: ('1 document(s) failed to index.', [{'index': {'status': 400, '_index': 'eiffel009_allevents_2', '_type': 'documents', 'error': {'caused_by': {'type': 'max_bytes_length_exceeded_exception', 'reason': 'bytes can be at most 32766 in length; got 33328'}, 'type': 'illegal_argument_exception', 'reason': 'Document contains at least one immense term in field="eventData.optionalParameters.COMPLETE_BASELINE" (whose UTF8 encoding is longer than the max length 32766), all of which were skipped.  Please correct the analyzer to not produce such terms.  The prefix of the first immense term is: \'[123, 34, 67, 88, 67, 49, 55, 51, 51, 49, 53, 48, 34, 58, 123, 34, 83, 72, 65, 49, 34, 58, 34, 34, 44, 34, 78, 65, 77, 69]...\', original message: bytes can be at most 32766 in length; got 33328'}, '_id': '0037ad0b-acfa-11e5-843d-8b904bca518b'}}])
2016-06-15 19:21:47,184 [WARNING] elasticsearch:82 - PUT /eiffel009_allevents_2/documents/0037ad0b-acfa-11e5-843d-8b904bca518b?refresh=false [status:400 request:0.009s]
2016-06-15 19:21:47,185 [ERROR] mongo_connector.oplog_manager:473 - Could not upsert document: {<REMOVED  BIG DOCUMENT!>}
llvtt commented 8 years ago

Thanks for the detailed report. This looks like a bug in the elastic2-doc-manager package. I filed issues for this in that project, as well as the old elastic-doc-manager, which has the same problem.

kevin-xk commented 8 years ago

@llvtt, hi, my English is poor, i Used versions: -Elasticsearch 2.3.4 -mongo-connector 2.4.1 -es2-doc-manager 0.2.0 And i have the same issue.

2016-09-14 17:23:12,901 [ERROR] mongo_connector.oplog_manager:570 - OplogThread: caught exception during bulk upsert, re-upserting documents serially Traceback (most recent call last): File "/Library/Python/2.7/site-packages/mongo_connector/oplog_manager.py", line 567, in upsert_all dm.bulk_upsert(docs_to_dump(namespace), mapped_ns, long_ts) File "/Library/Python/2.7/site-packages/mongo_connector/util.py", line 32, in wrapped return f(_args, *_kwargs) File "/Library/Python/2.7/site-packages/mongo_connector/doc_managers/elastic2_doc_manager.py", line 229, in bulk_upsert for ok, resp in responses: File "/Library/Python/2.7/site-packages/elasticsearch/helpers/init.py", line 162, in streaming_bulk for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs): File "/Library/Python/2.7/site-packages/elasticsearch/helpers/init.py", line 134, in _process_bulk_chunk raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)

ShaneHarvey commented 7 years ago

@kevin-xk It looks like you used the --continue-on-error option and the documents should have been re-inserted one at a time after that (ignoring those documents that fail to insert).

Are you seeing the error again after the initial collection dump completes?