airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
14.74k stars 3.79k forks source link

[destination-bigquery] After an upgrade to 2.8.0 a connection to a postgres source started failing #40555

Open killthekitten opened 1 week ago

killthekitten commented 1 week ago

Connector Name

destination-bigquery

Connector Version

2.8.0

What step the error happened?

During the sync

Relevant information

Airbyte 0.63.2, 0.63.1 Postgres source 3.4.19 BigQuery destination 2.8.0

We have upgraded Airbyte from 0.60.0 to 0.63.2 and switched to the latest version of the BigQuery and Postgres connectors, and immediately after one of our Postgres -> BigQuery connections started to fail.

There were a couple times when the Airbyte instance became unavailable and logs were lost, but for all other cases, this is the error that appears in every log:

2024-06-26 16:22:21 platform > readFromSource: exception caught
io.airbyte.workers.exception.WorkerException: A stream status has been detected for a stream not present in the catalog

In the other Postgres -> BigQuery connections we don't see the same issue, but in all of them the destination is pinned to BigQuery v1.2.20 (i.e. predating destinations V2). The failing one was updated from another v2 version that was a couple months old.

The connection fails after the first attempt to read a stream:

2024-06-26 16:22:20 destination > INFO pool-6-thread-1 i.a.c.i.d.a.FlushWorkers(printWorkerInfo):127 [ASYNC WORKER INFO] Pool queue size: 0, Active threads: 0
2024-06-26 16:22:20 platform > Records read: 5000 (1 MB)
2024-06-26 16:22:21 source > INFO main i.a.c.i.s.r.AbstractDbSource$createReadIterator$3(invoke):513 Reading stream admin_notes. Records read: 10000
2024-06-26 16:22:21 platform > Records read: 10000 (2 MB)
2024-06-26 16:22:21 source > INFO main i.a.i.s.p.c.CtidStateManager(generateStateMessageAtCheckpoint):77 Emitting ctid state for stream public_admin_notes, state is io.airbyte.integrations.source.postgres.internal.models.CtidStatus@27f1bbe0[version=2,stateType=ctid,ctid=(191,19),incrementalState={},relationFilenode=16943,additionalProperties={}]
2024-06-26 16:22:21 platform > Records read: 15000 (3 MB)
2024-06-26 16:22:21 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."admin_notes" is 16943
2024-06-26 16:22:21 source > INFO main i.a.i.s.p.c.InitialSyncCtidIterator(computeNext):118 The latest file node 16943 for stream public_admin_notes is equal to the last file node 16943 known to Airbyte.
2024-06-26 16:22:21 source > INFO main i.a.i.s.p.c.CtidStateManager(createFinalStateMessage):115 Finished initial sync of stream public_admin_notes, Emitting final state, state is io.airbyte.protocol.models.v0.AirbyteStateMessage@4b1abd11[type=STREAM,stream=io.airbyte.protocol.models.v0.AirbyteStreamState@3f36b447[streamDescriptor=io.airbyte.protocol.models.v0.StreamDescriptor@6443b128[name=admin_notes,namespace=public,additionalProperties={}],streamState={"version":2,"state_type":"ctid","ctid":"(312,2)","incremental_state":{},"relation_filenode":16943},additionalProperties={}],global=<null>,data=<null>,sourceStats=<null>,destinationStats=<null>,additionalProperties={}]
2024-06-26 16:22:21 platform > readFromSource: exception caught

Out of ~15-20 attempts to sync only one finished successfully.

Relevant log output

Sorry for providing very limited logs โ€“ I wouldn't want to expose these logs in public, but would be happy to share them outside of GitHub.

