yougov / mongo-connector

MongoDB data stream pipeline tools by YouGov (adopted from MongoDB)
Apache License 2.0
1.88k stars 478 forks source link

retry unset a field does not exist infinitely and block the sync process #720

Open nonight89 opened 7 years ago

nonight89 commented 7 years ago

Hi, I met a problem as the title described when I sync data from mongodb to es2.

Error log as following:

2017-07-21 15:15:02,774 [ERROR] mongo_connector.oplog_manager:288 - Unable to process oplog document {u'b': True, u'h': -526593452711543270L, u'ts': Timestamp(1500621033, 582), u'o': {u'_id': ObjectId('58fb01b5e4b0c9303aeeafbe')}, u'v': 2, u'ns': u'auto-pipeline-monitor.ugc_video', u'op': u'd'} Traceback (most recent call last): File "/usr/lib/python2.6/site-packages/mongo_connector-2.5.1-py2.6.egg/mongo_connector/oplog_manager.py", line 250, in run entry['o']['_id'], ns, timestamp) File "/usr/lib/python2.6/site-packages/mongo_connector-2.5.1-py2.6.egg/mongo_connector/util.py", line 35, in wrapped return f(*args, *kwargs) File "/data/elastic2-doc-manager-0.3.0/mongo_connector/doc_managers/elastic2_doc_manager.py", line 441, in remove self.index(action, meta_action) File "/data/elastic2-doc-manager-0.3.0/mongo_connector/doc_managers/elastic2_doc_manager.py", line 473, in index self.commit() File "/data/elastic2-doc-manager-0.3.0/mongo_connector/doc_managers/elastic2_doc_manager.py", line 495, in commit self.send_buffered_operations() File "/data/elastic2-doc-manager-0.3.0/mongo_connector/doc_managers/elastic2_doc_manager.py", line 482, in send_buffered_operations action_buffer = self.BulkBuffer.get_buffer() File "/data/elastic2-doc-manager-0.3.0/mongo_connector/doc_managers/elastic2_doc_manager.py", line 693, in get_buffer self.update_sources() File "/usr/lib/python2.6/site-packages/mongo_connector-2.5.1-py2.6.egg/mongo_connector/util.py", line 35, in wrapped return f(args, kwargs) File "/data/elastic2-doc-manager-0.3.0/mongo_connector/doc_managers/elastic2_doc_manager.py", line 648, in update_sources updated = self.docman.apply_update(source, update_spec) File "/data/elastic2-doc-manager-0.3.0/mongo_connector/doc_managers/elastic2_doc_manager.py", line 224, in apply_update return super(DocManager, self).apply_update(doc, update_spec) File "/usr/lib/python2.6/site-packages/mongo_connector-2.5.1-py2.6.egg/mongo_connector/doc_managers/doc_manager_base.py", line 120, in apply_update exc_tb) File "/usr/lib/python2.6/site-packages/mongo_connector-2.5.1-py2.6.egg/mongo_connector/doc_managers/doc_manager_base.py", line 114, in apply_update _unset_field(doc, to_unset) File "/usr/lib/python2.6/site-packages/mongo_connector-2.5.1-py2.6.egg/mongo_connector/doc_managers/doc_manager_base.py", line 80, in _unset_field where = _retrieve_path(doc, path[:-1]) File "/usr/lib/python2.6/site-packages/mongo_connector-2.5.1-py2.6.egg/mongo_connector/doc_managers/doc_manager_base.py", line 49, in _retrieve_path looking_at = looking_at[part] UpdateDoesNotApply: Cannot apply update {u'$set': {u'legoSendBitratesTask': 2, u'taskType': 0}, u'$unset': {u'bitrates.firstDistState': 1, u'bitrates.distState': 1, u'bitrates.transState': 1, u'bitrates.firstTransState': 1}**} to {u'legoSendBitratesTask': 2, u'updateTime': 1500603425L, u'uid': 1065007773, u'ip': u'119.188.144.216', u'inputPipeline': 2, u'userType': 0, u'taskType': 0, u'duration': 21, u'size': 658326, u'uploadInfo': {u'state': 2}, u'uploadType': 0, u'province': u'\u5c71\u4e1c', u'repeatCheckResult': 2, u'appId': 4367, u'createTime': 1500603419, u'fileId': u'6f6ad6ddff404e8f813b702235debb47', u'parseInfo': {u'cbState': 2, u'state': 2, u'notifyLego': 2}, u'archiveInfo': {u'state': 2}, u'isp': u'\u8054\u901a', u'legoSendAuditImgTask': 2, u'qipuId': 8377179209L, u'auditImg': {u'transInfo': {u'cbState': 0, u'state': 1}}, u'legoEnqueue': 2}

Log file kept printing the same log message infinitely, and sync process was blocked.

I have checked the source code : def _retrieve_path(container, path, create=False): looking_at = container for part in path: if isinstance(looking_at, dict): if create and not part in looking_at: looking_at[part] = {} looking_at = looking_at[part] In _unset_field method, _retrieve_path method uses 'create' default value, so unset a field does not exist may lead an exception when looking_at[part] is null.

Actually, in mongodb, we can unset a field does not exist.

ShaneHarvey commented 7 years ago

What version of MongoDB are your replicating from? If it's MongoDB >= 2.6 then I think a separate error caused that Elasticsearch document to become out of sync because MongoDB >= 2.6 does not record $unset on fields that do not exist. I recommend resyncing your data.

However, if you can reproduce this failure please post the sequence of MongoDB operations (inserts, updates, etc..) that causes it and we'll go form there.

nonight89 commented 7 years ago

My MongoDB version is 2.4.

I have tried many times to resync. All the old data are done successfully, but when syncing process goes to the real time data the "UpdateDoesNotApply" error occurs.

Yesterday, I had modified the source code.

I use create=True with _retrieve_path in _unset_field method, and set a check if index_or_key in where before del where[index_or_key].

Then I resynced my data. Finally it works. :)

ShaneHarvey commented 7 years ago

It's definitely a bug that you're seeing this error on when replicating from 2.4. This part of the code is supposed to ignore $unset on missing fields and instead log a warning: https://github.com/mongodb-labs/mongo-connector/blob/master/mongo_connector/doc_managers/doc_manager_base.py#L92-L99

I'll try to reproduce this and get back to you.

airios commented 5 years ago

Ran into this the other day. I modified doc_manager_base.py as per @nonight89's instructions:

        def _unset_field(doc, to_unset):
            try:
                if "." in to_unset:
                    path = to_unset.split(".")
                    where = _retrieve_path(doc, path[:-1], create=True)
                    index_or_key = _convert_or_raise(where, path[-1])
                    if isinstance(where, list):
                        # Unset an array element sets it to null.
                        where[index_or_key] = None
                    else:
                        if index_or_key in where:
                            # Unset field removes it entirely.
                            del where[index_or_key]
                else:
                    del doc[to_unset]
            except (KeyError, IndexError, ValueError):
                source_version = get_mininum_mongodb_version()
                if source_version is None or source_version.at_least(2, 6):
                    raise
                # Ignore unset errors since MongoDB 2.4 records invalid
                # $unsets in the oplog.
                LOG.warning(
                    "Could not unset field %r from document %r. "
                    "This may be normal when replicating from "
                    "MongoDB 2.4 or the destination could be out of "
                    "sync." % (to_unset, doc)
                )

After updating, I restarted mongoconnector and re-ran the sync. It worked like a charm.