airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
14.78k stars 3.8k forks source link

Source MSSQL: to any Avro or Parquet destination: failed to convert JSON to Avro #12949

Open plenti-jacob-roe opened 2 years ago

plenti-jacob-roe commented 2 years ago

Environment

Current Behavior

When setting up a sync to use cdc incremental load from MSSQL to Databricks I get a Failed to convert JSON to Avro error on a decimal field. However when I run the sync using a full refresh there is no issue. This error occurs on a few table but not all

Expected Behavior

It should be able to sync the data on either full refresh or incremental modes

Logs

LOG ``` 2022-05-18 04:52:45 destination > tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field AmountAhead is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: AmountAhead) adheres to schema: 0.00 2022-05-18 04:52:45 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:129) ~[converter-1.0.1.jar:?] 2022-05-18 04:52:45 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:118) ~[converter-1.0.1.jar:?] 2022-05-18 04:52:45 destination > at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95) ~[converter-1.0.1.jar:?] 2022-05-18 04:52:45 destination > at io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:39) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.38.4-alpha.jar:?] 2022-05-18 04:52:45 destination > at io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter.write(S3ParquetWriter.java:113) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.38.4-alpha.jar:?] 2022-05-18 04:52:45 destination > at io.airbyte.integrations.destination.databricks.DatabricksStreamCopier.write(DatabricksStreamCopier.java:109) ~[io.airbyte.airbyte-integrations.connectors-destination-databricks-0.38.4-alpha.jar:?] 2022-05-18 04:52:45 destination > at io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.lambda$recordWriterFunction$0(CopyConsumerFactory.java:104) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.38.4-alpha.jar:?] 2022-05-18 04:52:45 destination > at io.airbyte.integrations.destination.record_buffer.InMemoryRecordBufferingStrategy.lambda$flushAll$1(InMemoryRecordBufferingStrategy.java:86) ~[io.airbyte.airbyte-integrations.bases-base-java-0.38.4-alpha.jar:?] 2022-05-18 04:52:45 destination > at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) ~[io.airbyte.airbyte-integrations.bases-base-java-0.38.4-alpha.jar:?] 2022-05-18 04:52:45 destination > at io.airbyte.integrations.destination.record_buffer.InMemoryRecordBufferingStrategy.flushAll(InMemoryRecordBufferingStrategy.java:82) ~[io.airbyte.airbyte-integrations.bases-base-java-0.38.4-alpha.jar:?] 2022-05-18 04:52:45 destination > at io.airbyte.integrations.destination.record_buffer.InMemoryRecordBufferingStrategy.addRecord(InMemoryRecordBufferingStrategy.java:65) ~[io.airbyte.airbyte-integrations.bases-base-java-0.38.4-alpha.jar:?] 2022-05-18 04:52:45 destination > at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:137) ~[io.airbyte.airbyte-integrations.bases-base-java-0.38.4-alpha.jar:?] 2022-05-18 04:52:45 destination > at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50) ~[io.airbyte.airbyte-integrations.bases-base-java-0.38.4-alpha.jar:?] 2022-05-18 04:52:45 destination > at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194) ~[io.airbyte.airbyte-integrations.bases-base-java-0.38.4-alpha.jar:?] ```

Steps to Reproduce

1.Create a connection for a cdc enabled table with a column of type decimal(28,2)
2.Attempt to sync table using incremental sync mode

mp-pinheiro commented 2 years ago

I'm facing the same issue, but with a Google Cloud Storage destination.

Airbyte Version: 0.39.29-alpha Source: Microsoft SQL Server (MSSQL) (0.4.8) Destination: Google Cloud Storage (GCS) (0.2.9)

