airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.75k stars 4.04k forks source link

[source-mongodb] infinite initialization data with duplicates #40525

Closed almihor closed 1 month ago

almihor commented 3 months ago

Connector Name

source-mongodb

Connector Version

1.4.1

What step the error happened?

During the sync

Relevant information

I am trying to replicate data from MongoDB to MSSQL and encountered a problem. The source collection contains approximately 3,000 documents, but Airbyte performs the initial synchronization until it hangs. As a result, the destination MSSQL contains orders of magnitude more data due to duplicates that differ only by the _ab_cdc_cursor field.

I have tried different versions of the connector, different destinations, and even set up a clean MongoDB and a clean Airbyte locally. The behavior remains the same.

Let me know if you need any changes or additional information!

Relevant log output

2024-06-26 03:10:05 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 70, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:06 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 71, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:06 platform > Records read: 215000 (183 MB)
2024-06-26 03:10:06 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 72, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:07 platform > Records read: 220000 (187 MB)
2024-06-26 03:10:07 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 73, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:07 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 74, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:08 platform > Records read: 225000 (191 MB)
2024-06-26 03:10:08 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 75, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:08 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 76, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:08 platform > Records read: 230000 (195 MB)
2024-06-26 03:10:09 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 77, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:09 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 78, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:09 platform > Records read: 235000 (199 MB)
2024-06-26 03:10:10 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 79, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:10 platform > Records read: 240000 (204 MB)
2024-06-26 03:10:10 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 80, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:10 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 81, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:11 platform > Records read: 245000 (208 MB)
2024-06-26 03:10:11 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 82, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:11 platform > Records read: 250000 (212 MB)
2024-06-26 03:10:11 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 83, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:12 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 84, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:12 platform > Records read: 255000 (217 MB)
2024-06-26 03:10:12 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 85, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:13 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 86, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:13 platform > Records read: 260000 (221 MB)
2024-06-26 03:10:13 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 87, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:13 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 88, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:13 platform > Records read: 265000 (225 MB)
2024-06-26 03:10:14 source > INFO main i.a.i.s.m.MongoDbInitialLoadRecordIterator(computeNext):64 Finishing subquery number : 89, processing at id : /6TTwx0WjkKvaxWNfKHvUw==
2024-06-26 03:10:14 platform > Records read: 270000 (229 MB)

Contribute

almihor commented 3 months ago

Also the same behavior with full refresh|overwrite and all other methods for stream update.

full logs

evantahler commented 3 months ago

Can you confirm that you are using the latest version of the airbyte platform as well?

almihor commented 3 months ago

At work, we use an older version of Airbyte, but I tested the latest version locally, and the behavior was the same.

evantahler commented 3 months ago

If you could please update to the latest version of the airbyte platform, and include a new set of logs, that would make debugging this easier. Also please confirm that your deployment of airbyte has enough memory.

From the logs, it looks like the platform is failing the source in some strange way. From there, the destination commits what it got. But, because the source crashed, on the next attempt the read will start from the beginning again, leading to duplicates.

almihor commented 3 months ago

I'll try to do this before Monday-Tuesday thank you!

almihor commented 2 months ago

@evantahler

Deployed the latest version of airbyte, same behavior. Attaching the logs default_workspace_job_1_attempt_1_txt.txt

rodireich commented 2 months ago

What I find odd is that we don't seem to be making progress on the initial sync - all subqueries are finishing at

processing at id : /6TTwx0WjkKvaxWNfKHvUw==

Chunk size for each query is determined to be 1141064 rows

Chunk size determined for: catalogServiceDb.bonusCategories, to be 1141064 rows

So even though we see an indication of records read successfully, each records ends at the same point

xiaohansong commented 2 months ago

And it's weird because the chunk size is above 1M but it seems we are running a subquery every 5000-10000 records.

The ID looks like a base64 encoded UUID /6TTwx0WjkKvaxWNfKHvUw== == ffa4d3c3-1d16-8e42-af6b-158d7ca1ef53 and I wonder if that could be related. My guess is user's ID column is actually a binary formatted UUID. I'll look into the implication of that

xiaohansong commented 2 months ago

@almihor I wonder what is the type of your _id in mongodb? are they consistent? Are they typed in binary or string or both?

Our logic is

  1. select <columns> fromorder by _id asc limit <chunk_size>
  2. fetch the last _id and convert to string id last_str_id to some airbyte internal processing logic
  3. Run the query select <columns> fromorder by _id asc where _id > converting_func(last_str_id)
  4. In your case, 2nd should return 0 record and then we end the initial sync, because chunk_size is determined to ~1M and you only have a few thousand documents.

One explanation is this converting_func isn't working properly, either from mongo _id to str_id, or str_id back to mongo _id. If you have mixed types of ids that could possibly explain this too.

I think a query of

db.yourCollection.aggregate([
  {
    $project: {
      idType: { $type: "$_id" }
    }
  },
  {
    $group: {
      _id: "$idType",
      count: { $sum: 1 }
    }
  },
  {
    $sort: { count: -1 }
  }
])

would do the work

almihor commented 2 months ago

@xiaohansong query result:

image

just in case, a couple of IDs:

[
  {
    "_id": {"$binary": {"base64": "S/TgiB6yHUejhnihVwzFRg==", "subType": "03"}}
  },
  {
    "_id": {"$binary": {"base64": "Iy8nILiGSU2fHG5btCtlSw==", "subType": "03"}}
  },
  {
    "_id": {"$binary": {"base64": "SzCPhZBvh0yX43q03D/3qA==", "subType": "03"}}
  },
  {
    "_id": {"$binary": {"base64": "2gtuqiK9IEOHNqbk6Zy7qQ==", "subType": "03"}}
  }
]
evantahler commented 2 months ago

grooming notes:

Code ref starting point: https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbInitialLoadRecordIterator.java#L61

xiaohansong commented 2 months ago

Cool I was able to reproduce it with my test db. Will tweak around and see where is the issue

https://cloud.airbyte.com/workspaces/c757521b-e7f0-4385-b64c-9a1984914ffa/connections/318041de-c29f-4037-8305-69025a7a466b/job-history

xiaohansong commented 2 months ago

I learned we need to convert str back to BsonBinary with original subtype here:

https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/IdType.java#L111

Default was BsonBinarySubType.BINARY which can be different from user's actual _id type (legacy_uuid), thus it converts back to a totally different binary to compare with, so it breaks the sorting mechanism.

I'll put up a fix soon. Thanks @almihor for your data, that's really helpful!

xiaohansong commented 2 months ago

@almihor you can use source-mongodb-v2 version 1.4.3-dev.e7d35a725f to unblock yourself. Change the version in UI under Settings -> Sources. We need to run a few more tests to merge the PR so official release won't be soon, but for your use case I think this dev version could address your issue! Once this ticket is closed you will need to unset the pinned version to allow future updates on this connector.

almihor commented 1 month ago

@xiaohansong Hello! Sorry for delayed answer) There is some error while init-landing data, but data was landed. logs

upd and next run logs: 07903e37_87da_41ed_b096_04429355e3df_job_2471_attempt_1_txt.txt without any landed data, but i guess there wasn't any updates in source data

xiaohansong commented 1 month ago

@almihor thanks for trying it out. Errors look like transient; and mongodb indeed will only do incremental syncs so it's expected there isn't any landed data in the 2nd run unless you reset the source/connection.

theyueli commented 1 month ago

it seems this issue has been resolved. Thanks @xiaohansong. Closing the ticket. Please feel to reopen if there is any further issue.