airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.88k stars 4.07k forks source link

Destination GCS: Failed to convert json to avro #14064

Open Wolff-Lucas opened 2 years ago

Wolff-Lucas commented 2 years ago

Environment

Current Behavior

When setting up a sync from Zendesk to GCP parquet format I get a Failed to convert JSON to Avro error tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field logo is expected to be one of these: NULL, STRING.

Expected Behavior

It should be able to sync the data on either full refresh or incremental modes

Logs

LOGS tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field logo is expected to be one of these: NULL, STRING. If this is a complex type, check if offending field (path: logo) adheres to schema: {url=https://my-domain.zendesk.com/api/v2/attachments/4419650404369.json, id=4419650404369, file_name=téléchargement__9_.png, content_url=https://my-domain.zendesk.com/system/brands/4419650404369/téléchargement__9_.png, mapped_content_url=https://aide.voyage-prive.com/system/brands/4419650404369/téléchargement__9_.png, content_type=image/png, size=841, width=80, height=80, inline=false, deleted=false, thumbnails=[{url=https://my-domain.zendesk.com/api/v2/attachments/4419650404497.json, id=4419650404497, file_name=téléchargement__9__thumb.png, content_url=https://my-domain.zendesk.com/system/brands/4419650404369/téléchargement__9__thumb.png, mapped_content_url=https://aide.voyage-prive.com/system/brands/4419650404369/téléchargement__9__thumb.png, content_type=image/png, size=428, width=32, height=32, inline=false, deleted=false}, {url=https://my-domain.zendesk.com/api/v2/attachments/4419650404881.json, id=4419650404881, file_name=téléchargement__9__small.png, content_url=https://my-domain.zendesk.com/system/brands/4419650404369/téléchargement__9__small.png, mapped_content_url=https://aide.voyage-prive.com/system/brands/4419650404369/téléchargement__9__small.png, content_type=image/png, size=353, width=24, height=24, inline=false, deleted=false}]} 2022-06-22 09:40:00 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:129) ~[converter-1.0.1.jar:?] 2022-06-22 09:40:00 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:118) ~[converter-1.0.1.jar:?] 2022-06-22 09:40:00 destination > at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95) ~[converter-1.0.1.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:39) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.destination.s3.parquet.ParquetSerializedBuffer.accept(ParquetSerializedBuffer.java:93) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy.addRecord(SerializedBufferingStrategy.java:69) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:137) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) [io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38) [io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165) [io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107) [io.airbyte.airbyte-integrations.bases-base-java-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > at io.airbyte.integrations.destination.gcs.GcsDestination.main(GcsDestination.java:39) [io.airbyte.airbyte-integrations.connectors-destination-gcs-0.39.5-alpha.jar:?] 2022-06-22 09:40:00 destination > Caused by: org.apache.avro.AvroTypeException: Could not evaluate union, field logo is expected to be one of these: NULL, STRING. If this is a complex type, check if offending field (path: logo) adheres to schema: {url=https://my-domain.zendesk.com/api/v2/attachments/4419650404369.json, id=4419650404369, file_name=téléchargement__9_.png, content_url=https://my-domain.zendesk.com/system/brands/4419650404369/téléchargement__9_.png, mapped_content_url=https://aide.voyage-prive.com/system/brands/4419650404369/téléchargement__9_.png, content_type=image/png, size=841, width=80, height=80, inline=false, deleted=false, thumbnails=[{url=https://my-domain.zendesk.com/api/v2/attachments/4419650404497.json, id=4419650404497, file_name=téléchargement__9__thumb.png, content_url=https://my-domain.zendesk.com/system/brands/4419650404369/téléchargement__9__thumb.png, mapped_content_url=https://aide.voyage-prive.com/system/brands/4419650404369/téléchargement__9__thumb.png, content_type=image/png, size=428, width=32, height=32, inline=false, deleted=false}, {url=https://my-domain.zendesk.com/api/v2/attachments/4419650404881.json, id=4419650404881, file_name=téléchargement__9__small.png, content_url=https://my-domain.zendesk.com/system/brands/4419650404369/téléchargement__9__small.png, mapped_content_url=https://aide.voyage-prive.com/system/brands/4419650404369/téléchargement__9__small.png, content_type=image/png, size=353, width=24, height=24, inline=false, deleted=false}]} 2022-06-22 09:40:00 destination > at tech.allegro.schema.json2avro.converter.AvroTypeExceptions.unionException(AvroTypeExceptions.java:28) ~[converter-1.0.1.jar:?] 2022-06-22 09:40:00 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readUnion(JsonGenericRecordReader.java:294) ~[converter-1.0.1.jar:?] 2022-06-22 09:40:00 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:196) ~[converter-1.0.1.jar:?] 2022-06-22 09:40:00 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:170) ~[converter-1.0.1.jar:?] 2022-06-22 09:40:00 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.lambda$readRecord$0(JsonGenericRecordReader.java:152) ~[converter-1.0.1.jar:?] 2022-06-22 09:40:00 destination > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:721) ~[?:?] 2022-06-22 09:40:00 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readRecord(JsonGenericRecordReader.java:141) ~[converter-1.0.1.jar:?] 2022-06-22 09:40:00 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:127) ~[converter-1.0.1.jar:?] 2022-06-22 09:40:00 destination > ... 17 more

Steps to Reproduce

1.Create a connection between zendesk and gcp with parquet format 2.attempt to sync the tables

marcosmarxm commented 2 years ago

Can you share what streams and what sync mode are you using AND what is the Parquet configuration in the destination GCS connector? I'm asking this to reproduce the issue locally.

Wolff-Lucas commented 2 years ago

Here is the configuration.

streams :

sync mode :

parquet configuration (I kept the default config):

Thank you!

mp-pinheiro commented 2 years ago

I'm having the same issue.

Airbyte Version: 0.39.24-alpha
Source: Microsoft SQL Server (MSSQL) (0.4.5)
- Replication Method: Logical Replication (CDC)
- Data to Sync - Existing and New
- Initial Snapshot Isolation Level: Snapshot
Destination: Google Cloud Storage (GCS) (0.2.8)
- Page Size = 1024 Kb
- Block Size = 128 Mb
- Compression Codec = UNCOMPRESSED
- Dictionary encore = True
- Max Padding Size = 8 Mb
- Dictionary Page Size = 1024 Kb

Log:

2022-06-24 14:11:00 destination > 2022-06-24 14:11:00 INFO i.a.i.d.r.SerializedBufferingStrategy(lambda$addRecord$0):48 - Starting a new buffer for stream al_transacoes (current state: 0 bytes in 0 buffers)
2022-06-24 14:11:01 destination > 2022-06-24 14:11:01 WARN o.a.h.u.NativeCodeLoader(<clinit>):60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-06-24 14:11:01 destination > 2022-06-24 14:11:01 ERROR i.a.i.b.FailureTrackingAirbyteMessageConsumer(accept):52 - Exception while accepting message
2022-06-24 14:11:01 destination > tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field volume_programado is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: volume_programado) adheres to schema: 5000.00000000
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:129) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:118) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:39) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.s3.parquet.ParquetSerializedBuffer.accept(ParquetSerializedBuffer.java:93) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy.addRecord(SerializedBufferingStrategy.java:62) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:138) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.gcs.GcsDestination.main(GcsDestination.java:39) [io.airbyte.airbyte-integrations.connectors-destination-gcs-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination > Caused by: org.apache.avro.AvroTypeException: Could not evaluate union, field volume_programado is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: volume_programado) adheres to schema: 5000.00000000
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.AvroTypeExceptions.unionException(AvroTypeExceptions.java:28) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readUnion(JsonGenericRecordReader.java:294) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:196) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:170) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.lambda$readRecord$0(JsonGenericRecordReader.java:152) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at java.util.LinkedHashMap.forEach(LinkedHashMap.java:721) ~[?:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readRecord(JsonGenericRecordReader.java:141) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:127) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  ... 17 more
2022-06-24 14:11:01 destination > 2022-06-24 14:11:01 WARN i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):63 - Airbyte message consumer: failed.
2022-06-24 14:11:01 destination > 2022-06-24 14:11:01 ERROR i.a.i.d.b.BufferedStreamConsumer(close):169 - executing on failed close procedure.
2022-06-24 14:11:01 destination > 2022-06-24 14:11:01 INFO i.a.i.d.r.SerializedBufferingStrategy(close):127 - Closing buffer for stream al_transacoes
2022-06-24 14:11:01 destination > 2022-06-24 14:11:01 ERROR i.a.i.d.r.SerializedBufferingStrategy(close):131 - Exception while closing stream buffer
2022-06-24 14:11:01 destination > java.lang.NullPointerException: Cannot invoke "java.io.InputStream.close()" because "this.inputStream" is null
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.s3.parquet.ParquetSerializedBuffer.close(ParquetSerializedBuffer.java:154) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy.close(SerializedBufferingStrategy.java:128) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.close(BufferedStreamConsumer.java:175) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.lambda$close$0(FailureTrackingAirbyteMessageConsumer.java:67) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.close(FailureTrackingAirbyteMessageConsumer.java:67) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:164) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107) [io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.gcs.GcsDestination.main(GcsDestination.java:39) [io.airbyte.airbyte-integrations.connectors-destination-gcs-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination > 2022-06-24 14:11:01 ERROR i.a.i.b.AirbyteExceptionHandler(uncaughtException):26 - Something went wrong in the connector. See the logs for more details.
2022-06-24 14:11:01 destination > tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field volume_programado is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: volume_programado) adheres to schema: 5000.00000000
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:129) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:118) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:39) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.s3.parquet.ParquetSerializedBuffer.accept(ParquetSerializedBuffer.java:93) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy.addRecord(SerializedBufferingStrategy.java:62) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:138) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  at io.airbyte.integrations.destination.gcs.GcsDestination.main(GcsDestination.java:39) ~[io.airbyte.airbyte-integrations.connectors-destination-gcs-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >  Suppressed: java.lang.RuntimeException: Exceptions thrown while closing buffers: java.lang.NullPointerException: Cannot invoke "java.io.InputStream.close()" because "this.inputStream" is null
2022-06-24 14:11:01 destination >      at io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy.close(SerializedBufferingStrategy.java:135) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >      at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.close(BufferedStreamConsumer.java:175) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >      at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.lambda$close$0(FailureTrackingAirbyteMessageConsumer.java:67) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >      at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >      at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.close(FailureTrackingAirbyteMessageConsumer.java:67) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >      at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:164) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >      at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination >      at io.airbyte.integrations.destination.gcs.GcsDestination.main(GcsDestination.java:39) ~[io.airbyte.airbyte-integrations.connectors-destination-gcs-0.39.20-alpha.jar:?]
2022-06-24 14:11:01 destination > Caused by: org.apache.avro.AvroTypeException: Could not evaluate union, field volume_programado is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: volume_programado) adheres to schema: 5000.00000000
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.AvroTypeExceptions.unionException(AvroTypeExceptions.java:28) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readUnion(JsonGenericRecordReader.java:294) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:196) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:170) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.lambda$readRecord$0(JsonGenericRecordReader.java:152) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at java.util.LinkedHashMap.forEach(LinkedHashMap.java:721) ~[?:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readRecord(JsonGenericRecordReader.java:141) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:127) ~[converter-1.0.1.jar:?]
2022-06-24 14:11:01 destination >  ... 17 more
2022-06-24 14:11:02 INFO i.a.w.g.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$7):415 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@7dc29167[type=TRACE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=<null>,trace=io.airbyte.protocol.models.AirbyteTraceMessage@730570af[type=ERROR,emittedAt=1.65607986199E12,error=io.airbyte.protocol.models.AirbyteErrorTraceMessage@773afa17[message=Something went wrong in the connector. See the logs for more details.,internalMessage=tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field volume_programado is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: volume_programado) adheres to schema: 5000.00000000,stackTrace=[tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:129), tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:118), tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95), io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:39), io.airbyte.integrations.destination.s3.parquet.ParquetSerializedBuffer.accept(ParquetSerializedBuffer.java:93), io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy.addRecord(SerializedBufferingStrategy.java:62), io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:138), io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50), io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194), io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203), io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232), io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202), io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165), io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54), io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38), io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165), io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107), io.airbyte.integrations.destination.gcs.GcsDestination.main(GcsDestination.java:39)],failureType=system_error,additionalProperties={}],additionalProperties={}],additionalProperties={}]