```shell 2024-06-26 16:19:09 platform > Retry State: RetryManager(completeFailureBackoffPolicy=BackoffPolicy(minInterval=PT10S, maxInterval=PT30M, base=3), partialFailureBackoffPolicy=null, successiveCompleteFailureLimit=5, totalCompleteFailureLimit=10, successivePartialFailureLimit=1000, totalPartialFailureLimit=20, successiveCompleteFailures=0, totalCompleteFailures=0, successivePartialFailures=4, totalPartialFailures=4) 2024-06-26 16:19:10 platform > Backing off for: 0 seconds. 2024-06-26 16:19:10 platform > Docker volume job log path: /tmp/workspace/238092/4/logs.log 2024-06-26 16:19:10 platform > Executing worker wrapper. Airbyte version: 0.63.2 2024-06-26 16:19:10 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0' 2024-06-26 16:19:10 platform > 2024-06-26 16:19:10 platform > ----- START CHECK ----- 2024-06-26 16:19:10 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0' 2024-06-26 16:19:10 platform > 2024-06-26 16:19:10 platform > Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1' 2024-06-26 16:19:10 platform > Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1' 2024-06-26 16:19:10 platform > Checking if airbyte/source-postgres:3.4.19 exists... 2024-06-26 16:19:10 platform > airbyte/source-postgres:3.4.19 was found locally. 2024-06-26 16:19:10 platform > Creating docker container = source-postgres-check-238092-4-vtkbh with resources io.airbyte.config.ResourceRequirements@57714039[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=,additionalProperties={}] and allowedHosts io.airbyte.config.AllowedHosts@74d787ff[hosts=[localhost, *.datadoghq.com, *.datadoghq.eu, *.sentry.io],additionalProperties={}] 2024-06-26 16:19:10 platform > Preparing command: docker run --rm --init -i -w /data/238092/4 --log-driver none --name source-postgres-check-238092-4-vtkbh --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/source-postgres:3.4.19 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e AIRBYTE_ROLE=dev -e WORKER_ENVIRONMENT=DOCKER -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=4 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.63.2 -e WORKER_JOB_ID=238092 airbyte/source-postgres:3.4.19 check --config source_config.json 2024-06-26 16:19:11 platform > Reading messages from protocol version 0.2.0 2024-06-26 16:19:16 platform > INFO main i.a.i.s.p.PostgresSource(main):691 starting source: class io.airbyte.integrations.source.postgres.PostgresSource 2024-06-26 16:19:16 platform > INFO main i.a.c.i.b.IntegrationCliParser$Companion(parseOptions):144 integration args: {check=null, config=source_config.json} 2024-06-26 16:19:16 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):124 Running integration: io.airbyte.cdk.integrations.base.ssh.SshWrappedSource 2024-06-26 16:19:16 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):125 Command: CHECK 2024-06-26 16:19:16 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):126 Integration config: IntegrationConfig{command=CHECK, configPath='source_config.json', catalogPath='null', statePath='null'} 2024-06-26 16:19:16 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword groups - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2024-06-26 16:19:16 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2024-06-26 16:19:16 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword group - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2024-06-26 16:19:16 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2024-06-26 16:19:16 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword always_show - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2024-06-26 16:19:16 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword display_type - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2024-06-26 16:19:16 platform > INFO main i.a.c.i.b.s.SshTunnel$Companion(getInstance):423 Starting connection with method: NO_TUNNEL 2024-06-26 16:19:16 platform > INFO main i.a.i.s.p.PostgresUtils(isCdc):70 using CDC: false 2024-06-26 16:19:16 platform > INFO main i.a.i.s.p.PostgresSource(toSslJdbcParamInternal):917 DISABLED toSslJdbcParam disable 2024-06-26 16:19:17 platform > INFO main c.z.h.HikariDataSource():79 HikariPool-1 - Starting... 2024-06-26 16:19:17 platform > INFO main c.z.h.HikariDataSource():81 HikariPool-1 - Start completed. 2024-06-26 16:19:17 platform > INFO main i.a.i.s.p.PostgresUtils(isCdc):70 using CDC: false 2024-06-26 16:19:17 platform > INFO main i.a.i.s.p.PostgresUtils(isCdc):70 using CDC: false 2024-06-26 16:19:17 platform > INFO main i.a.i.s.p.PostgresUtils(isXmin):190 using Xmin: false 2024-06-26 16:19:17 platform > INFO main i.a.c.i.s.j.AbstractJdbcSource(getCheckOperations$lambda$6):338 Attempting to get metadata from the database to see if we can connect. 2024-06-26 16:19:17 platform > INFO main i.a.c.i.s.j.AbstractJdbcSource(checkUserHasPrivileges):302 Checking if the user can perform select to any table in schema: public 2024-06-26 16:19:17 platform > INFO main i.a.c.d.j.s.AdaptiveStreamingQueryConfig(initialize):24 Set initial fetch size: 10 rows 2024-06-26 16:19:18 platform > INFO main i.a.c.d.j.s.AdaptiveStreamingQueryConfig(accept):33 Set new fetch size: 1092267 rows 2024-06-26 16:19:18 platform > INFO main i.a.c.d.j.s.TwoStageSizeEstimator$Companion(getTargetBufferByteSize):80 Max memory limit: 25249710080, JDBC buffer size: 15149826048 2024-06-26 16:19:18 platform > INFO main c.z.h.HikariDataSource(close):349 HikariPool-1 - Shutdown initiated... 2024-06-26 16:19:18 platform > INFO main c.z.h.HikariDataSource(close):351 HikariPool-1 - Shutdown completed. 2024-06-26 16:19:18 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):268 Completed integration: io.airbyte.cdk.integrations.base.ssh.SshWrappedSource 2024-06-26 16:19:18 platform > INFO main i.a.i.s.p.PostgresSource(main):693 completed source: class io.airbyte.integrations.source.postgres.PostgresSource 2024-06-26 16:19:18 platform > Check connection job received output: io.airbyte.config.StandardCheckConnectionOutput@6b72d02a[status=succeeded,message=,additionalProperties={}] 2024-06-26 16:19:18 platform > 2024-06-26 16:19:18 platform > ----- END CHECK ----- 2024-06-26 16:19:18 platform > 2024-06-26 16:19:18 platform > Docker volume job log path: /tmp/workspace/238092/4/logs.log 2024-06-26 16:19:18 platform > Executing worker wrapper. Airbyte version: 0.63.2 2024-06-26 16:19:18 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0' 2024-06-26 16:19:18 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0' .... .... .... generation 7 (!= current generation 8). Truncating it. 2024-06-26 16:22:18 destination > INFO sync-operations-7 i.a.i.d.b.o.BigQueryStorageOperation(truncateStagingTable):115 Truncating raw table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=airbyte_internal, tableId=airbyte_fullsync_live_raw__stream_user_policy_issues}} 2024-06-26 16:22:18 destination > INFO sync-operations-2 i.a.i.b.d.o.AbstractStreamOperation(prepareFinalTableForOverwrite):209 Using temp final table for table user_policies_versions, this will be overwritten at end of sync 2024-06-26 16:22:18 destination > INFO sync-operations-1 i.a.i.b.d.o.AbstractStreamOperation(prepareFinalTableForOverwrite):209 Using temp final table for table user_claims, this will be overwritten at end of sync 2024-06-26 16:22:18 destination > INFO sync-operations-10 i.a.i.b.d.o.AbstractStreamOperation(prepareFinalTableForOverwrite):209 Using temp final table for table user_policies_admin_users, this will be overwritten at end of sync 2024-06-26 16:22:18 destination > INFO sync-operations-5 i.a.i.b.d.o.AbstractStreamOperation(prepareFinalTableForOverwrite):209 Using temp final table for table user_policies_checkout_charges, this will be overwritten at end of sync 2024-06-26 16:22:18 destination > INFO sync-operations-4 i.a.i.b.d.o.AbstractStreamOperation(prepareFinalTableForOverwrite):209 Using temp final table for table user_policies, this will be overwritten at end of sync 2024-06-26 16:22:18 destination > INFO sync-operations-8 i.a.i.b.d.o.AbstractStreamOperation(prepareFinalTableForOverwrite):209 Using temp final table for table user_policy_activities, this will be overwritten at end of sync 2024-06-26 16:22:18 destination > INFO sync-operations-7 i.a.i.d.b.BigQueryUtils(createPartitionedTableIfNotExists):245 Partitioned table created successfully: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=airbyte_internal, tableId=airbyte_fullsync_live_raw__stream_user_policy_issues_airbyte_tmp}} 2024-06-26 16:22:18 destination > INFO sync-operations-7 i.a.i.d.b.o.BigQueryGcsStorageOperation(prepareStage):51 Creating bucket staging-area 2024-06-26 16:22:19 destination > INFO sync-operations-7 i.a.i.b.d.o.AbstractStreamOperation(prepareFinalTable):183 Final Table exists for stream user_policy_issues 2024-06-26 16:22:20 destination > INFO sync-operations-7 i.a.i.b.d.o.AbstractStreamOperation(prepareFinalTableForOverwrite):209 Using temp final table for table user_policy_issues, this will be overwritten at end of sync 2024-06-26 16:22:20 destination > INFO main i.a.c.i.d.a.b.BufferManager():48 Max 'memory' available for buffer allocation 593 MB 2024-06-26 16:22:20 destination > INFO main i.a.c.i.b.IntegrationRunner$Companion(consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core):423 Starting buffered read of input stream 2024-06-26 16:22:20 destination > INFO main i.a.c.i.d.a.FlushWorkers(start):73 Start async buffer supervisor 2024-06-26 16:22:20 destination > INFO pool-3-thread-1 i.a.c.i.d.a.b.BufferManager(printQueueInfo):94 [ASYNC QUEUE INFO] Global: max: 593.92 MB, allocated: 10 MB (10.0 MB), %% used: 0.016837142758506073 | State Manager memory usage: Allocated: 10 MB, Used: 0 bytes, percentage Used 0.0 2024-06-26 16:22:20 destination > INFO main i.a.c.i.d.a.AsyncStreamConsumer(start):89 class io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer started. 2024-06-26 16:22:20 destination > INFO pool-6-thread-1 i.a.c.i.d.a.FlushWorkers(printWorkerInfo):127 [ASYNC WORKER INFO] Pool queue size: 0, Active threads: 0 2024-06-26 16:22:20 platform > Records read: 5000 (1 MB) 2024-06-26 16:22:21 source > INFO main i.a.c.i.s.r.AbstractDbSource$createReadIterator$3(invoke):513 Reading stream admin_notes. Records read: 10000 2024-06-26 16:22:21 platform > Records read: 10000 (2 MB) 2024-06-26 16:22:21 source > INFO main i.a.i.s.p.c.CtidStateManager(generateStateMessageAtCheckpoint):77 Emitting ctid state for stream public_admin_notes, state is io.airbyte.integrations.source.postgres.internal.models.CtidStatus@27f1bbe0[version=2,stateType=ctid,ctid=(191,19),incrementalState={},relationFilenode=16943,additionalProperties={}] 2024-06-26 16:22:21 platform > Records read: 15000 (3 MB) 2024-06-26 16:22:21 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "public"."admin_notes" is 16943 2024-06-26 16:22:21 source > INFO main i.a.i.s.p.c.InitialSyncCtidIterator(computeNext):118 The latest file node 16943 for stream public_admin_notes is equal to the last file node 16943 known to Airbyte. 2024-06-26 16:22:21 source > INFO main i.a.i.s.p.c.CtidStateManager(createFinalStateMessage):115 Finished initial sync of stream public_admin_notes, Emitting final state, state is io.airbyte.protocol.models.v0.AirbyteStateMessage@4b1abd11[type=STREAM,stream=io.airbyte.protocol.models.v0.AirbyteStreamState@3f36b447[streamDescriptor=io.airbyte.protocol.models.v0.StreamDescriptor@6443b128[name=admin_notes,namespace=public,additionalProperties={}],streamState={"version":2,"state_type":"ctid","ctid":"(312,2)","incremental_state":{},"relation_filenode":16943},additionalProperties={}],global=,data=,sourceStats=,destinationStats=,additionalProperties={}] 2024-06-26 16:22:21 platform > readFromSource: exception caught io.airbyte.workers.exception.WorkerException: A stream status has been detected for a stream not present in the catalog at io.airbyte.workers.helper.StreamStatusCompletionTracker.track(StreamStatusCompletionTracker.kt:36) ~[io.airbyte-airbyte-commons-worker-0.63.2.jar:?] at io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:361) ~[io.airbyte-airbyte-commons-worker-0.63.2.jar:?] at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:242) ~[io.airbyte-airbyte-commons-worker-0.63.2.jar:?] at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] at java.base/java.lang.Thread.run(Thread.java:1583) [?:?] 2024-06-26 16:22:21 platform > readFromSource: done. (source.isFinished:false, fromSource.isClosed:false) 2024-06-26 16:22:21 platform > processMessage: done. (fromSource.isDone:true, forDest.isClosed:false) 2024-06-26 16:22:21 platform > writeToDestination: exception caught java.lang.IllegalStateException: Source process is still alive, cannot retrieve exit value. at com.google.common.base.Preconditions.checkState(Preconditions.java:515) ~[guava-33.1.0-jre.jar:?] at io.airbyte.workers.internal.DefaultAirbyteSource.getExitValue(DefaultAirbyteSource.java:136) ~[io.airbyte-airbyte-commons-worker-0.63.2.jar:?] at io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:454) ~[io.airbyte-airbyte-commons-worker-0.63.2.jar:?] at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:263) ~[io.airbyte-airbyte-commons-worker-0.63.2.jar:?] at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] at java.base/java.lang.Thread.run(Thread.java:1583) [?:?] 2024-06-26 16:22:21 platform > writeToDestination: done. (forDest.isDone:true, isDestRunning:true) 2024-06-26 16:22:22 destination > INFO main i.a.c.i.b.IntegrationRunner$Companion(consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core):445 Finished buffered read of input stream 2024-06-26 16:22:22 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):193 Closing flush workers -- waiting for all buffers to flush 2024-06-26 16:22:22 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):226 REMAINING_BUFFERS_INFO Namespace: null Stream: admin_notes -- remaining records: 15815 ```

