airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.53k stars 4k forks source link

[destination-databricks-lakehouse] Sync to Databricks Lakehouse via. Managed Tables ends up in staging state #28266

Open Steiniche opened 1 year ago

Steiniche commented 1 year ago

Connector Name

destination-databricks-lakehouse

Connector Version

1.1.0

What step the error happened?

During the sync

Revelant information

Context

Using Airbyte as a self service platform for getting data into Databricks. The experience is awesome (thanks to all contributors!) but we are experiencing a bug.

I have tried with both a file (https) and some MSSQL databases / tables. We are using Databricks with Unity. Data Source is set to Managed Tables and connecting through a SQL Warehouse running in the Databricks workspace.

Expected

We expect that the data pulled from a source e.g. MSSQL or a file would be loaded into Databricks as a schema with columns as defined by the data.

Actual What happens is that there are created a schema (_airbyte_raw_events) with 3 columns (_airbyte_ab_id, _airbyte_data, _airbyte_emitted_at). The airbyte_data column contains the data from the source as a json string. As per my understanding these are the staging tables airbyte uses to move the data and then produce the final result.

Closing

If anyone have any ideas as to why we are seeing this or if this is in fact a bug we will appreciate it. What would be required to get schema + columns instead of just json dumps (staging data)?

I have not found other issues describing this but please link relevant issues.

We will happily contribute a fix if this is in fact a bug and we can get some pointers as to where to change the code.

If there is a need for more information about the setup, which options are enabled, logs, etc. please state which and I will gather them.

Relevant log output

