airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
14.73k stars 3.78k forks source link

Destination S3: date type written as dictionary for parquet format #14028

Open marcosmarxm opened 2 years ago

marcosmarxm commented 2 years ago

This Github issue is synchronized with Zendesk:

Ticket ID: #1152 Priority: normal Group: User Success Engineer Assignee: Marcos Marx

Original ticket description:

  • Is this your first time deploying Airbyte?: No
  • OS Version / Instance: Kubernetes
  • Memory / Disk: unlimited (scalable cluster)
  • Deployment: Kubernetes
  • Airbyte Version: 0.35.64-alpha
  • Source name/version: source-google-analytics-v4 0.3.22
  • Destination name/version: destination-s3 / 0.3.5
  • Step: The issue is happening during sync
  • Description: the date column of the Google analytics are written as a dictionnary in parquet file instead of string

I did try to sync to the s3 destination in json format, and it worked (the dates were strings)
is this a bug or this is how parquet encode dates ?

[Discourse post]
marcosmarxm commented 2 years ago

I was able to reproduce issue with integration account. Other fields are unnest correctly.

marcosmarxm commented 2 years ago

Comment made from Zendesk by Marcos Marx on 2022-06-22 at 20:39:

I created the issue https://github.com/airbytehq/airbyte/issues/14028 looks it's a bug with S3 destination connector. I added to connector roadmap to solve it. I'll update you when this is fix.
blarghmatey commented 1 year ago

I'm wondering if there has been any progress on this issue yet? We are currently using the S3 destination to populate our data lake environment with Parquet files and this is preventing us from being able to execute queries on any tables that contain a datetime value.

tuliren commented 1 year ago

This issue is very strange in that:

The current { member0: string, member1: none } value is very confusing. I don't know how come it can be an object. It's probably something peculiar from the Google Analytics schema.

There is no progress in this issue so far. Unfortunately we currently don't have enough bandwidth for S3. So I also cannot give an estimation when this can be fixed.

blarghmatey commented 1 year ago

For my use case we are actually using Postgres as the source, so the format conversion seems to be specific to the S3 destination and not anything to do with the source format, unless there is a common issue with the intermediate representation and allocating the proper type information in the schema being passed to the S3 destination.

If anyone has a suggestion of where in the S3 plugin to look I am happy to try my hand at debugging and fixing the issue.

blarghmatey commented 1 year ago

I've been doing some further debugging to narrow down the potential source of the issue. So far I have the following information:

Looking at the code in the S3 connector it seems that Airbyte is relying on the upstream Apache Parquet Java library for translating from Avro to Parquet. This suggests that there is either a bug in the upstream implementation, or a bug in how it is being used in Airbyte.

It is entirely possible that there is another source of the problem that I am overlooking, but this is what I have discovered so far.

blarghmatey commented 1 year ago

Looking again at the dependencies, the S3 destination is using the 1.12.0 release of the parquet library which is over a year old. The latest release is 1.12.3. Would it be possible to get that version bumped and an updated connector build pushed so that we can see if that resolves this dictionary issue?

blarghmatey commented 1 year ago

I believe that this change in the upstream Parquet library will resolve this bug, so updating to 1.12.3 will be the necessary fix

https://github.com/apache/parquet-mr/commit/c72862b61399ff516e968fbd02885e573d4be81c

blarghmatey commented 1 year ago

@marcosmarxm @tuliren I found what appears to be the necessary fix to this bug and pushed a PR that bumps the upstream Parquet dependency to the latest version. Can you take a look and see about getting a new release pushed so I can test it? Thanks!

https://github.com/airbytehq/airbyte/pull/14502

marcosmarxm commented 1 year ago

Thanks @blarghmatey !

blarghmatey commented 1 year ago

Taking a closer look at the Avro file that I generated while testing I noticed that the actual schema for the timestamp with TZ fields is an Avro union type, which is not supported in Parquet, so it seems that what is happening is that the conversion is treating the Avro Union as a Parquet Struct, resulting in the behavior that we're seeing.

Avro schema that I'm working:

>>> pprint(json.loads(dfr.meta['avro.schema'].decode('utf8')))
{'fields': [{'name': '_airbyte_ab_id',
             'type': {'logicalType': 'uuid', 'type': 'string'}},
            {'name': '_airbyte_emitted_at',
             'type': {'logicalType': 'timestamp-millis', 'type': 'long'}},
            {'default': None, 'name': 'id', 'type': ['null', 'double']},
            {'default': None, 'name': 'live', 'type': ['null', 'boolean']},
            {'default': None, 'name': 'title', 'type': ['null', 'string']},
            {'default': None, 'name': 'run_tag', 'type': ['null', 'string']},
            {'default': None,
             'name': 'end_date',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None, 'name': 'course_id', 'type': ['null', 'double']},
            {'default': None,
             'name': 'created_on',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'start_date',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'updated_on',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'courseware_id',
             'type': ['null', 'string']},
            {'default': None,
             'name': 'enrollment_end',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'expiration_date',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'enrollment_start',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'courseware_url_path',
             'type': ['null', 'string']},
            {'default': None,
             'name': '_airbyte_additional_properties',
             'type': ['null', {'type': 'map', 'values': 'string'}]}],
 'name': 'mitxonline__app__postgres__courses_courserun',
 'namespace': 'public',
 'type': 'record'}

Generated Parquet schema:

>>> pq.read_table("/tmp/2022_07_08_1657291363014_0.parquet")
pyarrow.Table
_airbyte_ab_id: string not null
_airbyte_emitted_at: timestamp[ms, tz=UTC] not null
id: double
live: bool
title: string
run_tag: string
end_date: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
course_id: double
created_on: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
start_date: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
updated_on: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
courseware_id: string
enrollment_end: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
expiration_date: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
enrollment_start: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
courseware_url_path: string
_airbyte_additional_properties: map<string, string ('_airbyte_additional_properties')>
  child 0, _airbyte_additional_properties: struct<key: string not null, value: string not null> not null
      child 0, key: string not null
      child 1, value: string not null
blarghmatey commented 1 year ago

The block of code in the upstream Parquet library that generates the struct from a Union type is https://github.com/apache/parquet-mr/blob/e990eb3f14c39273e46a9fce07ec85d2edf7fccb/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java#L264-L272

It seems that the solution is to modify the Airbyte code that generates the Avro schema from the JSON schema to not include string in the union of logical types for the record so that the Parquet conversion doesn't generate the struct. If there is another approach that I'm missing I'm happy for suggestions.

felcastro commented 1 year ago

Any updates? Currently trying to sync data from MySQL to S3 Parquet, and Timestamp fields are being converted to struct type. {"member0": "2020-02-04T17:01:27.000+0000", "member1": null}

liyinqiu commented 1 year ago

Hi there, will there be an ongoing fix here or it is intentionally written as a struct? The background is we are building mysql to s3 sync, then build a Hive table based on the output of this parquet file, we want to have datetime column type in Hive metastore but it does not work with this struct in parquet file. From this post, we used a previous version of mysql source and it outputs datetime as string could mitigate the issue.

sebas-500 commented 1 year ago

Any updates on this?

I'm trying to copy a Postgres table into S3 but some of the datetime columns contain the aforementioned struct.

robertomczak commented 1 year ago

Observed the same during our MySQL to Parquet S3 testing {'member0': datetime.datetime(2016, 9, 25, 19, 57, 31, tzinfo=), 'member1': None}

sebas-500 commented 1 year ago

Observed the same during our MySQL to Parquet S3 testing {'member0': datetime.datetime(2016, 9, 25, 19, 57, 31, tzinfo=), 'member1': None}

@robertomczak Did you manage to work it around?

robertomczak commented 1 year ago

