airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.48k stars 3.99k forks source link

Destination S3: matchingSchema null while running a connector #15831

Open AM-I-Human opened 2 years ago

AM-I-Human commented 2 years ago

Related to: This airbyte post

Environment

Current Behavior

While syncing a 1 record test on a custom python connecto that fetch a dynamic schema, I get the following exception

java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.node.ObjectNode.put(String, String)" because "matchingSchema" is null ```ps 2022-08-12 14:46:56 WARN i.a.c.i.LineGobbler(voidCall):88 - airbyte-source gobbler IOException: Stream closed. Typically happens when cancelling a job. 2022-08-12 14:46:56 destination > 2022-08-12 14:46:56 INFO i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):62 - Airbyte message consumer: succeeded. 2022-08-12 14:46:56 destination > 2022-08-12 14:46:56 INFO i.a.i.d.b.BufferedStreamConsumer(close):171 - executing on success close procedure. 2022-08-12 14:46:56 destination > 2022-08-12 14:46:56 INFO i.a.i.d.r.SerializedBufferingStrategy(flushAll):100 - Flushing all 0 current buffers (0 bytes in total) 2022-08-12 14:46:56 destination > 2022-08-12 14:46:56 INFO i.a.i.b.IntegrationRunner(runInternal):152 - Completed integration: io.airbyte.integrations.destination.s3.S3Destination 2022-08-12 14:46:57 ERROR i.a.w.g.DefaultReplicationWorker(run):184 - Sync worker failed. java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.node.ObjectNode.put(String, String)" because "matchingSchema" is null at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?] at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:177) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at java.lang.Thread.run(Thread.java:1589) [?:?] ```

Expected Behavior

Place a warning or sync fully

Logs

Full log ```ps 2022-08-12 14:45:27 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. errors: $.compression_codec: is missing but it is required, $.format_type: does not have a value in the enumeration [Avro] 2022-08-12 14:45:27 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. errors: $.flattening: is missing but it is required, $.format_type: does not have a value in the enumeration [CSV] 2022-08-12 14:45:27 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. errors: $.compression_type: does not have a value in the enumeration [No Compression] 2022-08-12 14:45:27 INFO i.a.w.t.TemporalAttemptExecution(get):105 - Docker volume job log path: /tmp/workspace/35/0/logs.log 2022-08-12 14:45:27 INFO i.a.w.t.TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.40.0-alpha 2022-08-12 14:45:27 INFO i.a.c.i.LineGobbler(voidCall):83 - Checking if docker.lunalabs.it/airbyte/iassicur-api:0.1.0 exists... 2022-08-12 14:45:27 INFO i.a.c.i.LineGobbler(voidCall):83 - docker.lunalabs.it/airbyte/iassicur-api:0.1.0 was found locally. 2022-08-12 14:45:27 INFO i.a.w.p.DockerProcessFactory(create):108 - Creating docker job ID: 35 2022-08-12 14:45:27 INFO i.a.w.p.DockerProcessFactory(create):163 - Preparing command: docker run --rm --init -i -w /data/35/0 --log-driver none --name iassicur-api-check-35-0-jtxfd --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE= -e WORKER_ENVIRONMENT=DOCKER -e WORKER_CONNECTOR_IMAGE=docker.lunalabs.it/airbyte/iassicur-api:0.1.0 -e WORKER_JOB_ATTEMPT=0 -e AIRBYTE_VERSION=0.40.0-alpha -e WORKER_JOB_ID=35 docker.lunalabs.it/airbyte/iassicur-api:0.1.0 check --config source_config.json 2022-08-12 14:45:29 INFO i.a.w.i.DefaultAirbyteStreamFactory(internalLog):99 - Check succeeded 2022-08-12 14:45:29 INFO i.a.w.t.TemporalAttemptExecution(get):131 - Stopping cancellation check scheduling... 2022-08-12 14:45:29 INFO i.a.w.t.TemporalAttemptExecution(get):105 - Docker volume job log path: /tmp/workspace/35/0/logs.log 2022-08-12 14:45:29 INFO i.a.w.t.TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.40.0-alpha 2022-08-12 14:45:29 INFO i.a.c.i.LineGobbler(voidCall):83 - Checking if airbyte/destination-s3:0.3.13 exists... 2022-08-12 14:45:29 INFO i.a.c.i.LineGobbler(voidCall):83 - airbyte/destination-s3:0.3.13 was found locally. 2022-08-12 14:45:29 INFO i.a.w.p.DockerProcessFactory(create):108 - Creating docker job ID: 35 2022-08-12 14:45:29 INFO i.a.w.p.DockerProcessFactory(create):163 - Preparing command: docker run --rm --init -i -w /data/35/0 --log-driver none --name destination-s3-check-35-0-gvwxr --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE= -e WORKER_ENVIRONMENT=DOCKER -e WORKER_CONNECTOR_IMAGE=airbyte/destination-s3:0.3.13 -e WORKER_JOB_ATTEMPT=0 -e AIRBYTE_VERSION=0.40.0-alpha -e WORKER_JOB_ID=35 airbyte/destination-s3:0.3.13 check --config source_config.json 2022-08-12 14:45:30 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:30 INFO i.a.i.b.IntegrationCliParser(parseOptions):118 - integration args: {check=null, config=source_config.json} 2022-08-12 14:45:30 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:30 INFO i.a.i.b.IntegrationRunner(runInternal):104 - Running integration: io.airbyte.integrations.destination.s3.S3Destination 2022-08-12 14:45:30 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:30 INFO i.a.i.b.IntegrationRunner(runInternal):105 - Command: CHECK 2022-08-12 14:45:30 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:30 INFO i.a.i.b.IntegrationRunner(runInternal):106 - Integration config: IntegrationConfig{command=CHECK, configPath='source_config.json', catalogPath='null', statePath='null'} 2022-08-12 14:45:30 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:30 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-08-12 14:45:30 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:30 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-08-12 14:45:30 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:30 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-08-12 14:45:30 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:30 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword requires - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-08-12 14:45:30 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:30 INFO i.a.i.d.s.S3FormatConfigs(getS3FormatConfig):22 - S3 format config: {"compression":{"compression_type":"GZIP"},"format_type":"JSONL"} 2022-08-12 14:45:30 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:30 INFO i.a.i.d.s.S3DestinationConfig(createS3Client):190 - Creating S3 client... 2022-08-12 14:45:31 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:31 INFO i.a.i.d.s.S3StorageOperations(createBucketObjectIfNotExists):102 - Storage Object luca-rca-datalake-test/luca-rca-datalake-test does not exist in bucket; creating... 2022-08-12 14:45:31 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:31 INFO i.a.i.d.s.S3StorageOperations(createBucketObjectIfNotExists):104 - Storage Object luca-rca-datalake-test/luca-rca-datalake-test has been created in bucket. 2022-08-12 14:45:31 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:31 INFO i.a.i.d.s.S3Destination(testIAMUserHasListObjectPermission):155 - Started testing if IAM user can call listObjects on the destination bucket 2022-08-12 14:45:31 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:31 INFO i.a.i.d.s.S3Destination(testIAMUserHasListObjectPermission):158 - Finished checking for listObjects permission 2022-08-12 14:45:31 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:31 INFO i.a.i.d.s.S3Destination(testSingleUpload):81 - Started testing if all required credentials assigned to user for single file uploading 2022-08-12 14:45:32 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:32 INFO i.a.i.d.s.S3Destination(testSingleUpload):91 - Finished checking for normal upload mode 2022-08-12 14:45:32 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:32 INFO i.a.i.d.s.S3Destination(testMultipartUpload):95 - Started testing if all required credentials assigned to user for multipart upload 2022-08-12 14:45:32 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:32 INFO a.m.s.StreamTransferManager(getMultiPartOutputStreams):329 - Initiated multipart upload to luca-rca-datalake-test/PREPROD/test_1660315532001 with full ID qh1qmwqIS0NEgXuju2SQatWfD_4fPkibZ45ufbcNwa07hLn9TFSa49a5PyqsOIkPxnufYI2b1rcrIUEMG.05E.dK.yqsYgnUq_mAAtMxc.WPkW4H.dPAE0cLIUU4c4RX 2022-08-12 14:45:32 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:32 INFO a.m.s.MultiPartOutputStream(close):158 - Called close() on [MultipartOutputStream for parts 1 - 10000] 2022-08-12 14:45:32 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:32 INFO a.m.s.MultiPartOutputStream(close):158 - Called close() on [MultipartOutputStream for parts 1 - 10000] 2022-08-12 14:45:32 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:32 WARN a.m.s.MultiPartOutputStream(close):160 - [MultipartOutputStream for parts 1 - 10000] is already closed 2022-08-12 14:45:32 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:32 INFO a.m.s.StreamTransferManager(complete):367 - [Manager uploading to luca-rca-datalake-test/PREPROD/test_1660315532001 with id qh1qmwqIS...LIUU4c4RX]: Uploading leftover stream [Part number 1 containing 3.34 MB] 2022-08-12 14:45:33 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:33 INFO a.m.s.StreamTransferManager(uploadStreamPart):558 - [Manager uploading to luca-rca-datalake-test/PREPROD/test_1660315532001 with id qh1qmwqIS...LIUU4c4RX]: Finished uploading [Part number 1 containing 3.34 MB] 2022-08-12 14:45:33 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:33 INFO a.m.s.StreamTransferManager(complete):395 - [Manager uploading to luca-rca-datalake-test/PREPROD/test_1660315532001 with id qh1qmwqIS...LIUU4c4RX]: Completed 2022-08-12 14:45:33 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:33 INFO i.a.i.d.s.S3Destination(testMultipartUpload):119 - Finished verification for multipart upload mode 2022-08-12 14:45:34 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-12 14:45:34 INFO i.a.i.b.IntegrationRunner(runInternal):152 - Completed integration: io.airbyte.integrations.destination.s3.S3Destination 2022-08-12 14:45:34 INFO i.a.w.t.TemporalAttemptExecution(get):131 - Stopping cancellation check scheduling... 2022-08-12 14:45:34 INFO i.a.w.t.TemporalAttemptExecution(get):105 - Docker volume job log path: /tmp/workspace/35/0/logs.log 2022-08-12 14:45:34 INFO i.a.w.t.TemporalAttemptExecution(get):110 - Executing worker wrapper. Airbyte version: 0.40.0-alpha 2022-08-12 14:45:34 INFO i.a.c.EnvConfigs(getEnvOrDefault):977 - Using default value for environment variable METRIC_CLIENT: '' 2022-08-12 14:45:34 WARN i.a.m.l.MetricClientFactory(initialize):60 - Metric client is already initialized to 2022-08-12 14:45:34 INFO i.a.w.g.DefaultReplicationWorker(run):119 - start sync worker. job id: 35 attempt id: 0 2022-08-12 14:45:34 INFO i.a.w.g.DefaultReplicationWorker(run):131 - configured sync modes: {Iassicur_mach1.DRE=incremental - append} 2022-08-12 14:45:34 INFO i.a.w.i.DefaultAirbyteDestination(start):69 - Running destination... 2022-08-12 14:45:34 INFO i.a.c.i.LineGobbler(voidCall):83 - Checking if airbyte/destination-s3:0.3.13 exists... 2022-08-12 14:45:34 INFO i.a.c.i.LineGobbler(voidCall):83 - airbyte/destination-s3:0.3.13 was found locally. 2022-08-12 14:45:34 INFO i.a.w.p.DockerProcessFactory(create):108 - Creating docker job ID: 35 2022-08-12 14:45:34 INFO i.a.w.p.DockerProcessFactory(create):163 - Preparing command: docker run --rm --init -i -w /data/35/0 --log-driver none --name destination-s3-write-35-0-jxjzj --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE= -e WORKER_ENVIRONMENT=DOCKER -e WORKER_CONNECTOR_IMAGE=airbyte/destination-s3:0.3.13 -e WORKER_JOB_ATTEMPT=0 -e AIRBYTE_VERSION=0.40.0-alpha -e WORKER_JOB_ID=35 airbyte/destination-s3:0.3.13 write --config destination_config.json --catalog destination_catalog.json 2022-08-12 14:45:34 INFO i.a.c.i.LineGobbler(voidCall):83 - Checking if docker.lunalabs.it/airbyte/iassicur-api:0.1.0 exists... 2022-08-12 14:45:34 INFO i.a.c.i.LineGobbler(voidCall):83 - docker.lunalabs.it/airbyte/iassicur-api:0.1.0 was found locally. 2022-08-12 14:45:34 INFO i.a.w.p.DockerProcessFactory(create):108 - Creating docker job ID: 35 2022-08-12 14:45:34 INFO i.a.w.p.DockerProcessFactory(create):163 - Preparing command: docker run --rm --init -i -w /data/35/0 --log-driver none --name iassicur-api-read-35-0-prcsq --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE= -e WORKER_ENVIRONMENT=DOCKER -e WORKER_CONNECTOR_IMAGE=docker.lunalabs.it/airbyte/iassicur-api:0.1.0 -e WORKER_JOB_ATTEMPT=0 -e AIRBYTE_VERSION=0.40.0-alpha -e WORKER_JOB_ID=35 docker.lunalabs.it/airbyte/iassicur-api:0.1.0 read --config source_config.json --catalog source_catalog.json 2022-08-12 14:45:34 INFO i.a.w.g.DefaultReplicationWorker(run):173 - Waiting for source and destination threads to complete. 2022-08-12 14:45:34 INFO i.a.w.g.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$7):411 - Destination output thread started. 2022-08-12 14:45:34 INFO i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):303 - Replication thread started. 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 INFO i.a.i.b.IntegrationCliParser(parseOptions):118 - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json} 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 INFO i.a.i.b.IntegrationRunner(runInternal):104 - Running integration: io.airbyte.integrations.destination.s3.S3Destination 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 INFO i.a.i.b.IntegrationRunner(runInternal):105 - Command: WRITE 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 INFO i.a.i.b.IntegrationRunner(runInternal):106 - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'} 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword requires - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 INFO i.a.i.d.s.S3FormatConfigs(getS3FormatConfig):22 - S3 format config: {"compression":{"compression_type":"GZIP"},"format_type":"JSONL"} 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 INFO i.a.i.d.s.S3DestinationConfig(createS3Client):190 - Creating S3 client... 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 INFO i.a.i.d.s.SerializedBufferFactory(getCreateFunction):49 - S3 format config: S3JsonlFormatConfig{, compression=GZIP} 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 INFO i.a.i.d.s.S3ConsumerFactory(lambda$toWriteConfig$0):83 - Write config: WriteConfig{streamName=test/DRE, namespace=Iassicur_mach1, outputBucketPath=PREPROD, pathFormat=PREPROD/${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_, fullOutputPath=PREPROD/Iassicur_mach1/test/DRE/2022_08_12_1660315535528_, syncMode=append} 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 INFO i.a.i.d.b.BufferedStreamConsumer(startTracked):116 - class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started. 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 INFO i.a.i.d.s.S3ConsumerFactory(lambda$onStartFunction$1):90 - Preparing bucket in destination started for 1 streams 2022-08-12 14:45:35 destination > 2022-08-12 14:45:35 INFO i.a.i.d.s.S3ConsumerFactory(lambda$onStartFunction$1):104 - Preparing storage area in destination completed. 2022-08-12 14:45:36 source > Starting syncing SourceIassicurApi 2022-08-12 14:45:36 source > Syncing stream: DRE 2022-08-12 14:45:41 source > {'SQL': 'SELECT TOP 1 *, NUMERORECORD FROM DRE ORDER BY NUMERORECORD'} 2022-08-12 14:45:41 source > Backing off _send(...) for 5.0s (requests.exceptions.ConnectionError: HTTPSConnectionPool(host='mach1.iassicur.cloud', port=443): Max retries exceeded with url: /IServiceExec.asmx/Query?SQL=SELECT+TOP+1+%2A%2C+NUMERORECORD+FROM+DRE+ORDER+BY+NUMERORECORD (Caused by NewConnectionError(': Failed to establish a new connection: [Errno -3] Try again'))) 2022-08-12 14:45:41 source > Caught retryable error 'HTTPSConnectionPool(host='mach1.iassicur.cloud', port=443): Max retries exceeded with url: /IServiceExec.asmx/Query?SQL=SELECT+TOP+1+%2A%2C+NUMERORECORD+FROM+DRE+ORDER+BY+NUMERORECORD (Caused by NewConnectionError(': Failed to establish a new connection: [Errno -3] Try again'))' after 1 tries. Waiting 5 seconds then retrying... 2022-08-12 14:46:56 WARN i.a.c.i.LineGobbler(voidCall):88 - airbyte-source gobbler IOException: Stream closed. Typically happens when cancelling a job. 2022-08-12 14:46:56 destination > 2022-08-12 14:46:56 INFO i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):62 - Airbyte message consumer: succeeded. 2022-08-12 14:46:56 destination > 2022-08-12 14:46:56 INFO i.a.i.d.b.BufferedStreamConsumer(close):171 - executing on success close procedure. 2022-08-12 14:46:56 destination > 2022-08-12 14:46:56 INFO i.a.i.d.r.SerializedBufferingStrategy(flushAll):100 - Flushing all 0 current buffers (0 bytes in total) 2022-08-12 14:46:56 destination > 2022-08-12 14:46:56 INFO i.a.i.b.IntegrationRunner(runInternal):152 - Completed integration: io.airbyte.integrations.destination.s3.S3Destination 2022-08-12 14:46:57 ERROR i.a.w.g.DefaultReplicationWorker(run):184 - Sync worker failed. java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.node.ObjectNode.put(String, String)" because "matchingSchema" is null at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?] at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:177) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at java.lang.Thread.run(Thread.java:1589) [?:?] Suppressed: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled. at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:141) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at java.lang.Thread.run(Thread.java:1589) [?:?] Caused by: java.lang.RuntimeException: java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.node.ObjectNode.put(String, String)" because "matchingSchema" is null at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:368) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] ... 1 more Caused by: java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.node.ObjectNode.put(String, String)" because "matchingSchema" is null at io.airbyte.workers.RecordSchemaValidator.validateSchema(RecordSchemaValidator.java:48) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at io.airbyte.workers.general.DefaultReplicationWorker.validateSchema(DefaultReplicationWorker.java:389) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:317) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] ... 1 more 2022-08-12 14:46:57 INFO i.a.w.g.DefaultReplicationWorker(run):243 - sync summary: io.airbyte.config.ReplicationAttemptSummary@21056739[status=failed,recordsSynced=0,bytesSynced=0,startTime=1660315534230,endTime=1660315617085,totalStats=io.airbyte.config.SyncStats@6757f343[recordsEmitted=0,bytesEmitted=0,stateMessagesEmitted=0,recordsCommitted=0],streamStats=[]] 2022-08-12 14:46:57 INFO i.a.w.g.DefaultReplicationWorker(run):272 - Source did not output any state messages 2022-08-12 14:46:57 WARN i.a.w.g.DefaultReplicationWorker(run):283 - State capture: No state retained. 2022-08-12 14:46:57 INFO i.a.w.t.TemporalAttemptExecution(get):131 - Stopping cancellation check scheduling... 2022-08-12 14:46:57 INFO i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):165 - sync summary: io.airbyte.config.StandardSyncOutput@49e737b3[standardSyncSummary=io.airbyte.config.StandardSyncSummary@6f021b8c[status=failed,recordsSynced=0,bytesSynced=0,startTime=1660315534230,endTime=1660315617085,totalStats=io.airbyte.config.SyncStats@6757f343[recordsEmitted=0,bytesEmitted=0,stateMessagesEmitted=0,recordsCommitted=0],streamStats=[]],normalizationSummary=,state=,outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@11ac5de[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@718ebdf4[stream=io.airbyte.protocol.models.AirbyteStream@1a456d9[name=test/DRE,jsonSchema={"type":"object","properties":{"CAP":{"type":["null","string"]},"CEV":{"type":["null","string"]},"DRE":{"type":["null","string"]},"NDG":{"type":["null","string"]},"SSN":{"type":["null","string"]},"ACCE":{"type":["null","string"]},"ANTE":{"type":["null","string"]},"ITER":{"type":["null","string"]},"MORA":{"type":["null","string"]},"NOTA":{"type":["null","string"]},"POST":{"type":["null","string"]},"RAMO":{"type":["null","string"]},"RATA":{"type":["null","string"]},"FILES":{"type":["null","string"]},"LOBBY":{"type":["null","string"]},"MEZZO":{"type":["null","string"]},"NETTO":{"type":["null","string"]},"NOTA2":{"type":["null","string"]},"NOTA3":{"type":["null","string"]},"NOTA4":{"type":["null","string"]},"SERIE":{"type":["null","string"]},"STATO":{"type":["null","string"]},"TARGA":{"type":["null","string"]},"TASSE":{"type":["null","string"]},"ARRIVO":{"type":["null","string"]},"AVVISO":{"type":["null","string"]},"CITTA'":{"type":["null","string"]},"GRUPPO":{"type":["null","string"]},"INDICE":{"type":["null","string"]},"INIZIO":{"type":["null","string"]},"NOMEB1":{"type":["null","string"]},"NOMEB2":{"type":["null","string"]},"NOMEB3":{"type":["null","string"]},"NOMEB4":{"type":["null","string"]},"NOTAB1":{"type":["null","string"]},"NOTAB2":{"type":["null","string"]},"NOTAB3":{"type":["null","string"]},"NOTAB4":{"type":["null","string"]},"NUMERO":{"type":["null","string"]},"PREMIO":{"type":["null","string"]},"TASSE%":{"type":["null","string"]},"TASSO1":{"type":["null","string"]},"TASSO2":{"type":["null","string"]},"TASSO3":{"type":["null","string"]},"TASSO4":{"type":["null","string"]},"TASSO5":{"type":["null","string"]},"VALUTA":{"type":["null","string"]},"CLIENTE":{"type":["null","string"]},"DIRITTI":{"type":["null","string"]},"INDICE%":{"type":["null","string"]},"NAZIONE":{"type":["null","string"]},"ORIGINE":{"type":["null","string"]},"PAGANTE":{"type":["null","string"]},"PRIVACY":{"type":["null","string"]},"PY-HPKW":{"type":["null","string"]},"PY-NOME":{"type":["null","string"]},"PY-TIPO":{"type":["null","string"]},"REPARTO":{"type":["null","string"]},"RISCHIO":{"type":["null","string"]},"RUBRICA":{"type":["null","string"]},"SESSOB1":{"type":["null","string"]},"SESSOB2":{"type":["null","string"]},"SESSOB3":{"type":["null","string"]},"SESSOB4":{"type":["null","string"]},"TECNICO":{"type":["null","string"]},"VIAGGIO":{"type":["null","string"]},"VINCOLO":{"type":["null","string"]},"CAUZIONE":{"type":["null","string"]},"CLIENTE2":{"type":["null","string"]},"DATAITER":{"type":["null","string"]},"EDIZIONE":{"type":["null","string"]},"FLAGRAMO":{"type":["null","string"]},"IBAN-ALT":{"type":["null","string"]},"IMPORTO1":{"type":["null","string"]},"IMPORTO2":{"type":["null","string"]},"IMPORTO3":{"type":["null","string"]},"IMPORTO4":{"type":["null","string"]},"IMPORTO5":{"type":["null","string"]},"MITTENTE":{"type":["null","string"]},"MODOPREL":{"type":["null","string"]},"NOME-INT":{"type":["null","string"]},"NOMEPROD":{"type":["null","string"]},"PAGARID?":{"type":["null","string"]},"PARTENZA":{"type":["null","string"]},"PY-CORPO":{"type":["null","string"]},"PY-LUNGH":{"type":["null","string"]},"RAMOBASE":{"type":["null","string"]},"RAMOFLAG":{"type":["null","string"]},"RECLAMO?":{"type":["null","string"]},"RISCHIO2":{"type":["null","string"]},"SCADENZA":{"type":["null","string"]},"SPESEAMM":{"type":["null","string"]},"USA-PEC?":{"type":["null","string"]},"WORKFLOW":{"type":["null","string"]},"ZONAPROD":{"type":["null","string"]},"ACCESSORI":{"type":["null","string"]},"CODICECIG":{"type":["null","string"]},"CODICECUP":{"type":["null","string"]},"COMPAGNIA":{"type":["null","string"]},"COPERTURA":{"type":["null","string"]},"EMAILPROD":{"type":["null","string"]},"FLAGLINEA":{"type":["null","string"]},"GARANZIA1":{"type":["null","string"]},"GARANZIA2":{"type":["null","string"]},"GARANZIA3":{"type":["null","string"]},"GARANZIA4":{"type":["null","string"]},"GARANZIA5":{"type":["null","string"]},"GARANZIA6":{"type":["null","string"]},"GARANZIA7":{"type":["null","string"]},"GARANZIA8":{"type":["null","string"]},"GARANZIA9":{"type":["null","string"]},"GARANZIE?":{"type":["null","string"]},"INDIRIZZO":{"type":["null","string"]},"IPERTESTO":{"type":["null","string"]},"POSIZIONE":{"type":["null","string"]},"PRODUTT01":{"type":["null","string"]},"PRODUTT02":{"type":["null","string"]},"PRODUTT03":{"type":["null","string"]},"PRODUTT04":{"type":["null","string"]},"PRODUTT05":{"type":["null","string"]},"PRODUTT06":{"type":["null","string"]},"PROVINCIA":{"type":["null","string"]},"PY-MOTORE":{"type":["null","string"]},"PY-NUMERO":{"type":["null","string"]},"PY-STAZZA":{"type":["null","string"]},"PY-TENDER":{"type":["null","string"]},"PY-TOTALE":{"type":["null","string"]},"QUIETANZA":{"type":["null","string"]},"STAT-FLAG":{"type":["null","string"]},"STAT-TIPO":{"type":["null","string"]},"TOTPREDOV":{"type":["null","string"]},"VENDITORE":{"type":["null","string"]},"ACCESSORI2":{"type":["null","string"]},"ASSICURATO":{"type":["null","string"]},"CC-BENEFIC":{"type":["null","string"]},"DURATAANNI":{"type":["null","string"]},"GIORNIMORA":{"type":["null","string"]},"GRUPPOPROD":{"type":["null","string"]},"IMPONIBILE":{"type":["null","string"]},"INDIRIZZO2":{"type":["null","string"]},"PARTITAIVA":{"type":["null","string"]},"POG-TARGET":{"type":["null","string"]},"PREVENTIVO":{"type":["null","string"]},"PRODUTTORE":{"type":["null","string"]},"RANGE-QUIE":{"type":["null","string"]},"RATAREGOLA":{"type":["null","string"]},"RIDATTIVO?":{"type":["null","string"]},"STAT-DATA1":{"type":["null","string"]},"STAT-DATA2":{"type":["null","string"]},"TRATTATIVA":{"type":["null","string"]},"ABI-BENEFIC":{"type":["null","string"]},"ANNUOREGOLA":{"type":["null","string"]},"ANTIRACKET%":{"type":["null","string"]},"ASSICURATO2":{"type":["null","string"]},"CAB-BENEFIC":{"type":["null","string"]},"CARICATO-DA":{"type":["null","string"]},"CIN-BENEFIC":{"type":["null","string"]},"COMMERCIALE":{"type":["null","string"]},"DATA-DEROGA":{"type":["null","string"]},"DATAANNULLO":{"type":["null","string"]},"DATAINIPREL":{"type":["null","string"]},"DATARISERVA":{"type":["null","string"]},"DATAULTPREL":{"type":["null","string"]},"DATAVINCOLO":{"type":["null","string"]},"DESCRIZIONE":{"type":["null","string"]},"DICHIARAZ-3":{"type":["null","string"]},"DICHIARAZ-4":{"type":["null","string"]},"FREQUENPREL":{"type":["null","string"]},"IMPORTOPREL":{"type":["null","string"]},"INDIRIZZOB1":{"type":["null","string"]},"INDIRIZZOB2":{"type":["null","string"]},"INDIRIZZOB3":{"type":["null","string"]},"INDIRIZZOB4":{"type":["null","string"]},"INFORMISVAP":{"type":["null","string"]},"POL-DEROGA?":{"type":["null","string"]},"PREMIOANNUO":{"type":["null","string"]},"PY-BANDIERA":{"type":["null","string"]},"PY-MAXVELOC":{"type":["null","string"]},"RANGE-QUIE2":{"type":["null","string"]},"RANGE-QUIER":{"type":["null","string"]},"SCOPERTO%-T":{"type":["null","string"]},"SEGNALATORE":{"type":["null","string"]},"STAT-VALORE":{"type":["null","string"]},"TIPOVINCOLO":{"type":["null","string"]},"ALTRO-COLLAB":{"type":["null","string"]},"BENEFICIARI?":{"type":["null","string"]},"BENEFICIARIO":{"type":["null","string"]},"CODICE-VENDI":{"type":["null","string"]},"CODICECONVER":{"type":["null","string"]},"CONTRAENZAFP":{"type":["null","string"]},"DATA-ADEGUAT":{"type":["null","string"]},"DATA-RECLAMO":{"type":["null","string"]},"DATADISDETTA":{"type":["null","string"]},"DATASVINCOLO":{"type":["null","string"]},"DESTINATARIO":{"type":["null","string"]},"FLAGEDIZIONE":{"type":["null","string"]},"FRANCHIGIA-T":{"type":["null","string"]},"IBAN-BENEFIC":{"type":["null","string"]},"INIZIOREGOLA":{"type":["null","string"]},"LEGALE-RAPPR":{"type":["null","string"]},"NUMAPPENDICI":{"type":["null","string"]},"PERCENTUALE%":{"type":["null","string"]},"PERIODICITA'":{"type":["null","string"]},"POG-PRODOTTO":{"type":["null","string"]},"POL-ADEGUAT?":{"type":["null","string"]},"PROVVSUIMPO1":{"type":["null","string"]},"PROVVSUIMPO2":{"type":["null","string"]},"PROVVSUPROVV":{"type":["null","string"]},"PY-CVFISCALI":{"type":["null","string"]},"PY-VELEEXTRA":{"type":["null","string"]},"REGOLAZIONE1":{"type":["null","string"]},"REGOLAZIONE2":{"type":["null","string"]},"REGOLAZIONE3":{"type":["null","string"]},"REGOLAZIONE4":{"type":["null","string"]},"REGOLAZIONE5":{"type":["null","string"]},"REGOLAZIONE6":{"type":["null","string"]},"REGOLAZIONE7":{"type":["null","string"]},"REGOLAZIONE8":{"type":["null","string"]},"REGOLAZIONE9":{"type":["null","string"]},"REGOLAZIONE?":{"type":["null","string"]},"SOMMAIMPORTI":{"type":["null","string"]},"SSNALLAFIRMA":{"type":["null","string"]},"TASSEDALRAMO":{"type":["null","string"]},"TASSEDIRITTI":{"type":["null","string"]},"TELEFONOPROD":{"type":["null","string"]},"ACCEALLAFIRMA":{"type":["null","string"]},"ANONIMIZZATO?":{"type":["null","string"]},"ANTICIPODELE%":{"type":["null","string"]},"BANCA-BENEFIC":{"type":["null","string"]},"C/C-COMPAGNIA":{"type":["null","string"]},"CODICEFISCALE":{"type":["null","string"]},"CONTOCORRENTE":{"type":["null","string"]},"DATACOPERTURA":{"type":["null","string"]},"DATAEMISSIONE":{"type":["null","string"]},"DATAREVISIONE":{"type":["null","string"]},"FONDIDAANNULL":{"type":["null","string"]},"GRUPPOPOLIZZA":{"type":["null","string"]},"IMPOALLAFIRMA":{"type":["null","string"]},"INVIOCLIENTE?":{"type":["null","string"]},"MODOPAGAMENTO":{"type":["null","string"]},"OPZIONE-ASSIC":{"type":["null","string"]},"OPZIONE-PROVV":{"type":["null","string"]},"ORADECORRENZA":{"type":["null","string"]},"PERCENTUALEB1":{"type":["null","string"]},"PERCENTUALEB2":{"type":["null","string"]},"PERCENTUALEB3":{"type":["null","string"]},"PERCENTUALEB4":{"type":["null","string"]},"POLIZZA-YACHT":{"type":["null","string"]},"PROVVPRECONTO":{"type":["null","string"]},"PY-CANTIMODEL":{"type":["null","string"]},"PY-MATRICOLA1":{"type":["null","string"]},"PY-MATRICOLA2":{"type":["null","string"]},"PY-PERIODMESI":{"type":["null","string"]},"QUALIFICA-INT":{"type":["null","string"]},"QUEST-ADEGUAT":{"type":["null","string"]},"RAMOCOMPAGNIA":{"type":["null","string"]},"REGOLAZIONE10":{"type":["null","string"]},"SCADENZA-CALC":{"type":["null","string"]},"SCUDOFISCALE?":{"type":["null","string"]},"SPESEAMMFIRMA":{"type":["null","string"]},"STATODISDETTA":{"type":["null","string"]},"TASSEDALRAMO2":{"type":["null","string"]},"VALORERISERVA":{"type":["null","string"]},"VALUTADOSSIER":{"type":["null","string"]},"DATAMORAREGOLA":{"type":["null","string"]},"DATAREGISTRATA":{"type":["null","string"]},"FRANCHIGIA-PER":{"type":["null","string"]},"GIORNIDISDETTA":{"type":["null","string"]},"IBAN-PAESE-ALT":{"type":["null","string"]},"IBAN-PAESE-BEN":{"type":["null","string"]},"IMPORTOETANAVE":{"type":["null","string"]},"MIN-SCOPERTO-T":{"type":["null","string"]},"MODADEGUATEZZA":{"type":["null","string"]},"OPZIONE-CODICE":{"type":["null","string"]},"OPZIONE-FINANZ":{"type":["null","string"]},"PY-ANNOCOSTRU1":{"type":["null","string"]},"PY-ANNOCOSTRU2":{"type":["null","string"]},"PY-DOTAZIEXTRA":{"type":["null","string"]},"PY-EQUIPAGGIO?":{"type":["null","string"]},"PY-MOTOREFUORI":{"type":["null","string"]},"PY-NISCRIZIONE":{"type":["null","string"]},"PY-PERIODDATA1":{"type":["null","string"]},"PY-PERIODDATA2":{"type":["null","string"]},"PY-PERIODDATA3":{"type":["null","string"]},"PY-PERIODDATA4":{"type":["null","string"]},"QUIETANZAANNUA":{"type":["null","string"]},"RINNOFINEGGSCA":{"type":["null","string"]},"SCADENZAREGOLA":{"type":["null","string"]},"SOMMAASSICURA1":{"type":["null","string"]},"SOMMAASSICURA2":{"type":["null","string"]},"SOMMAGARANZIA1":{"type":["null","string"]},"SOMMAGARANZIA2":{"type":["null","string"]},"SOMMAGARANZIA3":{"type":["null","string"]},"SOMMAGARANZIA4":{"type":["null","string"]},"SOMMAGARANZIA5":{"type":["null","string"]},"SOMMAGARANZIA6":{"type":["null","string"]},"SOMMAGARANZIA7":{"type":["null","string"]},"SOMMAGARANZIA8":{"type":["null","string"]},"SOMMAGARANZIA9":{"type":["null","string"]},"STATIS-DOSSIER":{"type":["null","string"]},"TASSEALLAFIRMA":{"type":["null","string"]},"TIPOCONSULENZA":{"type":["null","string"]},"TOTALERISCATTO":{"type":["null","string"]},"TOTPREDOVFIRMA":{"type":["null","string"]},"ULTIMASCADENZA":{"type":["null","string"]},"VALORECAPITALE":{"type":["null","string"]},"ABI-ALTERNATIVO":{"type":["null","string"]},"AGG-ANTIRACKET?":{"type":["null","string"]},"C/C-ALTERNATIVO":{"type":["null","string"]},"CAB-ALTERNATIVO":{"type":["null","string"]},"CIN-ALTERNATIVO":{"type":["null","string"]},"COASSICURAZIONE":{"type":["null","string"]},"CODICEFISCALEB1":{"type":["null","string"]},"CODICEFISCALEB2":{"type":["null","string"]},"CODICEFISCALEB3":{"type":["null","string"]},"CODICEFISCALEB4":{"type":["null","string"]},"CODICEFISCALEBE":{"type":["null","string"]},"CONTOCORRENTEPR":{"type":["null","string"]},"DATADINASCITAB1":{"type":["null","string"]},"DATADINASCITAB2":{"type":["null","string"]},"DATADINASCITAB3":{"type":["null","string"]},"DATADINASCITAB4":{"type":["null","string"]},"DATAINVDISDETTA":{"type":["null","string"]},"DATASOSPENSIONE":{"type":["null","string"]},"DATAULTIMAREGOL":{"type":["null","string"]},"DESCRIZIONESEGN":{"type":["null","string"]},"DIRITTALLAFIRMA":{"type":["null","string"]},"GIORNIMORAFIRMA":{"type":["null","string"]},"GIORNIMORAPAREG":{"type":["null","string"]},"GIORNIMORAREGOL":{"type":["null","string"]},"IBAN-CHKDGT-ALT":{"type":["null","string"]},"IBAN-CHKDGT-BEN":{"type":["null","string"]},"IMPOULTIMAREGOL":{"type":["null","string"]},"INDIRBANCA-BENE":{"type":["null","string"]},"LINEAINVESTIMEN":{"type":["null","string"]},"NOTA2PRODUTTORE":{"type":["null","string"]},"NUMEROPOSIZIONE":{"type":["null","string"]},"OGGETTOEDIZIONE":{"type":["null","string"]},"OPZIONE-NEWFIN?":{"type":["null","string"]},"PERCENTUALERATA":{"type":["null","string"]},"POLIZZAABBINATA":{"type":["null","string"]},"PREMIOALLAFIRMA":{"type":["null","string"]},"PREMIOGARANZIA1":{"type":["null","string"]},"PREMIOGARANZIA2":{"type":["null","string"]},"PREMIOGARANZIA3":{"type":["null","string"]},"PREMIOGARANZIA4":{"type":["null","string"]},"PREMIOGARANZIA5":{"type":["null","string"]},"PREMIOGARANZIA6":{"type":["null","string"]},"PREMIOGARANZIA7":{"type":["null","string"]},"PREMIOGARANZIA8":{"type":["null","string"]},"PREMIOGARANZIA9":{"type":["null","string"]},"PROVVSUPROVV01%":{"type":["null","string"]},"PROVVSUPROVV02%":{"type":["null","string"]},"PROVVSUPROVV03%":{"type":["null","string"]},"PROVVSUPROVV04%":{"type":["null","string"]},"PROVVSUPROVV05%":{"type":["null","string"]},"PROVVSUPROVV06%":{"type":["null","string"]},"QUIETANZAREGOLA":{"type":["null","string"]},"RIPARTIZ-PROVV?":{"type":["null","string"]},"RISCHIOSPECIALE":{"type":["null","string"]},"TIPORAMOPOLIZZA":{"type":["null","string"]},"TRATTATIVA-ORIG":{"type":["null","string"]},"ULTIMOPAGAMENTO":{"type":["null","string"]}}},supportedSyncModes=[incremental],sourceDefinedCursor=true,defaultCursorField=[DATAORAPRIMAMODIFICA],sourceDefinedPrimaryKey=[[NUMERORECORD]],namespace=Iassicur_mach1,additionalProperties={}],syncMode=incremental,cursorField=[DATAORAPRIMAMODIFICA],destinationSyncMode=append,primaryKey=[[NUMERORECORD]],additionalProperties={}]],additionalProperties={}],failures=[io.airbyte.config.FailureReason@7e85e31a[failureOrigin=replication,failureType=,internalMessage=java.lang.RuntimeException: java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.node.ObjectNode.put(String, String)" because "matchingSchema" is null,externalMessage=Something went wrong during replication,metadata=io.airbyte.config.Metadata@29e6ed21[additionalProperties={attemptNumber=0, jobId=35}],stacktrace=java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.node.ObjectNode.put(String, String)" because "matchingSchema" is null at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1589) Caused by: java.lang.RuntimeException: java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.node.ObjectNode.put(String, String)" because "matchingSchema" is null at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:368) at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ... 3 more Caused by: java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.node.ObjectNode.put(String, String)" because "matchingSchema" is null at io.airbyte.workers.RecordSchemaValidator.validateSchema(RecordSchemaValidator.java:48) at io.airbyte.workers.general.DefaultReplicationWorker.validateSchema(DefaultReplicationWorker.java:389) at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:317) ... 4 more ,retryable=,timestamp=1660315556972]]] 2022-08-12 14:46:57 INFO i.a.w.t.TemporalUtils(withBackgroundHeartbeat):312 - Stopping temporal heartbeating... ```