Contribute

killthekitten commented 1 week ago

Downgrading to 2.7.1 results in the same error.

marcosmarxm commented 1 week ago

@airbytehq/destinations can someone take a look into this issue? My guess during the upgrade running discover schema a table was deleted or lost permission but the state still exist. @killthekitten can you check the state message (Settings -> Advance) and check if all streams/tables are in there also match the ones in Replication tab?

edgao commented 1 week ago

tl;dr this sounds like a source bug, but try downgrading destination-bigquery to 2.6.3 as a workaround?

WorkerException: A stream status has been detected for a stream not present in the catalog

this sounds like the platform detecting that the source emitted an invalid stream status message (which is not the same as a state message ๐Ÿ˜“ ). (relevant platform code here)

And I think the reason this works on the pre-dv2 destination-bigquery version is that platform only triggers this validation when the destination supports refreshes (code.) Destination Bigquery added support for refreshes in 2.7.0, so downgrading to an older version will mask the bug.

so @marcosmarxm I think it's worth tagging either DB sources to investigate source-postgres, or platform move to check the platform code - not sure which one is more likely though :/

killthekitten commented 1 week ago

@edgao downgrade to 2.6.3 worked!

killthekitten commented 1 week ago

@marcosmarxm can't find the state information anywhere (looked under general settings, connection and connector settings). Could it be that the latest Airbyte doesn't show this info anymore? I've been looking for it earlier today already.

