airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.42k stars 3.98k forks source link

Source MongoDB to S3 Failed to convert JSON to Avro #9431

Open alexnikitchuk opened 2 years ago

alexnikitchuk commented 2 years ago
## Environment - **Airbyte version**: 0.35.2-alpha - **OS Version / Instance**: macOS - **Deployment**: Docker - **Source Connector and version**: airbyte/source-mongodb-v2 0.1.10 - **Destination Connector and version**: airbyte/destination-s3 0.2.3 - **Severity**: Critical - **Step where error happened**: Sync job ## Current Behavior File in destination S3 appears but it is empty, sync fails with error: ``` Exception in thread "main" tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field state is expected to be one of these: NULL, STRING. If this is a complex type, check if offending field: state adheres to schema. at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:129) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:118) at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95) at io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:41) at io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter.write(S3ParquetWriter.java:108) at io.airbyte.integrations.destination.s3.S3Consumer.acceptTracked(S3Consumer.java:83) at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:46) at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:147) at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:128) at io.airbyte.integrations.destination.s3.S3Destination.main(S3Destination.java:30) Caused by: org.apache.avro.AvroTypeException: Could not evaluate union, field state is expected to be one of these: NULL, STRING. If this is a complex type, check if offending field: state adheres to schema. at tech.allegro.schema.json2avro.converter.AvroTypeExceptions.unionException(AvroTypeExceptions.java:28) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readUnion(JsonGenericRecordReader.java:289) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:196) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:170) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.lambda$readRecord$0(JsonGenericRecordReader.java:152) at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:721) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readRecord(JsonGenericRecordReader.java:141) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:127) ... 9 more ``` ## Expected Behavior Destination is synched with source. In case of sync error empty file should not appear in destination. ## Logs *If applicable, please upload the logs from the failing operation. For sync jobs, you can download the full logs from the UI by going to the sync attempt page and clicking the download logs button at the top right of the logs display window.*
LOG ``` 2022-01-12 09:07:24 INFO i.a.w.w.WorkerRun(call):49 - Executing worker wrapper. Airbyte version: 0.35.2-alpha 2022-01-12 09:07:24 INFO i.a.w.t.TemporalAttemptExecution(get):118 - Docker volume job log path: /tmp/workspace/5/2/logs.log 2022-01-12 09:07:24 INFO i.a.w.t.TemporalAttemptExecution(get):123 - Executing worker wrapper. Airbyte version: version not set 2022-01-12 09:07:24 WARN i.a.d.Databases(createPostgresDatabaseWithRetryTimeout):65 - Waiting for database to become available... 2022-01-12 09:07:24 INFO i.a.d.i.j.JobsDatabaseInstance(lambda$static$2):25 - Testing if jobs database is ready... 2022-01-12 09:07:25 INFO i.a.d.Databases(createPostgresDatabaseWithRetryTimeout):90 - Database available! 2022-01-12 09:07:25 INFO i.a.d.Databases(createPostgresDatabaseWithRetry):48 - Database available! 2022-01-12 09:07:25 INFO i.a.w.DefaultReplicationWorker(run):99 - start sync worker. job id: 5 attempt id: 2 2022-01-12 09:07:25 INFO i.a.w.DefaultReplicationWorker(run):108 - configured sync modes: {mongodb.collection=full_refresh - overwrite} 2022-01-12 09:07:25 INFO i.a.w.p.a.DefaultAirbyteDestination(start):69 - Running destination... 2022-01-12 09:07:25 INFO i.a.c.i.LineGobbler(voidCall):82 - Checking if airbyte/destination-s3:0.2.3 exists... 2022-01-12 09:07:25 INFO i.a.c.i.LineGobbler(voidCall):82 - airbyte/destination-s3:0.2.3 was found locally. 2022-01-12 09:07:25 INFO i.a.w.p.DockerProcessFactory(create):171 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/5/2 --network host --log-driver none airbyte/destination-s3:0.2.3 write --config destination_config.json --catalog destination_catalog.json 2022-01-12 09:07:25 INFO i.a.c.i.LineGobbler(voidCall):82 - Checking if airbyte/source-mongodb-v2:0.1.10 exists... 2022-01-12 09:07:25 INFO i.a.c.i.LineGobbler(voidCall):82 - airbyte/source-mongodb-v2:0.1.10 was found locally. 2022-01-12 09:07:25 INFO i.a.w.p.DockerProcessFactory(create):171 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/5/2 --network host --log-driver none airbyte/source-mongodb-v2:0.1.10 read --config source_config.json --catalog source_catalog.json 2022-01-12 09:07:25 INFO i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):242 - Destination output thread started. 2022-01-12 09:07:25 INFO i.a.w.DefaultReplicationWorker(run):130 - Waiting for source and destination threads to complete. 2022-01-12 09:07:25 INFO i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Replication thread started. 2022-01-12 09:07:25 destination > SLF4J: Class path contains multiple SLF4J bindings. 2022-01-12 09:07:25 destination > SLF4J: Found binding in [jar:file:/airbyte/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] 2022-01-12 09:07:25 destination > SLF4J: Found binding in [jar:file:/airbyte/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] 2022-01-12 09:07:25 destination > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 2022-01-12 09:07:26 destination > SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO i.a.i.s.m.MongoDbSource(main):64 - starting source: class io.airbyte.integrations.source.mongodb.MongoDbSource 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO i.a.i.b.IntegrationRunner(run):76 - Running integration: io.airbyte.integrations.source.mongodb.MongoDbSource 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO i.a.i.b.IntegrationCliParser(parseOptions):118 - integration args: {read=null, catalog=source_catalog.json, config=source_config.json} 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO i.a.i.b.IntegrationRunner(run):80 - Command: READ 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO i.a.i.b.IntegrationRunner(run):81 - Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='null'} 2022-01-12 09:07:27 destination > 2022-01-12 09:07:27 INFO i.a.i.b.IntegrationRunner(run):76 - Running integration: io.airbyte.integrations.destination.s3.S3Destination 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-01-12 09:07:27 destination > 2022-01-12 09:07:27 INFO i.a.i.b.IntegrationCliParser(parseOptions):118 - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json} 2022-01-12 09:07:27 destination > 2022-01-12 09:07:27 INFO i.a.i.b.IntegrationRunner(run):80 - Command: WRITE 2022-01-12 09:07:27 destination > 2022-01-12 09:07:27 INFO i.a.i.b.IntegrationRunner(run):81 - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'} 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO i.a.i.s.r.CdcStateManager():26 - Initialized CDC state with: null 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO i.a.i.s.r.StateManager(createCursorInfoForStream):118 - No cursor field set in catalog but not present in state. Stream: AirbyteStreamNameNamespacePair{name='collection', namespace='mongodb'}, New Cursor Field: null. Resetting cursor value 2022-01-12 09:07:27 destination > 2022-01-12 09:07:27 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-01-12 09:07:27 destination > 2022-01-12 09:07:27 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-01-12 09:07:27 destination > 2022-01-12 09:07:27 INFO i.a.i.d.s.S3FormatConfigs(getS3FormatConfig):22 - S3 format config: {"format_type":"Parquet","page_size_kb":1024,"block_size_mb":128,"compression_codec":"SNAPPY","dictionary_encoding":true,"max_padding_size_mb":8,"dictionary_page_size_kb":1024} 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO c.m.d.l.SLF4JLogger(info):71 - Cluster created with settings {hosts=[127.0.0.1:27017], srvHost=javelin-aws-development.tl0yt.mongodb.net, mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelectionTimeout='30000 ms', requiredReplicaSetName='atlas-dz0fb1-shard-0'} 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO c.m.d.l.SLF4JLogger(info):71 - Adding discovered server javelin-aws-development-shard-00-00.tl0yt.mongodb.net:27017 to client view of cluster 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO c.m.d.l.SLF4JLogger(info):71 - Cluster description not yet available. Waiting for 30000 ms before timing out 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO c.m.d.l.SLF4JLogger(info):71 - Adding discovered server javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017 to client view of cluster 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO c.m.d.l.SLF4JLogger(info):71 - Adding discovered server javelin-aws-development-shard-00-02.tl0yt.mongodb.net:27017 to client view of cluster 2022-01-12 09:07:27 source > 2022-01-12 09:07:27 INFO c.m.d.l.SLF4JLogger(info):71 - No server chosen by com.mongodb.client.internal.MongoClientDelegate$1@79ab3a71 from cluster description ClusterDescription{type=REPLICA_SET, connectionMode=MULTIPLE, serverDescriptions=[ServerDescription{address=javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017, type=UNKNOWN, state=CONNECTING}, ServerDescription{address=javelin-aws-development-shard-00-02.tl0yt.mongodb.net:27017, type=UNKNOWN, state=CONNECTING}, ServerDescription{address=javelin-aws-development-shard-00-00.tl0yt.mongodb.net:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out 2022-01-12 09:07:28 destination > 2022-01-12 09:07:28 INFO i.a.i.d.s.w.ProductionWriterFactory(create):37 - Json schema for stream collection: {"type":"object","properties":{"_id":{"type":"string"},"eta":{"type":"string"},"state":{"type":"string"},"rounds":{"type":"array"},"winner":{"type":"string"},"over_at":{"type":"string"},"version":{"type":"number"},"started_at":{"type":"string"},"rounds_count":{"type":"number"},"current_round":{"type":"string"},"offer_rejects":{"type":"array"}}} 2022-01-12 09:07:28 destination > 2022-01-12 09:07:28 WARN i.a.i.d.s.a.JsonToAvroSchemaConverter(parseSingleType):239 - Array field "rounds" does not specify the items type. It will default to an array of strings 2022-01-12 09:07:28 destination > 2022-01-12 09:07:28 WARN i.a.i.d.s.a.JsonToAvroSchemaConverter(parseSingleType):239 - Array field "offer_rejects" does not specify the items type. It will default to an array of strings 2022-01-12 09:07:28 destination > 2022-01-12 09:07:28 INFO i.a.i.d.s.w.ProductionWriterFactory(create):42 - Avro schema for stream collection: {"type":"record","name":"collection","namespace":"mongodb","fields":[{"name":"_airbyte_ab_id","type":{"type":"string","logicalType":"uuid"}},{"name":"_airbyte_emitted_at","type":{"type":"long","logicalType":"timestamp-millis"}},{"name":"_id","type":["null","string"],"default":null},{"name":"eta","type":["null","string"],"default":null},{"name":"state","type":["null","string"],"default":null},{"name":"rounds","type":["null",{"type":"array","items":["null","string"]}],"default":null},{"name":"winner","type":["null","string"],"default":null},{"name":"over_at","type":["null","string"],"default":null},{"name":"version","type":["null","double"],"default":null},{"name":"started_at","type":["null","string"],"default":null},{"name":"rounds_count","type":["null","double"],"default":null},{"name":"current_round","type":["null","string"],"default":null},{"name":"offer_rejects","type":["null",{"type":"array","items":["null","string"]}],"default":null},{"name":"_airbyte_additional_properties","type":["null",{"type":"map","values":"string"}],"default":null}]} 2022-01-12 09:07:28 destination > 2022-01-12 09:07:28 INFO i.a.i.d.s.p.S3ParquetWriter():55 - Full S3 path for stream 'collection': s3://test-data/raw/airbyte/mongodb/collection/2022_01_12_1641978448241_0.parquet 2022-01-12 09:07:28 destination > 2022-01-12 09:07:28 WARN o.a.h.u.NativeCodeLoader():60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Opened connection [connectionId{localValue:5, serverValue:44596}] to javelin-aws-development-shard-00-02.tl0yt.mongodb.net:27017 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Opened connection [connectionId{localValue:3, serverValue:44469}] to javelin-aws-development-shard-00-00.tl0yt.mongodb.net:27017 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Opened connection [connectionId{localValue:2, serverValue:40651}] to javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Opened connection [connectionId{localValue:4, serverValue:44597}] to javelin-aws-development-shard-00-02.tl0yt.mongodb.net:27017 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Opened connection [connectionId{localValue:1, serverValue:44468}] to javelin-aws-development-shard-00-00.tl0yt.mongodb.net:27017 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Opened connection [connectionId{localValue:6, serverValue:40650}] to javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Monitor thread successfully connected to server with description ServerDescription{address=javelin-aws-development-shard-00-02.tl0yt.mongodb.net:27017, type=REPLICA_SET_SECONDARY, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=9, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=514975300, setName='atlas-dz0fb1-shard-0', canonicalAddress=javelin-aws-development-shard-00-02.tl0yt.mongodb.net:27017, hosts=[javelin-aws-development-shard-00-00.tl0yt.mongodb.net:27017, javelin-aws-development-shard-00-02.tl0yt.mongodb.net:27017, javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017], passives=[], arbiters=[], primary='javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017', tagSet=TagSet{[Tag{name='nodeType', value='ELECTABLE'}, Tag{name='provider', value='AWS'}, Tag{name='region', value='EU_WEST_1'}, Tag{name='workloadType', value='OPERATIONAL'}]}, electionId=null, setVersion=1, topologyVersion=TopologyVersion{processId=61dc6872942ca7df9b7b6d1d, counter=3}, lastWriteDate=Wed Jan 12 09:07:26 UTC 2022, lastUpdateTimeNanos=141800408341600} 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Monitor thread successfully connected to server with description ServerDescription{address=javelin-aws-development-shard-00-00.tl0yt.mongodb.net:27017, type=REPLICA_SET_SECONDARY, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=9, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=515665300, setName='atlas-dz0fb1-shard-0', canonicalAddress=javelin-aws-development-shard-00-00.tl0yt.mongodb.net:27017, hosts=[javelin-aws-development-shard-00-00.tl0yt.mongodb.net:27017, javelin-aws-development-shard-00-02.tl0yt.mongodb.net:27017, javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017], passives=[], arbiters=[], primary='javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017', tagSet=TagSet{[Tag{name='nodeType', value='ELECTABLE'}, Tag{name='provider', value='AWS'}, Tag{name='region', value='EU_WEST_1'}, Tag{name='workloadType', value='OPERATIONAL'}]}, electionId=null, setVersion=1, topologyVersion=TopologyVersion{processId=61dc6844e7dc04edf397ac13, counter=4}, lastWriteDate=Wed Jan 12 09:07:26 UTC 2022, lastUpdateTimeNanos=141800409023500} 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Monitor thread successfully connected to server with description ServerDescription{address=javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=9, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=515014200, setName='atlas-dz0fb1-shard-0', canonicalAddress=javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017, hosts=[javelin-aws-development-shard-00-00.tl0yt.mongodb.net:27017, javelin-aws-development-shard-00-02.tl0yt.mongodb.net:27017, javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017], passives=[], arbiters=[], primary='javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017', tagSet=TagSet{[Tag{name='nodeType', value='ELECTABLE'}, Tag{name='provider', value='AWS'}, Tag{name='region', value='EU_WEST_1'}, Tag{name='workloadType', value='OPERATIONAL'}]}, electionId=7fffffff0000000000000007, setVersion=1, topologyVersion=TopologyVersion{processId=61dc685b1be2fab48641e705, counter=6}, lastWriteDate=Wed Jan 12 09:07:26 UTC 2022, lastUpdateTimeNanos=141800408734500} 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Setting max election id to 7fffffff0000000000000007 from replica set primary javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Setting max set version to 1 from replica set primary javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017 2022-01-12 09:07:28 source > 2022-01-12 09:07:28 INFO c.m.d.l.SLF4JLogger(info):71 - Discovered replica set primary javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017 2022-01-12 09:07:28 destination > 2022-01-12 09:07:28 WARN o.a.h.m.i.MetricsConfig(loadFirst):134 - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2022-01-12 09:07:29 source > 2022-01-12 09:07:29 INFO c.m.d.l.SLF4JLogger(info):71 - Opened connection [connectionId{localValue:7, serverValue:40652}] to javelin-aws-development-shard-00-01.tl0yt.mongodb.net:27017 2022-01-12 09:07:30 destination > 2022-01-12 09:07:30 INFO i.a.i.d.s.w.BaseS3Writer(initialize):78 - Overwrite mode 2022-01-12 09:07:31 destination > 2022-01-12 09:07:30 INFO i.a.i.d.s.w.BaseS3Writer(initialize):87 - Purging non-empty output path for stream 'collection' under OVERWRITE mode... 2022-01-12 09:07:31 destination > 2022-01-12 09:07:31 INFO i.a.i.d.s.w.BaseS3Writer(initialize):91 - Deleted 1 file(s) for stream 'collection'. 2022-01-12 09:07:36 source > 2022-01-12 09:07:36 INFO i.a.i.s.r.AbstractDbSource(lambda$read$2):123 - Closing database connection pool. 2022-01-12 09:07:36 source > 2022-01-12 09:07:36 INFO i.a.i.s.r.AbstractDbSource(lambda$read$2):125 - Closed database connection pool. 2022-01-12 09:07:36 source > 2022-01-12 09:07:36 INFO i.a.i.b.IntegrationRunner(run):133 - Completed integration: io.airbyte.integrations.source.mongodb.MongoDbSource 2022-01-12 09:07:36 source > 2022-01-12 09:07:36 INFO i.a.i.s.m.MongoDbSource(main):66 - completed source: class io.airbyte.integrations.source.mongodb.MongoDbSource 2022-01-12 09:07:36 INFO i.a.w.DefaultReplicationWorker(run):135 - One of source or destination thread complete. Waiting on the other. 2022-01-12 09:07:36 destination > 2022-01-12 09:07:36 WARN i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):58 - Airbyte message consumer: failed. 2022-01-12 09:07:36 destination > 2022-01-12 09:07:36 WARN i.a.i.d.s.w.BaseS3Writer(close):103 - Failure detected. Aborting upload of stream 'collection'... 2022-01-12 09:07:37 destination > Exception in thread "main" tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro: Could not evaluate union, field state is expected to be one of these: NULL, STRING. If this is a complex type, check if offending field: state adheres to schema. 2022-01-12 09:07:37 destination > 2022-01-12 09:07:37 WARN i.a.i.d.s.w.BaseS3Writer(close):105 - Upload of stream 'collection' aborted. 2022-01-12 09:07:37 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:129) 2022-01-12 09:07:37 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:118) 2022-01-12 09:07:37 destination > at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:95) 2022-01-12 09:07:37 destination > at io.airbyte.integrations.destination.s3.avro.AvroRecordFactory.getAvroRecord(AvroRecordFactory.java:39) 2022-01-12 09:07:37 destination > at io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter.write(S3ParquetWriter.java:108) 2022-01-12 09:07:37 destination > at io.airbyte.integrations.destination.s3.S3Consumer.acceptTracked(S3Consumer.java:83) 2022-01-12 09:07:37 destination > at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.accept(FailureTrackingAirbyteMessageConsumer.java:46) 2022-01-12 09:07:37 destination > at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:147) 2022-01-12 09:07:37 destination > at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:128) 2022-01-12 09:07:37 destination > at io.airbyte.integrations.destination.s3.S3Destination.main(S3Destination.java:30) 2022-01-12 09:07:37 destination > Caused by: org.apache.avro.AvroTypeException: Could not evaluate union, field state is expected to be one of these: NULL, STRING. If this is a complex type, check if offending field: state adheres to schema. 2022-01-12 09:07:37 destination > at tech.allegro.schema.json2avro.converter.AvroTypeExceptions.unionException(AvroTypeExceptions.java:28) 2022-01-12 09:07:37 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readUnion(JsonGenericRecordReader.java:289) 2022-01-12 09:07:37 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:196) 2022-01-12 09:07:37 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:170) 2022-01-12 09:07:37 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.lambda$readRecord$0(JsonGenericRecordReader.java:152) 2022-01-12 09:07:37 destination > at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:721) 2022-01-12 09:07:37 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readRecord(JsonGenericRecordReader.java:141) 2022-01-12 09:07:37 destination > at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:127) 2022-01-12 09:07:37 destination > ... 9 more 2022-01-12 09:07:37 ERROR i.a.w.DefaultReplicationWorker(run):141 - Sync worker failed. java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: Destination process exited with non-zero exit code 1 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?] at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:136) ~[io.airbyte-airbyte-workers-0.35.2-alpha.jar:?] at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:49) ~[io.airbyte-airbyte-workers-0.35.2-alpha.jar:?] at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:174) ~[io.airbyte-airbyte-workers-0.35.2-alpha.jar:?] at java.lang.Thread.run(Thread.java:833) [?:?] Suppressed: io.airbyte.workers.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled. at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:119) ~[io.airbyte-airbyte-workers-0.35.2-alpha.jar:?] at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:118) ~[io.airbyte-airbyte-workers-0.35.2-alpha.jar:?] at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:49) ~[io.airbyte-airbyte-workers-0.35.2-alpha.jar:?] at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:174) ~[io.airbyte-airbyte-workers-0.35.2-alpha.jar:?] at java.lang.Thread.run(Thread.java:833) [?:?] Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Destination process exited with non-zero exit code 1 at io.airbyte.workers.DefaultReplicationWorker.lambda$getDestinationOutputRunnable$3(DefaultReplicationWorker.java:260) ~[io.airbyte-airbyte-workers-0.35.2-alpha.jar:?] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] ... 1 more Caused by: java.lang.RuntimeException: Destination process exited with non-zero exit code 1 at io.airbyte.workers.DefaultReplicationWorker.lambda$getDestinationOutputRunnable$3(DefaultReplicationWorker.java:252) ~[io.airbyte-airbyte-workers-0.35.2-alpha.jar:?] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] ... 1 more 2022-01-12 09:07:37 INFO i.a.w.DefaultReplicationWorker(run):165 - sync summary: io.airbyte.config.ReplicationAttemptSummary@14ef070f[status=failed,recordsSynced=4,bytesSynced=6800,startTime=1641978445051,endTime=1641978457565] 2022-01-12 09:07:37 INFO i.a.w.DefaultReplicationWorker(run):174 - Source did not output any state messages 2022-01-12 09:07:37 WARN i.a.w.DefaultReplicationWorker(run):185 - State capture: No state retained. 2022-01-12 09:07:37 INFO i.a.w.t.TemporalAttemptExecution(get):144 - Stopping cancellation check scheduling... 2022-01-12 09:07:37 INFO i.a.w.t.s.ReplicationActivityImpl(replicate):144 - sync summary: io.airbyte.config.StandardSyncOutput@47009e6a[standardSyncSummary=io.airbyte.config.StandardSyncSummary@4ea88c3d[status=failed,recordsSynced=4,bytesSynced=6800,startTime=1641978445051,endTime=1641978457565],state=,outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@4c5b52a8[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@fe8870[stream=io.airbyte.protocol.models.AirbyteStream@434ea5cb[name=collection,jsonSchema={"type":"object","properties":{"_id":{"type":"string"},"eta":{"type":"string"},"state":{"type":"string"},"rounds":{"type":"array"},"winner":{"type":"string"},"over_at":{"type":"string"},"version":{"type":"number"},"started_at":{"type":"string"},"rounds_count":{"type":"number"},"current_round":{"type":"string"},"offer_rejects":{"type":"array"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=mongodb,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[],additionalProperties={}]],additionalProperties={}]] ```
## Steps to Reproduce 1. Run Airbyte, add MongoDB source and S3 Parquet destination 2. Add MongoDB to S3 connection and run sync ## Are you willing to submit a PR?

No

isabelamaro-hotmart commented 1 year ago

I am having the same issue when syncing Jira > S3 with changelog enabled Could not evaluate union, field changelog is expected to be one of these: NULL, STRING. If this is a complex type, check if offending field (path: changelog) adheres to schema: