airbytehq / PyAirbyte

PyAirbyte brings the power of Airbyte to every Python developer.
https://docs.airbyte.com/pyairbyte
Other
215 stars 31 forks source link

🐛 Bug: Google Ads fails due to nested primary keys #181

Closed aaronsteers closed 3 weeks ago

aaronsteers commented 5 months ago

The Google Ads source is failing due to nested primary keys. Logged here in slack: https://airbytehq-team.slack.com/archives/C06FZ238P8W/p1712842623104989?thread_ts=1712842217.390879&cid=C06FZ238P8W

Example error message:

Source code:

aaronsteers commented 5 months ago

As a first step to resolving, we should try to find out (1) which stream is causing this error and (2) if that stream is standard or dynamic from the user workspace and/or user config.

sukantaroy01 commented 2 months ago

Facing this error, @aaronsteers do we have any RCA on this after the issue was initially raised.

sukantaroy01 commented 2 months ago

Sample code I am running:


source_gads = ab.get_source('source-google-ads', install_if_missing=True, config=google_payload.get('configuration'))
source_gads.select_streams(['ad_group'])

result = source_gads.read()

Response:

Read Progress
Started reading at 14:08:55.

Read 1,562 records over 7 seconds (223.1 records / second).

Wrote 1,562 records over 1 batches.

Finished reading at 14:09:03.

Started finalizing streams at 14:09:03.

Finalized 0 batches over 0 seconds.

Failed `source-google-ads` read operation at 19:39:03.

Error stacktrace:


NotImplementedError                       Traceback (most recent call last)
File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:804, in Source.read(self, cache, streams, write_strategy, force_full_refresh, skip_validation)
    [803](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:803) try:
--> [804](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:804)     cache_processor.process_airbyte_messages(
    [805](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:805)         self._read_with_catalog(
    [806](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:806)             catalog=self.configured_catalog,
    [807](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:807)             state=state_provider,
    [808](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:808)         ),
    [809](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:809)         write_strategy=write_strategy,
    [810](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:810)     )
    [812](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:812) # TODO: We should catch more specific exceptions here

File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:217, in RecordProcessorBase.process_airbyte_messages(self, messages, write_strategy)
    [215](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:215) # We've finished processing input data.
    [216](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:216) # Finalize all received records and state messages:
--> [217](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:217) self.write_all_stream_data(
    [218](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:218)     write_strategy=write_strategy,
    [219](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:219) )
    [221](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:221) self.cleanup_all()

File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:226, in RecordProcessorBase.write_all_stream_data(self, write_strategy)
    [225](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:225) for stream_name in self.catalog_provider.stream_names:
--> [226](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:226)     self.write_stream_data(
    [227](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:227)         stream_name,
    [228](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:228)         write_strategy=write_strategy,
    [229](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:229)     )

File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:510, in SqlProcessorBase.write_stream_data(self, stream_name, write_strategy)
    [509](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:509) try:
--> [510](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:510)     self._write_temp_table_to_final_table(
    [511](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:511)         stream_name=stream_name,
    [512](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:512)         temp_table_name=temp_table_name,
    [513](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:513)         final_table_name=final_table_name,
    [514](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:514)         write_strategy=write_strategy,
    [515](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:515)     )
    [516](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:516) finally:

File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:693, in SqlProcessorBase._write_temp_table_to_final_table(self, stream_name, temp_table_name, final_table_name, write_strategy)
    [692](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:692) """Write the temp table into the final table using the provided write strategy."""
--> [693](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:693) has_pks: bool = bool(self._get_primary_keys(stream_name))
    [694](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:694) has_incremental_key: bool = bool(self._get_incremental_key(stream_name))

File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:792, in SqlProcessorBase._get_primary_keys(self, stream_name)
    [791](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:791)         msg = f"Nested primary keys are not yet supported. Found: {pk}"
--> [792](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:792)         raise NotImplementedError(msg)
    [794](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:794) return joined_pks

NotImplementedError: Nested primary keys are not yet supported. Found: ad_group.id

The above exception was the direct cause of the following exception:

AirbyteConnectorFailedError               Traceback (most recent call last)
Cell In[30], [line 4](vscode-notebook-cell:?execution_count=30&line=4)
      [1](vscode-notebook-cell:?execution_count=30&line=1) import time
      [3](vscode-notebook-cell:?execution_count=30&line=3) start_time = time.time()
----> [4](vscode-notebook-cell:?execution_count=30&line=4) result = source_gads.read()
      [7](vscode-notebook-cell:?execution_count=30&line=7) end_time = time.time()
      [8](vscode-notebook-cell:?execution_count=30&line=8) print(f"Time required : {end_time - start_time}")

File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:815, in Source.read(self, cache, streams, write_strategy, force_full_refresh, skip_validation)
    [813](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:813) except Exception as ex:
    [814](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:814)     self._log_sync_failure(cache=cache, exception=ex)
--> [815](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:815)     raise exc.AirbyteConnectorFailedError(
    [816](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:816)         log_text=self._last_log_messages,
    [817](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:817)     ) from ex
    [819](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:819) self._log_sync_success(cache=cache)
    [820](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:820) return ReadResult(
    [821](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:821)     processed_records=self._processed_records,
    [822](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:822)     cache=cache,
    [823](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:823)     processed_streams=[stream.stream.name for stream in self.configured_catalog.streams],
    [824](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:824) )

AirbyteConnectorFailedError: AirbyteConnectorFailedError: Connector failed.

Log output: 
        Found 1 customers: ['<AD_ACCOUNT_ID_REMOVED>']
    Marking stream ad_group as STARTED
    Syncing stream: ad_group 
    I0000 00:00:1721398140.339362   84332 check_gcp_environment_no_op.cc:29] ALTS: Platforms other than Linux and Windows are not supported

    Marking stream ad_group as RUNNING
    Read 1562 records from ad_group stream
    Marking stream ad_group as STOPPED
    Finished syncing ad_group
    SourceGoogleAds runtimes:
    Syncing stream ad_group 0:00:03.165605
    Finished syncing SourceGoogleAds
henriquelino commented 2 months ago

Having the same error message here.

My code:

import airbyte as ab

source = ab.get_source("source-google-ads")
source.set_config(
    {
        "credentials": {
            "developer_token": "...",
            "client_id": "...",
            "client_secret": "...",
            "refresh_token": "...",
        },
        "customer_id": '...',
    }
)
source.check()

source.select_streams(['campaign_criterion'])
read_result: ab.ReadResult = source.read()

raises: NotImplementedError: Nested primary keys are not yet supported. Found: campaign_criterion.resource_name