airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.07k stars 4.11k forks source link

Normalization Error: 'dict object' has no attribute 'XXX' #19220

Closed mattppal closed 1 year ago

mattppal commented 1 year ago

Environment

Current Behavior

There are four tables where I receive the following error:

1 of 48 SKIP relation airbyte_account.airbyte_account_addresses due to ephemeral model error............................ [ERROR SKIP]
Compilation Error in model _airbyte_airbyte_account_addresses_ab1 (models/generated/airbyte_ctes/airbyte_account/_airbyte_airbyte_account_addresses_ab1.sql)
  'dict object' has no attribute 'addresses_ab1'

  > in macro redshift__json_extract_scalar (macros/cross_db_utils/json_operations.sql)
  > called by macro json_extract_scalar (macros/cross_db_utils/json_operations.sql)
  > called by macro redshift_super_type (macros/configuration.sql)
  > called by model _airbyte_airbyte_account_addresses_ab1 (models/generated/airbyte_ctes/airbyte_account/_airbyte_airbyte_account_addresses_ab1.sql)

I pulled the relevant file from the volume: models/generated/airbyte_ctes/airbyte_account/_airbyte_airbyte_account_addresses_ab1.sql, but it's not immediately apparent what could be causing this error. All other tables complete successfully.

{{ config(
    sort = "_airbyte_emitted_at",
    unique_key = '_airbyte_ab_id',
    alias = "addresses_ab1",
    schema = "_airbyte_airbyte_account",
    tags = [ "top-level-intermediate" ]
) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: {{ source('airbyte_account', '_airbyte_raw_addresses') }}
select
    {{ json_extract_scalar('_airbyte_data', ['id'], ['id']) }} as id,
    {{ json_extract_scalar('_airbyte_data', ['type'], ['type']) }} as type,
    {{ json_extract_scalar('_airbyte_data', ['unit'], ['unit']) }} as unit,
    {{ json_extract_scalar('_airbyte_data', ['point'], ['point']) }} as point,
    {{ json_extract_scalar('_airbyte_data', ['county'], ['county']) }} as county,
    {{ json_extract_scalar('_airbyte_data', ['number'], ['number']) }} as number,
    {{ json_extract_scalar('_airbyte_data', ['region'], ['region']) }} as region,
    {{ json_extract_scalar('_airbyte_data', ['country'], ['country']) }} as country,
    {{ json_extract_scalar('_airbyte_data', ['latitude'], ['latitude']) }} as latitude,
    {{ json_extract_scalar('_airbyte_data', ['locality'], ['locality']) }} as locality,
    {{ json_extract_scalar('_airbyte_data', ['metadata'], ['metadata']) }} as metadata,
    {{ json_extract_scalar('_airbyte_data', ['zip_code'], ['zip_code']) }} as zip_code,
    {{ json_extract_scalar('_airbyte_data', ['longitude'], ['longitude']) }} as longitude,
    {{ json_extract_scalar('_airbyte_data', ['created_at'], ['created_at']) }} as created_at,
    {{ json_extract_scalar('_airbyte_data', ['updated_at'], ['updated_at']) }} as updated_at,
    {{ json_extract_scalar('_airbyte_data', ['street_name'], ['street_name']) }} as street_name,
    {{ json_extract_scalar('_airbyte_data', ['plus4_zip_code'], ['plus4_zip_code']) }} as plus4_zip_code,
    _airbyte_ab_id,
    _airbyte_emitted_at,
    {{ current_timestamp() }} as _airbyte_normalized_at
from {{ source('airbyte_account', '_airbyte_raw_addresses') }} as table_alias
-- addresses
where 1 = 1
{{ incremental_clause('_airbyte_emitted_at', this) }}

Expected Behavior

The sync should complete successfully.

Logs

(too long to attach)

Steps to Reproduce

  1. Setup connection Postgres <> Redshift [Incremental Append]
  2. Perform sync with normalization enabled
  3. All steps succeed
  4. Normalization fails
mattppal commented 1 year ago

Some more context: this error appears in the following tables:

So it's possible:

  1. There's something breaking due to tables having the same name.
  2. There's a commonality among a field in those tables causing the error.
mattppal commented 1 year ago

Something I'm noticing in the config block:

{{ config(
    sort = "_airbyte_emitted_at",
    unique_key = '_airbyte_ab_id',
    alias = "addresses_ab1",
    schema = "_airbyte_airbyte_account",
    tags = [ "top-level-intermediate" ]
) }}

Only the tables that are failing have an alias attribute defined. Why is airbyte setting the alias in these tables and not others? I think this is leading to the failure 'dict object' has no attribute 'addresses_ab1'.

mattppal commented 1 year ago

Additionally, the naming convention for the failed tables differs from the successful ones

image

mattppal commented 1 year ago

I can confirm that the issue is caused by streams with the same name, even in different schemas. This was resolved by splitting my streams into multiple connections where names are unique.

evantahler commented 1 year ago

Closing, as normalization as we know it is going away fairly soon. Follow along here: https://github.com/airbytehq/airbyte/issues/26028