airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
14.79k stars 3.8k forks source link

Destination MSSQL duplicate array records when using incremental append (nested tables) #9465

Open marcosmarxm opened 2 years ago

marcosmarxm commented 2 years ago

Is this your first time deploying Airbyte: No OS Version / Instance: Ubuntu 20.04 (2 vCPU ARM) Memory / Disk: 8GB / 40GB Deployment: Docker compose Airbyte Version: 0.35.4-alpha Source name/version: Custom connector Destination name/version: MSSQL 0.1.13 Step: Sending nesting array data for normalization from custom connector to MSSQL using incremental append duplicate data on nesting on each sync run, even though no records are found it's adding new rows, the logs says "Read 0 records from forms stream" Description: When no records found anyway it's executing normalization and duplicating the rows on nested tables from arrays.

from slack convo: Hello @Marcos Marx (Airbyte), yes the data it's very simple something like this: [{id: "1", name: "test", sub_objects: [{name: "abcd"}]}] it works perfect with main table, but with sub_objects table on every sync with no any extra data is re-running normalization and on that step duplicating data on sub_object table.

agrass commented 2 years ago

Hi, thanks por creating the issue @marcosmarxm. I would like to help with this, any idea where I can start checking? It's normal that re-run normalization when no new data it's fetch? Thanks

ChristopheDuong commented 2 years ago

Nested streams are not de-duplicated, see https://github.com/airbytehq/airbyte/blob/ecfc9e1cc586c7d7725e3c92ed178a5ea14ff7bf/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py#L142

You could use a custom transformation where you can specify how to de-duplicate sub-streams

agrass commented 2 years ago

Thanks for the response @ChristopheDuong, I'm working with incremental append and It make sense what you comment because there’s not a cursor/primary key that I'm agree, but the problem it's more related that it's duplicating data on every run with no data. This generate thousands of rows on each sync every 5 minutes and re-run the normalization every sync that is not that efficient. This it's an expected behavior or what do you think? It should run the normalization only on new data?

For example this 3 syncs with no data (0bytes) is generating new rows on nested table on each run: Captura de Pantalla 2022-01-11 a la(s) 18 53 58

agrass commented 2 years ago

It's there a way where I can check here on this line if the process it's sync and without any new records to avoid re-run the normalization?? that make sense? @ChristopheDuong https://github.com/airbytehq/airbyte/blob/2115f7ace470cef80bf72476f410a2f18e49abcd/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java#L47

ChristopheDuong commented 2 years ago

No, that's not right.

If there is one record for a stream not related to your substream in your connection, normalization would be triggered and rows will be appended in your un-nested table too.

You should really look at custom transformation where you can specify how to de-duplicate sub-streams https://docs.airbyte.com/operator-guides/transformation-and-normalization/transformations-with-airbyte

agrass commented 2 years ago

thanks for the response @ChristopheDuong, sorry I didn't understand well, probably I'm missing some context. it's there an example case where it's needed to re-run normalization with no new records found? Not considering when you reset your data.

marcosmarxm commented 1 year ago

Zendesk ticket #1758 has been linked to this issue.

marcosmarxm commented 1 year ago

Comment made from Zendesk by Marcos Marx on 2022-08-01 at 12:32:

Hello Jaafar, look your issue is similar to https://github.com/airbytehq/airbyte/issues/9465
Currently you need to dedup nested records by yourself probably exporting the normalization project and executing the dedup.
The main reason of this is:
# nested streams can't be deduped like their parents (as they may not share the same cursor/primary keys)