2022-06-30 14:58:58 destination > 2022-06-30 14:58:58 INFO i.a.i.d.r.SerializedBufferingStrategy(lambda$addRecord$0):48 - Starting a new buffer for stream my_table (current state: 0 bytes in 0 buffers)
2022-06-30 14:58:58 destination > 2022-06-30 14:58:58 ERROR i.a.i.b.FailureTrackingAirbyteMessageConsumer(accept):52 - Exception while accepting message
2022-06-30 14:58:58 destination > tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field num_field is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: num_field) adheres to schema: 1.00
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:129) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:118) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:39) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.s3.avro.AvroSerializedBuffer.writeRecord(AvroSerializedBuffer.java:53) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer.accept(BaseSerializedBuffer.java:92) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy.addRecord(SerializedBufferingStrategy.java:62) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:138) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) [io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38) [io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165) [io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107) [io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.gcs.GcsDestination.main(GcsDestination.java:39) [io.airbyte.airbyte-integrations.connectors-destination-gcs-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination > Caused by: org.apache.avro.AvroTypeException: Could not evaluate union, field num_field is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: num_field) adheres to schema: 1.00
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.AvroTypeExceptions.unionException(AvroTypeExceptions.java:28) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readUnion(JsonGenericRecordReader.java:294) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:196) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:170) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.lambda$readRecord$0(JsonGenericRecordReader.java:152) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at java.util.LinkedHashMap.forEach(LinkedHashMap.java:721) ~[?:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readRecord(JsonGenericRecordReader.java:141) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:127) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  ... 18 more
2022-06-30 14:58:58 destination > 2022-06-30 14:58:58 WARN i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):63 - Airbyte message consumer: failed.
2022-06-30 14:58:58 destination > 2022-06-30 14:58:58 ERROR i.a.i.d.b.BufferedStreamConsumer(close):169 - executing on failed close procedure.
2022-06-30 14:58:58 destination > 2022-06-30 14:58:58 INFO i.a.i.d.r.SerializedBufferingStrategy(close):127 - Closing buffer for stream my_table
2022-06-30 14:58:58 destination > 2022-06-30 14:58:58 INFO i.a.i.d.r.FileBuffer(deleteFile):81 - Deleting tempFile data e665152b-df6b-4f4d-b9c7-aac4b5e53fa912722021224595586762.avro
2022-06-30 14:58:58 destination > 2022-06-30 14:58:58 INFO i.a.i.d.s.S3ConsumerFactory(lambda$onCloseFunction$4):154 - Cleaning up destination started for 1 streams
2022-06-30 14:58:58 destination > 2022-06-30 14:58:58 INFO i.a.i.d.s.S3ConsumerFactory(lambda$onCloseFunction$4):159 - Cleaning up destination completed.
2022-06-30 14:58:58 destination > 2022-06-30 14:58:58 ERROR i.a.i.b.AirbyteExceptionHandler(uncaughtException):26 - Something went wrong in the connector. See the logs for more details.
2022-06-30 14:58:58 destination > tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field num_field is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: num_field) adheres to schema: 1.00
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:129) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:118) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:39) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.s3.avro.AvroSerializedBuffer.writeRecord(AvroSerializedBuffer.java:53) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer.accept(BaseSerializedBuffer.java:92) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy.addRecord(SerializedBufferingStrategy.java:62) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:138) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107) ~[io.airbyte.airbyte-integrations.bases-base-java-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination >  at io.airbyte.integrations.destination.gcs.GcsDestination.main(GcsDestination.java:39) ~[io.airbyte.airbyte-integrations.connectors-destination-gcs-0.39.24-alpha.jar:?]
2022-06-30 14:58:58 destination > Caused by: org.apache.avro.AvroTypeException: Could not evaluate union, field num_field is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: num_field) adheres to schema: 1.00
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.AvroTypeExceptions.unionException(AvroTypeExceptions.java:28) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readUnion(JsonGenericRecordReader.java:294) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:196) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:170) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.lambda$readRecord$0(JsonGenericRecordReader.java:152) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at java.util.LinkedHashMap.forEach(LinkedHashMap.java:721) ~[?:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readRecord(JsonGenericRecordReader.java:141) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:127) ~[converter-1.0.1.jar:?]
2022-06-30 14:58:58 destination >  ... 18 more
2022-06-30 14:58:58 INFO i.a.w.g.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$7):415 - State in DefaultReplicationWorker from destination: io.airbyte.protocol.models.AirbyteMessage@35045690[type=TRACE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=<null>,trace=io.airbyte.protocol.models.AirbyteTraceMessage@3bc3be40[type=ERROR,emittedAt=1.656601138558E12,error=io.airbyte.protocol.models.AirbyteErrorTraceMessage@3ffc3de0[message=Something went wrong in the connector. See the logs for more details.,internalMessage=tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field num_field is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: num_field) adheres to schema: 1.00,stackTrace=tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field num_field is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: num_field) adheres to schema: 1.00
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:129)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:118)
    at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95)
    at io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:39)
    at io.airbyte.integrations.destination.s3.avro.AvroSerializedBuffer.writeRecord(AvroSerializedBuffer.java:53)
    at io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer.accept(BaseSerializedBuffer.java:92)
    at io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy.addRecord(SerializedBufferingStrategy.java:62)
    at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:138)
    at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50)
    at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194)
    at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203)
    at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232)
    at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202)
    at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165)
    at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54)
    at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38)
    at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165)
    at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107)
    at io.airbyte.integrations.destination.gcs.GcsDestination.main(GcsDestination.java:39)
