airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.04k stars 4.11k forks source link

Airbyte CDK: Improve error for returning non-iterable from connectors parse_response #17047

Closed Gitznik closed 6 months ago

Gitznik commented 2 years ago
## Environment - **Airbyte version**: 0.40.7 - **OS Version / Instance**: macOS - **Deployment**: Docker - **Source Connector and version**: Custom python HTTP source - **Step where error happened**: Running `python3 main.py read` while developing the connector ## Current Behavior If the `parse_response` method of the stream does not return an iterator, this leads to a fairly cryptic pydantic error message, instead of specyfing that an iterator is expected. As `response.json()` is type hinted with `Any`, my IDE did not catch this type violation for me and I was quite confused. ## Expected Behavior The error should tell me that the iterator returned from my `parse_response` function is incompatible with the expected data. If there is no use case for returning a dict as an iterator, it could even raise a warning if the `parse_response` returns a dict. ## Logs ``` {"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceNativex"}} {"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: campaigns "}} {"type": "LOG", "log": {"level": "ERROR", "message": "Encountered an exception while reading stream campaigns\nTraceback (most recent call last):\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n yield from self._read_stream(\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n for record in record_iterator:\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 282, in _read_full_refresh\n yield self._as_airbyte_record(configured_stream.stream.name, record)\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 317, in _as_airbyte_record\n message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)\n File \"pydantic/main.py\", line 341, in pydantic.main.BaseModel.__init__\npydantic.error_wrappers.ValidationError: 1 validation error for AirbyteRecordMessage\ndata\n value is not a valid dict (type=type_error.dict)"}} {"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing campaigns"}} {"type": "LOG", "log": {"level": "INFO", "message": "SourceNativex runtimes:\nSyncing stream campaigns 0:00:01.377093"}} {"type": "LOG", "log": {"level": "FATAL", "message": "1 validation error for AirbyteRecordMessage\ndata\n value is not a valid dict (type=type_error.dict)\nTraceback (most recent call last):\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/main.py\", line 13, in \n launch(source, sys.argv[1:])\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 123, in launch\n for message in source_entrypoint.run(parsed_args):\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 114, in run\n for message in generator:\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 127, in read\n raise e\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n yield from self._read_stream(\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n for record in record_iterator:\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 282, in _read_full_refresh\n yield self._as_airbyte_record(configured_stream.stream.name, record)\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 317, in _as_airbyte_record\n message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)\n File \"pydantic/main.py\", line 341, in pydantic.main.BaseModel.__init__\npydantic.error_wrappers.ValidationError: 1 validation error for AirbyteRecordMessage\ndata\n value is not a valid dict (type=type_error.dict)"}} {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": 1663859942510.5889, "error": {"message": "Something went wrong in the connector. See the logs for more details.", "internal_message": "1 validation error for AirbyteRecordMessage\ndata\n value is not a valid dict (type=type_error.dict)", "stack_trace": "Traceback (most recent call last):\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/main.py\", line 13, in \n launch(source, sys.argv[1:])\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 123, in launch\n for message in source_entrypoint.run(parsed_args):\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 114, in run\n for message in generator:\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 127, in read\n raise e\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n yield from self._read_stream(\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n for record in record_iterator:\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 282, in _read_full_refresh\n yield self._as_airbyte_record(configured_stream.stream.name, record)\n File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 317, in _as_airbyte_record\n message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)\n File \"pydantic/main.py\", line 341, in pydantic.main.BaseModel.__init__\npydantic.error_wrappers.ValidationError: 1 validation error for AirbyteRecordMessage\ndata\n value is not a valid dict (type=type_error.dict)\n", "failure_type": "system_error"}}} ``` ## Steps to Reproduce 1. Return a dict from a connectors `parse_response` method 2. Call `python3 main.py read --config secrets/config.json --catalog catalog.json` on it ## Are you willing to submit a PR? Sure. From what I can see the method is called from https://github.com/airbytehq/airbyte/blob/bb39b36016ca32baba3a762ee2a56fe3da358bcc/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py#L428 Would it make sense to do a type assert here?
marcosmarxm commented 2 years ago

Thanks for suggesting this, you can submit the change. The question is how do you will assert the output of yield?

Gitznik commented 2 years ago

Good question. Feels quite tricky to do it in a robust way here. Going down the stack trace, we could catch the pydantic.error_wrappers.ValidationError error caused in https://github.com/airbytehq/airbyte/blob/bb39b36016ca32baba3a762ee2a56fe3da358bcc/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py#L317 when passing anything that is not a dict and raise a custom error with some additional information, e.g. this likely cause? I imagine something like:

try:
    message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
except pydantic.error_wrappers.ValidationError as e:
    if "value is not a valid dict" in str(e) :
        raise TypeError("Should return an iterator of dics from parse response") from e
    raise e
marcosmarxm commented 2 years ago

I think it is a good idea @Gitznik do you mind submitting a PR then I'll ask the CDK team to review it?

girarda commented 2 years ago

@Gitznik FYI I ended up having to revert https://github.com/airbytehq/airbyte/issues/17047 because the airbyte_protocol.py file is autogenerated by our build system. Will need to look for a better way to run this validation

octavia-squidington-iii commented 6 months ago

At Airbyte, we seek to be clear about the project priorities and roadmap. This issue has not had any activity for 180 days, suggesting that it's not as critical as others. It's possible it has already been fixed. It is being marked as stale and will be closed in 20 days if there is no activity. To keep it open, please comment to let us know why it is important to you and if it is still reproducible on recent versions of Airbyte.

octavia-squidington-iii commented 6 months ago

This issue was closed because it has been inactive for 20 days since being marked as stale.