airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.51k stars 4k forks source link

[file-cdk] `avro` loader cannot load custom logical types that fastavro can #36918

Open imrehg opened 5 months ago

imrehg commented 5 months ago

Topic

CDK Avro load failure

Relevant information

Airbyte's Avro loader in the CDK tries to interpret the schema before loading it, and it relies on fastavro to do the majority of the work, however also applies extra logic when trying to decide if the contents of the file can be loaded.

Currently custom logical types are not supported in Airbyte, even if fastavro does support them (either loading by their underlying primitive types, or if provided a decoding function, then decoding them). Instead it checks if the logical type is one of the official logical types, and if not it declares it unsupported.

Airbyte should instead load those fields with the underlying types, and leave any further decoding to the user.

This suggestion is baed on ideas at: https://github.com/airbytehq/airbyte/pull/36888#pullrequestreview-1987260725

Example

I cannot attach an example avro file here (GitHub limitations), but here's script, modified from fastavro's example that creates such a file and can be tried out

import datetime

import fastavro
from fastavro import parse_schema, reader, writer

schema = {
    "name": "LogicalTest",
    "type": "record",
    "fields": [
        {"name": "temp", "type": "int"},
        {
            "name": "date",
            "type": ["null", {"logicalType": "datetime", "type": "string"}],
        },
    ],
}
parsed_schema = parse_schema(schema)

now = datetime.datetime.now(tz=datetime.timezone.utc)
delta = datetime.timedelta(minutes=1)

# 'records' can be an iterable (including generator)
records = [
    {"temp": 0, "date": now - 3 * delta},
    {"temp": 22, "date": now - 2 * delta},
    {"temp": -11, "date": now - delta},
    {"temp": 111, "date": now},
]

def encode_datetime_as_string(data, *args):
    return datetime.datetime.isoformat(data)

def decode_string_as_datetime(data, *args):
    return datetime.datetime.fromisoformat(data)

fastavro.write.LOGICAL_WRITERS["string-datetime"] = encode_datetime_as_string

#####
# Uncomment the next line to see the decoded logical type,
# otherwise the string-engcoded datetime will be shown during read,
# as it would be when using AirByte where no custom readers are defined.
#####
# fastavro.read.LOGICAL_READERS["string-datetime"] = decode_string_as_datetime

# Writing
with open("example.avro", "wb") as out:
    writer(out, parsed_schema, records)

# Reading
with open("example.avro", "rb") as fo:
    for record in reader(fo):
        print(record)

This script reads back the example file as:

{'temp': 0, 'date': '2024-04-09T04:11:16.504105+00:00'}
{'temp': 22, 'date': '2024-04-09T04:12:16.504105+00:00'}
{'temp': -11, 'date': '2024-04-09T04:13:16.504105+00:00'}
{'temp': 111, 'date': '2024-04-09T04:14:16.504105+00:00'}

On the other hand, in Airbyte I get the following (when also using the fixes from https://github.com/airbytehq/airbyte/pull/36888 for meaningful error message):

Screenshot 2024-04-09 at 12 23 05

Here's the relevant Slack thread in the Airbyte channels: https://airbytehq.slack.com/archives/C021JANJ6TY/p1712632429290159?thread_ts=1712557994.937269&cid=C021JANJ6TY

marcosmarxm commented 5 months ago

I added this to team backlog to further investigation. Thanks @imrehg