Caused by: org.apache.avro.AvroTypeException: Could not evaluate union, field num_field is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: num_field) adheres to schema: 1.00
    at tech.allegro.schema.json2avro.converter.AvroTypeExceptions.unionException(AvroTypeExceptions.java:28)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readUnion(JsonGenericRecordReader.java:294)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:196)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:170)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.lambda$readRecord$0(JsonGenericRecordReader.java:152)
    at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:721)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readRecord(JsonGenericRecordReader.java:141)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:127)
    ... 18 more
,failureType=system_error,additionalProperties={}],additionalProperties={}],additionalProperties={}]
2022-06-30 14:59:58 WARN i.a.c.i.LineGobbler(voidCall):86 - airbyte-source gobbler IOException: Stream closed. Typically happens when cancelling a job.
2022-06-30 14:59:58 ERROR i.a.w.g.DefaultReplicationWorker(run):180 - Sync worker failed.
java.util.concurrent.ExecutionException: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process message delivery failed
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
    at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:173) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
    at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
    at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
    Suppressed: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
        at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
        at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:137) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
        at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
        at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
        at java.lang.Thread.run(Thread.java:833) [?:?]
    Suppressed: java.io.IOException: Stream closed
        at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:445) ~[?:?]
        at java.io.OutputStream.write(OutputStream.java:162) ~[?:?]
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:123) ~[?:?]
        at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234) ~[?:?]
        at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:313) ~[?:?]
        at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:318) ~[?:?]
        at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:160) ~[?:?]
        at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:248) ~[?:?]
        at java.io.BufferedWriter.flush(BufferedWriter.java:257) ~[?:?]
        at io.airbyte.workers.internal.DefaultAirbyteDestination.notifyEndOfInput(DefaultAirbyteDestination.java:98) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
        at io.airbyte.workers.internal.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:111) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
        at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:137) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
        at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
        at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
        at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process message delivery failed
    at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:319) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    ... 1 more