@edgao it could very well be a source issue. Not sure how relevant this is, but we've been seeing a lot of these errors in logs:

https://github.com/airbytehq/airbyte/issues/17319

Is there anything else I could look up to help with investigation?

edgao commented 1 week ago

that sounds like an unrelated thing (platform logs out some record validation stuff, but afaik it's purely informational and doesn't actually affect sync success)

pinged the db sources team internally to take a look at this, stream statuses are a relatively new thing so it's possible that there's some rough edges in that region

rodireich commented 1 week ago

The table in question is "public"."admin_notes". The error is happening in the context of initial sync completing. We need more logging, context in order to try to reconstruct what went wrong here.

This looks like user cursor incremental sync (i.e no CDC or xmin)

rodireich commented 1 week ago

I tried recreating a similar scenario with all the information we have so far. Not seeing the problem

killthekitten commented 1 week ago

@rodireich admin_notes showing up in logs before the error could be a coincidence, asย this stream is first in the list when you sort alphabetically.

Is there any secure channel where I can send you the full logs? I could send both the successful and the failing sync logs. Also, don't have the access to the link you shared so can't confirm whether the setup matches ours.

killthekitten commented 1 week ago

This looks like user cursor incremental sync (i.e no CDC or xmin)

Correct, we use the cursor mode. That said, we perform a full rewrite on all streams

