airbytehq / PyAirbyte

PyAirbyte brings the power of Airbyte to every Python developer.
https://docs.airbyte.com/pyairbyte
Other
178 stars 20 forks source link

🐛Bug: Duck DB Cache has Problems converting Big Numbers to Decimal #177

Closed tinomerl closed 2 months ago

tinomerl commented 2 months ago

Problem Description

PyAirbyte Version: v0.9.0 Source Version: source-facebook-marketing 2.1.1

When trying to get Data from the Facebook Marketing Source, to be precise the "ad_account" Stream I get the following Exception.

duckdb.duckdb.InvalidInputException: Invalid Input Error: JSON transform error in file ".cache/default_cache/ad_account_01HV39XAFK4DART550Y0KCFWS2.jsonl.gz", in line 1: Failed to cast value to decimal: 1234567890123456.0 Try setting 'auto_detect' to true, specifying 'format' or 'records' manually, or setting 'ignore_errors' to true.

(The value in the error is redacted since it's a live ID. But the number of digits is the same.)

The column which leads to the error is no particular clumn. I had the error with the following Columns

Interestingly the column id holds the same value as eg. end_advertiser minus the decimal. Making it 1234567890123456. This column doesn't throw an Exception.

Additional Info I tested the same stream with a PostgreSQL cache and it worked just fine. The column types there are numeric.

Sample Code

import airbyte as ab

source: ab.Source = ab.get_source("source-facebook-marketing")
source.set_config({
    "access_token": "redacted",
    "start_date": "2024-03-28T00:00:00Z",
    "account_ids": ["redacted"]
})
source.get_available_streams()
source.select_streams("ad_account")
records = source.read()

Error Log

Stack trace

``` Started `source-facebook-marketing` read operation at 08:07:42... Failed `source-facebook-marketing` read operation at 08:08:14. Traceback (most recent call last): File "/home/tino/repos/PyAirbyte/airbyte/sources/base.py", line 740, in read cache.processor.process_airbyte_messages( File "/home/tino/repos/PyAirbyte/airbyte/_processors/base.py", line 200, in process_airbyte_messages self.write_all_stream_data( File "/home/tino/repos/PyAirbyte/airbyte/_processors/base.py", line 211, in write_all_stream_data self.write_stream_data(stream_name, write_strategy=write_strategy) File "/home/tino/repos/PyAirbyte/airbyte/_processors/sql/base.py", line 555, in write_stream_data temp_table_name = self._write_files_to_new_table( File "/home/tino/repos/PyAirbyte/airbyte/_processors/sql/duckdb.py", line 124, in _write_files_to_new_table self._execute_sql(insert_statement) File "/home/tino/repos/PyAirbyte/airbyte/_processors/sql/base.py", line 629, in _execute_sql raise SQLRuntimeError(msg) from None # from ex airbyte._processors.sql.base.SQLRuntimeError: Error when executing SQL: INSERT INTO main.ad_account_01hv39xafkzpgnrcn5t4w1hh98 ( "account_id" , "account_status" , "age" , "amount_spent" , "balance" , "business" , "business_city" , "business_country_code" , "business_name" , "business_state" , "business_street" , "business_street2" , "business_zip" , "can_create_brand_lift_study" , "capabilities" , "created_time" , "currency" , "disable_reason" , "end_advertiser" , "end_advertiser_name" , "extended_credit_invoice_group" , "failed_delivery_checks" , "fb_entity" , "funding_source" , "funding_source_details" , "has_migrated_permissions" , "id" , "io_number" , "is_attribution_spec_system_default" , "is_direct_deals_enabled" , "is_in_3ds_authorization_enabled_market" , "is_notifications_enabled" , "is_personal" , "is_prepay_account" , "is_tax_id_required" , "line_numbers" , "media_agency" , "min_campaign_group_spend_cap" , "min_daily_budget" , "name" , "offsite_pixels_tos_accepted" , "owner" , "partner" , "rf_spec" , "spend_cap" , "tax_id" , "tax_id_status" , "tax_id_type" , "timezone_id" , "timezone_name" , "timezone_offset_hours_utc" , "tos_accepted" , "user_tasks" , "user_tos_accepted" , "_airbyte_raw_id" , "_airbyte_extracted_at" , "_airbyte_meta" ) SELECT "account_id" , "account_status" , "age" , "amount_spent" , "balance" , "business" , "business_city" , "business_country_code" , "business_name" , "business_state" , "business_street" , "business_street2" , "business_zip" , "can_create_brand_lift_study" , "capabilities" , "created_time" , "currency" , "disable_reason" , "end_advertiser" , "end_advertiser_name" , "extended_credit_invoice_group" , "failed_delivery_checks" , "fb_entity" , "funding_source" , "funding_source_details" , "has_migrated_permissions" , "id" , "io_number" , "is_attribution_spec_system_default" , "is_direct_deals_enabled" , "is_in_3ds_authorization_enabled_market" , "is_notifications_enabled" , "is_personal" , "is_prepay_account" , "is_tax_id_required" , "line_numbers" , "media_agency" , "min_campaign_group_spend_cap" , "min_daily_budget" , "name" , "offsite_pixels_tos_accepted" , "owner" , "partner" , "rf_spec" , "spend_cap" , "tax_id" , "tax_id_status" , "tax_id_type" , "timezone_id" , "timezone_name" , "timezone_offset_hours_utc" , "tos_accepted" , "user_tasks" , "user_tos_accepted" , "_airbyte_raw_id" , "_airbyte_extracted_at" , "_airbyte_meta" FROM read_json_auto( ['.cache/default_cache/ad_account_01HV39XAFK4DART550Y0KCFWS2.jsonl.gz'], format = 'newline_delimited', union_by_name = true, columns = { "account_id": VARCHAR , "account_status": BIGINT , "age": DECIMAL , "amount_spent": VARCHAR , "balance": VARCHAR , "business": JSON , "business_city": VARCHAR , "business_country_code": VARCHAR , "business_name": VARCHAR , "business_state": VARCHAR , "business_street": VARCHAR , "business_street2": VARCHAR , "business_zip": VARCHAR , "can_create_brand_lift_study": BOOLEAN , "capabilities": JSON , "created_time": TIMESTAMP , "currency": VARCHAR , "disable_reason": DECIMAL , "end_advertiser": DECIMAL , "end_advertiser_name": VARCHAR , "extended_credit_invoice_group": JSON , "failed_delivery_checks": JSON , "fb_entity": DECIMAL , "funding_source": DECIMAL , "funding_source_details": JSON , "has_migrated_permissions": BOOLEAN , "id": VARCHAR , "io_number": DECIMAL , "is_attribution_spec_system_default": BOOLEAN , "is_direct_deals_enabled": BOOLEAN , "is_in_3ds_authorization_enabled_market": BOOLEAN , "is_notifications_enabled": BOOLEAN , "is_personal": DECIMAL , "is_prepay_account": BOOLEAN , "is_tax_id_required": BOOLEAN , "line_numbers": DECIMAL , "media_agency": DECIMAL , "min_campaign_group_spend_cap": DECIMAL , "min_daily_budget": DECIMAL , "name": VARCHAR , "offsite_pixels_tos_accepted": BOOLEAN , "owner": DECIMAL , "partner": DECIMAL , "rf_spec": JSON , "spend_cap": VARCHAR , "tax_id": VARCHAR , "tax_id_status": DECIMAL , "tax_id_type": VARCHAR , "timezone_id": DECIMAL , "timezone_name": VARCHAR , "timezone_offset_hours_utc": DECIMAL , "tos_accepted": JSON , "user_tasks": JSON , "user_tos_accepted": JSON , "_airbyte_raw_id": VARCHAR , "_airbyte_extracted_at": TIMESTAMP , "_airbyte_meta": JSON } ) ProgrammingError(duckdb.duckdb.InvalidInputException) Invalid Input Error: JSON transform error in file ".cache/default_cache/ad_account_01HV39XAFK4DART550Y0KCFWS2.jsonl.gz", in line 1: Failed to cast value to decimal: 1234567890123456.0 Try setting 'auto_detect' to true, specifying 'format' or 'records' manually, or setting 'ignore_errors' to true. [SQL: INSERT INTO main.ad_account_01hv39xafkzpgnrcn5t4w1hh98 ( "account_id" , "account_status" , "age" , "amount_spent" , "balance" , "business" , "business_city" , "business_country_code" , "business_name" , "business_state" , "business_street" , "business_street2" , "business_zip" , "can_create_brand_lift_study" , "capabilities" , "created_time" , "currency" , "disable_reason" , "end_advertiser" , "end_advertiser_name" , "extended_credit_invoice_group" , "failed_delivery_checks" , "fb_entity" , "funding_source" , "funding_source_details" , "has_migrated_permissions" , "id" , "io_number" , "is_attribution_spec_system_default" , "is_direct_deals_enabled" , "is_in_3ds_authorization_enabled_market" , "is_notifications_enabled" , "is_personal" , "is_prepay_account" , "is_tax_id_required" , "line_numbers" , "media_agency" , "min_campaign_group_spend_cap" , "min_daily_budget" , "name" , "offsite_pixels_tos_accepted" , "owner" , "partner" , "rf_spec" , "spend_cap" , "tax_id" , "tax_id_status" , "tax_id_type" , "timezone_id" , "timezone_name" , "timezone_offset_hours_utc" , "tos_accepted" , "user_tasks" , "user_tos_accepted" , "_airbyte_raw_id" , "_airbyte_extracted_at" , "_airbyte_meta" ) SELECT "account_id" , "account_status" , "age" , "amount_spent" , "balance" , "business" , "business_city" , "business_country_code" , "business_name" , "business_state" , "business_street" , "business_street2" , "business_zip" , "can_create_brand_lift_study" , "capabilities" , "created_time" , "currency" , "disable_reason" , "end_advertiser" , "end_advertiser_name" , "extended_credit_invoice_group" , "failed_delivery_checks" , "fb_entity" , "funding_source" , "funding_source_details" , "has_migrated_permissions" , "id" , "io_number" , "is_attribution_spec_system_default" , "is_direct_deals_enabled" , "is_in_3ds_authorization_enabled_market" , "is_notifications_enabled" , "is_personal" , "is_prepay_account" , "is_tax_id_required" , "line_numbers" , "media_agency" , "min_campaign_group_spend_cap" , "min_daily_budget" , "name" , "offsite_pixels_tos_accepted" , "owner" , "partner" , "rf_spec" , "spend_cap" , "tax_id" , "tax_id_status" , "tax_id_type" , "timezone_id" , "timezone_name" , "timezone_offset_hours_utc" , "tos_accepted" , "user_tasks" , "user_tos_accepted" , "_airbyte_raw_id" , "_airbyte_extracted_at" , "_airbyte_meta" FROM read_json_auto( ['.cache/default_cache/ad_account_01HV39XAFK4DART550Y0KCFWS2.jsonl.gz'], format = 'newline_delimited', union_by_name = true, columns = { "account_id": VARCHAR , "account_status": BIGINT , "age": DECIMAL , "amount_spent": VARCHAR , "balance": VARCHAR , "business": JSON , "business_city": VARCHAR , "business_country_code": VARCHAR , "business_name": VARCHAR , "business_state": VARCHAR , "business_street": VARCHAR , "business_street2": VARCHAR , "business_zip": VARCHAR , "can_create_brand_lift_study": BOOLEAN , "capabilities": JSON , "created_time": TIMESTAMP , "currency": VARCHAR , "disable_reason": DECIMAL , "end_advertiser": DECIMAL , "end_advertiser_name": VARCHAR , "extended_credit_invoice_group": JSON , "failed_delivery_checks": JSON , "fb_entity": DECIMAL , "funding_source": DECIMAL , "funding_source_details": JSON , "has_migrated_permissions": BOOLEAN , "id": VARCHAR , "io_number": DECIMAL , "is_attribution_spec_system_default": BOOLEAN , "is_direct_deals_enabled": BOOLEAN , "is_in_3ds_authorization_enabled_market": BOOLEAN , "is_notifications_enabled": BOOLEAN , "is_personal": DECIMAL , "is_prepay_account": BOOLEAN , "is_tax_id_required": BOOLEAN , "line_numbers": DECIMAL , "media_agency": DECIMAL , "min_campaign_group_spend_cap": DECIMAL , "min_daily_budget": DECIMAL , "name": VARCHAR , "offsite_pixels_tos_accepted": BOOLEAN , "owner": DECIMAL , "partner": DECIMAL , "rf_spec": JSON , "spend_cap": VARCHAR , "tax_id": VARCHAR , "tax_id_status": DECIMAL , "tax_id_type": VARCHAR , "timezone_id": DECIMAL , "timezone_name": VARCHAR , "timezone_offset_hours_utc": DECIMAL , "tos_accepted": JSON , "user_tasks": JSON , "user_tos_accepted": JSON , "_airbyte_raw_id": VARCHAR , "_airbyte_extracted_at": TIMESTAMP , "_airbyte_meta": JSON } ) ] (Background on this error at: https://sqlalche.me/e/14/f405) The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/tino/.pyenv/versions/3.10.13/lib/python3.10/runpy.py", line 196, in _run_module_as_main return _run_code(code, main_globals, None, File "/home/tino/.pyenv/versions/3.10.13/lib/python3.10/runpy.py", line 86, in _run_code exec(code, run_globals) File "/home/tino/.vscode-server/extensions/ms-python.debugpy-2024.4.0-linux-x64/bundled/libs/debugpy/adapter/../../debugpy/launcher/../../debugpy/__main__.py", line 39, in cli.main() File "/home/tino/.vscode-server/extensions/ms-python.debugpy-2024.4.0-linux-x64/bundled/libs/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py", line 430, in main run() File "/home/tino/.vscode-server/extensions/ms-python.debugpy-2024.4.0-linux-x64/bundled/libs/debugpy/adapter/../../debugpy/launcher/../../debugpy/../debugpy/server/cli.py", line 284, in run_file runpy.run_path(target, run_name="__main__") File "/home/tino/.vscode-server/extensions/ms-python.debugpy-2024.4.0-linux-x64/bundled/libs/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 321, in run_path return _run_module_code(code, init_globals, run_name, File "/home/tino/.vscode-server/extensions/ms-python.debugpy-2024.4.0-linux-x64/bundled/libs/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 135, in _run_module_code _run_code(code, mod_globals, init_globals, File "/home/tino/.vscode-server/extensions/ms-python.debugpy-2024.4.0-linux-x64/bundled/libs/debugpy/_vendored/pydevd/_pydevd_bundle/pydevd_runpy.py", line 124, in _run_code exec(code, run_globals) File "/home/tino/repos/PyAirbyte/live_tests/facebook_test.py", line 11, in records = source.read() File "/home/tino/repos/PyAirbyte/airbyte/sources/base.py", line 749, in read raise exc.AirbyteConnectorFailedError( airbyte.exceptions.AirbyteConnectorFailedError: AirbyteConnectorFailedError: Connector failed. Log output: Starting syncing SourceFacebookMarketing Marking stream ad_account as STARTED Syncing stream: ad_account Marking stream ad_account as RUNNING Read 1 records from ad_account stream Marking stream ad_account as STOPPED Finished syncing ad_account SourceFacebookMarketing runtimes: Syncing stream ad_account 0:00:06.926236 Finished syncing SourceFacebookMarketing ```

aaronsteers commented 2 months ago

@tinomerl - Thanks for logging this!

aaronsteers commented 2 months ago

@ellipsis-dev - Will you try to create a PR that resolves this? We want to use the largest possible bigquery data type for integers. This is controlled in the BigQueryTypeConverter class, and specifically in the to_sql_type() method.

aaronsteers commented 2 months ago

@tinomerl - I did a quick check and it looks like this value can fit without issue if the data type is integer. The issues is that the value is being declared as a (decimal), and we don't know where the decimal place should be - and we have to account for high precision (after the decimal place) as well as being a large number in itself.

We can expand to the next-largest size for decimal values, but I don't see any reason why this should be declared as a decimal value instead of an int. I'll take a look at the source and see if we can update it to use integer type.

Update: code is here: https://github.com/airbytehq/airbyte/blob/27e851c5caf9bfc8d9e555370a0aeedef959c67d/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ad_account.json#L10

Update: draft PR here for discussion:

aaronsteers commented 2 months ago

Potential fix (PyAirbyte-side) here:

tinomerl commented 2 months ago

Hey @aaronsteers,

I agree with you, that these values should be integers instead of numbers. But i think it stems from the fact that in the Docs for the Endpoint these fields are declared as "numeric strings" while others are declared as int or unsigned int.

https://developers.facebook.com/docs/marketing-api/reference/ad-account

I also pulled your fix and tested the stream locally with 28 different ad accounts. Worked perfectly.