airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.58k stars 4.01k forks source link

[destination-s3] `_airbyte_additional_properties` is empty/NULL when used at top level of JSON Schema #36326

Open NatElkins opened 6 months ago

NatElkins commented 6 months ago

Connector Name

destination-s3

Connector Version

v0.5.9

What step the error happened?

During the sync

Relevant information

Issue: The S3 destination is not writing non-included properties into the _airbyte_additional_properties column.

I am using a custom connector with Airbyte cloud. I believe my schema is set up properly for my source. Here it is:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": [
    "object"
  ],
  "additionalProperties": true, // <-- Doesn't seem to be respected
  "required": [
    "id",
    "employeeId",
    "knoetic_table_name"
  ],
  "properties": {
    "id": {
      "type": [ "string", "integer" ]
    },
    "employeeId": {
      "type": [
        "string", "integer"
      ]
    },
    "knoetic_table_name": {
      "type": [ "string"]
    }
  }
}

I'm able to run the source locally. I see the record coming through in the logs:

{"type": "RECORD", "record": {"stream": "tables_stream", "data": {"id": 8, "employeeId": 123, "customDate1": "2019-07-01", "customAmount1": "100000.00 EUR", "customTargetVariableReason": "Commissions", "customComment1": "", "customChangeReason": "", "knoetic_table_name": "customTargetVariable"}, "emitted_at": 1710954734415}}

Here's what I see in the table after the sync:

image

Here's what I see after refreshing the stream schema:

image

It looks to me like support for additional properties was used in the past: https://github.com/airbytehq/airbyte/pull/7288

I'm at a loss of what to do next. Any help would be greatly appreciated!

Relevant log output

No response

Contribute

NatElkins commented 6 months ago

To confirm the schema was looking as expected, I add the following to the constructor of my source stream:

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        schema = self.get_json_schema()
        print(f"The schema is {schema}")

And the output in the log is as follows:

The schema is {'$schema': 'http://json-schema.org/draft-07/schema#', 'type': ['object'], 'additionalProperties': True, 'required': ['id', 'employeeId', 'knoetic_table_name'], 'properties': {'id': {'type': ['string', 'integer']}, 'employeeId': {'type': ['string', 'integer']}, 'knoetic_table_name': {'type': ['string']}}}
marcosmarxm commented 6 months ago

I added the issue to the team backlog to further investigation. Thanks for reporting it.

NatElkins commented 6 months ago

BTW when I use additionalProperties not at the top level of the schema, it works without issue. For example, when I update my schema with a data property with additionalProperties set to true:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": [
    "object"
  ],
  "required": [
    "id",
    "employeeId",
    "table_name",
    "data"
  ],
  "properties": {
    "id": {
      "type": [ "string", "integer" ]
    },
    "employeeId": {
      "type": [
        "string", "integer"
      ]
    },
    "table_name": {
      "type": [ "string"]
    },
    "data": {    // <-- Added this new property
      "type": [ "object" ],
      "additionalProperties": true
    }
  }
}

And then change my parse_response function to modify the record to fill that in:

    def parse_response(
        self,
        response: requests.Response,
        stream_state: Mapping[str, Any] = None,
        stream_slice: Mapping[str, Any] = None,
        **kwargs,
    ) -> Iterable[Mapping[str, Any]]:
        table_name = stream_slice["table"]
        try:
            # This will raise an exception if the response is not 2xx
            response.raise_for_status()

            # Making a new record to be compatible with the schema
            for table_record in response.json():
                id = table_record.get("id")
                employee_id = table_record.get("employeeId")

                new_record = {
                    "id": id,
                    "employeeId": employee_id,
                    "table_name": table_name,
                    "data": table_record,
                }

                yield new_record
        except HTTPError as e:
            # Check to see if this error code is one we expect.
            # If so, raise an error.
            if not (
                self.skip_http_status_codes
                and e.response.status_code in self.skip_http_status_codes
            ):
                raise e

            # Otherwise, just log a warning.
            self.logger.warning(
                f"Stream `{self.name}`. An error occurred, details: {e}. Skipping for now."
            )
            yield {}

It works (see the data field):

image
NatElkins commented 6 months ago

I was also trying to figure out how to set a "schemaless" schema (which is essentially what I'm doing), but I could figure out how to do it via the CDK, even though it's mentioned here: https://docs.airbyte.com/understanding-airbyte/schemaless-sources-and-destinations#schemaless-schema