Closed sotiriskar closed 3 weeks ago
logs:
2024-10-08 08:44:01 platform > Docker volume job log path: /tmp/workspace/643/0/logs.log
2024-10-08 08:44:01 platform > Executing worker wrapper. Airbyte version: 0.63.13
2024-10-08 08:44:01 platform >
2024-10-08 08:44:01 platform > ----- START CHECK -----
2024-10-08 08:44:01 platform >
2024-10-08 08:44:01 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-10-08 08:44:01 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-10-08 08:44:01 platform > Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-10-08 08:44:01 platform > Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-10-08 08:44:01 platform > Checking if airbyte/source-postgres:3.6.19 exists...
2024-10-08 08:44:01 platform > airbyte/source-postgres:3.6.19 was found locally.
2024-10-08 08:44:01 platform > Creating docker container = source-postgres-check-643-0-wirzm with resources io.airbyte.config.ResourceRequirements@519e4b06[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=,additionalProperties={}] and allowedHosts io.airbyte.config.AllowedHosts@3d07b274[hosts=[localhost, *.datadoghq.com, *.datadoghq.eu, *.sentry.io],additionalProperties={}]
2024-10-08 08:44:01 platform > Preparing command: docker run --rm --init -i -w /data/643/0 --log-driver none --name source-postgres-check-643-0-wirzm --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/source-postgres:3.6.19 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE=dev -e WORKER_ENVIRONMENT=DOCKER -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.63.13 -e WORKER_JOB_ID=643 airbyte/source-postgres:3.6.19 check --config source_config.json
2024-10-08 08:44:01 platform > Reading messages from protocol version 0.2.0
2024-10-08 08:44:01 platform > INFO main i.a.i.s.p.PostgresSource(main):698 starting source: class io.airbyte.integrations.source.postgres.PostgresSource
2024-10-08 08:44:01 platform > INFO main i.a.c.i.b.IntegrationCliParser$Companion(parseOptions):144 integration args: {check=null, config=source_config.json}
2024-10-08 08:44:01 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):130 Running integration: io.airbyte.cdk.integrations.base.ssh.SshWrappedSource
2024-10-08 08:44:01 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):131 Command: CHECK
2024-10-08 08:44:01 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):132 Integration config: IntegrationConfig{command=CHECK, configPath='source_config.json', catalogPath='null', statePath='null'}
2024-10-08 08:44:02 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword groups - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:02 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:02 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword group - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:02 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:02 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword always_show - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:02 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword display_type - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:02 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword min - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:02 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword max - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:02 platform > INFO main i.a.c.i.b.s.SshTunnel$Companion(getInstance):423 Starting connection with method: NO_TUNNEL
2024-10-08 08:44:02 platform > INFO main i.a.i.s.p.PostgresUtils(isCdc):70 using CDC: true
2024-10-08 08:44:02 platform > INFO main i.a.i.s.p.PostgresSource(toSslJdbcParamInternal):924 DISABLED toSslJdbcParam disable
2024-10-08 08:44:02 platform > INFO main c.z.h.HikariDataSource(<init>):79 HikariPool-1 - Starting...
2024-10-08 08:44:02 platform > INFO main c.z.h.HikariDataSource(<init>):81 HikariPool-1 - Start completed.
2024-10-08 08:44:02 platform > INFO main i.a.i.s.p.PostgresUtils(isCdc):70 using CDC: true
2024-10-08 08:44:02 platform > INFO main i.a.c.d.j.s.AdaptiveStreamingQueryConfig(initialize):24 Set initial fetch size: 10 rows
2024-10-08 08:44:02 platform > INFO main i.a.i.s.p.PostgresCatalogHelper(getPublicizedTables):156 For CDC, only tables in publication pub will be included in the sync: [public.device_check_recipes, public.device_check_recipe_ingredient, public.device_check_recipe_calculations, public.devices, public.device_check_ingredient_checks, public.device_check, public.device_check_contexts, public.device_check_results, public.devices_data]
2024-10-08 08:44:02 platform > INFO main i.a.i.s.p.PostgresUtils(isCdc):70 using CDC: true
2024-10-08 08:44:02 platform > INFO main i.a.i.s.p.PostgresUtils(isXmin):190 using Xmin: false
2024-10-08 08:44:02 platform > INFO main i.a.c.i.s.j.AbstractJdbcSource(getCheckOperations$lambda$6):343 Attempting to get metadata from the database to see if we can connect.
2024-10-08 08:44:02 platform > INFO main i.a.c.i.s.j.AbstractJdbcSource(checkUserHasPrivileges):307 Checking if the user can perform select to any table in schema: public
2024-10-08 08:44:02 platform > INFO main i.a.c.d.j.s.AdaptiveStreamingQueryConfig(initialize):24 Set initial fetch size: 10 rows
2024-10-08 08:44:02 platform > INFO main i.a.i.s.p.PostgresSource(lambda$getReplicationSlot$4):426 Attempting to find the named replication slot using the query: HikariProxyPreparedStatement@332244937 wrapping SELECT * FROM pg_replication_slots WHERE slot_name = ('airbyte_slot') AND plugin = ('pgoutput') AND database = ('postgres')
2024-10-08 08:44:02 platform > INFO main i.a.c.d.j.s.AdaptiveStreamingQueryConfig(initialize):24 Set initial fetch size: 10 rows
2024-10-08 08:44:02 platform > INFO main i.a.i.s.p.PostgresSource(lambda$getCheckOperations$6):458 Attempting to find the publication using the query: HikariProxyPreparedStatement@1227459815 wrapping SELECT * FROM pg_publication WHERE pubname = ('pub')
2024-10-08 08:44:02 platform > INFO main i.a.c.d.j.s.AdaptiveStreamingQueryConfig(initialize):24 Set initial fetch size: 10 rows
2024-10-08 08:44:02 platform > INFO main i.a.i.s.p.c.PostgresReplicationConnection(createConnection):44 Creating a replication connection.
2024-10-08 08:44:02 platform > INFO main i.a.i.s.p.c.PostgresReplicationConnection(createConnection):47 Validating replication connection.
2024-10-08 08:44:02 platform > INFO main c.z.h.HikariDataSource(close):349 HikariPool-1 - Shutdown initiated...
2024-10-08 08:44:02 platform > INFO main c.z.h.HikariDataSource(close):351 HikariPool-1 - Shutdown completed.
2024-10-08 08:44:02 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):224 Completed integration: io.airbyte.cdk.integrations.base.ssh.SshWrappedSource
2024-10-08 08:44:02 platform > INFO main i.a.i.s.p.PostgresSource(main):700 completed source: class io.airbyte.integrations.source.postgres.PostgresSource
2024-10-08 08:44:02 platform > Check connection job received output: io.airbyte.config.StandardCheckConnectionOutput@6bc34b5a[status=succeeded,message=<null>,additionalProperties={}]
2024-10-08 08:44:02 platform >
2024-10-08 08:44:02 platform > ----- END CHECK -----
2024-10-08 08:44:02 platform >
2024-10-08 08:44:02 platform > Docker volume job log path: /tmp/workspace/643/0/logs.log
2024-10-08 08:44:02 platform > Executing worker wrapper. Airbyte version: 0.63.13
2024-10-08 08:44:02 platform >
2024-10-08 08:44:02 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-10-08 08:44:02 platform > ----- START CHECK -----
2024-10-08 08:44:02 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-10-08 08:44:02 platform >
2024-10-08 08:44:02 platform > Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-10-08 08:44:02 platform > Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-10-08 08:44:02 platform > Checking if airbyte/destination-kafka:0.1.10 exists...
2024-10-08 08:44:02 platform > airbyte/destination-kafka:0.1.10 was found locally.
2024-10-08 08:44:02 platform > Creating docker container = destination-kafka-check-643-0-znfmz with resources io.airbyte.config.ResourceRequirements@127236af[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=,additionalProperties={}] and allowedHosts null
2024-10-08 08:44:02 platform > Preparing command: docker run --rm --init -i -w /data/643/0 --log-driver none --name destination-kafka-check-643-0-znfmz --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/destination-kafka:0.1.10 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE=dev -e WORKER_ENVIRONMENT=DOCKER -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.63.13 -e WORKER_JOB_ID=643 airbyte/destination-kafka:0.1.10 check --config source_config.json
2024-10-08 08:44:02 platform > Reading messages from protocol version 0.2.0
2024-10-08 08:44:03 platform > 2024-10-08 08:44:03 INFO i.a.i.d.k.KafkaDestination(main):84 - Starting destination: class io.airbyte.integrations.destination.kafka.KafkaDestination
2024-10-08 08:44:03 platform > 2024-10-08 08:44:03 INFO i.a.i.b.IntegrationCliParser(parseOptions):118 - integration args: {check=null, config=source_config.json}
2024-10-08 08:44:03 platform > 2024-10-08 08:44:03 INFO i.a.i.b.IntegrationRunner(runInternal):104 - Running integration: io.airbyte.integrations.destination.kafka.KafkaDestination
2024-10-08 08:44:03 platform > 2024-10-08 08:44:03 INFO i.a.i.b.IntegrationRunner(runInternal):105 - Command: CHECK
2024-10-08 08:44:03 platform > 2024-10-08 08:44:03 INFO i.a.i.b.IntegrationRunner(runInternal):106 - Integration config: IntegrationConfig{command=CHECK, configPath='source_config.json', catalogPath='null', statePath='null'}
2024-10-08 08:44:03 platform > 2024-10-08 08:44:03 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:03 platform > 2024-10-08 08:44:03 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:03 platform > 2024-10-08 08:44:03 INFO i.a.i.b.IntegrationRunner(runInternal):152 - Completed integration: io.airbyte.integrations.destination.kafka.KafkaDestination
2024-10-08 08:44:03 platform > 2024-10-08 08:44:03 INFO i.a.i.d.k.KafkaDestination(main):86 - Completed destination: class io.airbyte.integrations.destination.kafka.KafkaDestination
2024-10-08 08:44:03 platform > Check connection job received output: io.airbyte.config.StandardCheckConnectionOutput@44eb0042[status=succeeded,message=<null>,additionalProperties={}]
2024-10-08 08:44:03 platform >
2024-10-08 08:44:03 platform > ----- END CHECK -----
2024-10-08 08:44:03 platform >
2024-10-08 08:44:05 platform > Docker volume job log path: /tmp/workspace/643/0/logs.log
2024-10-08 08:44:05 platform > Executing worker wrapper. Airbyte version: 0.63.13
2024-10-08 08:44:05 platform > start sync worker. job id: 643 attempt id: 0
2024-10-08 08:44:05 platform >
2024-10-08 08:44:05 platform > ----- START REPLICATION -----
2024-10-08 08:44:05 platform >
2024-10-08 08:44:05 platform > Running destination...
2024-10-08 08:44:05 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-10-08 08:44:05 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-10-08 08:44:05 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-10-08 08:44:05 platform > Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-10-08 08:44:05 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-10-08 08:44:05 platform > Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-10-08 08:44:05 platform > Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-10-08 08:44:05 platform > Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-10-08 08:44:05 platform > Checking if airbyte/destination-kafka:0.1.10 exists...
2024-10-08 08:44:05 platform > Checking if airbyte/source-postgres:3.6.19 exists...
2024-10-08 08:44:05 platform > airbyte/source-postgres:3.6.19 was found locally.
2024-10-08 08:44:06 platform > Creating docker container = source-postgres-read-643-0-cfiiq with resources io.airbyte.config.ResourceRequirements@1ff38007[cpuRequest=0.5,cpuLimit=1,memoryRequest=1Gi,memoryLimit=2Gi,additionalProperties={}] and allowedHosts io.airbyte.config.AllowedHosts@33c835df[hosts=[localhost, *.datadoghq.com, *.datadoghq.eu, *.sentry.io],additionalProperties={}]
2024-10-08 08:44:06 platform > Preparing command: docker run --rm --init -i -w /data/643/0 --log-driver none --name source-postgres-read-643-0-cfiiq -e CONCURRENT_SOURCE_STREAM_READ=false --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/source-postgres:3.6.19 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE=dev -e WORKER_ENVIRONMENT=DOCKER -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.63.13 -e WORKER_JOB_ID=643 --cpus=1 --memory-reservation=1Gi --memory=2Gi airbyte/source-postgres:3.6.19 read --config source_config.json --catalog source_catalog.json --state input_state.json
2024-10-08 08:44:06 platform > Reading messages from protocol version 0.2.0
2024-10-08 08:44:06 platform > airbyte/destination-kafka:0.1.10 was found locally.
2024-10-08 08:44:06 platform > Creating docker container = destination-kafka-write-643-0-dvocc with resources io.airbyte.config.ResourceRequirements@564b7352[cpuRequest=0.5,cpuLimit=1,memoryRequest=1Gi,memoryLimit=2Gi,additionalProperties={}] and allowedHosts null
2024-10-08 08:44:06 platform > Preparing command: docker run --rm --init -i -w /data/643/0 --log-driver none --name destination-kafka-write-643-0-dvocc --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/destination-kafka:0.1.10 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE=dev -e WORKER_ENVIRONMENT=DOCKER -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.63.13 -e WORKER_JOB_ID=643 --cpus=1 --memory-reservation=1Gi --memory=2Gi airbyte/destination-kafka:0.1.10 write --config destination_config.json --catalog destination_catalog.json
2024-10-08 08:44:06 platform > Writing messages to protocol version 0.2.0
2024-10-08 08:44:06 platform > Reading messages from protocol version 0.2.0
2024-10-08 08:44:06 platform > readFromSource: start
2024-10-08 08:44:06 platform > Starting source heartbeat check. Will check threshold of 10800 seconds, every 1 minutes.
2024-10-08 08:44:06 platform > processMessage: start
2024-10-08 08:44:06 platform > readFromDestination: start
2024-10-08 08:44:06 platform > writeToDestination: start
2024-10-08 08:44:08 destination > 2024-10-08 08:44:08 INFO i.a.i.d.k.KafkaDestination(main):84 - Starting destination: class io.airbyte.integrations.destination.kafka.KafkaDestination
2024-10-08 08:44:08 destination > 2024-10-08 08:44:08 INFO i.a.i.b.IntegrationCliParser(parseOptions):118 - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2024-10-08 08:44:08 destination > 2024-10-08 08:44:08 INFO i.a.i.b.IntegrationRunner(runInternal):104 - Running integration: io.airbyte.integrations.destination.kafka.KafkaDestination
2024-10-08 08:44:08 destination > 2024-10-08 08:44:08 INFO i.a.i.b.IntegrationRunner(runInternal):105 - Command: WRITE
2024-10-08 08:44:08 destination > 2024-10-08 08:44:08 INFO i.a.i.b.IntegrationRunner(runInternal):106 - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2024-10-08 08:44:08 destination > 2024-10-08 08:44:08 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:08 destination > 2024-10-08 08:44:08 WARN c.n.s.JsonMetaSchema(newValidator):338 - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:08 destination > 2024-10-08 08:44:08 INFO i.a.i.d.k.KafkaDestinationConfig(propertiesByProtocol):81 - Kafka protocol config: {"security_protocol":"PLAINTEXT"}
2024-10-08 08:44:08 source > INFO main i.a.i.s.p.PostgresSource(main):698 starting source: class io.airbyte.integrations.source.postgres.PostgresSource
2024-10-08 08:44:08 destination > 2024-10-08 08:44:08 WARN o.a.k.c.CommonClientConfigs(warnIfDeprecatedDnsLookupValue):192 - Configuration 'client.dns.lookup' with value 'default' is deprecated and will be removed in future version. Please use 'use_all_dns_ips' or another non-deprecated value.
2024-10-08 08:44:08 source > INFO main i.a.c.i.b.IntegrationCliParser$Companion(parseOptions):144 integration args: {read=null, catalog=source_catalog.json, state=input_state.json, config=source_config.json}
2024-10-08 08:44:08 source > INFO main i.a.c.i.b.IntegrationRunner(runInternal):130 Running integration: io.airbyte.cdk.integrations.base.ssh.SshWrappedSource
2024-10-08 08:44:08 source > INFO main i.a.c.i.b.IntegrationRunner(runInternal):131 Command: READ
2024-10-08 08:44:08 source > INFO main i.a.c.i.b.IntegrationRunner(runInternal):132 Integration config: IntegrationConfig{command=READ, configPath='source_config.json', catalogPath='source_catalog.json', statePath='input_state.json'}
2024-10-08 08:44:08 source > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword groups - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:08 source > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:08 source > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword group - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:08 source > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:08 source > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword always_show - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:08 source > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword display_type - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:08 source > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword min - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:08 source > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword max - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-10-08 08:44:08 source > INFO main i.a.c.i.b.s.SshTunnel$Companion(getInstance):423 Starting connection with method: NO_TUNNEL
2024-10-08 08:44:08 source > INFO main i.a.i.s.p.PostgresUtils(isCdc):70 using CDC: true
2024-10-08 08:44:08 source > INFO main i.a.c.i.s.r.s.StateManagerFactory(createStateManager):51 Global state manager selected to manage state object with type GLOBAL.
2024-10-08 08:44:08 source > INFO main i.a.c.i.s.r.s.CursorManager(createCursorInfoForStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_db_sources):213 Found cursor field. Does not match previous cursor field. Stream: public_device_check_recipes. Original Cursor Field: null (count 0). New Cursor Field: _ab_cdc_lsn. Resetting cursor value.
2024-10-08 08:44:08 source > INFO main i.a.c.i.s.r.s.CursorManager(createCursorInfoForStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_db_sources):213 Found cursor field. Does not match previous cursor field. Stream: public_device_check_recipe_ingredient. Original Cursor Field: null (count 0). New Cursor Field: _ab_cdc_lsn. Resetting cursor value.
2024-10-08 08:44:08 source > INFO main i.a.c.i.s.r.s.CursorManager(createCursorInfoForStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_db_sources):213 Found cursor field. Does not match previous cursor field. Stream: public_device_check_recipe_calculations. Original Cursor Field: null (count 0). New Cursor Field: _ab_cdc_lsn. Resetting cursor value.
2024-10-08 08:44:08 source > INFO main i.a.c.i.s.r.s.CursorManager(createCursorInfoForStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_db_sources):213 Found cursor field. Does not match previous cursor field. Stream: public_device_check_ingredient_checks. Original Cursor Field: null (count 0). New Cursor Field: _ab_cdc_lsn. Resetting cursor value.
2024-10-08 08:44:08 source > INFO main i.a.c.i.s.r.s.CursorManager(createCursorInfoForStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_db_sources):213 Found cursor field. Does not match previous cursor field. Stream: public_devices. Original Cursor Field: null (count 0). New Cursor Field: _ab_cdc_lsn. Resetting cursor value.
2024-10-08 08:44:08 source > INFO main i.a.c.i.s.r.s.CursorManager(createCursorInfoForStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_db_sources):213 Found cursor field. Does not match previous cursor field. Stream: public_device_check. Original Cursor Field: null (count 0). New Cursor Field: _ab_cdc_lsn. Resetting cursor value.
2024-10-08 08:44:08 source > INFO main i.a.c.i.s.r.s.CursorManager(createCursorInfoForStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_db_sources):213 Found cursor field. Does not match previous cursor field. Stream: public_device_check_contexts. Original Cursor Field: null (count 0). New Cursor Field: _ab_cdc_lsn. Resetting cursor value.
2024-10-08 08:44:08 source > INFO main i.a.c.i.s.r.s.CursorManager(createCursorInfoForStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_db_sources):213 Found cursor field. Does not match previous cursor field. Stream: public_device_check_results. Original Cursor Field: null (count 0). New Cursor Field: _ab_cdc_lsn. Resetting cursor value.
2024-10-08 08:44:08 source > INFO main i.a.c.i.s.r.s.CursorManager(createCursorInfoForStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_db_sources):213 Found cursor field. Does not match previous cursor field. Stream: public_devices_data. Original Cursor Field: null (count 0). New Cursor Field: _ab_cdc_lsn. Resetting cursor value.
2024-10-08 08:44:08 source > INFO main i.a.c.i.s.r.CdcStateManager(<init>):30 Initialized CDC state
2024-10-08 08:44:08 source > INFO main i.a.i.s.p.PostgresSource(toSslJdbcParamInternal):924 DISABLED toSslJdbcParam disable
2024-10-08 08:44:08 source > INFO main c.z.h.HikariDataSource(<init>):79 HikariPool-1 - Starting...
2024-10-08 08:44:08 source > INFO main c.z.h.HikariDataSource(<init>):81 HikariPool-1 - Start completed.
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresUtils(isCdc):70 using CDC: true
2024-10-08 08:44:09 source > INFO main i.a.c.d.j.s.AdaptiveStreamingQueryConfig(initialize):24 Set initial fetch size: 10 rows
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresCatalogHelper(getPublicizedTables):156 For CDC, only tables in publication pub will be included in the sync: [public.device_check_recipes, public.device_check_recipe_ingredient, public.device_check_recipe_calculations, public.devices, public.device_check_ingredient_checks, public.device_check, public.device_check_contexts, public.device_check_results, public.devices_data]
2024-10-08 08:44:09 source > INFO main i.a.c.i.s.j.AbstractJdbcSource(logPreSyncDebugData):780 Data source product recognized as PostgreSQL:16.4 (Debian 16.4-1.pgdg120+1)
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):285 Discovering indexes for schema "public", table "device_check_recipes"
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):287 Index name: device_check_recipes_pk, Column: id, Unique: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):285 Discovering indexes for schema "public", table "device_check_recipe_ingredient"
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):287 Index name: device_check_recipe_ingredient_pk, Column: id, Unique: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):285 Discovering indexes for schema "public", table "device_check_recipe_calculations"
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):287 Index name: device_check_recipe_calculations_pk, Column: id, Unique: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):285 Discovering indexes for schema "public", table "device_check_ingredient_checks"
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):287 Index name: device_check_ingredient_checks_pk, Column: id, Unique: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):285 Discovering indexes for schema "public", table "devices"
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):287 Index name: devices_pk, Column: id, Unique: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):285 Discovering indexes for schema "public", table "device_check"
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):287 Index name: device_check_pk, Column: id, Unique: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):285 Discovering indexes for schema "public", table "device_check_contexts"
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):287 Index name: device_check_contexts_pk, Column: id, Unique: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):285 Discovering indexes for schema "public", table "device_check_results"
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):287 Index name: device_check_results_pk, Column: id, Unique: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):285 Discovering indexes for schema "public", table "devices_data"
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(logPreSyncDebugData):287 Index name: devices_data_pk, Column: id, Unique: true
2024-10-08 08:44:09 source > INFO main i.a.c.i.s.j.AbstractJdbcSource(discoverInternal):369 Internal schemas to exclude: [catalog_history, information_schema, pg_catalog, pg_internal]
2024-10-08 08:44:09 source > INFO main i.a.c.d.j.s.AdaptiveStreamingQueryConfig(initialize):24 Set initial fetch size: 10 rows
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresUtils(isCdc):70 using CDC: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(lambda$getReplicationSlot$4):426 Attempting to find the named replication slot using the query: HikariProxyPreparedStatement@310792845 wrapping SELECT * FROM pg_replication_slots WHERE slot_name = ('airbyte_slot') AND plugin = ('pgoutput') AND database = ('postgres')
2024-10-08 08:44:09 source > INFO main i.a.c.d.j.s.AdaptiveStreamingQueryConfig(initialize):24 Set initial fetch size: 10 rows
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(format):240 Initial Debezium state constructed: {"[\"postgres\",{\"server\":\"postgres\"}]":"{\"transaction_id\":null,\"lsn\":1782120776,\"txId\":2191,\"ts_usec\":1728377049379466}"}
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresUtils(shouldFlushAfterSync):78 Should flush after sync: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(toSslJdbcParamInternal):924 DISABLED toSslJdbcParam disable
2024-10-08 08:44:09 source > INFO main o.a.k.c.c.AbstractConfig(logAll):370 JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = false
2024-10-08 08:44:09 source > INFO main o.a.k.c.c.AbstractConfig(logAll):370 StandaloneConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
auto.include.jmx.reporter = true
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
config.providers = []
connector.client.config.override.policy = All
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = [http://:8083]
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 1000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = /tmp/cdc-state-offset3738844837592024729/offset.dat
plugin.discovery = hybrid_warn
plugin.path = null
response.http.headers.config =
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
task.shutdown.graceful.timeout.ms = 5000
topic.creation.enable = true
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
2024-10-08 08:44:09 source > INFO main o.a.k.c.s.FileOffsetBackingStore(start):63 Starting FileOffsetBackingStore with file /tmp/cdc-state-offset3738844837592024729/offset.dat
2024-10-08 08:44:09 source > INFO main o.a.k.c.c.AbstractConfig(logAll):370 JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = false
2024-10-08 08:44:09 source > INFO main o.a.k.c.c.AbstractConfig(logAll):370 JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = false
2024-10-08 08:44:09 source > INFO main i.d.c.CommonConnectorConfig(getSourceInfoStructMaker):1649 Loading the custom source info struct maker plugin: io.debezium.connector.postgresql.PostgresSourceInfoStructMaker
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(extractLsn):191 Found previous partition offset PostgresPartition [sourcePartition={server=postgres}]: {lsn=1779148824, txId=2007, ts_usec=1728328445663109}
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(parseSavedOffset):171 Closing offsetStorageReader and fileOffsetBackingStore
2024-10-08 08:44:09 source > INFO main o.a.k.c.s.FileOffsetBackingStore(stop):71 Stopped FileOffsetBackingStore
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(isSavedOffsetAfterReplicationSlotLSN):69 Replication slot confirmed_flush_lsn : 1779148872 Saved offset LSN : 1779148824
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresCdcCtidInitializer(getCtidInitialLoadGlobalStateManager):117 Streams to be synced via ctid (can include RFR streams) : 9
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresCdcCtidInitializer(getCtidInitialLoadGlobalStateManager):118 Streams: public.device_check_recipes, public.device_check_recipe_ingredient, public.device_check_recipe_calculations, public.device_check_ingredient_checks, public.devices, public.device_check, public.device_check_contexts, public.device_check_results, public.devices_data
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."device_check_recipes" is 25784
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."device_check_recipe_ingredient" is 25812
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."device_check_recipe_calculations" is 25775
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."device_check_ingredient_checks" is 25821
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."devices" is 25749
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."device_check" is 25757
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."device_check_contexts" is 25766
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."device_check_results" is 25793
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."devices_data" is 25802
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(format):240 Initial Debezium state constructed: {"[\"postgres\",{\"server\":\"postgres\"}]":"{\"transaction_id\":null,\"lsn\":1782120824,\"txId\":2192,\"ts_usec\":1728377049595534}"}
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresUtils(isCdc):70 using CDC: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(getIncrementalIterators):508 Using ctid + CDC
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresUtils(getFirstRecordWaitTime):171 First record waiting time: 1200 seconds
2024-10-08 08:44:09 source > INFO main i.a.c.i.s.r.InitialLoadTimeoutUtil(getInitialLoadTimeout):44 Initial Load timeout: 28800 seconds
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresCdcCtidInitializer(cdcCtidIteratorsCombined):149 First record waiting time: 1200 seconds
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresCdcCtidInitializer(cdcCtidIteratorsCombined):150 Initial load timeout: 8 hours
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresCdcCtidInitializer(cdcCtidIteratorsCombined):151 Queue size: 10000
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(format):240 Initial Debezium state constructed: {"[\"postgres\",{\"server\":\"postgres\"}]":"{\"transaction_id\":null,\"lsn\":1782120824,\"txId\":2193,\"ts_usec\":1728377049654068}"}
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresUtils(shouldFlushAfterSync):78 Should flush after sync: true
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.PostgresSource(toSslJdbcParamInternal):924 DISABLED toSslJdbcParam disable
2024-10-08 08:44:09 source > INFO main o.a.k.c.c.AbstractConfig(logAll):370 JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = false
2024-10-08 08:44:09 source > INFO main o.a.k.c.c.AbstractConfig(logAll):370 StandaloneConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
auto.include.jmx.reporter = true
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
config.providers = []
connector.client.config.override.policy = All
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = [http://:8083]
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 1000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = /tmp/cdc-state-offset17322034933894750985/offset.dat
plugin.discovery = hybrid_warn
plugin.path = null
response.http.headers.config =
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
task.shutdown.graceful.timeout.ms = 5000
topic.creation.enable = true
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
2024-10-08 08:44:09 source > INFO main o.a.k.c.s.FileOffsetBackingStore(start):63 Starting FileOffsetBackingStore with file /tmp/cdc-state-offset17322034933894750985/offset.dat
2024-10-08 08:44:09 source > INFO main o.a.k.c.c.AbstractConfig(logAll):370 JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = false
2024-10-08 08:44:09 source > INFO main o.a.k.c.c.AbstractConfig(logAll):370 JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = false
2024-10-08 08:44:09 source > INFO main i.d.c.CommonConnectorConfig(getSourceInfoStructMaker):1649 Loading the custom source info struct maker plugin: io.debezium.connector.postgresql.PostgresSourceInfoStructMaker
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(extractLsn):191 Found previous partition offset PostgresPartition [sourcePartition={server=postgres}]: {lsn=1779148824, txId=2007, ts_usec=1728328445663109}
2024-10-08 08:44:09 source > INFO main i.a.i.s.p.c.PostgresDebeziumStateUtil(parseSavedOffset):171 Closing offsetStorageReader and fileOffsetBackingStore
2024-10-08 08:44:09 source > INFO main o.a.k.c.s.FileOffsetBackingStore(stop):71 Stopped FileOffsetBackingStore
2024-10-08 08:44:09 platform > SOURCE analytics [airbyte/source-postgres:3.6.19] | Type: db-sources-cdc-cursor-invalid | Value: 1
2024-10-08 08:44:09 source > ERROR main i.a.c.i.b.s.SshWrappedSource(read):72 Exception occurred while getting the delegate read iterator, closing SSH tunnel io.airbyte.commons.exceptions.ConfigErrorException: Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.
at io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidInitializer.cdcCtidIteratorsCombined(PostgresCdcCtidInitializer.java:184) ~[io.airbyte.airbyte-integrations.connectors-source-postgres.jar:?]
at io.airbyte.integrations.source.postgres.PostgresSource.getIncrementalIterators(PostgresSource.java:509) ~[io.airbyte.airbyte-integrations.connectors-source-postgres.jar:?]
at io.airbyte.integrations.source.postgres.PostgresSource.getIncrementalIterators(PostgresSource.java:138) ~[io.airbyte.airbyte-integrations.connectors-source-postgres.jar:?]
at io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource.read(AbstractDbSource.kt:152) ~[airbyte-cdk-db-sources-0.45.1.jar:?]
at io.airbyte.cdk.integrations.base.ssh.SshWrappedSource.read(SshWrappedSource.kt:70) [airbyte-cdk-core-0.45.1.jar:?]
at io.airbyte.cdk.integrations.base.IntegrationRunner.readSerial(IntegrationRunner.kt:290) [airbyte-cdk-core-0.45.1.jar:?]
at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:190) [airbyte-cdk-core-0.45.1.jar:?]
at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119) [airbyte-cdk-core-0.45.1.jar:?]
at io.airbyte.integrations.source.postgres.PostgresSource.main(PostgresSource.java:699) [io.airbyte.airbyte-integrations.connectors-source-postgres.jar:?]
Stack Trace: io.airbyte.commons.exceptions.ConfigErrorException: Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.
at io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidInitializer.cdcCtidIteratorsCombined(PostgresCdcCtidInitializer.java:184)
at io.airbyte.integrations.source.postgres.PostgresSource.getIncrementalIterators(PostgresSource.java:509)
at io.airbyte.integrations.source.postgres.PostgresSource.getIncrementalIterators(PostgresSource.java:138)
at io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource.read(AbstractDbSource.kt:152)
at io.airbyte.cdk.integrations.base.ssh.SshWrappedSource.read(SshWrappedSource.kt:70)
at io.airbyte.cdk.integrations.base.IntegrationRunner.readSerial(IntegrationRunner.kt:290)
at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:190)
at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)
at io.airbyte.integrations.source.postgres.PostgresSource.main(PostgresSource.java:699)
2024-10-08 08:44:09 source > ERROR main i.a.c.i.u.ConnectorExceptionHandler(handleException):68 caught exception! io.airbyte.commons.exceptions.ConfigErrorException: Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.
at io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidInitializer.cdcCtidIteratorsCombined(PostgresCdcCtidInitializer.java:184) ~[io.airbyte.airbyte-integrations.connectors-source-postgres.jar:?]
at io.airbyte.integrations.source.postgres.PostgresSource.getIncrementalIterators(PostgresSource.java:509) ~[io.airbyte.airbyte-integrations.connectors-source-postgres.jar:?]
at io.airbyte.integrations.source.postgres.PostgresSource.getIncrementalIterators(PostgresSource.java:138) ~[io.airbyte.airbyte-integrations.connectors-source-postgres.jar:?]
at io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource.read(AbstractDbSource.kt:152) ~[airbyte-cdk-db-sources-0.45.1.jar:?]
at io.airbyte.cdk.integrations.base.ssh.SshWrappedSource.read(SshWrappedSource.kt:70) ~[airbyte-cdk-core-0.45.1.jar:?]
at io.airbyte.cdk.integrations.base.IntegrationRunner.readSerial(IntegrationRunner.kt:290) ~[airbyte-cdk-core-0.45.1.jar:?]
at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:190) [airbyte-cdk-core-0.45.1.jar:?]
at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119) [airbyte-cdk-core-0.45.1.jar:?]
at io.airbyte.integrations.source.postgres.PostgresSource.main(PostgresSource.java:699) [io.airbyte.airbyte-integrations.connectors-source-postgres.jar:?]
Stack Trace: io.airbyte.commons.exceptions.ConfigErrorException: Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.
at io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidInitializer.cdcCtidIteratorsCombined(PostgresCdcCtidInitializer.java:184)
at io.airbyte.integrations.source.postgres.PostgresSource.getIncrementalIterators(PostgresSource.java:509)
at io.airbyte.integrations.source.postgres.PostgresSource.getIncrementalIterators(PostgresSource.java:138)
at io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource.read(AbstractDbSource.kt:152)
at io.airbyte.cdk.integrations.base.ssh.SshWrappedSource.read(SshWrappedSource.kt:70)
at io.airbyte.cdk.integrations.base.IntegrationRunner.readSerial(IntegrationRunner.kt:290)
at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:190)
at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)
at io.airbyte.integrations.source.postgres.PostgresSource.main(PostgresSource.java:699)
2024-10-08 08:44:09 platform > readFromSource: source exception
io.airbyte.workers.internal.exception.SourceException: Source process exited with non-zero exit code 1
at io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:378) ~[io.airbyte-airbyte-commons-worker-0.63.13.jar:?]
at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:242) ~[io.airbyte-airbyte-commons-worker-0.63.13.jar:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-10-08 08:44:09 platform > readFromSource: done. (source.isFinished:true, fromSource.isClosed:false)
2024-10-08 08:44:09 platform > processMessage: done. (fromSource.isDone:true, forDest.isClosed:false)
2024-10-08 08:44:09 platform > writeToDestination: done. (forDest.isDone:true, isDestRunning:true)
2024-10-08 08:44:09 platform > thread status... timeout thread: false , replication thread: true
2024-10-08 08:44:09 destination > 2024-10-08 08:44:09 INFO i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):62 - Airbyte message consumer: succeeded.
2024-10-08 08:44:09 destination > 2024-10-08 08:44:09 INFO i.a.i.b.IntegrationRunner(runInternal):152 - Completed integration: io.airbyte.integrations.destination.kafka.KafkaDestination
2024-10-08 08:44:09 destination > 2024-10-08 08:44:09 INFO i.a.i.d.k.KafkaDestination(main):86 - Completed destination: class io.airbyte.integrations.destination.kafka.KafkaDestination
2024-10-08 08:44:10 platform > readFromDestination: done. (writeToDestFailed:false, dest.isFinished:true)
2024-10-08 08:44:10 platform > thread status... timeout thread: false , replication thread: true
2024-10-08 08:44:10 platform > Closing StateCheckSumCountEventHandler
2024-10-08 08:44:10 platform > sync summary: {
"status" : "failed",
"startTime" : 1728377045912,
"endTime" : 1728377050056,
"totalStats" : {
"bytesEmitted" : 0,
"destinationStateMessagesEmitted" : 0,
"destinationWriteEndTime" : 1728377050048,
"destinationWriteStartTime" : 1728377045920,
"meanSecondsBeforeSourceStateMessageEmitted" : 0,
"maxSecondsBeforeSourceStateMessageEmitted" : 0,
"meanSecondsBetweenStateMessageEmittedandCommitted" : 0,
"recordsEmitted" : 0,
"replicationEndTime" : 1728377050051,
"replicationStartTime" : 1728377045912,
"sourceReadEndTime" : 0,
"sourceReadStartTime" : 1728377045920,
"sourceStateMessagesEmitted" : 0
},
"streamStats" : [ ],
"performanceMetrics" : {
"processFromSource" : {
"elapsedTimeInNanos" : 1976291,
"executionCount" : 2,
"avgExecTimeInNanos" : 988145.5
},
"readFromSource" : {
"elapsedTimeInNanos" : 3842486553,
"executionCount" : 1365,
"avgExecTimeInNanos" : 2815008.4637362636
},
"processFromDest" : {
"elapsedTimeInNanos" : 0,
"executionCount" : 0,
"avgExecTimeInNanos" : "NaN"
},
"writeToDest" : {
"elapsedTimeInNanos" : 0,
"executionCount" : 0,
"avgExecTimeInNanos" : "NaN"
},
"readFromDest" : {
"elapsedTimeInNanos" : 4006536297,
"executionCount" : 1789,
"avgExecTimeInNanos" : 2239539.573504751
}
}
}
2024-10-08 08:44:10 platform > failures: [ {
"failureOrigin" : "source",
"failureType" : "config_error",
"internalMessage" : "io.airbyte.commons.exceptions.ConfigErrorException: Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.",
"externalMessage" : "Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.",
"metadata" : {
"attemptNumber" : 0,
"jobId" : 643,
"from_trace_message" : true,
"connector_command" : "read"
},
"stacktrace" : "io.airbyte.commons.exceptions.ConfigErrorException: Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.\n\tat io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidInitializer.cdcCtidIteratorsCombined(PostgresCdcCtidInitializer.java:184)\n\tat io.airbyte.integrations.source.postgres.PostgresSource.getIncrementalIterators(PostgresSource.java:509)\n\tat io.airbyte.integrations.source.postgres.PostgresSource.getIncrementalIterators(PostgresSource.java:138)\n\tat io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource.read(AbstractDbSource.kt:152)\n\tat io.airbyte.cdk.integrations.base.ssh.SshWrappedSource.read(SshWrappedSource.kt:70)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.readSerial(IntegrationRunner.kt:290)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:190)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)\n\tat io.airbyte.integrations.source.postgres.PostgresSource.main(PostgresSource.java:699)\n",
"timestamp" : 1728377049686
}, {
"failureOrigin" : "source",
"internalMessage" : "Source process exited with non-zero exit code 1",
"externalMessage" : "Something went wrong within the source connector",
"metadata" : {
"attemptNumber" : 0,
"jobId" : 643,
"connector_command" : "read"
},
"stacktrace" : "io.airbyte.workers.internal.exception.SourceException: Source process exited with non-zero exit code 1\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:378)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:242)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n",
"timestamp" : 1728377049887
} ]
2024-10-08 08:44:10 platform >
2024-10-08 08:44:10 platform > ----- END REPLICATION -----
2024-10-08 08:44:10 platform >
2024-10-08 08:44:10 platform > Retry State: RetryManager(completeFailureBackoffPolicy=BackoffPolicy(minInterval=PT10S, maxInterval=PT30M, base=3), partialFailureBackoffPolicy=null, successiveCompleteFailureLimit=5, totalCompleteFailureLimit=10, successivePartialFailureLimit=1000, totalPartialFailureLimit=20, successiveCompleteFailures=1, totalCompleteFailures=1, successivePartialFailures=0, totalPartialFailures=0)
Backoff before next attempt: 10 seconds
2024-10-08 08:44:10 platform > Failing job: 643, reason: Connection Check Failed 8e92c85c-0de7-4cb1-9871-f639c514ad70
Solved it by deleting and recreating the connection.
Topic
No response
Relevant information
Description: I am using Airbyte to sync data from a Postgres source to a Kafka destination using Change Data Capture (CDC) incrementaly. However, I consistently encounter the following error:
I have already applied the following configurations to the PostgreSQL server as recommended by the documentation:
Context:
I am syncing 10 rows from 9 different tables, so the data volume is relatively small. WAL size has been increased, and both wal_keep_size and max_slot_wal_keep_size are set to 10GB. Sync frequency is every minute, and WAL retention should be sufficient based on my data volume and sync frequency. Expected Behavior: CDC sync should work without triggering a full refresh and without encountering the "Saved offset is before replication slot's confirmed LSN" error.
Any additional suggestions or insight would be greatly appreciated.