Observed the same during our MySQL to Parquet S3 testing {'member0': datetime.datetime(2016, 9, 25, 19, 57, 31, tzinfo=), 'member1': None}

@robertomczak Did you manage to work it around?

Not directly in Airbyte, current idea is to process data in further ETL using Pandas and extract column from nested object to top level object of pandas data frame.

It would be nice to fix it, as keeping empty string artifact on intermidiate data lake is not good practice. 🤷‍♂️

sebas-500 commented 1 year ago

Observed the same during our MySQL to Parquet S3 testing {'member0': datetime.datetime(2016, 9, 25, 19, 57, 31, tzinfo=), 'member1': None}

@robertomczak Did you manage to work it around?

Not directly in Airbyte, current idea is to process data in further ETL using Pandas and extract column from nested object to top level object of pandas data frame.

It would be nice to fix it, as keeping empty string artifact on intermidiate data lake is not good practice. 🤷‍♂️

Aren't the destination files getting bigger? For example in my case a table of 20MB was transformed to two files of 97 and 20MB each. I bet the struct has something to do with it.

msaffitz commented 1 year ago

I'm seeing this as well -- I'm curious if this is related to #17010 and #17011.

In either case, I poked around to see if I could get a fix but unfortunately wasn't able to get my env fully setup to build / run tests.

It'd awesome to get a fix within Airbyte for this :)

tevonsb commented 10 months ago

We have also experienced this using a snowflake connector on any field of type DATE.

Reading to a parquet file in S3. Our data is not large (100k rows) chunked into a single file. So don't think it has to do with data scale.

qoqajr commented 9 months ago

Hello,

I'm experiencing the same issue with postgresql timestamptz data being written into s3 parquet files. Any update on tthe issue since 2022...?

jamsi commented 8 months ago

Incase it helps anyone, I was processing some of the parquet data in Pyspark through AWS Glue and added this transformation to solve struct.member0 -> DateTime field.

def extract_timestamp_from_struct(df: DataFrame) -> DataFrame:
    '''
    This function converts any "structs" that are really datetimes, into datetimes.
    This is documented here: https://github.com/airbytehq/airbyte/issues/14028

    Parameters:
    - df: The input DataFrame.

    Returns:
    - DataFrame with datetime columns as datetimes
    '''    
    for field in df.schema:
        if isinstance(field.dataType, StructType):
            if any(child.name == "member0" and isinstance(child.dataType, TimestampType) for child in field.dataType.fields):
                df = df.withColumn(field.name, col(f"{field.name}.member0"))
    return df
qoqajr commented 8 months ago

Thanks @jamsi for sharing, I ended up doing exactly the same in a glue job, but it would be better if it wouldn't be required to add another service in the mix just to fix date formats.

lahdirakram commented 7 months ago

any new about this? im facing the same behaviour

john-motif commented 7 months ago

I'm also seeing this issue with Snowflake > GCS in Parquet format

OMK2186 commented 6 months ago

MySQL to S3 connector: _airbyte_emitted_at is in string format, but columns from MySQL table are being converted to Struct {'member0': datetime.datetime(2023, 12, 7, 7, 48, 39, tzinfo=<UTC>), 'member1': None}

davidfromtandym commented 6 months ago

This is an insanely frustrating bug

nijuyonkadesu commented 4 months ago

I cling to the tinest ember of hope, and rebuid destination-s3 connector with

implementation ('org.apache.parquet:parquet-avro:1.13.1') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}

and

{'member0': datetime.datetime(2024, 2, 19, 10, 30, tzinfo=<UTC>), 'member1': None}

image

it's still the same ...

myonlinecode1988 commented 3 months ago

I see the above in both parquet and avro files. Is it on the roadmap to address this?

octavia-squidington-iii commented 2 months ago

Zendesk ticket #5666 has been linked to this issue.

kahlua-kol commented 1 month ago

Hey! Any hope to get this bug fixed soon? :)