airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.23k stars 3.93k forks source link

Postgres destination: Ssh tunneling makes custom connector emit and commit inconsistent number of records #21251

Closed machariamuguku closed 2 days ago

machariamuguku commented 1 year ago

Environment

Current Behavior

When you sync the connector with a postgres destination using ssh key authentication, the number of records emitted and committed are 1. Inconsistent (i.e vary in number) and 2. Fewer than are available in the source. It seems like pulling yielding is cut short before it completes. No such behaviour is observed when using a postgres destination without tunneling. We have tried this with the same database with and without tunneling.

Expected Behavior

All available records should be emitted and committed

Logs

2023-01-11 14:02:45 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. 
errors: $.mode: must be a constant value disable, $.mode: does not have a value in the enumeration [disable]
2023-01-11 14:02:45 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. 
errors: $.mode: must be a constant value allow, $.mode: does not have a value in the enumeration [allow]
2023-01-11 14:02:45 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. 
errors: $.mode: must be a constant value prefer, $.mode: does not have a value in the enumeration [prefer]
2023-01-11 14:02:45 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. 
errors: $.tunnel_method: must be a constant value NO_TUNNEL
2023-01-11 14:02:45 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. 
errors: $.ssh_key: object found, string expected
2023-01-11 14:02:45 INFO i.a.v.j.JsonSchemaValidator(test):71 - JSON schema validation failed. 
errors: $.tunnel_method: must be a constant value SSH_PASSWORD_AUTH, $.tunnel_user_password: is missing but it is required
2023-01-11 14:02:45 INFO i.a.a.LoggingTrackingClient(track):43 - track. version: null, userId: null, action: Connector Jobs, metadata: {job_type=sync, config.source.ona_data_api_domain=set, config.destination.ssl_mode.mode=require, namespace_definition=source, config.source.ona_data_api_token._secret=set, frequency=1440 min, connector_source_definition_id=b8028232-b0dd-4e09-af7f-97c86a6ad1ae, config.destination.port=set, workspace_id=ebc980ea-3212-44f6-aac7-6449bab3f0fc, config.destination.database=set, config.source.ona_data_form_id=set, attempt_stage=STARTED, config.destination.password._secret=set, attempt_id=1, connector_destination=Postgres, connector_destination_docker_repository=airbyte/destination-postgres, table_prefix=true, workspace_name=ebc980ea-3212-44f6-aac7-6449bab3f0fc, catalog.destination_sync_mode.overwrite=set, number_of_streams=1, config.source.ona_data_page_size=set, connector_source=Ona Data Forms, config.destination.host=set, connector_source_docker_repository=muguku/source-ona-data-forms, config.destination.schema=set, config.destination.username=set, catalog.sync_mode.full_refresh=set, connection_id=c4e859b4-b235-495d-985b-520d1dfcd08d, job_id=309, connector_source_version=0.0.2, connector_destination_version=0.3.24, config.destination.ssl=false, operation_count=0, connector_destination_definition_id=25c5221d-dce2-4163-ade9-739ef790f503}
2023-01-11 14:02:45 INFO i.a.w.t.TemporalAttemptExecution(get):106 - Docker volume job log path: /tmp/workspace/309/0/logs.log
2023-01-11 14:02:45 INFO i.a.w.t.TemporalAttemptExecution(get):111 - Executing worker wrapper. Airbyte version: 0.40.7
2023-01-11 14:02:45 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:02:45 INFO i.a.c.i.LineGobbler(voidCall):114 - ----- START CHECK -----
2023-01-11 14:02:45 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:02:45 INFO i.a.c.i.LineGobbler(voidCall):114 - Checking if muguku/source-ona-data-forms:0.0.2 exists...
2023-01-11 14:02:45 INFO i.a.c.i.LineGobbler(voidCall):114 - muguku/source-ona-data-forms:0.0.2 was found locally.
2023-01-11 14:02:45 INFO i.a.w.p.DockerProcessFactory(create):119 - Creating docker container = source-ona-data-forms-check-309-0-ygrqe with resources io.airbyte.config.ResourceRequirements@33b7a5a4[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=]
2023-01-11 14:02:45 INFO i.a.w.p.DockerProcessFactory(create):163 - Preparing command: docker run --rm --init -i -w /data/309/0 --log-driver none --name source-ona-data-forms-check-309-0-ygrqe --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE= -e WORKER_JOB_ATTEMPT=0 -e WORKER_CONNECTOR_IMAGE=muguku/source-ona-data-forms:0.0.2 -e AIRBYTE_VERSION=0.40.7 -e WORKER_JOB_ID=309 muguku/source-ona-data-forms:0.0.2 check --config source_config.json
2023-01-11 14:02:48 INFO i.a.w.i.DefaultAirbyteStreamFactory(internalLog):103 - Check succeeded
2023-01-11 14:02:48 INFO i.a.w.t.TemporalAttemptExecution(get):132 - Stopping cancellation check scheduling...
2023-01-11 14:02:48 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:02:48 INFO i.a.c.i.LineGobbler(voidCall):114 - ----- END CHECK -----
2023-01-11 14:02:48 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:02:48 INFO i.a.w.t.TemporalAttemptExecution(get):106 - Docker volume job log path: /tmp/workspace/309/0/logs.log
2023-01-11 14:02:48 INFO i.a.w.t.TemporalAttemptExecution(get):111 - Executing worker wrapper. Airbyte version: 0.40.7
2023-01-11 14:02:48 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:02:48 INFO i.a.c.i.LineGobbler(voidCall):114 - ----- START CHECK -----
2023-01-11 14:02:48 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:02:48 INFO i.a.c.i.LineGobbler(voidCall):114 - Checking if airbyte/destination-postgres:0.3.24 exists...
2023-01-11 14:02:48 INFO i.a.c.i.LineGobbler(voidCall):114 - airbyte/destination-postgres:0.3.24 was found locally.
2023-01-11 14:02:48 INFO i.a.w.p.DockerProcessFactory(create):119 - Creating docker container = destination-postgres-check-309-0-tnskb with resources io.airbyte.config.ResourceRequirements@33b7a5a4[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=]
2023-01-11 14:02:48 INFO i.a.w.p.DockerProcessFactory(create):163 - Preparing command: docker run --rm --init -i -w /data/309/0 --log-driver none --name destination-postgres-check-309-0-tnskb --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE= -e WORKER_JOB_ATTEMPT=0 -e WORKER_CONNECTOR_IMAGE=airbyte/destination-postgres:0.3.24 -e AIRBYTE_VERSION=0.40.7 -e WORKER_JOB_ID=309 airbyte/destination-postgres:0.3.24 check --config source_config.json
2023-01-11 14:02:50 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:50 INFO i.a.i.d.p.PostgresDestination(main):100 - starting destination: class io.airbyte.integrations.destination.postgres.PostgresDestination
2023-01-11 14:02:50 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:50 INFO i.a.i.b.IntegrationCliParser(parseOptions):118 - integration args: {check=null, config=source_config.json}
2023-01-11 14:02:50 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:50 INFO i.a.i.b.IntegrationRunner(runInternal):104 - Running integration: io.airbyte.integrations.base.ssh.SshWrappedDestination
2023-01-11 14:02:50 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:50 INFO i.a.i.b.IntegrationRunner(runInternal):105 - Command: CHECK
2023-01-11 14:02:50 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:50 INFO i.a.i.b.IntegrationRunner(runInternal):106 - Integration config: IntegrationConfig{command=CHECK, configPath='source_config.json', catalogPath='null', statePath='null'}
2023-01-11 14:02:50 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:50 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-01-11 14:02:50 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:50 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-01-11 14:02:50 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:50 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-01-11 14:02:50 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:50 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword multiline - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-01-11 14:02:50 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:50 INFO i.a.i.b.s.SshTunnel(getInstance):172 - Starting connection with method: SSH_KEY_AUTH
2023-01-11 14:02:51 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:51 WARN o.a.s.c.k.StaticServerKeyVerifier(handleAcceptance):59 - Server at **domain/ip**:22 presented unverified EdDSA key: SHA256:**KEY**
2023-01-11 14:02:51 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:51 INFO i.a.i.b.s.SshTunnel(openTunnel):289 - Established tunneling session.  Port forwarding started on /127.0.0.1:40903
2023-01-11 14:02:51 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:51 INFO c.z.h.HikariDataSource(<init>):80 - HikariPool-1 - Starting...
2023-01-11 14:02:51 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:51 INFO c.z.h.HikariDataSource(<init>):82 - HikariPool-1 - Start completed.
2023-01-11 14:02:53 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:53 INFO c.z.h.HikariDataSource(close):350 - HikariPool-1 - Shutdown initiated...
2023-01-11 14:02:53 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:53 INFO c.z.h.HikariDataSource(close):352 - HikariPool-1 - Shutdown completed.
2023-01-11 14:02:53 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:53 INFO i.a.i.b.IntegrationRunner(runInternal):152 - Completed integration: io.airbyte.integrations.base.ssh.SshWrappedDestination
2023-01-11 14:02:53 INFO i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):62 - 2023-01-11 14:02:53 INFO i.a.i.d.p.PostgresDestination(main):102 - completed destination: class io.airbyte.integrations.destination.postgres.PostgresDestination
2023-01-11 14:02:53 INFO i.a.w.t.TemporalAttemptExecution(get):132 - Stopping cancellation check scheduling...
2023-01-11 14:02:53 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:02:53 INFO i.a.c.i.LineGobbler(voidCall):114 - ----- END CHECK -----
2023-01-11 14:02:53 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:02:53 INFO i.a.w.t.TemporalAttemptExecution(get):106 - Docker volume job log path: /tmp/workspace/309/0/logs.log
2023-01-11 14:02:53 INFO i.a.w.t.TemporalAttemptExecution(get):111 - Executing worker wrapper. Airbyte version: 0.40.7
2023-01-11 14:02:53 INFO i.a.c.f.EnvVariableFeatureFlags(getEnvOrDefault):49 - Using default value for environment variable LOG_CONNECTOR_MESSAGES: 'false'
2023-01-11 14:02:53 INFO i.a.c.EnvConfigs(getEnvOrDefault):1096 - Using default value for environment variable METRIC_CLIENT: ''
2023-01-11 14:02:53 WARN i.a.m.l.MetricClientFactory(initialize):60 - Metric client is already initialized to 
2023-01-11 14:02:53 INFO i.a.c.f.EnvVariableFeatureFlags(getEnvOrDefault):49 - Using default value for environment variable LOG_CONNECTOR_MESSAGES: 'false'
2023-01-11 14:02:53 INFO i.a.w.g.DefaultReplicationWorker(run):122 - start sync worker. job id: 309 attempt id: 0
2023-01-11 14:02:53 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:02:53 INFO i.a.c.i.LineGobbler(voidCall):114 - ----- START REPLICATION -----
2023-01-11 14:02:53 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:02:53 INFO i.a.w.g.DefaultReplicationWorker(run):135 - configured sync modes: {null.form_data=full_refresh - overwrite}
2023-01-11 14:02:53 INFO i.a.w.i.DefaultAirbyteDestination(start):69 - Running destination...
2023-01-11 14:02:53 INFO i.a.c.i.LineGobbler(voidCall):114 - Checking if airbyte/destination-postgres:0.3.24 exists...
2023-01-11 14:02:54 INFO i.a.c.i.LineGobbler(voidCall):114 - airbyte/destination-postgres:0.3.24 was found locally.
2023-01-11 14:02:54 INFO i.a.w.p.DockerProcessFactory(create):119 - Creating docker container = destination-postgres-write-309-0-ynzjh with resources io.airbyte.config.ResourceRequirements@5638d827[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=]
2023-01-11 14:02:54 INFO i.a.w.p.DockerProcessFactory(create):163 - Preparing command: docker run --rm --init -i -w /data/309/0 --log-driver none --name destination-postgres-write-309-0-ynzjh --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE= -e WORKER_JOB_ATTEMPT=0 -e WORKER_CONNECTOR_IMAGE=airbyte/destination-postgres:0.3.24 -e AIRBYTE_VERSION=0.40.7 -e WORKER_JOB_ID=309 airbyte/destination-postgres:0.3.24 write --config destination_config.json --catalog destination_catalog.json
2023-01-11 14:02:54 INFO i.a.c.i.LineGobbler(voidCall):114 - Checking if muguku/source-ona-data-forms:0.0.2 exists...
2023-01-11 14:02:54 INFO i.a.c.i.LineGobbler(voidCall):114 - muguku/source-ona-data-forms:0.0.2 was found locally.
2023-01-11 14:02:54 INFO i.a.w.p.DockerProcessFactory(create):119 - Creating docker container = source-ona-data-forms-read-309-0-yzfnx with resources io.airbyte.config.ResourceRequirements@2e76ed89[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=]
2023-01-11 14:02:54 INFO i.a.w.p.DockerProcessFactory(create):163 - Preparing command: docker run --rm --init -i -w /data/309/0 --log-driver none --name source-ona-data-forms-read-309-0-yzfnx --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -e DEPLOYMENT_MODE=OSS -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE= -e WORKER_JOB_ATTEMPT=0 -e WORKER_CONNECTOR_IMAGE=muguku/source-ona-data-forms:0.0.2 -e AIRBYTE_VERSION=0.40.7 -e WORKER_JOB_ID=309 muguku/source-ona-data-forms:0.0.2 read --config source_config.json --catalog source_catalog.json --state input_state.json
2023-01-11 14:02:54 INFO i.a.w.g.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$7):435 - Destination output thread started.
2023-01-11 14:02:54 INFO i.a.w.g.DefaultReplicationWorker(run):177 - Waiting for source and destination threads to complete.
2023-01-11 14:02:54 INFO i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):322 - Replication thread started.
2023-01-11 14:02:56 source > Starting syncing SourceOnaDataForms
2023-01-11 14:02:56 source > Syncing stream: form_data 
2023-01-11 14:02:56 destination > 2023-01-11 14:02:56 INFO i.a.i.d.p.PostgresDestination(main):100 - starting destination: class io.airbyte.integrations.destination.postgres.PostgresDestination
2023-01-11 14:02:56 destination > 2023-01-11 14:02:56 INFO i.a.i.b.IntegrationCliParser(parseOptions):118 - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2023-01-11 14:02:56 destination > 2023-01-11 14:02:56 INFO i.a.i.b.IntegrationRunner(runInternal):104 - Running integration: io.airbyte.integrations.base.ssh.SshWrappedDestination
2023-01-11 14:02:56 destination > 2023-01-11 14:02:56 INFO i.a.i.b.IntegrationRunner(runInternal):105 - Command: WRITE
2023-01-11 14:02:56 destination > 2023-01-11 14:02:56 INFO i.a.i.b.IntegrationRunner(runInternal):106 - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2023-01-11 14:02:57 destination > 2023-01-11 14:02:57 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-01-11 14:02:57 destination > 2023-01-11 14:02:57 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-01-11 14:02:57 destination > 2023-01-11 14:02:57 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-01-11 14:02:57 destination > 2023-01-11 14:02:57 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword multiline - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-01-11 14:02:57 destination > 2023-01-11 14:02:57 INFO i.a.i.b.s.SshTunnel(getInstance):172 - Starting connection with method: SSH_KEY_AUTH
2023-01-11 14:02:58 destination > 2023-01-11 14:02:58 WARN o.a.s.c.k.StaticServerKeyVerifier(handleAcceptance):59 - **domain/ip**:22 presented unverified EdDSA key: SHA256:**KEY**
2023-01-11 14:02:58 destination > 2023-01-11 14:02:58 INFO i.a.i.b.s.SshTunnel(openTunnel):289 - Established tunneling session.  Port forwarding started on /127.0.0.1:36111
2023-01-11 14:02:58 destination > 2023-01-11 14:02:58 INFO c.z.h.HikariDataSource(<init>):80 - HikariPool-1 - Starting...
2023-01-11 14:02:58 destination > 2023-01-11 14:02:58 INFO c.z.h.HikariDataSource(<init>):82 - HikariPool-1 - Start completed.
2023-01-11 14:02:59 destination > 2023-01-11 14:02:59 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):98 - Write config: WriteConfig{streamName=ndc_assessment_form_data, namespace=null, outputSchemaName=inform, tmpTableName=_airbyte_tmp_alp_ndc_assessment_form_data, outputTableName=_airbyte_raw_ndc_assessment_form_data, syncMode=overwrite}
2023-01-11 14:02:59 destination > 2023-01-11 14:02:59 INFO i.a.i.d.b.BufferedStreamConsumer(startTracked):116 - class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
2023-01-11 14:02:59 destination > 2023-01-11 14:02:59 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):123 - Preparing tmp tables in destination started for 1 streams
2023-01-11 14:02:59 destination > 2023-01-11 14:02:59 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):127 - Preparing tmp table in destination started for stream ndc_assessment_form_data. schema: inform, tmp table name: _airbyte_tmp_alp_ndc_assessment_form_data
2023-01-11 14:03:00 destination > 2023-01-11 14:03:00 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):133 - Preparing tables in destination completed.
2023-01-11 14:03:00 source > {"type": "RECORD", "record": *Incomplete record*
2023-01-11 14:03:00 INFO i.a.w.g.DefaultReplicationWorker(lambda$getReplicationRunnable$6):363 - Total records read: 194 (1 MB)
2023-01-11 14:03:00 INFO i.a.w.g.DefaultReplicationWorker(run):182 - One of source or destination thread complete. Waiting on the other.
2023-01-11 14:03:00 destination > 2023-01-11 14:03:00 WARN i.a.i.b.IntegrationRunner(watchForOrphanThreads):215 - The main thread is exiting while children non-daemon threads from a connector are still active.
2023-01-11 14:03:00 destination > Ideally, this situation should not happen...
2023-01-11 14:03:00 destination > Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead.
2023-01-11 14:03:00 destination > The main thread is: main (RUNNABLE)
2023-01-11 14:03:00 destination >  Thread stacktrace: java.base/java.lang.Thread.getStackTrace(Thread.java:1610)
2023-01-11 14:03:00 destination >         at io.airbyte.integrations.base.IntegrationRunner.dumpThread(IntegrationRunner.java:271)
2023-01-11 14:03:00 destination >         at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:219)
2023-01-11 14:03:00 destination >         at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:177)
2023-01-11 14:03:00 destination >         at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:146)
2023-01-11 14:03:00 destination >         at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:97)
2023-01-11 14:03:00 destination >         at io.airbyte.integrations.destination.postgres.PostgresDestination.main(PostgresDestination.java:101)
2023-01-11 14:03:00 destination > 2023-01-11 14:03:00 WARN i.a.i.b.IntegrationRunner(watchForOrphanThreads):227 - Active non-daemon thread: pool-2-thread-1 (TIMED_WAITING)
2023-01-11 14:03:00 destination >  Thread stacktrace: java.base@17.0.1/jdk.internal.misc.Unsafe.park(Native Method)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:252)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:401)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:903)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1061)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1122)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/java.lang.Thread.run(Thread.java:833)
2023-01-11 14:03:00 destination > 2023-01-11 14:03:00 WARN i.a.i.b.IntegrationRunner(watchForOrphanThreads):227 - Active non-daemon thread: NioSocketAcceptor-2 (RUNNABLE)
2023-01-11 14:03:00 destination >  Thread stacktrace: java.base@17.0.1/sun.nio.ch.EPoll.wait(Native Method)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:118)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:129)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/sun.nio.ch.SelectorImpl.select(SelectorImpl.java:146)
2023-01-11 14:03:00 destination >         at app//org.apache.mina.transport.socket.nio.NioSocketAcceptor.select(NioSocketAcceptor.java:299)
2023-01-11 14:03:00 destination >         at app//org.apache.mina.core.polling.AbstractPollingIoAcceptor$Acceptor.run(AbstractPollingIoAcceptor.java:466)
2023-01-11 14:03:00 destination >         at app//org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
2023-01-11 14:03:00 destination >         at java.base@17.0.1/java.lang.Thread.run(Thread.java:833)
2023-01-11 14:03:00 destination > 2023-01-11 14:03:00 INFO i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):62 - Airbyte message consumer: succeeded.
2023-01-11 14:03:00 destination > 2023-01-11 14:03:00 INFO i.a.i.d.b.BufferedStreamConsumer(close):171 - executing on success close procedure.
2023-01-11 14:03:00 destination > 2023-01-11 14:03:00 INFO i.a.i.d.r.InMemoryRecordBufferingStrategy(flushAll):84 - Flushing ndc_assessment_form_data: 194 records (4 MB)
2023-01-11 14:03:01 destination > 2023-01-11 14:03:01 INFO i.a.i.d.j.SqlOperations(onDestinationCloseOperations):137 - No onDestinationCloseOperations required for this destination.
2023-01-11 14:03:01 destination > 2023-01-11 14:03:01 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):163 - Finalizing tables in destination started for 1 streams
2023-01-11 14:03:01 destination > 2023-01-11 14:03:01 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):168 - Finalizing stream ndc_assessment_form_data. schema inform, tmp table _airbyte_tmp_alp_ndc_assessment_form_data, final table _airbyte_raw_ndc_assessment_form_data
2023-01-11 14:03:01 destination > 2023-01-11 14:03:01 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):181 - Executing finalization of tables.
2023-01-11 14:03:01 destination > 2023-01-11 14:03:01 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):183 - Finalizing tables in destination completed.
2023-01-11 14:03:01 destination > 2023-01-11 14:03:01 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):186 - Cleaning tmp tables in destination started for 1 streams
2023-01-11 14:03:01 destination > 2023-01-11 14:03:01 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):190 - Cleaning tmp table in destination started for stream ndc_assessment_form_data. schema inform, tmp table name: _airbyte_tmp_alp_ndc_assessment_form_data
2023-01-11 14:03:01 destination > 2023-01-11 14:03:01 INFO i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):195 - Cleaning tmp tables in destination completed.
2023-01-11 14:03:01 destination > 2023-01-11 14:03:01 INFO i.a.i.b.IntegrationRunner(runInternal):152 - Completed integration: io.airbyte.integrations.base.ssh.SshWrappedDestination
2023-01-11 14:03:01 destination > 2023-01-11 14:03:01 INFO i.a.i.d.p.PostgresDestination(main):102 - completed destination: class io.airbyte.integrations.destination.postgres.PostgresDestination
2023-01-11 14:03:01 INFO i.a.w.g.DefaultReplicationWorker(run):184 - Source and destination threads complete.
2023-01-11 14:03:01 INFO i.a.w.g.DefaultReplicationWorker(run):281 - Source did not output any state messages
2023-01-11 14:03:01 WARN i.a.w.g.DefaultReplicationWorker(run):289 - State capture: No new state, falling back on input state: io.airbyte.config.State@2ba9f379[state={}]
2023-01-11 14:03:01 INFO i.a.w.g.DefaultReplicationWorker(run):300 - sync summary: {
  "status" : "completed",
  "recordsSynced" : 194,
  "bytesSynced" : 1746976,
  "startTime" : 1673445773989,
  "endTime" : 1673445781679,
  "totalStats" : {
    "recordsEmitted" : 194,
    "bytesEmitted" : 1746976,
    "sourceStateMessagesEmitted" : 0,
    "destinationStateMessagesEmitted" : 0,
    "recordsCommitted" : 194,
    "meanSecondsBeforeSourceStateMessageEmitted" : 0,
    "maxSecondsBeforeSourceStateMessageEmitted" : 0,
    "maxSecondsBetweenStateMessageEmittedandCommitted" : 0,
    "meanSecondsBetweenStateMessageEmittedandCommitted" : 0
  },
  "streamStats" : [ {
    "streamName" : "ndc_assessment_form_data",
    "stats" : {
      "recordsEmitted" : 194,
      "bytesEmitted" : 1746976,
      "recordsCommitted" : 194
    }
  } ]
}
2023-01-11 14:03:01 INFO i.a.w.g.DefaultReplicationWorker(run):301 - failures: [ ]
2023-01-11 14:03:01 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:03:01 INFO i.a.c.i.LineGobbler(voidCall):114 - ----- END REPLICATION -----
2023-01-11 14:03:01 INFO i.a.w.t.TemporalAttemptExecution(get):132 - Stopping cancellation check scheduling...
2023-01-11 14:03:01 INFO i.a.c.i.LineGobbler(voidCall):114 - 
2023-01-11 14:03:01 INFO i.a.w.t.s.ReplicationActivityImpl(lambda$replicate$3):149 - sync summary: io.airbyte.config.StandardSyncOutput@7d748a7e[standardSyncSummary=io.airbyte.config.StandardSyncSummary@5ba8e580[status=completed,recordsSynced=194,bytesSynced=1746976,startTime=1673445773989,endTime=1673445781679,totalStats=io.airbyte.config.SyncStats@615999c[recordsEmitted=194,bytesEmitted=1746976,sourceStateMessagesEmitted=0,destinationStateMessagesEmitted=0,recordsCommitted=194,meanSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBeforeSourceStateMessageEmitted=0,maxSecondsBetweenStateMessageEmittedandCommitted=0,meanSecondsBetweenStateMessageEmittedandCommitted=0,additionalProperties={}],streamStats=[io.airbyte.config.StreamSyncStats@785e28a7[streamName=ndc_assessment_form_data,stats=io.airbyte.config.SyncStats@21a06aa8[recordsEmitted=194,bytesEmitted=1746976,sourceStateMessagesEmitted=<null>,destinationStateMessagesEmitted=<null>,recordsCommitted=194,meanSecondsBeforeSourceStateMessageEmitted=<null>,maxSecondsBeforeSourceStateMessageEmitted=<null>,maxSecondsBetweenStateMessageEmittedandCommitted=<null>,meanSecondsBetweenStateMessageEmittedandCommitted=<null>,additionalProperties={}]]]],normalizationSummary=<null>,state=io.airbyte.config.State@2ba9f379[state={}],outputCatalog=io.airbyte.protocol.models.ConfiguredAirbyteCatalog@3d380ff9[streams=[io.airbyte.protocol.models.ConfiguredAirbyteStream@17cba8bb[stream=io.airbyte.protocol.models.AirbyteStream@1ed9a1f5[name=ndc_assessment_form_data,jsonSchema={"type":"object","$schema":"http://json-schema.org/schema#","required":[],"properties":{}},supportedSyncModes=[full_refresh],sourceDefinedCursor=<null>,defaultCursorField=[],sourceDefinedPrimaryKey=[[_uuid]],namespace=<null>,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[[_uuid]],additionalProperties={}]],additionalProperties={}],failures=[]]
2023-01-11 14:03:01 INFO i.a.w.t.TemporalUtils(withBackgroundHeartbeat):279 - Stopping temporal heartbeating...

Steps to Reproduce

  1. Setup a custom connector source
  2. Setup a postgres connector destination without ssh tunneling
  3. Setup a connection of the source and destination
  4. Run a sync on the connection a number of times. Observe the records consistency in the Sync History UI
  5. Change the postgres destination to use tunneling
  6. Re-run the sync and observe the records consistency in the Sync History UI
  7. Records are consistent without tunneling but inconsistent with tunneling

Are you willing to submit a PR?

Yes. With guidance

alepietrobon commented 1 year ago

+1 . This seems useful to investigate

evantahler commented 2 days ago

Closing as cannot reproduce