evantahler commented 1 week ago

@killthekitten are you in our community slack? We could do the log transfer there

killthekitten commented 6 days ago

@evantahler messaged you just now ๐Ÿ‘Œ

evantahler commented 6 days ago

Thank you! Internal reference to slack thread for Airbyte folks looking for the log

PieterBos94 commented 4 days ago

I am having the same issue with LinkedIn, PostHog and Hubspot after the upgrade to 2.8.0. Downgrading to 2.6.3 solved the problem. I had the same error in the logs about the stream status. Let me know if I can supply any additional information :)

io.airbyte.workers.exception.WorkerException: A stream status has been detected for a stream not present in the catalog

benmoriceau commented 4 days ago

Hello @killthekitten and @PieterBos94, We have created a new release which is changing the error being logged by adding which is the missing stream. Would it be possible to upgrade your platform and re-run with the version 2.8.0 of big-query?

Thanks,

benmoriceau commented 4 days ago

Would it be possible to send me the catalog associated with the connection? The request to get it is SELECT "catalog" FROM public."connection" where id = <CONNECTION_ID>;

Could you also let me know if there is a stream prefix set. It can be find in the setting tab of the connection. image

killthekitten commented 4 days ago

@benmoriceau

Would it be possible to upgrade your platform and re-run with the version 2.8.0 of big-query?