Are you willing to submit a PR?

I find hard to deal with this repo, too complex (I'm not super with java) logs-35.txt

harshithmullapudi commented 2 years ago

Hey I can also see there is error

Backing off _send(...) for 5.0s (requests.exceptions.ConnectionError: HTTPSConnectionPool(host='mach1.iassicur.cloud', port=443): Max retries exceeded with url: /IServiceExec.asmx/Query?SQL=SELECT+TOP+1+%2A%2C+NUMERORECORD+FROM+DRE+ORDER+BY+NUMERORECORD (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f93c414c610>: Failed to establish a new connection: [Errno -3] Try again')))
2022-08-12 14:45:41 source > Caught retryable error 'HTTPSConnectionPool(host='mach1.iassicur.cloud', port=443): Max retries exceeded with url: /IServiceExec.asmx/Query?SQL=SELECT+TOP+1+%2A%2C+NUMERORECORD+FROM+DRE+ORDER+BY+NUMERORECORD (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f93c414c610>: Failed to establish a new connection: [Errno -3] Try again'))' after 1 tries. 

Can you check and see if fixing this is helping out

AM-I-Human commented 2 years ago

This happen sometimes, the api server is not so reliable. logs-70.txt This is a log without the back off The error happen even with static json schema (log not uploaded)

AM-I-Human commented 2 years ago

By calling from console
python airbyte-integrations/connectors/source-iassicur-api/main.py read --config config.json --catalog catalog.json Seems to work, it prints content on console, I believe that there is a bug in Airbyte.

AM-I-Human commented 2 years ago

This is caused by the fact that to recover the schema from the catalog it is used as key the concatenation of namespace and streamname.

  public static String streamNameWithNamespace(final @Nullable String namespace, final String streamName) {
    return Objects.toString(namespace, "").trim() + streamName.trim();
  }

I discover that if I put on my connector:

  public static String streamNameWithNamespace(final @Nullable String namespace, final String streamName) {
    return Objects.toString(namespace, "").trim() + streamName.trim();
  }

The error matching schema is avoided(I get another error, but it is not related)

marcosmarxm commented 2 years ago

@AM-I-Human is it possible to share the connector code for further investigation?

AM-I-Human commented 2 years ago

This is the working version (tables and url are changed) PS: I'm the only dev on this, I'm sorry if you find it hard to read

from abc import ABC
from base64 import b64encode
from enum import Enum
from typing import Any, Iterable, List, Mapping, MutableMapping, Tuple

import pandas as pd
import requests
from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator
from pydantic import BaseModel

from source_iassicur_api.iassicur.iassicur_parser import IassicurParser

# Available iassicur fields:
## CodiceDb, Archivio, CodiceRecord, NumeroRecord,
## DataPrimaModifica, OraPrimaModifica, DataOraPrimaModifica, UtentePrimaModifica,
## DataUltimaModifica, OraUltimaModifica, DataOraUltimaModifica, UtenteUltimaModifica

class IassicurSubdomain(str, Enum):
    Domain1 = "mach1"
    doimain2 = "mansutti"

KNOWN_IASSICUR_TABLES = {
    IassicurSubdomain.Domain1.value: [
        "TABLE",
    ],
    IassicurSubdomain.doimain2.value: [
        "TABLE2",
    ],
}
# Basic incremental stream
class IncrementalIassicurApiStream(HttpStream, ABC):
    CURSOR_FIELD_MONOTONE_TRANSLATION_FN = {
        "NUMERORECORD": lambda df: None
        if "NUMERORECORD" not in df.columns
        else df.NUMERORECORD.str.replace(r"\W", "", regex=True).astype(int),
        "DATAORAULTIMAMODIFICA": lambda df: None
        if "DATAORAULTIMAMODIFICA" not in df.columns
        else pd.to_datetime(df.DATAORAULTIMAMODIFICA).dt.strftime("%Y%m%d%H%M%S"),  # type: ignore
    }

    @property
    def cursor_field(self) -> list[str]:
        return ["NUMERORECORD"]  # or DATAORAULTIMAMODIFICA

    @property
    def primary_key(self):
        return ["NUMERORECORD"]  # CODICERECORD

    @property
    def state_checkpoint_interval(self):
        return 1000

    @property
    def url_base(self):
        return f"https://{self._subdomain.value}.iassicur.cloud"

    @property
    def page_size(self) -> int:
        return self._page_size

    def path(self, **kwargs) -> str:
        return f"{self.url_base}/IServiceExec.asmx/Query"

    def __init__(
        self,
        table_name: str,
        subdomain: IassicurSubdomain,
        page_size: int = 1000,
        authenticator: HttpAuthenticator | None = None,
    ):
        self._table_name = table_name
        self._subdomain = subdomain
        self._page_size = page_size
        self._parser = IassicurParser()
        super().__init__(authenticator=authenticator)

    def next_page_token(self, response: requests.Response) -> dict[str, Any] | None:
        """
        This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed
        to most other methods in this class to help you form headers, request bodies, query params, etc..

        For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a
        'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1.
        The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page'].

        :param response: the most recent response from the API
        :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response.
                If there are no more pages in the result, return None.
        """

        data = pd.DataFrame(
            list(
                self._parser.records_from_xml(response.text),
            )
        )
        return (
            {}
            if data.empty
            else (
                data.assign(**self.CURSOR_FIELD_MONOTONE_TRANSLATION_FN)[
                    self.cursor_field
                ]
                .max()
                .to_dict()
            )
        )

    def request_params(
        self,
        stream_state: dict[str, Any],
        stream_slice: dict[str, Any] | None = None,
        next_page_token: dict[str, Any] | None = None,
    ) -> MutableMapping[str, Any]:
        """
        Usually contains common params e.g. pagination size etc.
        """
        state = next_page_token or stream_state

        missing_primary_key_fields: set = set(self.cursor_field) - set(self.primary_key)
        sql_statement = (
            f"SELECT TOP {self.page_size} *, {','.join(self.cursor_field)}{',' if missing_primary_key_fields else ''}{','.join(missing_primary_key_fields)} FROM {self._table_name} "
            + (f"WHERE {self._condition(state)} " if state else "")
            + f"ORDER BY { ','.join(self.cursor_field) }"
        )

        print("sql_statement", sql_statement)
        return {"SQL": sql_statement}

    def _condition(self, state, is_date=False):
        return f"""{" and ".join(
            [
                (f"{cursor}" if not is_date else f"FORMAT({cursor},'yyyymmddhhnnss')")+f">{str(state[cursor]).replace('.','')}"
                for cursor in self.cursor_field
            ]
        )}"""

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[dict]:
        """
        :return an iterable containing each record in the response
        """
        response.raise_for_status()
        yield from map(pd.Series.to_dict, self._parser.records_from_xml(response.text))

    def get_updated_state(
        self,
        current_stream_state: dict[str, str],
        latest_record: dict[str, str],
    ) -> dict[str, str]:
        """
        Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and
        the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental.
        """
        last_cursor_fields: pd.Series = (
            pd.DataFrame(latest_record, index=[0])
            .assign(**self.CURSOR_FIELD_MONOTONE_TRANSLATION_FN)[self.cursor_field]
            .iloc[0]
        )
        if not current_stream_state:
            return last_cursor_fields.to_dict()

        current_cursor_fields: pd.Series = pd.Series(current_stream_state)[
            self.cursor_field
        ]
        return current_cursor_fields.where(
            current_cursor_fields > last_cursor_fields, other=last_cursor_fields
        )[self.cursor_field].to_dict()

class IassicurTable(IncrementalIassicurApiStream):
    @property
    def name(self) -> str:
        return self._table_name

    @property
    def namespace(self) -> str:
        return f""

    @property
    def use_cache(self):
        return True

    @property
    def should_backoff(self):
        return False

    def as_airbyte_stream(self) -> AirbyteStream:

        return AirbyteStream(
            name=self.name,
            json_schema=self.get_json_schema(),
            supported_sync_modes=[SyncMode.incremental],
            source_defined_cursor=self.source_defined_cursor,
            default_cursor_field=self._wrapped_cursor_field(),
            source_defined_primary_key=Stream._wrapped_primary_key(self.primary_key),
            namespace=self.namespace,
        )

    def get_json_schema(self) -> dict:
        response = requests.get(
            f"{self.url_base}/IServiceExec.asmx/Query",
            params={
                "SQL": f"SELECT TOP 1 *, {','.join(self.cursor_field)}{(','+','.join(self.primary_key)) if self.cursor_field!=self.primary_key else ''} FROM {self._table_name} WHERE 1=0"
            },
            headers=dict(self.authenticator.get_auth_header()),  # type: ignore
        )
        response.raise_for_status()
        return {
            "type": "object",
            "additionalProperties": True,
            "required": set(self.cursor_field + self.primary_key),
            "properties": {
                col: {"type": ["null", "string"]}
                for col in self._parser.read_columns_data(response.text).Nome
            },
        }

    def request_kwargs(
        self, stream_state: dict, stream_slice: dict, next_page_token: dict
    ):
        return {"timeout": (500, 500)}

class SourceIassicurApi(AbstractSource):
    class IassicurAuthenticator(HttpAuthenticator):
        def __init__(self, username, password) -> None:
            self.user = username
            self.password = password
            super().__init__()

        def get_auth_header(self) -> Mapping[str, Any]:
            """
            :return: A dictionary containing all the necessary headers to authenticate.
            """
            return {
                "Authorization": f'Basic {b64encode(b":".join((self.user.encode("latin1"), self.password.encode("latin1")))).decode("latin1")}'
            }

    class _Config(BaseModel):
        user: str
        password: str
        subdomain: IassicurSubdomain
        page_size: int

        @property
        def url(self):
            return f"https://{self.subdomain}.iassicur.cloud"

    def check_connection(self, logger, config) -> Tuple[bool, Any]:
        """
        :param config:  the user-input config object conforming to the connector's spec.yaml
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        parsed_config = SourceIassicurApi._Config(**config)
        try:
            requests.get(
                url=parsed_config.url, params={"SQL": "select 1+2"}
            ).raise_for_status()
        except requests.HTTPError as http_error:
            return False, http_error.response
        return True, None

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        :param config: A Mapping of the user input configuration as defined in the connector spec.
        """
        parsed_config = SourceIassicurApi._Config(**config)

        return [
            IassicurTable(
                table_name=table_name,
                subdomain=parsed_config.subdomain,
                page_size=parsed_config.page_size,
                authenticator=self.IassicurAuthenticator(
                    username=parsed_config.user, password=parsed_config.password
                ),
            )
            for table_name in KNOWN_IASSICUR_TABLES[parsed_config.subdomain]
        ]
AM-I-Human commented 2 years ago

I changed

class IassicurTable(IncrementalIassicurApiStream):
    @property
    def name(self) -> str:
        return self._table_name

    @property
    def namespace(self) -> str:
        return f"Iassicur_{self._subdomain}"

to

class IassicurTable(IncrementalIassicurApiStream):
    @property
    def name(self) -> str:
        return self._table_name

    @property
    def namespace(self) -> str:
        return f""

Now that I see I forgot the .value on the _subdomain enum! Maybe some chars are the cause of this error

AM-I-Human commented 2 years ago

Ok, even with return f"IASSICUR_{self._subdomain.name.upper()}" I get matchingSchema null

marcosmarxm commented 2 years ago

@AM-I-Human what happens if you remove the namespace and allow this to be created by the UI? mostly because I don't see any Source implementing this function.

AM-I-Human commented 2 years ago

If I remove

    @property
    def namespace(self) -> str:
        return f""

It gives me error because namespace can't be None

marcosmarxm commented 2 years ago

@AM-I-Human would be possible to use other destination besides S3? Maybe give some orientation what is wrong. Btw, integration tests and source connector are working properly?

AM-I-Human commented 2 years ago

@AM-I-Human would be possible to use other destination besides S3? Maybe give some orientation what is wrong. Btw, integration tests and source connector are working properly?

Unit test are working, meanwhile I didn't made any integration test. Until 17 September, I will be on holiday without the pc. I will make some tests after. I found that so connector give me a matching schema error (and the schema printed seems valid), to avoid this I changed mirrored namespace with the destination one. On snowflake connector seems to work without any issues.

marcosmarxm commented 2 years ago

Probably this is a limitation between the destination because if the connection works with Snowflake destination.

bukhbayar commented 1 year ago

This error is happening in with Redshift Connector too.