2023-07-12 09:27:40 INFO i.a.a.c.AirbyteApiClient(retryWithJitterThrows):236 - Attempt 0 to get state
2023-07-12 09:27:40 INFO i.a.w.h.NormalizationInDestinationHelper(shouldNormalizeInDestination):52 - Requires Normalization: false, Normalization Supported: false, Feature Flag Enabled: false
2023-07-12 09:27:40 INFO i.a.a.c.AirbyteApiClient(retryWithJitterThrows):236 - Attempt 0 to set attempt sync config
2023-07-12 09:27:40 INFO i.a.c.t.s.DefaultTaskQueueMapper(getTaskQueue):31 - Called DefaultTaskQueueMapper getTaskQueue for geography auto
2023-07-12 09:27:40 INFO i.a.w.t.TemporalAttemptExecution(get):136 - Docker volume job log path: /tmp/workspace/11/0/logs.log
2023-07-12 09:27:40 INFO i.a.w.t.TemporalAttemptExecution(get):141 - Executing worker wrapper. Airbyte version: 0.50.7
2023-07-12 09:27:40 INFO i.a.a.c.AirbyteApiClient(retryWithJitterThrows):236 - Attempt 0 to save workflow id for cancellation
2023-07-12 09:27:41 INFO i.a.a.c.AirbyteApiClient(retryWithJitterThrows):236 - Attempt 0 to get the source definition for feature flag checks
2023-07-12 09:27:41 INFO i.a.a.c.AirbyteApiClient(retryWithJitterThrows):236 - Attempt 0 to get the source definition
2023-07-12 09:27:41 INFO i.a.w.g.ReplicationWorkerFactory(maybeEnableConcurrentStreamReads):166 - Concurrent stream read enabled? false
2023-07-12 09:27:41 INFO i.a.w.g.ReplicationWorkerFactory(create):127 - Setting up source...
2023-07-12 09:27:41 INFO i.a.w.g.ReplicationWorkerFactory(create):134 - Setting up destination...
2023-07-12 09:27:41 INFO i.a.c.EnvConfigs(getEnvOrDefault):1235 - Using default value for environment variable METRIC_CLIENT: ''
2023-07-12 09:27:41 WARN i.a.m.l.MetricClientFactory(initialize):60 - Metric client is already initialized to 
2023-07-12 09:27:41 INFO i.a.w.g.ReplicationWorkerFactory(create):146 - Setting up replication worker...
2023-07-12 09:27:41 INFO i.a.w.g.BufferedReplicationWorker(run):137 - start sync worker. job id: 11 attempt id: 0
2023-07-12 09:27:41 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-07-12 09:27:41 INFO i.a.c.i.LineGobbler(voidCall):149 - ----- START REPLICATION -----
2023-07-12 09:27:41 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2023-07-12 09:27:41 INFO i.a.w.i.DefaultAirbyteDestination(start):88 - Running destination...
2023-07-12 09:27:41 INFO i.a.c.EnvConfigs(getEnvOrDefault):1235 - Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2023-07-12 09:27:41 INFO i.a.c.EnvConfigs(getEnvOrDefault):1235 - Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2023-07-12 09:27:41 INFO i.a.c.EnvConfigs(getEnvOrDefault):1235 - Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2023-07-12 09:27:41 INFO i.a.c.EnvConfigs(getEnvOrDefault):1235 - Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2023-07-12 09:27:41 INFO i.a.c.EnvConfigs(getEnvOrDefault):1235 - Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2023-07-12 09:27:41 INFO i.a.c.EnvConfigs(getEnvOrDefault):1235 - Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2023-07-12 09:27:41 INFO i.a.c.EnvConfigs(getEnvOrDefault):1235 - Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2023-07-12 09:27:41 INFO i.a.c.EnvConfigs(getEnvOrDefault):1235 - Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2023-07-12 09:27:41 INFO i.a.c.EnvConfigs(getEnvOrDefault):1235 - Using default value for environment variable LAUNCHDARKLY_KEY: ''
2023-07-12 09:27:41 INFO i.a.c.EnvConfigs(getEnvOrDefault):1235 - Using default value for environment variable LAUNCHDARKLY_KEY: ''
2023-07-12 09:27:41 INFO i.a.c.i.LineGobbler(voidCall):149 - Checking if airbyte/source-mssql:1.1.0 exists...
2023-07-12 09:27:41 INFO i.a.c.i.LineGobbler(voidCall):149 - Checking if airbyte/destination-databricks:1.1.0 exists...
2023-07-12 09:27:41 INFO i.a.c.i.LineGobbler(voidCall):149 - airbyte/source-mssql:1.1.0 was found locally.
2023-07-12 09:27:41 INFO i.a.c.i.LineGobbler(voidCall):149 - airbyte/destination-databricks:1.1.0 was found locally.
2023-07-12 09:27:41 INFO i.a.w.p.DockerProcessFactory(create):139 - Creating docker container = source-mssql-read-11-0-lxpmm with resources io.airbyte.config.ResourceRequirements@20d23835[cpuRequest=1,cpuLimit=,memoryRequest=,memoryLimit=,additionalProperties={}] and allowedHosts io.airbyte.config.AllowedHosts@10942a5[hosts=[vt-ava-bi-sql01.cloudapp.net, *.datadoghq.com, *.datadoghq.eu, *.sentry.io],additionalProperties={}]
2023-07-12 09:27:41 INFO i.a.w.p.DockerProcessFactory(create):139 - Creating docker container = destination-databricks-write-11-0-vujut with resources io.airbyte.config.ResourceRequirements@258f27f3[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=,additionalProperties={}] and allowedHosts null
2023-07-12 09:27:41 INFO i.a.w.p.DockerProcessFactory(create):192 - Preparing command: docker run --rm --init -i -w /data/11/0 --log-driver none --name source-mssql-read-11-0-lxpmm -e CONCURRENT_SOURCE_STREAM_READ=false --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/source-mssql:1.1.0 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e USE_STREAM_CAPABLE_STATE=true -e FIELD_SELECTION_WORKSPACES= -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE= -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.50.7 -e WORKER_JOB_ID=11 airbyte/source-mssql:1.1.0 read --config source_config.json --catalog source_catalog.json
2023-07-12 09:27:41 INFO i.a.w.p.DockerProcessFactory(create):192 - Preparing command: docker run --rm --init -i -w /data/11/0 --log-driver none --name destination-databricks-write-11-0-vujut --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/destination-databricks:1.1.0 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e USE_STREAM_CAPABLE_STATE=true -e FIELD_SELECTION_WORKSPACES= -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE= -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.50.7 -e WORKER_JOB_ID=11 airbyte/destination-databricks:1.1.0 write --config destination_config.json --catalog destination_catalog.json
2023-07-12 09:27:41 INFO i.a.w.i.VersionedAirbyteStreamFactory(create):177 - Reading messages from protocol version 0.2.0
2023-07-12 09:27:41 INFO i.a.w.i.VersionedAirbyteMessageBufferedWriterFactory(createWriter):41 - Writing messages to protocol version 0.2.0
2023-07-12 09:27:41 INFO i.a.w.i.VersionedAirbyteStreamFactory(create):177 - Reading messages from protocol version 0.2.0
2023-07-12 09:27:41 INFO i.a.w.g.BufferedReplicationWorker(writeToDestination):368 - writeToDestination: start
2023-07-12 09:27:41 INFO i.a.w.g.BufferedReplicationWorker(readFromDestination):401 - readFromDestination: start
2023-07-12 09:27:41 INFO i.a.w.g.BufferedReplicationWorker(processMessage):327 - processMessage: start
2023-07-12 09:27:41 INFO i.a.w.i.HeartbeatTimeoutChaperone(runWithHeartbeatThread):94 - Starting source heartbeat check. Will check every 1 minutes.
2023-07-12 09:27:41 INFO i.a.w.g.BufferedReplicationWorker(readFromSource):291 - readFromSource: start
2023-07-12 09:27:42 source > INFO i.a.i.s.m.MssqlSource(main):549 starting source: class io.airbyte.integrations.source.mssql.MssqlSource
2023-07-12 09:27:42 source > INFO i.a.i.b.IntegrationCliParser(parseOptions):126 integration args: {read=null, catalog=source_catalog.json, config=source_config.json}
2023-07-12 09:27:42 source > INFO i.a.i.b.IntegrationRunner(runInternal):106 Running integration: io.airbyte.integrations.base.ssh.SshWrappedSource
2023-07-12 09:27:42 source > INFO i.a.i.b.IntegrationRunner(runInternal):107 Command: READ
2023-07-12 09:27:42 source > INFO i.a.i.b.IntegrationRunner(runInternal):108 Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='null'}
2023-07-12 09:27:42 destination > INFO i.a.i.b.IntegrationCliParser(parseOptions):126 integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2023-07-12 09:27:42 destination > INFO i.a.i.b.IntegrationRunner(runInternal):106 Running integration: io.airbyte.integrations.destination.databricks.DatabricksDestination
2023-07-12 09:27:42 destination > INFO i.a.i.b.IntegrationRunner(runInternal):107 Command: WRITE
2023-07-12 09:27:42 destination > INFO i.a.i.b.IntegrationRunner(runInternal):108 Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2023-07-12 09:27:43 source > WARN c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-07-12 09:27:43 source > WARN c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-07-12 09:27:43 destination > WARN c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-07-12 09:27:43 destination > WARN c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-07-12 09:27:43 destination > INFO i.a.i.d.j.c.SwitchingDestination(getConsumer):65 Using destination type: MANAGED_TABLES_STORAGE
2023-07-12 09:27:43 destination > INFO i.a.i.d.d.DatabricksStorageConfigProvider(getDatabricksStorageConfig):22 Databricks storage type config: "MANAGED_TABLES_STORAGE"
2023-07-12 09:27:43 destination > INFO c.z.h.HikariDataSource(<init>):80 HikariPool-1 - Starting...
2023-07-12 09:27:43 destination > INFO c.z.h.HikariDataSource(<init>):82 HikariPool-1 - Start completed.
2023-07-12 09:27:43 source > INFO i.a.i.b.s.SshTunnel(getInstance):204 Starting connection with method: NO_TUNNEL
2023-07-12 09:27:43 source > INFO i.a.i.s.r.s.StateManagerFactory(createStateManager):48 Legacy state manager selected to manage state object with type LEGACY.
2023-07-12 09:27:43 source > INFO i.a.i.s.r.s.CursorManager(createCursorInfoForStream):182 No cursor field set in catalog but not present in state. Stream: dbo_EQ.LukkePriser, New Cursor Field: null. Resetting cursor value
2023-07-12 09:27:43 source > INFO i.a.i.s.r.CdcStateManager(<init>):29 Initialized CDC state with: null
2023-07-12 09:27:43 destination > INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):103 Write config: WriteConfig{streamName=EQ.LukkePriser, namespace=null, outputSchemaName=eks_ras, tmpTableName=_airbyte_tmp_zda_eq_lukkepriser, outputTableName=_airbyte_raw_eq_lukkepriser, syncMode=overwrite}
2023-07-12 09:27:43 destination > INFO i.a.i.d.b.BufferedStreamConsumer(startTracked):144 class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
2023-07-12 09:27:43 destination > INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):142 Preparing raw tables in destination started for 1 streams
2023-07-12 09:27:43 destination > INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):147 Preparing raw table in destination started for stream EQ.LukkePriser. schema: eks_ras, table name: _airbyte_raw_eq_lukkepriser
2023-07-12 09:27:43 source > INFO c.z.h.HikariDataSource(<init>):80 HikariPool-1 - Starting...
2023-07-12 09:27:43 source > INFO c.z.h.HikariDataSource(<init>):82 HikariPool-1 - Start completed.
2023-07-12 09:27:44 destination > INFO c.z.h.p.PoolBase(getAndSetNetworkTimeout):536 HikariPool-1 - Driver does not support get/set network timeout for connections. ([Databricks][JDBC](10220) Driver does not support this optional feature.)
2023-07-12 09:27:45 source > INFO i.a.i.s.j.AbstractJdbcSource(logPreSyncDebugData):450 Data source product recognized as Microsoft SQL Server:15.00.4280
2023-07-12 09:27:45 source > INFO i.a.i.s.j.AbstractJdbcSource(discoverInternal):166 Internal schemas to exclude: [spt_fallback_db, spt_monitor, cdc, spt_values, INFORMATION_SCHEMA, spt_fallback_usg, MSreplication_options, sys, spt_fallback_dev]
2023-07-12 09:27:49 source > INFO i.a.i.s.m.MssqlSource(getIncrementalIterators):465 using CDC: false
2023-07-12 09:27:49 source > INFO i.a.i.s.m.MssqlSource(queryTableFullRefresh):103 Queueing query for table: EQ.LukkePriser
2023-07-12 09:27:50 source > INFO i.a.d.j.s.AdaptiveStreamingQueryConfig(initialize):31 Set initial fetch size: 10 rows
2023-07-12 09:27:50 source > INFO i.a.i.s.m.MssqlSource(queryTableFullRefresh):121 Prepared SQL query for TableFullRefresh is: SELECT "Dato", "Type", "Kontrakt", "Pris", "Valuta", "Kilde" FROM "dbo"."EQ.LukkePriser"
2023-07-12 09:27:50 source > INFO i.a.i.s.r.RelationalDbQueryUtils(lambda$queryTable$0):73 Queueing query: SELECT "Dato", "Type", "Kontrakt", "Pris", "Valuta", "Kilde" FROM "dbo"."EQ.LukkePriser"
2023-07-12 09:27:50 source > INFO i.a.d.j.s.AdaptiveStreamingQueryConfig(initialize):31 Set initial fetch size: 10 rows
2023-07-12 09:27:50 source > INFO i.a.c.u.CompositeIterator(lambda$emitStartStreamStatus$1):155 STARTING -> dbo_EQ.LukkePriser
2023-07-12 09:27:50 source > INFO i.a.c.u.CompositeIterator(lambda$emitRunningStreamStatus$0):148 RUNNING -> dbo_EQ.LukkePriser
2023-07-12 09:27:50 source > INFO i.a.d.j.s.AdaptiveStreamingQueryConfig(accept):40 Set new fetch size: 787219 rows
2023-07-12 09:27:50 source > INFO i.a.d.j.s.TwoStageSizeEstimator(getTargetBufferByteSize):71 Max memory limit: 50365202432, JDBC buffer size: 30219121459
2023-07-12 09:27:50 destination > INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):160 Preparing raw tables in destination completed.
2023-07-12 09:27:51 source > INFO i.a.c.u.CompositeIterator(lambda$emitCompleteStreamStatus$2):162 COMPLETE -> dbo_EQ.LukkePriser
2023-07-12 09:27:51 source > INFO i.a.i.s.r.AbstractDbSource(lambda$read$1):173 Closing database connection pool.
2023-07-12 09:27:51 source > INFO c.z.h.HikariDataSource(close):350 HikariPool-1 - Shutdown initiated...
2023-07-12 09:27:51 source > INFO c.z.h.HikariDataSource(close):352 HikariPool-1 - Shutdown completed.
2023-07-12 09:27:51 source > INFO i.a.i.s.r.AbstractDbSource(lambda$read$1):175 Closed database connection pool.
2023-07-12 09:27:51 source > INFO i.a.i.b.IntegrationRunner(runInternal):195 Completed integration: io.airbyte.integrations.base.ssh.SshWrappedSource
2023-07-12 09:27:51 source > INFO i.a.i.s.m.MssqlSource(main):551 completed source: class io.airbyte.integrations.source.mssql.MssqlSource
2023-07-12 09:27:51 INFO i.a.w.g.ReplicationWorkerHelper(endOfSource):152 - Total records read: 6368 (671 KB)
2023-07-12 09:27:51 INFO i.a.w.i.FieldSelector(reportMetrics):122 - Schema validation was performed to a max of 10 records with errors per stream.
2023-07-12 09:27:51 INFO i.a.w.g.BufferedReplicationWorker(readFromSource):319 - readFromSource: done. (source.isFinished:true, fromSource.isClosed:false)
2023-07-12 09:27:51 INFO i.a.w.i.HeartbeatTimeoutChaperone(runWithHeartbeatThread):111 - thread status... heartbeat thread: false , replication thread: true
2023-07-12 09:27:51 INFO i.a.w.g.BufferedReplicationWorker(writeToDestination):391 - writeToDestination: done. (forDest.isDone:true, isDestRunning:true)
2023-07-12 09:27:51 destination > INFO i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):80 Airbyte message consumer: succeeded.
2023-07-12 09:27:51 destination > INFO i.a.i.d.b.BufferedStreamConsumer(close):255 executing on success close procedure.
2023-07-12 09:27:51 destination > INFO i.a.i.d.r.InMemoryRecordBufferingStrategy(flushAllBuffers):85 Flushing EQ.LukkePriser: 6365 records (2 MB)
2023-07-12 09:27:51 destination > INFO i.a.i.d.d.DatabricksSqlOperations(insertRecordsInternal):58 actual size of batch: 6365
2023-07-12 09:28:01 destination > INFO i.a.i.d.r.InMemoryRecordBufferingStrategy(flushAllBuffers):91 Flushing completed for EQ.LukkePriser
2023-07-12 09:28:01 destination > INFO i.a.i.b.IntegrationRunner(runInternal):195 Completed integration: io.airbyte.integrations.destination.databricks.DatabricksDestination
2023-07-12 09:28:01 INFO i.a.w.g.BufferedReplicationWorker(readFromDestination):427 - readFromDestination: done. (writeToDestFailed:false, dest.isFinished:true)
2023-07-12 09:28:01 INFO i.a.w.t.TemporalAttemptExecution(get):163 - Stopping cancellation check scheduling...
2023-07-12 09:28:01 INFO i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):160 - sync summary: io.airbyte.config.StandardSyncOutput@39cbb6dd[standardSyncSummary=io.airbyte.config.StandardSyncSummary@1c69ac1a[status=completed,recordsSynced=6365,bytesSynced=687424,startTime=1689154061032,endTime=1689154081731,totalStats=io.airbyte.config.SyncStats@682d7f34[bytesCommitted=687424,bytesEmitted=687424,destinationStateMessagesEmitted=0,destinationWriteEndTime=1689154081727,destinationWriteStartTime=1689154061034,estimatedBytes=<null>,estimatedRecords=<null>,meanSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBetweenStateMessageEmittedandCommitted=0,meanSecondsBetweenStateMessageEmittedandCommitted=0,recordsEmitted=6365,recordsCommitted=6365,replicationEndTime=1689154081731,replicationStartTime=1689154061032,sourceReadEndTime=1689154071298,sourceReadStartTime=1689154061033,sourceStateMessagesEmitted=0,additionalProperties={}],streamStats=[io.airbyte.config.StreamSyncStats@67b8c358[streamName=EQ.LukkePriser,streamNamespace=<null>,stats=io.airbyte.config.SyncStats@3a1fc2c[bytesCommitted=687424,bytesEmitted=687424,destinationStateMessagesEmitted=<null>,destinationWriteEndTime=<null>,destinationWriteStartTime=<null>,estimatedBytes=<null>,estimatedRecords=<null>,meanSecondsBeforeSourceStateMessageEmitted=<null>,maxSecondsBeforeSourceStateMessageEmitted=<null>,maxSecondsBetweenStateMessageEmittedandCommitted=<null>,meanSecondsBetweenStateMessageEmittedandCommitted=<null>,recordsEmitted=6365,recordsCommitted=6365,replicationEndTime=<null>,replicationStartTime=<null>,sourceReadEndTime=<null>,sourceReadStartTime=<null>,sourceStateMessagesEmitted=<null>,additionalProperties={}],additionalProperties={}]],performanceMetrics=io.airbyte.config.PerformanceMetrics@52f9993e[additionalProperties={processFromSource=6688.13 ns/exec (total: 0.04s, 6368 executions), readFromSource=1496092.29 ns/exec (total: 10.11s, 6756 executions), processFromDest=NaN ns/exec (total: 0.00s, 0 executions), writeToDest=24413.51 ns/exec (total: 0.16s, 6365 executions), readFromDest=19395701.10 ns/exec (total: 20.66s, 1065 executions)}],additionalProperties={}],normalizationSummary=<null>,webhookOperationSummary=<null>,state=<null>,outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@51ae888a[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@702d23e4[stream=io.airbyte.protocol.models.AirbyteStream@7ce1f151[name=EQ.LukkePriser,jsonSchema={"type":"object","properties":{"Type":{"type":"string"},"Dato":{"type":"string"},"Pris":{"type":"number"},"Kontrakt":{"type":"string"},"Valuta":{"type":"string"},"Kilde":{"type":"string"}}},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[],additionalProperties={}]],additionalProperties={}],failures=[],additionalProperties={}]
2023-07-12 09:28:01 INFO i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):165 - Sync summary length: 2929
2023-07-12 09:28:01 INFO i.a.c.t.TemporalUtils(withBackgroundHeartbeat):307 - Stopping temporal heartbeating...
2023-07-12 09:28:01 ERROR i.a.p.j.JobNotifier(notifyJob):143 - Unable to read configuration for notification. Non-blocking. Error:
java.lang.NullPointerException: Cannot invoke "io.airbyte.config.NotificationSettings.getSendOnSuccess()" because "notificationSettings" is null
    at io.airbyte.persistence.job.JobNotifier.notifyJob(JobNotifier.java:118) ~[io.airbyte.airbyte-persistence-job-persistence-0.50.7.jar:?]
    at io.airbyte.persistence.job.JobNotifier.notifyJob(JobNotifier.java:81) ~[io.airbyte.airbyte-persistence-job-persistence-0.50.7.jar:?]
    at io.airbyte.persistence.job.JobNotifier.successJob(JobNotifier.java:224) ~[io.airbyte.airbyte-persistence-job-persistence-0.50.7.jar:?]
    at io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivityImpl.jobSuccessWithAttemptNumber(JobCreationAndStatusUpdateActivityImpl.java:276) ~[io.airbyte-airbyte-workers-0.50.7.jar:?]
    at jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:578) ~[?:?]
    at io.temporal.internal.activity.RootActivityInboundCallsInterceptor$POJOActivityInboundCallsInterceptor.executeActivity(RootActivityInboundCallsInterceptor.java:64) ~[temporal-sdk-1.17.0.jar:?]
    at io.temporal.internal.activity.RootActivityInboundCallsInterceptor.execute(RootActivityInboundCallsInterceptor.java:43) ~[temporal-sdk-1.17.0.jar:?]
    at io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor.execute(ActivityTaskExecutors.java:95) ~[temporal-sdk-1.17.0.jar:?]
    at io.temporal.internal.activity.ActivityTaskHandlerImpl.handle(ActivityTaskHandlerImpl.java:92) ~[temporal-sdk-1.17.0.jar:?]
    at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:241) ~[temporal-sdk-1.17.0.jar:?]
    at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:206) ~[temporal-sdk-1.17.0.jar:?]
    at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:179) ~[temporal-sdk-1.17.0.jar:?]
    at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93) ~[temporal-sdk-1.17.0.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    at java.lang.Thread.run(Thread.java:1589) ~[?:?]
2023-07-12 09:28:01 INFO i.a.v.j.JsonSchemaValidator(test):121 - JSON schema validation failed. 
errors: $.ssl_method: must be a constant value unencrypted
2023-07-12 09:28:01 INFO i.a.c.t.StreamResetRecordsHelper(deleteStreamResetRecordsForJob):50 - deleteStreamResetRecordsForJob was called for job 11 with config type sync. Returning, as config type is not resetConnection.

Contribute

kelly-sm commented 1 year ago

+1 - we are experiencing the same behavior

plenti-jacob-roe commented 1 year ago

workaround on this I found was changing it from managed table to s3 in the destination settings. Also found that change makes it significantly faster at syncing data. I am on version 1.0.1 of the destination though so might have changed but had the same issue when using managed tables