airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.27k stars 4.15k forks source link

Partitioning by expressions of type FLOAT64 is not allowed for Primary Key #2684

Closed leecheeaun closed 3 years ago

leecheeaun commented 3 years ago

Issue

Using the mode Incremental - Deduped + History - unable to select an alternative Primary Key as the UI auto-detects the primary key of the table. If the Primary Key is of type FLOAT64, the sync fails.

Airbyte Version

0.18.2-alpha

Connector Version (if applicable)

SQL Server - 0.2.3 BigQuery - 0.2.2

sherifnada commented 3 years ago

@leecheeaun could you share the logs from the failed job?

Dock1100 commented 3 years ago

Hi, today faced the same issue. Trying to replicate Postgres to BigQuery (mode Incremental - Deduped + History). Job fails with error:

normalization - 2021-11-16 11:49:42 INFO () LineGobbler(voidCall):82 - Database Error in model my_table_name_scd (models/generated/airbyte_incremental/scd/my_namespace/my_table_name_scd.sql)
normalization - 2021-11-16 11:49:42 INFO () LineGobbler(voidCall):82 -   Partitioning by expressions of type FLOAT64 is not allowed at [63:47]
normalization - 2021-11-16 11:49:42 INFO () LineGobbler(voidCall):82 -   compiled SQL at ../build/run/airbyte_utils/models/generated/airbyte_incremental/scd/my_namespace/my_table_name_scd.sql

my_table_name_scd.sql

  create or replace table `google_project_id`.my_namespace.`my_table_name_scd`
  partition by range_bucket(
            _airbyte_active_row,
            generate_array(0, 1, 1)
        )
  cluster by _airbyte_unique_key_scd, _airbyte_emitted_at
  OPTIONS()
  as (

with

input_data as (
    select *
    from `google_project_id`._airbyte_my_namespace.`my_table_name_ab3`
    -- my_table_name from `google_project_id`.my_namespace._airbyte_raw_my_table_name
),

scd_data as (
    -- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
    select
      to_hex(md5(cast(concat(coalesce(cast(id as
    string
), '')) as
    string
))) as _airbyte_unique_key,
        id,
        promo_id,
        date_used,
        balance_id,
        purchase_id,
      id as _airbyte_start_at,
      lag(id) over (
        partition by cast(id as
    string
)
        order by
            id is null asc,
            id desc,
            _airbyte_emitted_at desc
      ) as _airbyte_end_at,
      case when row_number() over (
        partition by cast(id as
    string
)
        order by
            id is null asc,
            id desc,
            _airbyte_emitted_at desc
      ) = 1 then 1 else 0 end as _airbyte_active_row,
      _airbyte_ab_id,
      _airbyte_emitted_at,
      _airbyte_my_table_name_hashid
    from input_data
),
dedup_data as (
    select
        -- we need to ensure de-duplicated rows for merge/update queries
        -- additionally, we generate a unique key for the scd table
        row_number() over (
            partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
            order by _airbyte_ab_id
        ) as _airbyte_row_num,
        to_hex(md5(cast(concat(coalesce(cast(_airbyte_unique_key as
    string
), ''), '-', coalesce(cast(_airbyte_start_at as
    string
), ''), '-', coalesce(cast(_airbyte_emitted_at as
    string
), '')) as
    string
))) as _airbyte_unique_key_scd,
        scd_data.*
    from scd_data
)
select
    _airbyte_unique_key,
    _airbyte_unique_key_scd,
        id,
        promo_id,
        date_used,
        balance_id,
        purchase_id,
    _airbyte_start_at,
    _airbyte_end_at,
    _airbyte_active_row,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    CURRENT_TIMESTAMP() as _airbyte_normalized_at,
    _airbyte_my_table_name_hashid
from dedup_data where _airbyte_row_num = 1
  );
ChristopheDuong commented 3 years ago

Could you create a new issue instead of commenting on this closed one, please?

it seems like we'd need to add cast on the cursor fields to string and avoid manipulating them as float.

Can you confirm the datatype of _airbyte_start_at? or your cursor field?

Thanks

Dock1100 commented 3 years ago

_airbyte_start_at - timestamp cursor field (id) - FLOAT, which is kind of strange, because in PostgresDb it's serial (and it's also strange that other *_id fields are float, they should be int)

created tables in bq: my_table_name

Field name  Type    Mode    Policy Tags Description
id  FLOAT   NULLABLE    
promo_id    FLOAT   NULLABLE    
date_used   STRING  NULLABLE    
balance_id  FLOAT   NULLABLE    
purchase_id FLOAT   NULLABLE    
_airbyte_ab_id  STRING  NULLABLE    
_airbyte_emitted_at TIMESTAMP   NULLABLE    
_airbyte_normalized_at  TIMESTAMP   NULLABLE    
_airbyte_b2c_pg_promo_usage_hashid  STRING  NULLABLE    

_airbyte_raw_my_table_name

Field name  Type    Mode    Policy Tags Description
_airbyte_ab_id  STRING  NULLABLE    
_airbyte_emitted_at TIMESTAMP   NULLABLE    
_airbyte_data   STRING  NULLABLE