I won't be able to test the new release on our current instance, unfortunately, at least not this week (please ping me again if needed in the future).

That said, I took time to re-create our airbyte installation in a new GCP project and to my surprise, the sync didn't fail on 2.8.0. The only differences I can think off:

  1. It's a fresh Airbyte DB with just one configured connection to the same PostgreSQL data source that we use in a real setup
  2. It uses a clean GCS staging area

Would it be possible to send me the catalog associated with the connection?

Sure, I've exported the catalog data and passed it to @evantahler.

Could you also let me know if there is a stream prefix set. It can be find in the setting tab of the connection.

There's no prefix

evantahler commented 4 days ago

Internal link with @killthekitten's Catalog - thanks!

benmoriceau commented 3 days ago

Thank you @killthekitten, Looking at the catalog, the schema looks ok. I would like to make sure that it is the catalog coming from the old deployment and not the new one.

What is happening is that the version 2.8.0 of big query is adding the a new functionality named refresh. This now require having a stream status being forwarded to the destination. When this functionality is activated, we are validating that the stream status are valid. We have deployed this to our internal deployment of Airbyte and we didn't run into any issue.

I am going to run a comparison of the format of the catalog that you send and an example of our internal catalog to see if I see anything that could explain the issue.

Thanks,

killthekitten commented 3 days ago

@benmoriceau confirmed, this is the catalog from the old deployment but downgraded to bigquery 2.6.3.

I can also make a dump of the fresh deployment's catalog (2.8.0) if that is helpful.

Could you briefly describe what is a status?

benmoriceau commented 3 days ago

@killthekitten, I believe that I am starting to understand the issue. It seems to be the same root cause than https://github.com/airbytehq/airbyte/issues/39900. We have introduce a new metadata in the connector definition. In order to properly propagate the metadata, we need to upgrade the platform before we upgrade the connector so the platform can properly read the metadata.