These also appear in the beggining:

2022-06-24 14:10:33 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. 
errors: $.data_to_sync: is not defined in the schema and the schema does not allow additional properties, $.snapshot_isolation: is not defined in the schema and the schema does not allow additional properties, $.replication_type: must be a constant value STANDARD, $.replication_type: does not have a value in the enumeration [STANDARD]
2022-06-24 14:10:33 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. 
errors: $.format_type: does not have a value in the enumeration [Avro], $.compression_codec: string found, object expected, $.compression_codec: should be valid to one and only one of the schemas 
2022-06-24 14:10:33 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword requires - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2022-06-24 14:10:33 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. 
errors: $.format_type: does not have a value in the enumeration [CSV]
2022-06-24 14:10:33 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. 
errors: $.format_type: does not have a value in the enumeration [JSONL]
2022-06-24 14:10:33 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. 
errors: $.hmac_key_access_id: object found, string expected, $.hmac_key_secret: object found, string expected

One thing to note is everything works as expected when Sync Mode is set to Full Refresh.

mp-pinheiro commented 2 years ago

Some of the log messages messages are very similar to https://github.com/airbytehq/airbyte/issues/12162

marcosmarxm commented 2 years ago

I tried same configuration with our integraation accounts but wasn't able to reproduce it yet. I'll test with other sources.

jbfbell commented 1 year ago

+1 https://airbytehq.sentry.io/issues/4307340311/?project=6527718

kev-datams commented 1 month ago

Hi ! Could we consider solving this issue in a reasonable time ?

Some similar issues have been solved on S3 connector recently (currently the base GCS one) and it would be really appreciable to have similar fixes on GCS:

Thank you 🙏