Caused by: java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method) ~[?:?]
    at java.io.FileOutputStream.write(FileOutputStream.java:349) ~[?:?]
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:123) ~[?:?]
    at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234) ~[?:?]
    at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:304) ~[?:?]
    at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) ~[?:?]
    at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:132) ~[?:?]
    at java.io.OutputStreamWriter.write(OutputStreamWriter.java:205) ~[?:?]
    at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120) ~[?:?]
    at java.io.BufferedWriter.write(BufferedWriter.java:233) ~[?:?]
    at java.io.Writer.write(Writer.java:249) ~[?:?]
    at io.airbyte.workers.internal.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:90) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
    at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:317) ~[io.airbyte-airbyte-workers-0.39.29-alpha.jar:?]
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    ... 1 more
2022-06-30 14:59:58 INFO i.a.w.g.DefaultReplicationWorker(run):239 - sync summary: io.airbyte.config.ReplicationAttemptSummary@10f2751d[status=failed,recordsSynced=366,bytesSynced=404315,startTime=1656601118840,endTime=1656601198667,totalStats=io.airbyte.config.SyncStats@27236128[recordsEmitted=366,bytesEmitted=404315,stateMessagesEmitted=0,recordsCommitted=0],streamStats=[io.airbyte.config.StreamSyncStats@4d15a470[streamName=my_table,stats=io.airbyte.config.SyncStats@76ebacfa[recordsEmitted=366,bytesEmitted=404315,stateMessagesEmitted=<null>,recordsCommitted=<null>]]]]
2022-06-30 14:59:58 INFO i.a.w.g.DefaultReplicationWorker(run):268 - Source did not output any state messages
2022-06-30 14:59:58 WARN i.a.w.g.DefaultReplicationWorker(run):276 - State capture: No new state, falling back on input state: io.airbyte.config.State@75b275a7[state={}]
2022-06-30 14:59:58 INFO i.a.w.t.TemporalAttemptExecution(get):134 - Stopping cancellation check scheduling...
2022-06-30 14:59:58 INFO i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):161 - sync summary: io.airbyte.config.StandardSyncOutput@29cc6b4a[standardSyncSummary=io.airbyte.config.StandardSyncSummary@5feb000b[status=failed,recordsSynced=366,bytesSynced=404315,startTime=1656601118840,endTime=1656601198667,totalStats=io.airbyte.config.SyncStats@27236128[recordsEmitted=366,bytesEmitted=404315,stateMessagesEmitted=0,recordsCommitted=0],streamStats=[io.airbyte.config.StreamSyncStats@4d15a470[streamName=my_table,stats=io.airbyte.config.SyncStats@76ebacfa[recordsEmitted=366,bytesEmitted=404315,stateMessagesEmitted=<null>,recordsCommitted=<null>]]]],normalizationSummary=<null>,state=io.airbyte.config.State@75b275a7[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@6583c593[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@653251e1[stream=io.airbyte.protocol.models.AirbyteStream@2feef036[name=my_table,jsonSchema={"type":"object","properties":{"gord":{"type":"number"},"prod_id":{"type":"number"},"Medida_ID":{"type":"number"},"prod_code":{"type":"string"},"prod_desc":{"type":"string"},"_ab_cdc_lsn":{"type":"string"},"sts_Produto":{"type":"boolean"},"temperatura":{"type":"number"},"flg_additive":{"type":"boolean"},"TipoProduto_ID":{"type":"number"},"ClasseProdutoID":{"type":"number"},"Medida_ID_Massa":{"type":"number"},"GrupoEmbalagemID":{"type":"number"},"cor_supervisorio":{"type":"string"},"Medida_ID_Pressao":{"type":"number"},"DescricaoAbreviada":{"type":"string"},"TemperaturaBase_ID":{"type":"number"},"_ab_cdc_deleted_at":{"type":"string"},"_ab_cdc_updated_at":{"type":"string"},"dt_UltimaAlteracao":{"type":"string"},"Medida_ID_Densidade":{"type":"number"},"flg_NaoInventariado":{"type":"boolean"},"produto_original_id":{"type":"number"},"ClasseRiscoProdutoID":{"type":"number"},"flg_ProdutoComercial":{"type":"boolean"},"InventarioDimensao_ID":{"type":"number"},"Medida_ID_Temperatura":{"type":"number"},"UsuarioUltimaAlteracao":{"type":"string"},"flg_PossuiConcentracao":{"type":"boolean"},"ClasseHidrocarbureto_ID":{"type":"number"},"MetodoCorrecaoVolume_ID":{"type":"number"},"num_field":{"type":"number"},"num_DiasSemMovimentacao":{"type":"number"},"ClasseRiscoSubsidiarioID":{"type":"number"},"ToleranciaVariacaoNegativa":{"type":"number"},"ToleranciaVariacaoPositiva":{"type":"number"},"flg_RequerPesagemRodoviaria":{"type":"boolean"},"flg_QuantidadeExpedidaViaPreset":{"type":"boolean"},"flg_QuantidadeExpedidaViaBalanca":{"type":"boolean"},"CoeficienteDilatacaoTermicaProduto":{"type":"number"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[],sourceDefinedPrimaryKey=[[prod_id]],namespace=NAMESPACE,additionalProperties={}],syncMode=incremental,cursorField=[],destinationSyncMode=append,primaryKey=[[prod_id]],additionalProperties={}]],additionalProperties={}],failures=[io.airbyte.config.FailureReason@28824698[failureOrigin=destination,failureType=system_error,internalMessage=tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field num_field is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: num_field) adheres to schema: 1.00,externalMessage=Something went wrong in the connector. See the logs for more details.,metadata=io.airbyte.config.Metadata@53d6a146[additionalProperties={attemptNumber=0, jobId=31981, from_trace_message=true}],stacktrace=tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field num_field is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: num_field) adheres to schema: 1.00
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:129)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:118)
    at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95)
    at io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:39)
    at io.airbyte.integrations.destination.s3.avro.AvroSerializedBuffer.writeRecord(AvroSerializedBuffer.java:53)
    at io.airbyte.integrations.destination.record_buffer.BaseSerializedBuffer.accept(BaseSerializedBuffer.java:92)
    at io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy.addRecord(SerializedBufferingStrategy.java:62)
    at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.acceptTracked(BufferedStreamConsumer.java:138)
    at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:50)
    at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:194)
    at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$4(IntegrationRunner.java:203)
    at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:232)
    at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:202)
    at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:165)
    at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54)
    at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38)
    at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:165)
    at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107)
    at io.airbyte.integrations.destination.gcs.GcsDestination.main(GcsDestination.java:39)