In order to confirm this could you let me know the result of the following query

SELECT supports_refreshes 
FROM public.actor_definition_version
where docker_repository = 'airbyte/destination-bigquery' and docker_image_tag = '2.8.0';

I would like to confirm my assumption because I don't understand why it wasn't the same error than the other tickets.

I am thinking about a way to fix that for the old deployment which requires as few manual action as possible.

killthekitten commented 3 days ago

@benmoriceau here you go:

airbyte=> SELECT supports_refreshes 
FROM public.actor_definition_version
where docker_repository = 'airbyte/destination-bigquery' and docker_image_tag = '2.8.0';
 supports_refreshes 
--------------------
 t
(1 row)

In order to properly propagate the metadata, we need to upgrade the platform before we upgrade the connector so the platform can properly read the metadata.

FYI I saw the message in BigQuery changelog and made sure the platform was running on 0.63.0 before I updated to BigQuery 2.8.0.

benmoriceau commented 3 days ago

Thanks @killthekitten, It seems that the sequence of updates was right which means that the functionalities supported by the destination have been properly imported.

I am still looking for the root cause, I will make a fix to prevent failing the sync in the platform.

Could you confirm that only the attempt were failing, if a new job is created, is it still failing? I wonder if we were using the old image tag for the config but for a connector which has been updated. It was fixed here But this got release in a platform version you are currently using (release version was 0.63.1).

killthekitten commented 3 days ago

@benmoriceau sorry, not sure I understood the question! IIRC the moment we switched the connector version, all attempts started failing and/or crashing the instance, only one succeeded out of a dozen. Is that what you asked?

benmoriceau commented 3 days ago

@killthekitten I am wondering if the jobs start failing after the X attempts failed. It should then create a new job, I am wondering what is the status of this job.

killthekitten commented 3 days ago

@benmoriceau our logs are a mess, as a bit of panic was involved and I don't remember how many times I cancelled the jobs while switching the platform version etc, so can't say which of the jobs were created after using up all attempts. Here is how it looked like (from earliest to latest):

  1. Sync failed, 260k records extracted, nothing loaded in any of the 5 attempts
  2. Clear data initiated by me and succeeded
  3. Sync partially succeeded, 2 attempts, various number of records extracted/loaded
  4. Sync partially succeeded, 4 attempts, the first attempt looks like all of the data was synced but still failed
  5. Another clear succeeded
  6. A cancelled sync
  7. A successful sync (reverted the version back)
benmoriceau commented 2 days ago

Thanks @killthekitten, Is the error reported in the ticket from the 1 step?

Could you send me the result of the following query so I can see the failure reason of all the attempts and make sure that we are going to address the right error:

SELECT j.id, j.config, j."status", a."status", a.failure_summary, a."output" 
FROM public.jobs j
join attempts a on a.job_id = j.id
where j."scope" = '<YOUR_CONNECTION_ID>'
order by j.id desc
limit 50;

I am also on the community slack under the name of Benoit Moriceau (Airbyte)

benmoriceau commented 2 days ago

Hello @killthekitten, Would you be able to upgrade the platform, bump the version of the big query destination to 2.8.0 or more and re-run the sync.

I am not seeing anything which would point me to a root cause in the query result. Upgrading to the latest version of the platform will add some stream name information in the failure message. It will help me to check if it is in the catalog of the job.

Thanks,

killthekitten commented 2 days ago

I could try tomorrow afternoon!

benmoriceau commented 2 days ago

Thanks!

killthekitten commented 1 day ago

On a side note we have a related issue: every time an airbyte instance goes down, any failed syncs are retried, but the retried attempt leaves the dataset in an inconsistent state. I believe this has started when I rolled back 2.8.0.

Could this be related to the refresh functionality that I reverted? Should I file a new issue?

benmoriceau commented 1 day ago

@killthekitten Yes it seems related to the refreshes functionality on the destination side. For some reason the destination only has the latest attempt data. @stephane-airbyte FYI as destination OC.

killthekitten commented 1 day ago

