airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.5k stars 3.99k forks source link

Normalization: final tables don't have any data #8028

Closed marcosmarxm closed 2 years ago

marcosmarxm commented 2 years ago
## Environment - **Airbyte version**: 0.32.0-alpha - **OS Version / Instance**: MacOS - **Deployment**: Docker - **Source Connector and version**: Github 0.2.4 - **Destination Connector and version**: Postgres 0.3.12 - **Severity**: Very Low / Low / Medium / High / **Critical** - **Step where error happened**: Deploy / **Sync job** / Setup new connection / Update connector / Upgrade Airbyte ## Current Behavior My instance was upgrade from 0.31.38-alpha to 0.32.0-alpha Set Github to Postgres, only the stargaze stream ![image](https://user-images.githubusercontent.com/5154322/142084815-d095fd3e-d536-44c0-b26a-f5b9ebb758ad.png) Final table stargaze is empty ![image](https://user-images.githubusercontent.com/5154322/142080979-f07f1330-b33a-4dc5-8c4e-fc68c00673b2.png) The stargaze scd table has data with `_active_row = 1` ![image](https://user-images.githubusercontent.com/5154322/142081045-fd6a4921-20ac-4cca-bb89-740726d89d9d.png) #### dbt files ``` root@ab914a1fc1a1:/tmp/workspace/7/0/normalize/models/generated/airbyte_incremental/github# cat stargazers.sql {{ config( indexes = [{'columns':['_airbyte_unique_key'],'unique':True}], unique_key = "_airbyte_unique_key", schema = "github", tags = [ "top-level" ] ) }} -- Final base SQL model select _airbyte_unique_key, {{ adapter.quote('user') }}, user_id, repository, starred_at, _airbyte_ab_id, _airbyte_emitted_at, {{ current_timestamp() }} as _airbyte_normalized_at, _airbyte_stargazers_hashid from {{ ref('stargazers_scd') }} -- stargazers from {{ source('github', '_airbyte_raw_stargazers') }} where 1 = 1 and _airbyte_active_row = 1 {{ incremental_clause('_airbyte_emitted_at') }} ``` #### Generated dbt files ``` root@ab914a1fc1a1:/tmp/workspace/7/0/build/run/airbyte_utils/models/generated/airbyte_incremental/github# cat stargazers.sql delete from "postgres".github."stargazers" where (_airbyte_unique_key) in ( select (_airbyte_unique_key) from "stargazers__dbt_tmp210240814249" ); insert into "postgres".github."stargazers" ("user", "user_id", "repository", "starred_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_stargazers_hashid", "_airbyte_unique_key") ( select "user", "user_id", "repository", "starred_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_stargazers_hashid", "_airbyte_unique_key" from "stargazers__dbt_tmp210240814249" ); ``` ``` root@ab914a1fc1a1:/tmp/workspace/7/0/build/run/airbyte_utils/models/generated/airbyte_incremental/github# cat stargazers_ab3.sql create table "postgres"._airbyte_github."stargazers_ab3" as ( with __dbt__cte__stargazers_ab1 as ( -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema select jsonb_extract_path(table_alias._airbyte_data, 'user') as "user", jsonb_extract_path_text(_airbyte_data, 'user_id') as user_id, jsonb_extract_path_text(_airbyte_data, 'repository') as repository, jsonb_extract_path_text(_airbyte_data, 'starred_at') as starred_at, _airbyte_ab_id, _airbyte_emitted_at, now() as _airbyte_normalized_at from "postgres".github._airbyte_raw_stargazers as table_alias -- stargazers where 1 = 1 ), __dbt__cte__stargazers_ab2 as ( -- SQL model to cast each column to its adequate SQL type converted from the JSON schema type select cast("user" as jsonb ) as "user", cast(user_id as bigint ) as user_id, cast(repository as varchar ) as repository, cast(nullif(starred_at, '') as timestamp with time zone ) as starred_at, _airbyte_ab_id, _airbyte_emitted_at, now() as _airbyte_normalized_at from __dbt__cte__stargazers_ab1 -- stargazers where 1 = 1 )-- SQL model to build a hash column based on the values of this record select md5(cast(coalesce(cast("user" as varchar ), '') || '-' || coalesce(cast(user_id as varchar ), '') || '-' || coalesce(cast(repository as varchar ), '') || '-' || coalesce(cast(starred_at as varchar ), '') as varchar )) as _airbyte_stargazers_hashid, tmp.* from __dbt__cte__stargazers_ab2 tmp -- stargazers where 1 = 1 ); ``` ## Logs ## Steps to Reproduce 1. create a Github connector using Integration Account 2. create a docker postgres database `docker run --rm --name dest -p 2001:5432 -e POSTGRES_PASSWORD=password -d postgres` 3. create a postgres destination 4. create a connection, select only stargaze stream with incremental + dedup history 5. sync and check final table. ## Are you willing to submit a PR?

Remove this with your answer.

marcosmarxm commented 2 years ago

I'll try a fresh instance and see if it's reproducible.

marcosmarxm commented 2 years ago

Slack convo

marcosmarxm commented 2 years ago

I'd installed a clean 0.32.0-alpha version and works fine.

ChristopheDuong commented 2 years ago

it would have been helpful to see what data was picked up in stargazers__dbt_tmp210240814249 (the query building that intermediate table could be found in the logs/dbt.log file

That table could have given clues on what would be deleted and re-inserted by:

 delete
    from "postgres".github."stargazers"
    where (_airbyte_unique_key) in (
        select (_airbyte_unique_key)
        from "stargazers__dbt_tmp210240814249"
    );

    insert into "postgres".github."stargazers" ("user", "user_id", "repository", "starred_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_stargazers_hashid", "_airbyte_unique_key")
    (
       select "user", "user_id", "repository", "starred_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_stargazers_hashid", "_airbyte_unique_key"
       from "stargazers__dbt_tmp210240814249"
    );

But I still don't quite understand how you would end up with empty tables...

ChristopheDuong commented 2 years ago

But I still don't quite understand how you would end up with empty tables...

I am guessing, there was some "reset tables" involved in this scenario.

And so the issue boils down to the table being empty and breaking the incremental clause as discussed here https://github.com/airbytehq/airbyte/issues/8286#issuecomment-982402024