Caused by: org.apache.avro.AvroTypeException: Could not evaluate union, field num_field is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: num_field) adheres to schema: 1.00
    at tech.allegro.schema.json2avro.converter.AvroTypeExceptions.unionException(AvroTypeExceptions.java:28)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readUnion(JsonGenericRecordReader.java:294)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:196)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:170)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.lambda$readRecord$0(JsonGenericRecordReader.java:152)
    at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:721)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readRecord(JsonGenericRecordReader.java:141)
    at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:127)
    ... 18 more
,retryable=<null>,timestamp=1656601138558], io.airbyte.config.FailureReason@249c4f84[failureOrigin=destination,failureType=<null>,internalMessage=io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process message delivery failed,externalMessage=Something went wrong within the destination connector,metadata=io.airbyte.config.Metadata@7cab7461[additionalProperties={attemptNumber=0, jobId=31981}],stacktrace=java.util.concurrent.CompletionException: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process message delivery failed
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process message delivery failed
    at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:319)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    ... 3 more
Caused by: java.io.IOException: Broken pipe
    at java.base/java.io.FileOutputStream.writeBytes(Native Method)
    at java.base/java.io.FileOutputStream.write(FileOutputStream.java:349)
    at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:123)
    at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:234)
    at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:304)
    at java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
    at java.base/sun.nio.cs.StreamEncoder.write(StreamEncoder.java:132)
    at java.base/java.io.OutputStreamWriter.write(OutputStreamWriter.java:205)
    at java.base/java.io.BufferedWriter.flushBuffer(BufferedWriter.java:120)
    at java.base/java.io.BufferedWriter.write(BufferedWriter.java:233)
    at java.base/java.io.Writer.write(Writer.java:249)
    at io.airbyte.workers.internal.DefaultAirbyteDestination.accept(DefaultAirbyteDestination.java:90)
    at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:317)
    ... 4 more
,retryable=<null>,timestamp=1656601138660], io.airbyte.config.FailureReason@46a75bea[failureOrigin=destination,failureType=<null>,internalMessage=io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1,externalMessage=Something went wrong within the destination connector,metadata=io.airbyte.config.Metadata@209d1ee9[additionalProperties={attemptNumber=0, jobId=31981}],stacktrace=java.util.concurrent.CompletionException: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.airbyte.workers.general.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
    at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getDestinationOutputRunnable$7(DefaultReplicationWorker.java:420)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    ... 3 more
,retryable=<null>,timestamp=1656601138660]]]
2022-06-30 14:59:58 INFO i.a.w.t.TemporalUtils(withBackgroundHeartbeat):291 - Stopping temporal heartbeating...
mp-pinheiro commented 2 years ago

Testing confirms that the issue only occurs for output data types Avro and Parquet. JSON and CSV both work. This is somewhat expected considering the issue seems to bee in merging datat type schemas.

mp-pinheiro commented 2 years ago

This might be a shot in the dark, but in my case, the column num_field in the logs is a decimal type with a user specified precision.

+---------------------------+
| num_field                 |
|---------------------------|
| 0.76                      |
| 0.77                      |
| 0.79                      |
| 0.80                      |
| 0.81                      |
| 0.83                      |
| 0.84                      |
| 0.86                      |
| 0.87                      |
| 1.00                      |
+---------------------------+

It seems that no issues occur for all values but for 1.00.

I have a second table with the same issue that also has the same problem of trailing zeroes (column type is also decimal):

+---------------------+
| dec_field           |
|---------------------|
| 5000.00000000       |
| 27000.00000000      |
| 12000.00000000      |
| 10000.00000000      |
| 5000.00000000       |
| 7000.00000000       |
| 5000.00000000       |
| 5000.00000000       |
| 5000.00000000       |
| 5000.00000000       |
+---------------------+

Same problem with a third table, this one has a decimal column with a lot of zero values:

+------------+
| zero       |
|------------|
| 0.00000000 |
| 0.00000000 |
| 0.00000000 |
| 0.00000000 |
| 0.00000000 |
| 0.00000000 |
| 0.00000000 |
| 0.00000000 |
| 0.00000000 |
| 0.00000000 |
+------------+

The error in this one is as follows:

2022-06-24 18:50:30 destination > tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field zero is expected to be one of these: NULL, DOUBLE. If this is a complex type, check if offending field (path: zero) adheres to schema: 0E-8

