airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.78k stars 4.04k forks source link

Mixpanel Export.process_response() Fails in case of JSON text properties cointaining underlying line breaks #11018

Closed guillaume-chech closed 2 years ago

guillaume-chech commented 2 years ago

Environment

Current Behavior

When retrieving export objects from Mixpanel, if the response from Mixpanel contains malformed Json the whole sync job crashes. This is a problem because Mixpanel allow only to retrieve data for a full day granularity. So it's impossible to skip a fraction (1 minute or 2 of data where the data is faulty) the only issue is to skip a whole day, which is not really possible in terme of data loss.

Expected Behavior

We would expect the connector to first retry, and offer the possibility to ignore malformed events.

Logs

2022-03-09 15:10:22 INFO () WorkerRun(call):49 - Executing worker wrapper. Airbyte version: 0.32.6-alpha
2022-03-09 15:10:22 INFO () TemporalAttemptExecution(get):116 - Executing worker wrapper. Airbyte version: 0.32.6-alpha
2022-03-09 15:10:22 WARN () Databases(createPostgresDatabaseWithRetry):41 - Waiting for database to become available...
2022-03-09 15:10:22 INFO () JobsDatabaseInstance(lambda$static$2):25 - Testing if jobs database is ready...
2022-03-09 15:10:22 INFO () Databases(createPostgresDatabaseWithRetry):58 - Database available!
2022-03-09 15:10:23 INFO () DefaultReplicationWorker(run):99 - start sync worker. job id: 2163 attempt id: 2
2022-03-09 15:10:23 INFO () DefaultReplicationWorker(run):108 - configured sync modes: {null.export=incremental - append}
2022-03-09 15:10:23 INFO () DefaultAirbyteDestination(start):64 - Running destination...
2022-03-09 15:10:23 INFO () LineGobbler(voidCall):82 - Checking if airbyte/destination-snowflake:0.4.17 exists...
2022-03-09 15:10:23 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/2163/2 --network host --log-driver none airbyte/destination-snowflake:0.4.17 write --config destination_config.json --catalog destination_catalog.json
2022-03-09 15:10:23 INFO () LineGobbler(voidCall):82 - airbyte/destination-snowflake:0.4.17 was found locally.
2022-03-09 15:10:23 INFO () LineGobbler(voidCall):82 - Checking if airbyte/source-mixpanel:0.1.9 exists...
2022-03-09 15:10:23 INFO () LineGobbler(voidCall):82 - airbyte/source-mixpanel:0.1.9 was found locally.
2022-03-09 15:10:23 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/2163/2 --network host --log-driver none airbyte/source-mixpanel:0.1.9 read --config source_config.json --catalog source_catalog.json --state input_state.json
2022-03-09 15:10:23 INFO () DefaultReplicationWorker(run):136 - Waiting for source thread to join.
2022-03-09 15:10:23 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):243 - Destination output thread started.
2022-03-09 15:10:23 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):207 - Replication thread started.
destination - 2022-03-09 15:10:23 ERROR () LineGobbler(voidCall):82 - SLF4J: Class path contains multiple SLF4J bindings.
destination - 2022-03-09 15:10:23 ERROR () LineGobbler(voidCall):82 - SLF4J: Found binding in [jar:file:/airbyte/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
destination - 2022-03-09 15:10:23 ERROR () LineGobbler(voidCall):82 - SLF4J: Found binding in [jar:file:/airbyte/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
destination - 2022-03-09 15:10:23 ERROR () LineGobbler(voidCall):82 - SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
destination - 2022-03-09 15:10:23 ERROR () LineGobbler(voidCall):82 - SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
source - 2022-03-09 15:10:23 INFO () DefaultAirbyteStreamFactory(internalLog):97 - Starting syncing SourceMixpanel
source - 2022-03-09 15:10:23 INFO () DefaultAirbyteStreamFactory(internalLog):97 - Using start_date: 2022-02-08, end_date: 2022-03-09
source - 2022-03-09 15:10:23 INFO () DefaultAirbyteStreamFactory(internalLog):97 - Syncing stream: export 
source - 2022-03-09 15:10:23 INFO () DefaultAirbyteStreamFactory(internalLog):97 - Setting state of export stream to {'date': '2022-03-03T15:55:57'}
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 INFO i.a.i.b.IntegrationCliParser(parseOptions):118 - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 INFO i.a.i.b.IntegrationRunner(run):88 - Sentry transaction event: c1fb51a1054e4e7c858e6c0bf2a83840
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 INFO i.a.i.b.IntegrationRunner(runInternal):106 - Running integration: io.airbyte.integrations.destination.snowflake.SnowflakeDestination
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 INFO i.a.i.b.IntegrationRunner(runInternal):107 - Command: WRITE
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 INFO i.a.i.b.IntegrationRunner(runInternal):108 - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword multiline - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 INFO i.a.i.d.j.c.SwitchingDestination(getConsumer):65 - Using destination type: INTERNAL_STAGING
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 INFO i.a.i.d.s.SnowflakeInternalStagingConsumerFactory(lambda$toWriteConfig$0):92 - Write config: WriteConfig{streamName=export, namespace=null, outputSchemaName=MIXPANEL_RELEASE_PROD, tmpTableName=_airbyte_tmp_xwr_export, outputTableName=_airbyte_raw_export, syncMode=append}
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 INFO i.a.i.d.b.BufferedStreamConsumer(startTracked):141 - class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 INFO i.a.i.d.s.SnowflakeInternalStagingConsumerFactory(lambda$onStartFunction$2):111 - Preparing tmp tables in destination started for 1 streams
destination - 2022-03-09 15:10:24 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:24 INFO i.a.i.d.s.SnowflakeInternalStagingConsumerFactory(lambda$onStartFunction$2):119 - Preparing stage in destination started for schema MIXPANEL_RELEASE_PROD stream export: tmp table: _airbyte_tmp_xwr_export, stage: MIXPANEL_RELEASE_PROD_AIRBYTE_RAW_EXPORT
destination - 2022-03-09 15:10:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:26 INFO i.a.d.j.DefaultJdbcDatabase(lambda$query$1):106 - closing connection
destination - 2022-03-09 15:10:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:26 INFO i.a.i.d.s.SnowflakeInternalStagingConsumerFactory(lambda$onStartFunction$2):130 - Preparing stage in destination completed for schema MIXPANEL_RELEASE_PROD stream export
destination - 2022-03-09 15:10:26 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:10:26 INFO i.a.i.d.s.SnowflakeInternalStagingConsumerFactory(lambda$onStartFunction$2):133 - Preparing tables in destination completed.
2022-03-09 15:12:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 1000
2022-03-09 15:12:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 2000
2022-03-09 15:12:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 3000
2022-03-09 15:12:02 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 4000
2022-03-09 15:12:02 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 5000
2022-03-09 15:12:02 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 6000
2022-03-09 15:12:03 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 7000
2022-03-09 15:12:03 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 8000
2022-03-09 15:12:03 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 9000
2022-03-09 15:12:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 10000
2022-03-09 15:12:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 11000
2022-03-09 15:12:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 12000
2022-03-09 15:12:05 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 13000
2022-03-09 15:12:05 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 14000
2022-03-09 15:12:05 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 15000
2022-03-09 15:12:06 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 16000
2022-03-09 15:12:06 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 17000
2022-03-09 15:12:06 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 18000
2022-03-09 15:12:06 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 19000
2022-03-09 15:12:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 20000
2022-03-09 15:12:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 21000
2022-03-09 15:12:07 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 22000
2022-03-09 15:12:08 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 23000
2022-03-09 15:12:08 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 24000
2022-03-09 15:12:08 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 25000
2022-03-09 15:12:09 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 26000
2022-03-09 15:12:09 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 27000
2022-03-09 15:12:09 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 28000
2022-03-09 15:12:10 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 29000
2022-03-09 15:12:10 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 30000
2022-03-09 15:12:10 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 31000
2022-03-09 15:12:11 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 32000
2022-03-09 15:12:11 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 33000
2022-03-09 15:12:11 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 34000
2022-03-09 15:12:12 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 35000
2022-03-09 15:12:12 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 36000
destination - 2022-03-09 15:12:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:12 INFO i.a.i.d.b.BufferedStreamConsumer(flushQueueToDestination):181 - Flushing buffer: 134215987 bytes
destination - 2022-03-09 15:12:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:12 INFO i.a.i.d.b.BufferedStreamConsumer(lambda$flushQueueToDestination$1):185 - Flushing export: 36263 records
destination - 2022-03-09 15:12:12 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:12 INFO i.a.i.d.s.SnowflakeStagingSqlOperations(insertRecordsInternal):29 - Writing 36263 records to MIXPANEL_RELEASE_PROD_AIRBYTE_RAW_EXPORT/STAGED/3247C6BD-B76F-415F-805D-05CCE0B0790C
2022-03-09 15:12:14 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 37000
2022-03-09 15:12:15 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 38000
2022-03-09 15:12:15 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 39000
2022-03-09 15:12:15 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 40000
2022-03-09 15:12:15 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 41000
2022-03-09 15:12:16 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 42000
2022-03-09 15:12:16 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 43000
2022-03-09 15:12:16 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 44000
2022-03-09 15:12:17 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 45000
2022-03-09 15:12:17 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 46000
2022-03-09 15:12:17 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 47000
2022-03-09 15:12:18 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 48000
2022-03-09 15:12:18 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 49000
2022-03-09 15:12:18 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 50000
2022-03-09 15:12:18 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 51000
2022-03-09 15:12:19 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 52000
2022-03-09 15:12:19 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 53000
2022-03-09 15:12:19 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 54000
2022-03-09 15:12:20 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 55000
2022-03-09 15:12:20 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 56000
2022-03-09 15:12:20 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 57000
2022-03-09 15:12:20 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 58000
2022-03-09 15:12:21 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 59000
2022-03-09 15:12:21 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 60000
2022-03-09 15:12:21 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 61000
2022-03-09 15:12:22 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 62000
2022-03-09 15:12:22 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 63000
2022-03-09 15:12:22 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 64000
2022-03-09 15:12:23 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 65000
2022-03-09 15:12:23 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 66000
2022-03-09 15:12:23 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 67000
2022-03-09 15:12:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 68000
2022-03-09 15:12:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 69000
2022-03-09 15:12:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 70000
2022-03-09 15:12:24 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 71000
2022-03-09 15:12:25 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 72000
destination - 2022-03-09 15:12:25 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:25 INFO i.a.i.d.b.BufferedStreamConsumer(flushQueueToDestination):181 - Flushing buffer: 134214397 bytes
destination - 2022-03-09 15:12:25 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:25 INFO i.a.i.d.b.BufferedStreamConsumer(lambda$flushQueueToDestination$1):185 - Flushing export: 36085 records
destination - 2022-03-09 15:12:25 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:25 INFO i.a.i.d.s.SnowflakeStagingSqlOperations(insertRecordsInternal):29 - Writing 36085 records to MIXPANEL_RELEASE_PROD_AIRBYTE_RAW_EXPORT/STAGED/3247C6BD-B76F-415F-805D-05CCE0B0790C
2022-03-09 15:12:27 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 73000
2022-03-09 15:12:27 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 74000
2022-03-09 15:12:27 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 75000
2022-03-09 15:12:27 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 76000
2022-03-09 15:12:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 77000
2022-03-09 15:12:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 78000
2022-03-09 15:12:28 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 79000
2022-03-09 15:12:29 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 80000
2022-03-09 15:12:29 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 81000
2022-03-09 15:12:29 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 82000
2022-03-09 15:12:29 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 83000
2022-03-09 15:12:30 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 84000
2022-03-09 15:12:30 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 85000
2022-03-09 15:12:30 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 86000
2022-03-09 15:12:31 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 87000
2022-03-09 15:12:31 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 88000
2022-03-09 15:12:31 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 89000
2022-03-09 15:12:32 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 90000
2022-03-09 15:12:32 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 91000
2022-03-09 15:12:32 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 92000
2022-03-09 15:12:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 93000
2022-03-09 15:12:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 94000
2022-03-09 15:12:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 95000
2022-03-09 15:12:33 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 96000
2022-03-09 15:12:34 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 97000
2022-03-09 15:12:34 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 98000
2022-03-09 15:12:34 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 99000
2022-03-09 15:12:35 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 100000
2022-03-09 15:12:35 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 101000
2022-03-09 15:12:35 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 102000
2022-03-09 15:12:36 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 103000
2022-03-09 15:12:36 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 104000
2022-03-09 15:12:36 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 105000
2022-03-09 15:12:37 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 106000
2022-03-09 15:12:37 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 107000
2022-03-09 15:12:37 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 108000
destination - 2022-03-09 15:12:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:37 INFO i.a.i.d.b.BufferedStreamConsumer(flushQueueToDestination):181 - Flushing buffer: 134214388 bytes
destination - 2022-03-09 15:12:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:37 INFO i.a.i.d.b.BufferedStreamConsumer(lambda$flushQueueToDestination$1):185 - Flushing export: 35903 records
destination - 2022-03-09 15:12:37 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:37 INFO i.a.i.d.s.SnowflakeStagingSqlOperations(insertRecordsInternal):29 - Writing 35903 records to MIXPANEL_RELEASE_PROD_AIRBYTE_RAW_EXPORT/STAGED/3247C6BD-B76F-415F-805D-05CCE0B0790C
2022-03-09 15:12:39 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 109000
2022-03-09 15:12:39 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 110000
2022-03-09 15:12:39 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 111000
2022-03-09 15:12:40 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 112000
2022-03-09 15:12:40 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 113000
2022-03-09 15:12:40 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 114000
2022-03-09 15:12:41 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 115000
2022-03-09 15:12:41 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 116000
2022-03-09 15:12:41 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 117000
2022-03-09 15:12:42 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 118000
2022-03-09 15:12:42 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 119000
2022-03-09 15:12:42 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 120000
2022-03-09 15:12:43 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 121000
2022-03-09 15:12:43 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 122000
2022-03-09 15:12:43 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 123000
2022-03-09 15:12:43 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 124000
2022-03-09 15:12:44 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 125000
2022-03-09 15:12:44 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 126000
2022-03-09 15:12:44 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 127000
2022-03-09 15:12:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 128000
2022-03-09 15:12:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 129000
2022-03-09 15:12:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 130000
2022-03-09 15:12:46 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 131000
2022-03-09 15:12:46 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 132000
2022-03-09 15:12:46 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 133000
2022-03-09 15:12:47 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 134000
2022-03-09 15:12:47 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 135000
2022-03-09 15:12:47 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 136000
2022-03-09 15:12:48 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 137000
2022-03-09 15:12:48 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 138000
2022-03-09 15:12:48 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 139000
2022-03-09 15:12:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 140000
2022-03-09 15:12:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 141000
2022-03-09 15:12:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 142000
2022-03-09 15:12:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 143000
2022-03-09 15:12:50 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 144000
destination - 2022-03-09 15:12:50 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:50 INFO i.a.i.d.b.BufferedStreamConsumer(flushQueueToDestination):181 - Flushing buffer: 134217310 bytes
destination - 2022-03-09 15:12:50 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:50 INFO i.a.i.d.b.BufferedStreamConsumer(lambda$flushQueueToDestination$1):185 - Flushing export: 36252 records
destination - 2022-03-09 15:12:50 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:50 INFO i.a.i.d.s.SnowflakeStagingSqlOperations(insertRecordsInternal):29 - Writing 36252 records to MIXPANEL_RELEASE_PROD_AIRBYTE_RAW_EXPORT/STAGED/3247C6BD-B76F-415F-805D-05CCE0B0790C
2022-03-09 15:12:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 145000
2022-03-09 15:12:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 146000
2022-03-09 15:12:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 147000
2022-03-09 15:12:52 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 148000
2022-03-09 15:12:53 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 149000
2022-03-09 15:12:53 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 150000
2022-03-09 15:12:53 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 151000
2022-03-09 15:12:54 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 152000
2022-03-09 15:12:54 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 153000
2022-03-09 15:12:54 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):220 - Records read: 154000
source - 2022-03-09 15:12:55 ERROR () DefaultAirbyteStreamFactory(internalLog):95 - Encountered an exception while reading stream SourceMixpanel
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 108, in read
    internal_config=internal_config,
  File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 141, in _read_stream
    for record in record_iterator:
  File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 185, in _read_incremental
    for record_counter, record_data in enumerate(records, start=1):
  File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/streams/http/http.py", line 354, in read_records
    yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
  File "/airbyte/integration_code/source_mixpanel/source.py", line 95, in parse_response
    yield from self.process_response(response, **kwargs)
  File "/airbyte/integration_code/source_mixpanel/source.py", line 731, in process_response
    record = json.loads(record_line)
  File "/usr/local/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.7/json/decoder.py", line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Unterminated string starting at: line 1 column 904 (char 903)
source - 2022-03-09 15:12:55 INFO () DefaultAirbyteStreamFactory(internalLog):97 - Finished syncing SourceMixpanel
source - 2022-03-09 15:12:55 INFO () DefaultAirbyteStreamFactory(internalLog):97 - SourceMixpanel runtimes:

source - 2022-03-09 15:12:55 ERROR () DefaultAirbyteStreamFactory(internalLog):95 - Unterminated string starting at: line 1 column 904 (char 903)
Traceback (most recent call last):
  File "/airbyte/integration_code/main.py", line 13, in <module>
    launch(source, sys.argv[1:])
  File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/entrypoint.py", line 116, in launch
    for message in source_entrypoint.run(parsed_args):
  File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/entrypoint.py", line 107, in run
    for message in generator:
  File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 112, in read
    raise e
  File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 108, in read
    internal_config=internal_config,
  File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 141, in _read_stream
    for record in record_iterator:
  File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/abstract_source.py", line 185, in _read_incremental
    for record_counter, record_data in enumerate(records, start=1):
  File "/usr/local/lib/python3.7/site-packages/airbyte_cdk/sources/streams/http/http.py", line 354, in read_records
    yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
  File "/airbyte/integration_code/source_mixpanel/source.py", line 95, in parse_response
    yield from self.process_response(response, **kwargs)
  File "/airbyte/integration_code/source_mixpanel/source.py", line 731, in process_response
    record = json.loads(record_line)
  File "/usr/local/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.7/json/decoder.py", line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Unterminated string starting at: line 1 column 904 (char 903)
2022-03-09 15:12:55 INFO () DefaultReplicationWorker(run):138 - Source thread complete.
2022-03-09 15:12:55 INFO () DefaultReplicationWorker(run):139 - Waiting for destination thread to join.
destination - 2022-03-09 15:12:55 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:55 INFO i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):65 - Airbyte message consumer: succeeded.
destination - 2022-03-09 15:12:55 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:55 INFO i.a.i.d.b.BufferedStreamConsumer(close):217 - executing on success close procedure.
destination - 2022-03-09 15:12:55 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:55 INFO i.a.i.d.b.BufferedStreamConsumer(flushQueueToDestination):181 - Flushing buffer: 36635950 bytes
destination - 2022-03-09 15:12:55 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:55 INFO i.a.i.d.b.BufferedStreamConsumer(lambda$flushQueueToDestination$1):185 - Flushing export: 9804 records
destination - 2022-03-09 15:12:55 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:55 INFO i.a.i.d.s.SnowflakeStagingSqlOperations(insertRecordsInternal):29 - Writing 9804 records to MIXPANEL_RELEASE_PROD_AIRBYTE_RAW_EXPORT/STAGED/3247C6BD-B76F-415F-805D-05CCE0B0790C
destination - 2022-03-09 15:12:56 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:56 INFO i.a.i.d.s.SnowflakeInternalStagingConsumerFactory(lambda$onCloseFunction$4):173 - Finalizing tables in destination started for 1 streams
destination - 2022-03-09 15:12:56 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:12:56 INFO i.a.i.d.s.SnowflakeInternalStagingConsumerFactory(lambda$onCloseFunction$4):181 - Finalizing stream export. schema MIXPANEL_RELEASE_PROD, tmp table _airbyte_tmp_xwr_export, final table _airbyte_raw_export, stage path MIXPANEL_RELEASE_PROD_AIRBYTE_RAW_EXPORT/STAGED/3247C6BD-B76F-415F-805D-05CCE0B0790C
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-03-09 15:13:00 ERROR i.a.i.d.b.BufferedStreamConsumer(close):237 - Close failed.
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - net.snowflake.client.jdbc.SnowflakeSQLException: SQL compilation error:
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - Table '_AIRBYTE_RAW_EXPORT' already exists, but current role has no privileges on it. If this is unexpected and you cannot resolve this problem, contact your system administrator. ACCOUNTADMIN role may be required to manage the privileges on the object.
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:127) ~[snowflake-jdbc-3.13.9.jar:3.13.9]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:67) ~[snowflake-jdbc-3.13.9.jar:3.13.9]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:442) ~[snowflake-jdbc-3.13.9.jar:3.13.9]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:345) ~[snowflake-jdbc-3.13.9.jar:3.13.9]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:487) ~[snowflake-jdbc-3.13.9.jar:3.13.9]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:198) ~[snowflake-jdbc-3.13.9.jar:3.13.9]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:135) ~[snowflake-jdbc-3.13.9.jar:3.13.9]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at net.snowflake.client.core.SFStatement.execute(SFStatement.java:781) ~[snowflake-jdbc-3.13.9.jar:3.13.9]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at net.snowflake.client.core.SFStatement.execute(SFStatement.java:677) ~[snowflake-jdbc-3.13.9.jar:3.13.9]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at net.snowflake.client.jdbc.SnowflakeStatementV1.executeInternal(SnowflakeStatementV1.java:287) ~[snowflake-jdbc-3.13.9.jar:3.13.9]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at net.snowflake.client.jdbc.SnowflakeStatementV1.execute(SnowflakeStatementV1.java:342) ~[snowflake-jdbc-3.13.9.jar:3.13.9]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at org.apache.commons.dbcp2.DelegatingStatement.execute(DelegatingStatement.java:194) ~[commons-dbcp2-2.7.0.jar:2.7.0]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at org.apache.commons.dbcp2.DelegatingStatement.execute(DelegatingStatement.java:194) ~[commons-dbcp2-2.7.0.jar:2.7.0]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.db.jdbc.JdbcDatabase.lambda$execute$0(JdbcDatabase.java:47) ~[io.airbyte.airbyte-db-lib-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.db.jdbc.DefaultJdbcDatabase.execute(DefaultJdbcDatabase.java:45) ~[io.airbyte.airbyte-db-lib-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.db.jdbc.JdbcDatabase.execute(JdbcDatabase.java:47) ~[io.airbyte.airbyte-db-lib-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.destination.jdbc.JdbcSqlOperations.lambda$createTableIfNotExists$1(JdbcSqlOperations.java:56) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) ~[io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.destination.jdbc.JdbcSqlOperations.createTableIfNotExists(JdbcSqlOperations.java:55) ~[io.airbyte.airbyte-integrations.connectors-destination-jdbc-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.destination.snowflake.SnowflakeInternalStagingConsumerFactory.lambda$onCloseFunction$4(SnowflakeInternalStagingConsumerFactory.java:192) ~[io.airbyte.airbyte-integrations.connectors-destination-snowflake-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction.accept(OnCloseFunction.java:9) ~[io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.close(BufferedStreamConsumer.java:225) ~[io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.lambda$close$0(FailureTrackingAirbyteMessageConsumer.java:67) ~[io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) ~[io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.close(FailureTrackingAirbyteMessageConsumer.java:67) ~[io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:173) ~[io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.base.IntegrationRunner.lambda$runInternal$1(IntegrationRunner.java:149) ~[io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:54) [io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.base.sentry.AirbyteSentry.executeWithTracing(AirbyteSentry.java:38) [io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:149) [io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:90) [io.airbyte.airbyte-integrations.bases-base-java-0.35.37-alpha.jar:?]
destination - 2022-03-09 15:13:00 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 -    at io.airbyte.integrations.destination.snowflake.SnowflakeDestination.main(SnowflakeDestination.java:57) [io.airbyte.airbyte-integrations.connectors-destination-snowflake-0.35.37-alpha.jar:?]
2022-03-09 15:13:01 INFO () DefaultReplicationWorker(run):141 - Destination thread complete.
2022-03-09 15:13:01 ERROR () DefaultReplicationWorker(run):145 - Sync worker failed.
io.airbyte.workers.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
    at io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource.close(DefaultAirbyteSource.java:117) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
    at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:143) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
    at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:49) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
    at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
    at java.lang.Thread.run(Thread.java:832) [?:?]
    Suppressed: io.airbyte.workers.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.
        at io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:114) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
        at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:118) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
        at io.airbyte.workers.DefaultReplicationWorker.run(DefaultReplicationWorker.java:49) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
        at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:167) ~[io.airbyte-airbyte-workers-0.32.6-alpha.jar:?]
        at java.lang.Thread.run(Thread.java:832) [?:?]
2022-03-09 15:13:01 INFO () DefaultReplicationWorker(run):169 - sync summary: io.airbyte.config.ReplicationAttemptSummary@3991516f[status=failed,recordsSynced=154307,bytesSynced=143457765,startTime=1646838623056,endTime=1646838781883]
2022-03-09 15:13:01 INFO () DefaultReplicationWorker(run):178 - Source did not output any state messages
2022-03-09 15:13:01 WARN () DefaultReplicationWorker(run):186 - State capture: No new state, falling back on input state: io.airbyte.config.State@23bde340[state={"export":{"date":"2022-03-03T15:55:57"}}]
2022-03-09 15:13:01 INFO () TemporalAttemptExecution(get):137 - Stopping cancellation check scheduling...
2022-03-09 15:13:01 INFO () ReplicationActivityImpl(replicate):121 - sync summary: io.airbyte.config.StandardSyncOutput@300b24d7[standardSyncSummary=io.airbyte.config.StandardSyncSummary@1b1fec7[status=failed,recordsSynced=154307,bytesSynced=143457765,startTime=1646838623056,endTime=1646838781883],state=io.airbyte.config.State@23bde340[state={"export":{"date":"2022-03-03T15:55:57"}}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@385e6067[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@7d5bd45b[stream=io.airbyte.protocol.models.AirbyteStream@149f7abb[name=export,jsonSchema={"type":"object","$schema":"http://json-schema.org/draft-07/schema#","properties":{"os":{"type":["null","string"]},"rp":{"type":["null","string"]},"biz":{"type":["null","string"]},"Mode":{"type":["null","string"]},"city":{"type":["null","string"]},"time":{"type":["null","string"],"format":"date-time"},"wifi":{"type":["null","string"]},"Media":{"type":["null","string"]},"Score":{"type":["null","string"]},"_cpts":{"type":["null","string"]},"brand":{"type":["null","string"]},"event":{"type":["null","string"]},"model":{"type":["null","string"]},"radio":{"type":["null","string"]},"Screen":{"type":["null","string"]},"action":{"type":["null","string"]},"device":{"type":["null","string"]},"fbclid":{"type":["null","string"]},"labels":{"type":["null","array"],"items":{"type":["null","string"]}},"mp_lib":{"type":["null","string"]},"region":{"type":["null","string"]},"Contact":{"type":["null","string"]},"Filters":{"type":["null","string"]},"Message":{"type":["null","string"]},"User Id":{"type":["null","string"]},"browser":{"type":["null","string"]},"carrier":{"type":["null","string"]},"dataset":{"type":["null","string"]},"eventId":{"type":["null","string"]},"has_nfc":{"type":["null","string"]},"user_id":{"type":["null","string"]},"version":{"type":["null","string"]},"Category":{"type":["null","string"]},"Discount":{"type":["null","string"]},"Event ID":{"type":["null","string"]},"Event Id":{"type":["null","string"]},"Platform":{"type":["null","string"]},"event_id":{"type":["null","string"]},"pathname":{"type":["null","string"]},"referrer":{"type":["null","string"]},"utm_term":{"type":["null","string"]},"Coupon ID":{"type":["null","string"]},"Dealer ID":{"type":["null","string"]},"Device ID":{"type":["null","string"]},"_referrer":{"type":["null","string"]},"device_id":{"type":["null","string"]},"insert_id":{"type":["null","string"]},"nl_opt_in":{"type":["null","string"]},"Collection":{"type":["null","string"]},"Event Name":{"type":["null","string"]},"In Shotgun":{"type":["null","string"]},"duration_s":{"type":["null","string"]},"event_name":{"type":["null","string"]},"mp_keyword":{"type":["null","string"]},"nl_opt_out":{"type":["null","string"]},"origin_end":{"type":["null","string"]},"os_version":{"type":["null","string"]},"screen_dpi":{"type":["null","string"]},"utm_medium":{"type":["null","string"]},"utm_source":{"type":["null","string"]},"App Version":{"type":["null","string"]},"Badge Count":{"type":["null","string"]},"Basket Size":{"type":["null","string"]},"Coupon Type":{"type":["null","string"]},"Dealer Name":{"type":["null","string"]},"Platform OS":{"type":["null","string"]},"current_url":{"type":["null","string"]},"distinct_id":{"type":["null","string"]},"event_count":{"type":["null","string"]},"lib_version":{"type":["null","string"]},"utm_content":{"type":["null","string"]},"Connect With":{"type":["null","string"]},"Order Amount":{"type":["null","string"]},"manufacturer":{"type":["null","string"]},"origin_start":{"type":["null","string"]},"screen_width":{"type":["null","string"]},"utm_campaign":{"type":["null","string"]},"App Installed":{"type":["null","string"]},"Error Message":{"type":["null","string"]},"Transfer Mode":{"type":["null","string"]},"has_telephone":{"type":["null","string"]},"screen_height":{"type":["null","string"]},"search_engine":{"type":["null","string"]},"trackedObject":{"type":["null","string"]},"Call To Action":{"type":["null","string"]},"Friends Goings":{"type":["null","string"]},"Payment Method":{"type":["null","string"]},"Transfer Price":{"type":["null","string"]},"failure_reason":{"type":["null","string"]},"Signup Platform":{"type":["null","string"]},"browser_version":{"type":["null","string"]},"eventProperties":{"type":["null","string"]},"mp_country_code":{"type":["null","string"]},"sampling_factor":{"type":["null","integer"]},"Event Start Time":{"type":["null","string"]},"Min Ticket Price":{"type":["null","string"]},"Platform Version":{"type":["null","string"]},"app_build_number":{"type":["null","string"]},"initial_referrer":{"type":["null","string"]},"referring_domain":{"type":["null","string"]},"Event Is Sold Out":{"type":["null","string"]},"Notification Type":{"type":["null","string"]},"bluetooth_enabled":{"type":["null","string"]},"bluetooth_version":{"type":["null","string"]},"Seller Service Fee":{"type":["null","string"]},"app_version_string":{"type":["null","string"]},"failure_description":{"type":["null","string"]},"google_play_services":{"type":["null","string"]},"Authentication Method":{"type":["null","string"]},"mp_processing_time_ms":{"type":["null","string"]},"initial_referring_domain":{"type":["null","string"]},"external_browser_redirect":{"type":["null","string"]},"had_persisted_distinct_id":{"type":["null","string"]},"Selected Authentication Method":{"type":["null","string"]},"Available Authentication Methods":{"type":["null","string"]},"Event Start Time Distance In Hours":{"type":["null","string"]}},"additionalProperties":true},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=true,defaultCursorField=[time],sourceDefinedPrimaryKey=[],namespace=<null>,additionalProperties={}],syncMode=incremental,cursorField=[time],destinationSyncMode=append,primaryKey=[],additionalProperties={}]],additionalProperties={}]]
2022-03-09 15:13:01 INFO () ConfigRepository(updateConnectionState):518 - Updating connection c1c447b9-5cde-4ebb-b537-cbd7b561d7c2 state: io.airbyte.config.State@721a1033[state={"export":{"date":"2022-03-03T15:55:57"}}]
2022-03-09 15:13:01 INFO () DatabaseConfigPersistence(updateConfigRecord):273 - Updating STANDARD_SYNC_STATE record c1c447b9-5cde-4ebb-b537-cbd7b561d7c2

Steps to Reproduce

Unsure how to reproduce this as I could not find in the logs which events are faulty or not

Are you willing to submit a PR?

No, I'm not capable of such a thing .

Related issue : https://github.com/airbytehq/airbyte/issues/11008

guillaume-chech commented 2 years ago

💡 Investigating further, I could narrow down the issue to being an issue with how Export API Response is parsed using response.text.splitlines() here

It seems that some lines are "cut" randomly in between. To isolate the problem I ran the connector locally and added some good old dirty prints to identify way the json parsing was failing, printing the index and the record_line. I compared the printed record_line with manual exported json file using curled http request for the very same day of data.

I identified that in case line some text properties included a line break as follow for example, splitlines() would, rightfully, split the json item into 2 lines, which is unintended IMO.

Here the example, see the Message properties that contains a line break after the word sous-sols

{
   "event":"Tap Newsfeed Item",
   "properties":{
      "time":1647880119,
      "distinct_id":"1797f72ffd5edc-05690f19e13ab08-1b05575d-4a574-1797f72ffd6134f",
      "$app_build_number":"309",
      "$app_version_string":"9.3.4",
      "$carrier":"",
      "$city":"Lyon",
      "$device_id":"16733740-E75C-419A-8AEC-38E21AB12D1D",
      "$distinct_id_before_identity":"583325",
      "$had_persisted_distinct_id":false,
      "$insert_id":"bd7d438843772e57",
      "$lib_version":"1.3.4",
      "$manufacturer":"Apple",
      "$model":"iPhone12,3",
      "$mp_api_endpoint":"api.mixpanel.com",
      "$mp_api_timestamp_ms":1647880119550,
      "$os":"iOS",
      "$os_version":"15.3.1",
      "$radio":"LTE",
      "$region":"Rhône",
      "$screen_height":812,
      "$screen_width":375,
      "$user_id":"583325",
      "$wifi":false,
      "App Installed":true,
      "App Version":"9.3.4",
      "Call To Action":"Accéder à l'évènement",
      "Device ID":"F50E0003-3465-4BE7-8B86-B82F13661000",
      "Message":"~Aphrodizia est de retour pour la PFW 🔥~\n... et investira les immenses sous-sols 
du Palais de Tokyo avec Sébastien Léger, qui présentera son modular live show 💯  Rendez-vous demain soir !",
      "Notification Type":"Sélectionné pour toi",
      "Platform OS":"ios",
      "Platform Version":"15.3.1",
      "User Id":"583325",
      "event":"Tap Newsfeed Item",
      "mp_country_code":"FR",
      "mp_lib":"react-native",
      "mp_processing_time_ms":1647880119593
   }
}
guillaume-chech commented 2 years ago

💡 Using Response.iter_lines() native method seems to solve the issue. @marcosmarxm Sorry for the ping. If you think it's not relevant pls point me to the relevant issue fixing procedure. I'd like to open a pull request to solve the issue but I fear my Software Engineering skill set is a bit weak to do that without proper guidance. Any checklist I need to do to propose this solution ?

alafanechere commented 2 years ago

Hi @guillaume-chech :) Thank you very much for going into the debugging yourself. If you found the bug you definitely have the skills to fix it 👍 . The procedure is quite simple:

  1. Create a fork of our repo
  2. Code your fix
  3. Open a PR (please read the instructions in the PR, especially about version bumping and acceptance test).
  4. Share the result of the acceptance test if you could run it
  5. We review the PR and run the acceptance test in our CI
  6. 🎉 We publish and merge your fix.
guillaume-chech commented 2 years ago

Hi 👋 @alafanechere Thanks for the motivational response. Here it is : I hope it follows what you expected , I could not run the acceptance test unfortunately and I'm not sure how to perform every item of the PR check list. Happy review !

misteryeo commented 2 years ago

Closing this as it looks like PR was merged.