@benmoriceau

Would you be able to upgrade the platform, bump the version of the big query destination to 2.8.0 or more and re-run the sync.

I upgraded the platform to 0.63.4 and the destination to 2.8.1, and the sync has been completed successfully. Here are the differences compared to the time it failed:

Could the issue have been caused by the fact that the sync was cancelled, and then 2.8.0 attempted to resume it?

killthekitten commented 1 day ago

@benmoriceau update:

killthekitten commented 23 hours ago

A couple of syncs are not stable anymore (maybe a coincidence), so I'm reverting back to 2.6.3 once again. Would appreciate any hints about resolving the inconsistent state issues on 2.6.3.

If that is still relevant, I've found the "stream status has been detected" message in one of the logs:

io.airbyte.workers.exception.WorkerException: A stream status (public.transaction) has been detected for a stream not present in the catalog
        at io.airbyte.workers.helper.StreamStatusCompletionTracker.track(StreamStatusCompletionTracker.kt:36) ~[io.airbyte-airbyte-commons-worker-0.63.4.jar:?]
        at io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:361) ~[io.airbyte-airbyte-commons-worker-0.63.4.jar:?]
        at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:242) ~[io.airbyte-airbyte-commons-worker-0.63.4.jar:?]
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]

These are present only in some cases. The table naming is confusing in the logs: in some places I see public.{{stream_name}}, in others it is public_{{stream_name}} and also there's a bunch of public:{{stream_name}}. I'd assume this is just formatting issues.

killthekitten commented 22 hours ago

Another failure of this kind but in Stripe 3.17.4 <> BigQuery 1.2.20 sync:

io.airbyte.workers.exception.WorkerException: A stream status (public.transaction) has been detected for a stream not present in the catalog
        at io.airbyte.workers.helper.StreamStatusCompletionTracker.track(StreamStatusCompletionTracker.kt:36) ~[io.airbyte-airbyte-commons-worker-0.63.4.jar:?]
        at io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:361) ~[io.airbyte-airbyte-commons-worker-0.63.4.jar:?]
        at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:242) ~[io.airbyte-airbyte-commons-worker-0.63.4.jar:?]
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
benmoriceau commented 6 hours ago

@killthekitten Thanks for the logs, The way of writing the namespace and name is just a formating in the log. For this connection, is the stream transaction selected? It will also be helpful to get the catalog use in the job which fails with the error A stream status (public.transaction) has been detected for a stream not present in the catalog.

select config
from jobs
where id = <JOB_ID>
killthekitten commented 4 hours ago

The transaction stream is selected in this case, yes. What I've noticed, this could be any of the streams, I've seen different streams failing in different jobs. I've shared the config in DM ๐Ÿ‘

edgao commented 4 hours ago

@killthekitten for the thing about only having 54 records after two attempts - is that still happening? (if yes, can you post the logs? I'm on the airbyte slack, feel free to DM me)

benmoriceau commented 4 hours ago

@edgao I think that i have a fix for the missing stream in catalog. I wonder if the missing data in the destination is a side effect of that because we are not sending the COMPLETED status to the destination since the orchestrator failed.

killthekitten commented 2 hours ago

@edgao yes, it is still an issue! logs sent ๐Ÿ‘

benmoriceau commented 1 hour ago

@killthekitten Sorry if I missed that before but I would like to confirm that you are running with docker and without having the orchestrators activated.

killthekitten commented 1 hour ago

@benmoriceau correct, we run with docker. What would be an orchestrator? Pretty sure we don't run anything like that.

benmoriceau commented 54 minutes ago

The orchestrator is a component which run on its own pod an is transferring data between the source and the destination. It is disabled by default on docker but enabled by default on kube. There might be some differences around how the catalog is loaded in the different modes. That being said, I didn't managed to reproduce the issue on my local with the same configuration than you.

killthekitten commented 48 minutes ago

That's a pity. We did major version bumps and changed the stream configuration too many times in the last year, so it must be some broken state either in the config DB or airbyte_internal that is hard to reproduce