In conclusion, the problem does seem to be related to decimal fields and trailing zeroes messing up the union.

gsmith-schlesinger commented 2 years ago

To add, this issue also occurs using the MSSQL connector to S3. This breaking when consuming a column that has a money field. The Full Refresh tasks work fine and this only breaks on incremental.

Airbyte Version: 0.39.5-alpha Source: Microsoft SQL Server (MSSQL) (0.4.8) Destination: S3 (0.3.10)

gsmith-schlesinger commented 2 years ago

I believe this issue in in the base of airbyte. In MSSQLConverter.java there was a change a few months back that expanded the date type from:

  private final String SMALLDATETIME_TYPE = "SMALLDATETIME";

to:

 private final Set<String> DATE_TYPES = Set.of("DATE", "DATETIME", "DATETIME2", "DATETIMEOFFSET", "SMALLDATETIME");

And currently what's in there's code to handle smallmoney, but not money, or numeric column types.

  private final String SMALLMONEY_TYPE = "SMALLMONEY";

Should probably be updated and test with a change like with a set.

  private final DECIMAL_TYPES = Set.of("MONEY", "SMALLMONEY", "DECIMAL", "NUMERIC")

The connector itself might need to be updated as well, but I might have time to look into this further tomorrow and possibly code a fix around this.

marcosmarxm commented 2 years ago

Zendesk ticket #1433 has been linked to this issue.

marcosmarxm commented 2 years ago

Comment made from Zendesk by Marcos Marx on 2022-07-07 at 18:02:

Anthony for now the workaround it's to use JSON format instead Parquet/Avro. I'll ask the team to investigate further the issue.
gsmith-schlesinger commented 2 years ago

Right, for us that work around isn't sufficient. We've pull the repo down and are currently investigating a fix in our own fork. If and when we fix it. We will submit a PR.

Unless they're engineers working on this issue internally at airbyte.

marcosmarxm commented 2 years ago

I don't there is anyone currecntly working in this issue right now @gsmith-schlesinger

mp-pinheiro commented 2 years ago

Comment made from Zendesk by Marcos Marx on 2022-07-07 at 18:02:

Anthony for now the workaround it's to use JSON format instead Parquet/Avro. I'll ask the team to investigate further the issue.

This is what we're doing, but it's far from optimal, as JSON and CSV formats are very slow.

Right, for us that work around isn't sufficient. We've pull the repo down and are currently investigating a fix in our own fork. If and when we fix it. We will submit a PR.

Unless they're engineers working on this issue internally at airbyte.

We're doing the same thing on our side @gsmith-schlesinger, but Java isn't our forte. We've been having problems with even setting up a development environment with the project taking 30 minutes building just to fail at the last minute. Let me know if you have any luck!

marcosmarxm commented 2 years ago

@mp-pinheiro are you building the entire project? Why not only the connector?

mp-pinheiro commented 2 years ago

@mp-pinheiro are you building the entire project? Why not only the connector?

@marcosmarxm I tried that too, but the build also failed, I might have some dependencies issues.

gsmith-schlesinger commented 1 year ago

Conceded for now that CDC MSSQL to Parquet files is just broken at the debezium/connector level.

Current process is just CDC MSSQL to CSV on the larger tables and have a separate process kick off and convert the CSV's to Parquet files.... We do lose some datatypes but we determined it wasn't a big issue as it retained what we need.

Ideally this gets resolved some time down the road and we can setup CDC on MSSQL to go straight into Parquet files as that would be preferable

gsmith-schlesinger commented 1 year ago

15780

There's a PR currently submit that may fix this issue #15782

duytp commented 1 year ago

Yes, i got the same issue when the data schema is converted from JSON to AVRO.

isabelamaro-hotmart commented 1 year ago

I am having the same issue when syncing Jira > S3 with changelog enabled Could not evaluate union, field changelog is expected to be one of these: NULL, STRING. If this is a complex type, check if offending field (path: changelog) adheres to schema:

alxsbn commented 1 year ago

Same issue with TypeForm to S3 with PARQUET/ZSTD

DSamuylov commented 7 months ago

Same issue with MongoDB to S3.

PilarIH commented 3 months ago

Any updates on this? We're also having this issue with a MongoDB to S3 connection.

philk1991 commented 1 month ago

Same issue with Greenhouse to Google Cloud Storage

chlete commented 1 month ago

Hi everyone, we are facing the same issue. Is there a bug we can track and see the status of it?