airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.16k stars 4.13k forks source link

Postgres generated columns are always null on incremental updates #45338

Open acsbendi opened 2 months ago

acsbendi commented 2 months ago

Connector Name

destination-postgres

Connector Version

3.6.18

What step the error happened?

None

Relevant information

Steps for reproduction:

  1. Create a table with a generated column, insert a record
    create table main.generated_column_test
    (
    id bigserial primary key ,
    timestamp_inserted timestamp not null default now(),
    day_inserted date generated always as (timestamp_inserted::date) stored
    );
    insert into main.generated_column_test(id, timestamp_inserted) values (default, default);
  2. Select the table in Airbyte, with an incremental update mode (either Incremental | Append + Deduped or Incremental | Append + Deduped), run initial sync -> day_inserted has correct value
  3. Insert another record (insert into main.generated_column_test(id, timestamp_inserted) values (default, default);)
  4. Run an incremental refresh -> day_inserted has null value, incorrectly

This may be related to #38470.

Relevant log output

2024-09-09 12:44:14 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(lambda$start$0):244    value.converter.schemas.enable = false
2024-09-09 12:44:14 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(lambda$start$0):244    name = contents
2024-09-09 12:44:14 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(lambda$start$0):244    max.batch.size = 2048
2024-09-09 12:44:14 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(lambda$start$0):244    table.include.list = \Qmain.generated_column_test\E
2024-09-09 12:44:14 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(lambda$start$0):244    snapshot.mode = initial
2024-09-09 12:44:14 source > INFO pool-2-thread-1 i.d.c.CommonConnectorConfig(getSourceInfoStructMaker):1649 Loading the custom source info struct maker plugin: io.debezium.connector.postgresql.PostgresSourceInfoStructMaker
2024-09-09 12:44:14 source > INFO pool-2-thread-1 i.d.c.CommonConnectorConfig(getTopicNamingStrategy):1357 Loading the custom topic naming strategy plugin: io.debezium.schema.SchemaTopicNamingStrategy
2024-09-09 12:44:14 source > INFO pool-4-thread-1 i.d.j.JdbcConnection(lambda$doClose$4):952 Connection gracefully closed
2024-09-09 12:44:14 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(getPreviousOffsets):501 Found previous partition offset PostgresPartition [sourcePartition={server=contents}]: {lsn_proc=18002558956800, messageType=NOOP, lsn_commit=18002558956800, lsn=18002558956800, txId=177182214, ts_usec=1725885825801645}
2024-09-09 12:44:15 source > INFO pool-2-thread-1 i.d.c.p.c.PostgresConnection(readReplicationSlotInfo):337 Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{105F/8C21B500}, catalogXmin=177182005]
2024-09-09 12:44:15 source > INFO pool-2-thread-1 i.d.c.p.PostgresConnectorTask(start):156 user 'airbyte' connected to database 'contents' on PostgreSQL 13.13 on aarch64-unknown-linux-gnu, compiled by gcc (GCC) 7.3.1 20180712 (Red Hat 7.3.1-6), 64-bit with roles:
    role 'rds_replication' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'airbyte' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: true]
2024-09-09 12:44:15 source > INFO pool-2-thread-1 i.d.c.p.c.PostgresConnection(readReplicationSlotInfo):337 Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{105F/8C21B500}, catalogXmin=177182005]
2024-09-09 12:44:15 source > INFO pool-2-thread-1 i.d.c.p.PostgresConnectorTask(start):168 Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='contents'db='contents', lsn=LSN{105F/8C21B500}, txId=177182214, messageType=NOOP, lastCommitLsn=LSN{105F/8C21B500}, timestamp=2024-09-09T12:43:45.801645Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=LSN{105F/8C21B500}, lastCommitLsn=LSN{105F/8C21B500}, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
2024-09-09 12:44:15 source > INFO pool-2-thread-1 i.d.u.Threads(threadFactory):271 Requested thread factory for connector PostgresConnector, id = contents named = SignalProcessor
2024-09-09 12:44:15 source > INFO pool-2-thread-1 i.d.u.Threads(threadFactory):271 Requested thread factory for connector PostgresConnector, id = contents named = change-event-source-coordinator
2024-09-09 12:44:15 source > INFO pool-2-thread-1 i.d.u.Threads(threadFactory):271 Requested thread factory for connector PostgresConnector, id = contents named = blocking-snapshot
2024-09-09 12:44:15 source > INFO pool-2-thread-1 i.d.u.Threads$3(newThread):288 Creating thread debezium-postgresconnector-contents-change-event-source-coordinator
2024-09-09 12:44:15 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher$start$3(taskStarted):87 DebeziumEngine notify: task started
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator(lambda$start$0):134 Metrics registered
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator(lambda$start$0):137 Context created
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.PostgresSnapshotChangeEventSource(getSnapshottingTask):77 A previous offset indicating a completed snapshot has been found.
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.PostgresSnapshotChangeEventSource(getSnapshottingTask):85 According to the connector configuration no snapshot will be executed
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator(doSnapshot):254 Snapshot ended with SnapshotResult [status=SKIPPED, offset=PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='contents'db='contents', lsn=LSN{105F/8C21B500}, txId=177182214, messageType=NOOP, lastCommitLsn=LSN{105F/8C21B500}, timestamp=2024-09-09T12:43:45.801645Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=LSN{105F/8C21B500}, lastCommitLsn=LSN{105F/8C21B500}, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]]
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator(streamingConnected):433 Connected metrics set to 'true'
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.PostgresSchema(printReplicaIdentityInfo):100 REPLICA IDENTITY for 'main.generated_column_test' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
2024-09-09 12:44:15 source > WARN debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.c.PostgresDefaultValueConverter(parseDefaultValue):96 Cannot parse column default value '(timestamp_inserted)::date' to type 'date'. Expression evaluation is not supported.
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.p.s.SignalProcessor(start):105 SignalProcessor started. Scheduling it every 5000ms
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.u.Threads$3(newThread):288 Creating thread debezium-postgresconnector-contents-SignalProcessor
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator(streamEvents):279 Starting streaming
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.PostgresStreamingChangeEventSource(execute):137 Retrieved latest position from stored offset 'LSN{105F/8C21B500}'
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.c.WalPositionLocator(<init>):48 Looking for WAL restart position for last commit LSN 'LSN{105F/8C21B500}' and last change LSN 'LSN{105F/8C21B500}'
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.c.PostgresReplicationConnection(initPublication):150 Initializing PgOutput logical decoder publication
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.c.PostgresConnection(readReplicationSlotInfo):337 Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{105F/8C21B500}, catalogXmin=177182005]
2024-09-09 12:44:15 source > INFO pool-5-thread-1 i.d.j.JdbcConnection(lambda$doClose$4):952 Connection gracefully closed
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.u.Threads(threadFactory):271 Requested thread factory for connector PostgresConnector, id = contents named = keep-alive
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.u.Threads$3(newThread):288 Creating thread debezium-postgresconnector-contents-keep-alive
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.PostgresSchema(printReplicaIdentityInfo):100 REPLICA IDENTITY for 'main.generated_column_test' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
2024-09-09 12:44:15 source > WARN debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.c.PostgresDefaultValueConverter(parseDefaultValue):96 Cannot parse column default value '(timestamp_inserted)::date' to type 'date'. Expression evaluation is not supported.
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.PostgresStreamingChangeEventSource(searchWalPosition):338 Searching for WAL resume position
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.c.WalPositionLocator(resumeFromLsn):71 First LSN 'LSN{105F/8C21B500}' received
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.c.WalPositionLocator(resumeFromLsn):89 LSN after last stored change LSN 'LSN{105F/8C21B538}' received
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.PostgresStreamingChangeEventSource(searchWalPosition):360 WAL resume position 'LSN{105F/8C21B538}' discovered
2024-09-09 12:44:15 source > INFO pool-6-thread-1 i.d.j.JdbcConnection(lambda$doClose$4):952 Connection gracefully closed
2024-09-09 12:44:15 source > INFO pool-7-thread-1 i.d.j.JdbcConnection(lambda$doClose$4):952 Connection gracefully closed
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.c.PostgresReplicationConnection(initPublication):150 Initializing PgOutput logical decoder publication
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.u.Threads(threadFactory):271 Requested thread factory for connector PostgresConnector, id = contents named = keep-alive
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.u.Threads$3(newThread):288 Creating thread debezium-postgresconnector-contents-keep-alive
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.PostgresStreamingChangeEventSource(processMessages):212 Processing messages
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.c.AbstractMessageDecoder(shouldMessageBeSkipped):54 Streaming requested from LSN LSN{105F/8C21B500}, received LSN LSN{105F/8C21B500} identified as already processed
2024-09-09 12:44:15 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.c.p.c.WalPositionLocator(skipMessage):152 Message with LSN 'LSN{105F/8C21B538}' arrived, switching off the filtering
2024-09-09 12:44:16 source > INFO pool-2-thread-1 i.a.c.i.d.AirbyteDebeziumHandler$CapacityReportingBlockingQueue(reportQueueUtilization):48 CDC events queue stats: size=0, cap=10000, puts=1, polls=0
2024-09-09 12:44:16 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT1.565878587S in its first call.
2024-09-09 12:44:16 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):185 CDC events queue poll(): returned a change event with "source": {"version":"2.6.2.Final","connector":"postgresql","name":"contents","ts_ms":1725885841018,"snapshot":null,"db":"contents","sequence":"[\"18002559060336\",\"18002559060440\"]","ts_us":1725885841018552,"ts_ns":1725885841018552000,"schema":"main","table":"generated_column_test","txId":177182253,"lsn":18002559060440,"xmin":null}.
2024-09-09 12:44:16 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):201 Received first record from debezium.
2024-09-09 12:44:16 destination > INFO main c.z.h.p.HikariPool(checkFailFast):554 HikariPool-1 - Added connection net.snowflake.client.jdbc.SnowflakeConnectionV1@7e3d7dd
2024-09-09 12:44:16 destination > INFO main c.z.h.HikariDataSource(getConnection):122 HikariPool-1 - Start completed.
2024-09-09 12:44:17 destination > INFO main i.a.c.d.j.JdbcDatabase(execute$lambda$0):50 executing statement: create table if not exists "airbyte_internal"."_airbyte_destination_state" ("name" varchar, "namespace" varchar, "destination_state" varchar, "updated_at" timestamp with time zone)
2024-09-09 12:44:17 destination > INFO main i.a.c.d.j.JdbcDatabase(execute$lambda$0):54 statement successfully executed
2024-09-09 12:44:18 destination > INFO main i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:18 destination > INFO main i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:19 destination > INFO main i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:19 destination > INFO main i.a.i.d.s.t.SnowflakeDestinationHandler(getInitialRawTableState$lambda$3):105 Retrieving table from Db metadata: airbyte_internal main_raw__stream_generated_column_test
2024-09-09 12:44:19 destination > INFO main i.a.i.d.s.t.SnowflakeDestinationHandler(getInitialRawTableState$lambda$3):105 Retrieving table from Db metadata: airbyte_internal main_raw__stream_generated_column_test_airbyte_tmp
2024-09-09 12:44:20 destination > INFO sync-operations-1 i.a.i.b.d.t.TyperDeduperUtil(runMigrationsAsync$lambda$12):165 Maybe executing SnowflakeDV2Migration migration for stream main.generated_column_test.
2024-09-09 12:44:20 destination > INFO sync-operations-1 i.a.i.d.s.m.SnowflakeDV2Migration(migrateIfNecessary):32 Initializing DV2 Migration check
2024-09-09 12:44:20 destination > INFO sync-operations-1 i.a.i.b.d.t.BaseDestinationV1V2Migrator(migrateIfNecessary):19 Assessing whether migration is necessary for stream GENERATED_COLUMN_TEST
2024-09-09 12:44:20 destination > INFO sync-operations-1 i.a.i.b.d.t.BaseDestinationV1V2Migrator(shouldMigrate):43 Checking whether v1 raw table _airbyte_raw_generated_column_test in dataset main exists
2024-09-09 12:44:20 destination > INFO sync-operations-1 i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:21 destination > INFO sync-operations-1 i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:21 destination > INFO sync-operations-1 i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:21 destination > INFO sync-operations-1 i.a.i.b.d.t.BaseDestinationV1V2Migrator(shouldMigrate):50 Migration Info: Required for Sync mode: true, No existing v2 raw tables: false, A v1 raw table exists: false
2024-09-09 12:44:21 destination > INFO sync-operations-1 i.a.i.b.d.t.BaseDestinationV1V2Migrator(migrateIfNecessary):30 No Migration Required for stream: GENERATED_COLUMN_TEST
2024-09-09 12:44:21 destination > INFO main i.a.i.b.d.t.TyperDeduperUtil(executeRawTableMigrations):66 Refetching initial state for streams: [StreamId(finalNamespace=MAIN, finalName=GENERATED_COLUMN_TEST, rawNamespace=airbyte_internal, rawName=main_raw__stream_generated_column_test, originalNamespace=main, originalName=generated_column_test)]
2024-09-09 12:44:21 destination > INFO main i.a.c.d.j.JdbcDatabase(execute$lambda$0):50 executing statement: create table if not exists "airbyte_internal"."_airbyte_destination_state" ("name" varchar, "namespace" varchar, "destination_state" varchar, "updated_at" timestamp with time zone)
2024-09-09 12:44:22 destination > INFO main i.a.c.d.j.JdbcDatabase(execute$lambda$0):54 statement successfully executed
2024-09-09 12:44:22 destination > INFO main i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:22 destination > INFO main i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:23 destination > INFO main i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:23 destination > INFO main i.a.i.d.s.t.SnowflakeDestinationHandler(getInitialRawTableState$lambda$3):105 Retrieving table from Db metadata: airbyte_internal main_raw__stream_generated_column_test
2024-09-09 12:44:23 destination > INFO main i.a.i.d.s.t.SnowflakeDestinationHandler(getInitialRawTableState$lambda$3):105 Retrieving table from Db metadata: airbyte_internal main_raw__stream_generated_column_test_airbyte_tmp
2024-09-09 12:44:24 destination > INFO main i.a.i.b.d.t.TyperDeduperUtil(executeRawTableMigrations):73 Updated states: [DestinationInitialStatus(streamConfig=StreamConfig(id=StreamId(finalNamespace=MAIN, finalName=GENERATED_COLUMN_TEST, rawNamespace=airbyte_internal, rawName=main_raw__stream_generated_column_test, originalNamespace=main, originalName=generated_column_test), postImportAction=DEDUPE, primaryKey=[ColumnId(name=ID, originalName=id, canonicalName=ID)], cursor=Optional[ColumnId(name=_AB_CDC_LSN, originalName=_ab_cdc_lsn, canonicalName=_AB_CDC_LSN)], columns={ColumnId(name=ID, originalName=id, canonicalName=ID)=INTEGER, ColumnId(name=_AB_CDC_LSN, originalName=_ab_cdc_lsn, canonicalName=_AB_CDC_LSN)=NUMBER, ColumnId(name=DAY_INSERTED, originalName=day_inserted, canonicalName=DAY_INSERTED)=DATE, ColumnId(name=_AB_CDC_DELETED_AT, originalName=_ab_cdc_deleted_at, canonicalName=_AB_CDC_DELETED_AT)=STRING, ColumnId(name=_AB_CDC_UPDATED_AT, originalName=_ab_cdc_updated_at, canonicalName=_AB_CDC_UPDATED_AT)=STRING, ColumnId(name=TIMESTAMP_INSERTED, originalName=timestamp_inserted, canonicalName=TIMESTAMP_INSERTED)=TIMESTAMP_WITHOUT_TIMEZONE}, generationId=7, minimumGenerationId=0, syncId=4208), isFinalTablePresent=true, initialRawTableStatus=InitialRawTableStatus(rawTableExists=true, hasUnprocessedRecords=false, maxProcessedTimestamp=Optional[2024-09-09T12:42:16.037Z]), initialTempRawTableStatus=InitialRawTableStatus(rawTableExists=false, hasUnprocessedRecords=false, maxProcessedTimestamp=Optional.empty), isSchemaMismatch=false, isFinalTableEmpty=false, destinationState=SnowflakeState(needsSoftReset=false, isAirbyteMetaPresentInRaw=true), finalTableGenerationId=0, finalTempTableGenerationId=null)]
2024-09-09 12:44:24 destination > INFO sync-operations-2 i.a.i.b.d.t.TyperDeduperUtil(runMigrationsAsync$lambda$12):165 Maybe executing SnowflakeAbMetaAndGenIdMigration migration for stream main.generated_column_test.
2024-09-09 12:44:24 destination > INFO sync-operations-2 i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:24 destination > INFO sync-operations-2 i.a.i.d.s.m.SnowflakeAbMetaAndGenIdMigration(migrateIfNecessary):87 Skipping airbyte_meta/generation_id migration for main.generated_column_test because the raw table already has the airbyte_meta column
2024-09-09 12:44:24 destination > INFO sync-operations-2 i.a.i.d.s.m.SnowflakeAbMetaAndGenIdMigration(migrateIfNecessary):131 skipping migration of generation_id for table MAIN.GENERATED_COLUMN_TEST because schemas match
2024-09-09 12:44:24 destination > INFO main i.a.i.d.s.t.SnowflakeDestinationHandler(getDeleteStatesSql):523 skipping state deletion
2024-09-09 12:44:24 destination > INFO main i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):65 executing query within transaction: SELECT 1
2024-09-09 12:44:24 destination > INFO main i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):69 done executing query within transaction: SELECT 1
2024-09-09 12:44:24 destination > INFO main i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):65 executing query within transaction: insert into "airbyte_internal"."_airbyte_destination_state" ("name", "namespace", "destination_state", "updated_at") values ('generated_column_test', 'main', '{"needsSoftReset":false,"airbyteMetaPresentInRaw":true}', '2024-09-09T12:44:24.137263076Z')
2024-09-09 12:44:24 destination > INFO main i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):69 done executing query within transaction: insert into "airbyte_internal"."_airbyte_destination_state" ("name", "namespace", "destination_state", "updated_at") values ('generated_column_test', 'main', '{"needsSoftReset":false,"airbyteMetaPresentInRaw":true}', '2024-09-09T12:44:24.137263076Z')
2024-09-09 12:44:24 destination > INFO main i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:24 destination > INFO main i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:24 destination > INFO sync-operations-3 i.a.i.b.d.o.AbstractStreamOperation(prepareStageForNormalSync):79 main.generated_column_test: non-truncate sync. Creating raw table if not exists.
2024-09-09 12:44:24 destination > INFO sync-operations-3 i.a.i.d.s.t.SnowflakeDestinationHandler(execute):252 Executing sql 00535b14-0bc7-49fb-a7ff-e9a33bbf8e6c-f1e81c16-85d5-43ed-b6f5-836a7fe276d7: CREATE TABLE IF NOT EXISTS "airbyte_internal"."main_raw__stream_generated_column_test"( 
   "_airbyte_raw_id" VARCHAR PRIMARY KEY,
   "_airbyte_extracted_at" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(),
   "_airbyte_loaded_at" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
   "_airbyte_data" VARIANT,
   "_airbyte_meta" VARIANT DEFAULT NULL,
   "_airbyte_generation_id" INTEGER DEFAULT NULL
) data_retention_time_in_days = 1;
2024-09-09 12:44:24 destination > INFO sync-operations-3 i.a.c.d.j.JdbcDatabase(execute$lambda$0):50 executing statement: CREATE TABLE IF NOT EXISTS "airbyte_internal"."main_raw__stream_generated_column_test"( 
   "_airbyte_raw_id" VARCHAR PRIMARY KEY,
   "_airbyte_extracted_at" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(),
   "_airbyte_loaded_at" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
   "_airbyte_data" VARIANT,
   "_airbyte_meta" VARIANT DEFAULT NULL,
   "_airbyte_generation_id" INTEGER DEFAULT NULL
) data_retention_time_in_days = 1;
2024-09-09 12:44:25 destination > INFO sync-operations-3 i.a.c.d.j.JdbcDatabase(execute$lambda$0):54 statement successfully executed
2024-09-09 12:44:25 destination > INFO sync-operations-3 i.a.i.d.s.t.SnowflakeDestinationHandler(execute):274 Sql 00535b14-0bc7-49fb-a7ff-e9a33bbf8e6c-f1e81c16-85d5-43ed-b6f5-836a7fe276d7 completed in 61 ms
2024-09-09 12:44:25 destination > INFO sync-operations-3 i.a.c.d.j.JdbcDatabase(execute$lambda$0):50 executing statement: CREATE STAGE IF NOT EXISTS "airbyte_internal"."main_raw__stream_generated_column_test" encryption = (type = 'SNOWFLAKE_SSE') copy_options = (on_error='skip_file');
2024-09-09 12:44:25 destination > INFO sync-operations-3 i.a.c.d.j.JdbcDatabase(execute$lambda$0):54 statement successfully executed
2024-09-09 12:44:25 destination > INFO sync-operations-3 i.a.i.b.d.o.AbstractStreamOperation(prepareStageForNormalSync):125 main.generated_column_test: non-truncate sync and no temp raw table. Initial raw table status is null.
2024-09-09 12:44:25 destination > INFO sync-operations-3 i.a.i.b.d.o.AbstractStreamOperation(prepareFinalTable):247 Final Table exists for stream GENERATED_COLUMN_TEST
2024-09-09 12:44:25 destination > INFO main i.a.i.d.s.t.SnowflakeDestinationHandler(getDeleteStatesSql):523 skipping state deletion
2024-09-09 12:44:25 destination > INFO main i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):65 executing query within transaction: SELECT 1
2024-09-09 12:44:25 destination > INFO main i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):69 done executing query within transaction: SELECT 1
2024-09-09 12:44:25 destination > INFO main i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):65 executing query within transaction: insert into "airbyte_internal"."_airbyte_destination_state" ("name", "namespace", "destination_state", "updated_at") values ('generated_column_test', 'main', '{"needsSoftReset":false,"airbyteMetaPresentInRaw":true}', '2024-09-09T12:44:25.142850108Z')
2024-09-09 12:44:25 destination > INFO main i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):69 done executing query within transaction: insert into "airbyte_internal"."_airbyte_destination_state" ("name", "namespace", "destination_state", "updated_at") values ('generated_column_test', 'main', '{"needsSoftReset":false,"airbyteMetaPresentInRaw":true}', '2024-09-09T12:44:25.142850108Z')
2024-09-09 12:44:25 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(logStatistics):323 2 records sent during previous 00:00:11.014, last recorded offset of {server=contents} partition is {lsn_proc=18002559234680, messageType=NOOP, lsn_commit=18002559234680, lsn=18002559234680, txId=177182319, ts_usec=1725885864672405}
2024-09-09 12:44:25 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):87 CDC events queue poll(): blocked for PT9.514298523S after its previous call which was also logged.
2024-09-09 12:44:25 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(computeNext):140 CDC events queue poll(): returned a heartbeat event: progressing to 18002559234680.
2024-09-09 12:44:25 source > INFO main i.a.i.s.p.c.PostgresCdcTargetPosition(reachedTargetPosition):80 Signalling close because heartbeat LSN : 18002559234680 is after target LSN : PgLsn{lsn=18002559148272}
2024-09-09 12:44:25 source > INFO main i.a.c.i.d.i.DebeziumRecordIterator(requestClose):276 Closing: Heartbeat indicates sync is done by reaching the target position
2024-09-09 12:44:25 platform > SOURCE analytics [airbyte/source-postgres:3.6.18] | Type: db-sources-debezium-close-reason | Value: HEARTBEAT_REACHED_TARGET_POSITION
2024-09-09 12:44:25 source > INFO main i.d.e.EmbeddedEngine(stop):957 Stopping the embedded engine
2024-09-09 12:44:25 source > INFO main i.d.e.EmbeddedEngine(stop):964 Waiting for PT5M for connector to stop
2024-09-09 12:44:26 destination > INFO main i.a.c.i.d.a.b.BufferManager(<init>):48 Max 'memory' available for buffer allocation 1 GB
2024-09-09 12:44:26 destination > INFO main i.a.c.i.b.IntegrationRunner$Companion(consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core):379 Starting buffered read of input stream
2024-09-09 12:44:26 destination > INFO main i.a.c.i.d.a.FlushWorkers(start):73 Start async buffer supervisor
2024-09-09 12:44:26 destination > INFO main i.a.c.i.d.a.AsyncStreamConsumer(start):89 class io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer started.
2024-09-09 12:44:26 destination > INFO pool-10-thread-1 i.a.c.i.d.a.FlushWorkers(printWorkerInfo):127 [ASYNC WORKER INFO] Pool queue size: 0, Active threads: 0
2024-09-09 12:44:26 destination > INFO pool-7-thread-1 i.a.c.i.d.a.b.BufferManager(printQueueInfo):94 [ASYNC QUEUE INFO] Global: max: 1.81 GB, allocated: 10 MB (10.0 MB), %% used: 0.005387931034482759 | State Manager memory usage: Allocated: 10 MB, Used: 0 bytes, percentage Used 0.0
2024-09-09 12:44:26 source > INFO pool-2-thread-1 i.d.e.EmbeddedEngine(stopTaskAndCommitOffset):765 Stopping the task and engine
2024-09-09 12:44:26 source > INFO pool-2-thread-1 i.d.c.c.BaseSourceTask(stop):406 Stopping down connector
2024-09-09 12:44:26 source > INFO pool-8-thread-1 i.d.j.JdbcConnection(lambda$doClose$4):952 Connection gracefully closed
2024-09-09 12:44:26 source > INFO pool-9-thread-1 i.d.j.JdbcConnection(lambda$doClose$4):952 Connection gracefully closed
2024-09-09 12:44:26 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator(streamEvents):281 Finished streaming
2024-09-09 12:44:26 source > INFO debezium-postgresconnector-contents-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator(streamingConnected):433 Connected metrics set to 'false'
2024-09-09 12:44:26 source > INFO pool-2-thread-1 i.d.p.s.SignalProcessor(stop):127 SignalProcessor stopped
2024-09-09 12:44:26 source > INFO pool-2-thread-1 i.d.s.DefaultServiceRegistry(close):105 Debezium ServiceRegistry stopped.
2024-09-09 12:44:26 source > INFO pool-10-thread-1 i.d.j.JdbcConnection(lambda$doClose$4):952 Connection gracefully closed
2024-09-09 12:44:26 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher$start$3(taskStopped):91 DebeziumEngine notify: task stopped
2024-09-09 12:44:26 source > INFO pool-2-thread-1 o.a.k.c.s.FileOffsetBackingStore(stop):71 Stopped FileOffsetBackingStore
2024-09-09 12:44:26 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher$start$3(connectorStopped):83 DebeziumEngine notify: connector stopped
2024-09-09 12:44:26 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher(start$lambda$1):60 Debezium engine shutdown. Engine terminated successfully : true
2024-09-09 12:44:26 source > INFO pool-2-thread-1 i.a.c.i.d.i.DebeziumRecordPublisher(start$lambda$1):63 Connector 'io.debezium.connector.postgresql.PostgresConnector' completed normally.
2024-09-09 12:44:26 source > INFO main i.a.i.s.p.c.PostgresCdcStateHandler(saveState):38 debezium state: {"[\"contents\",{\"server\":\"contents\"}]":"{\"lsn_proc\":18002559234680,\"messageType\":\"NOOP\",\"lsn_commit\":18002559234680,\"lsn\":18002559234680,\"txId\":177182319,\"ts_usec\":1725885864672405}"}
2024-09-09 12:44:26 source > INFO main i.a.c.i.s.r.s.SourceStateIterator(computeNext):84 sending final state message, with count per stream: {main_generated_column_test=1} 
2024-09-09 12:44:26 platform > Stream status TRACE received of status: COMPLETE for stream main:generated_column_test
2024-09-09 12:44:26 source > INFO main i.a.c.i.s.r.AbstractDbSource(read$lambda$5):178 Closing database connection pool.
2024-09-09 12:44:26 source > INFO main c.z.h.HikariDataSource(close):349 HikariPool-1 - Shutdown initiated...
2024-09-09 12:44:26 source > INFO main c.z.h.HikariDataSource(close):351 HikariPool-1 - Shutdown completed.
2024-09-09 12:44:26 source > INFO main i.a.c.i.s.r.AbstractDbSource(read$lambda$5):180 Closed database connection pool.
2024-09-09 12:44:26 source > INFO main i.a.c.i.b.IntegrationRunner(runInternal):224 Completed integration: io.airbyte.cdk.integrations.base.ssh.SshWrappedSource
2024-09-09 12:44:26 source > INFO main i.a.i.s.p.PostgresSource(main):700 completed source: class io.airbyte.integrations.source.postgres.PostgresSource
2024-09-09 12:44:26 platform > Total records read: 5 (161 bytes)
2024-09-09 12:44:26 platform > Schema validation was performed to a max of 10 records with errors per stream.
2024-09-09 12:44:26 platform > Schema validation errors found for stream main_generated_column_test. Error messages: [$.timestamp_inserted: does not match the date-time pattern must be a valid RFC 3339 date-time, $._ab_cdc_deleted_at: null found, string expected]
2024-09-09 12:44:26 platform > readFromSource: done. (source.isFinished:true, fromSource.isClosed:false)
2024-09-09 12:44:26 platform > thread status... heartbeat thread: false , replication thread: true
2024-09-09 12:44:26 platform > processMessage: done. (fromSource.isDone:true, forDest.isClosed:false)
2024-09-09 12:44:26 platform > writeToDestination: done. (forDest.isDone:true, isDestRunning:true)
2024-09-09 12:44:26 platform > thread status... timeout thread: false , replication thread: true
2024-09-09 12:44:26 destination > INFO main i.a.c.i.b.IntegrationRunner$Companion(consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core):401 Finished buffered read of input stream
2024-09-09 12:44:26 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):193 Closing flush workers -- waiting for all buffers to flush
2024-09-09 12:44:26 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):226 REMAINING_BUFFERS_INFO
  Namespace: main Stream: generated_column_test -- remaining records: 1
2024-09-09 12:44:26 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):227 Waiting for all streams to flush.
2024-09-09 12:44:27 destination > INFO pool-9-thread-1 i.a.c.i.d.a.DetectStreamToFlush(getNextStreamToFlush):109 flushing: trigger info: main - generated_column_test, time trigger: false , size trigger: true current threshold b: 0 bytes, queue size b: 356 bytes, penalty b: 0 bytes, after penalty b: 356 bytes
2024-09-09 12:44:27 destination > INFO pool-8-thread-1 i.a.c.i.d.a.FlushWorkers(flush$lambda$6):135 Flush Worker (4631d) -- Worker picked up work.
2024-09-09 12:44:27 destination > INFO pool-8-thread-1 i.a.c.i.d.a.FlushWorkers(flush$lambda$6):141 Flush Worker (4631d) -- Attempting to read from queue namespace: main, stream: generated_column_test.
2024-09-09 12:44:27 destination > INFO pool-8-thread-1 i.a.c.i.d.a.GlobalMemoryManager(free):78 Freeing 10485404 bytes..
2024-09-09 12:44:27 destination > INFO pool-8-thread-1 i.a.c.i.d.a.FlushWorkers(flush$lambda$6):159 Flush Worker (4631d) -- Batch contains: 1 records, 356 bytes bytes.
2024-09-09 12:44:27 destination > INFO pool-8-thread-1 i.a.c.i.d.r.BaseSerializedBuffer(flush):166 Finished writing data to 3a25332b-ab9d-471b-83c7-63cf82de2e6c5519089624039111987.csv.gz (200 bytes)
2024-09-09 12:44:27 destination > INFO pool-8-thread-1 i.a.c.i.d.s.o.StagingStreamOperations(writeRecordsImpl):54 Buffer flush complete for stream generated_column_test (200 bytes) to staging
2024-09-09 12:44:27 destination > INFO pool-8-thread-1 i.a.i.d.s.o.SnowflakeStagingClient(uploadRecordsToBucket):84 executing query 17e89965-3577-425c-9dd9-4dfb5dd8e273, PUT file:///tmp/3a25332b-ab9d-471b-83c7-63cf82de2e6c5519089624039111987.csv.gz @"airbyte_internal"."main_raw__stream_generated_column_test"/2024/09/09/12/B5F6F50F-CBE0-4814-8344-FF3F53529E56/ PARALLEL = 1;
2024-09-09 12:44:27 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):230 Closing flush workers -- all buffers flushed
2024-09-09 12:44:27 destination > INFO main i.a.c.i.d.a.s.GlobalAsyncStateManager(flushStates):153 Flushing states
2024-09-09 12:44:27 destination > INFO main i.a.c.i.d.a.s.GlobalAsyncStateManager(flushStates):207 Flushing states complete
2024-09-09 12:44:27 destination > INFO main i.a.c.i.d.a.GlobalMemoryManager(free):78 Freeing 0 bytes..
2024-09-09 12:44:27 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):238 Closing flush workers -- supervisor shut down
2024-09-09 12:44:27 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):240 Closing flush workers -- Starting worker pool shutdown..
2024-09-09 12:44:28 destination > INFO pool-8-thread-1 i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:28 destination > INFO pool-8-thread-1 i.a.i.d.s.o.SnowflakeStagingClient(uploadRecordsToBucket):94 query 17e89965-3577-425c-9dd9-4dfb5dd8e273, completed with [{"source":"3a25332b-ab9d-471b-83c7-63cf82de2e6c5519089624039111987.csv.gz","target":"3a25332b-ab9d-471b-83c7-63cf82de2e6c5519089624039111987.csv.gz","source_size":200,"target_size":200,"source_compression":"GZIP","target_compression":"GZIP","status":"UPLOADED","encryption":"","message":""}]
2024-09-09 12:44:28 destination > INFO pool-8-thread-1 i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:28 destination > INFO pool-8-thread-1 i.a.i.d.s.o.SnowflakeStagingClient(uploadRecordsToStage):70 Successfully loaded records to stage 2024/09/09/12/B5F6F50F-CBE0-4814-8344-FF3F53529E56/ with 0 re-attempt(s)
2024-09-09 12:44:28 destination > INFO pool-8-thread-1 i.a.i.d.s.o.SnowflakeStagingClient(copyIntoTableFromStage):178 query b2d5220b-0d33-4c23-a96d-856d36048a38, COPY INTO "airbyte_internal"."main_raw__stream_generated_column_test" FROM '@"airbyte_internal"."main_raw__stream_generated_column_test"/2024/09/09/12/B5F6F50F-CBE0-4814-8344-FF3F53529E56/'
file_format = (
   type = csv
   compression = auto
   field_delimiter = ','
   skip_header = 0
   FIELD_OPTIONALLY_ENCLOSED_BY = '"'
   NULL_IF=('')
) files = ('3a25332b-ab9d-471b-83c7-63cf82de2e6c5519089624039111987.csv.gz');
2024-09-09 12:44:29 destination > INFO pool-8-thread-1 i.a.c.d.j.DefaultJdbcDatabase(unsafeQuery$lambda$6):126 closing connection
2024-09-09 12:44:29 destination > INFO pool-8-thread-1 i.a.i.d.s.o.SnowflakeStagingClient(copyIntoTableFromStage):187 query b2d5220b-0d33-4c23-a96d-856d36048a38, successfully loaded 1 rows of data into table
2024-09-09 12:44:29 destination > INFO pool-8-thread-1 i.a.c.i.d.r.FileBuffer(deleteFile):75 Deleting tempFile data 3a25332b-ab9d-471b-83c7-63cf82de2e6c5519089624039111987.csv.gz
2024-09-09 12:44:29 destination > INFO pool-8-thread-1 i.a.c.i.d.a.s.GlobalAsyncStateManager(flushStates):153 Flushing states
2024-09-09 12:44:29 platform > Destination complete for GLOBAL
2024-09-09 12:44:29 platform > Stream Status Update Received: main:generated_column_test - COMPLETE
2024-09-09 12:44:29 platform > Updating status: main:generated_column_test - COMPLETE
2024-09-09 12:44:29 destination > INFO pool-8-thread-1 i.a.c.i.d.a.s.GlobalAsyncStateManager(flushStates):207 Flushing states complete
2024-09-09 12:44:29 destination > INFO pool-8-thread-1 i.a.c.i.d.a.GlobalMemoryManager(free):78 Freeing 1696 bytes..
2024-09-09 12:44:29 destination > INFO pool-8-thread-1 i.a.c.i.d.a.GlobalMemoryManager(free):78 Freeing 356 bytes..
2024-09-09 12:44:29 destination > INFO pool-8-thread-1 i.a.c.i.d.a.FlushWorkers(flush$lambda$6):170 Flush Worker (4631d) -- Worker finished flushing. Current queue size: 0
2024-09-09 12:44:29 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):245 Closing flush workers  -- workers shut down
2024-09-09 12:44:29 destination > INFO main i.a.c.i.d.a.b.BufferManager(close):73 Buffers cleared..
2024-09-09 12:44:29 destination > INFO sync-operations-4 i.a.i.d.s.o.SnowflakeStorageOperation(cleanupStage):141 Cleaning up stage "airbyte_internal"."main_raw__stream_generated_column_test"
2024-09-09 12:44:29 destination > INFO sync-operations-4 i.a.c.d.j.JdbcDatabase(execute$lambda$0):50 executing statement: DROP STAGE IF EXISTS "airbyte_internal"."main_raw__stream_generated_column_test";
2024-09-09 12:44:29 destination > INFO sync-operations-4 i.a.c.d.j.JdbcDatabase(execute$lambda$0):54 statement successfully executed
2024-09-09 12:44:29 destination > INFO sync-operations-4 i.a.i.b.d.o.AbstractStreamOperation(finalizeTable):357 Not overwriting raw table for main.generated_column_test. Truncate sync: false; stream success: true; raw table suffix: ""
2024-09-09 12:44:29 destination > INFO sync-operations-4 i.a.i.b.d.t.TyperDeduperUtil(executeTypeAndDedupe):212 Attempting typing and deduping for main.generated_column_test with suffix 
2024-09-09 12:44:29 destination > INFO sync-operations-4 i.a.i.d.s.t.SnowflakeDestinationHandler(execute):252 Executing sql 5591705c-a260-45e9-bfbd-2d0aaeff9723-169a232c-ab26-42c1-84ed-1b5d1b1f110a: BEGIN TRANSACTION;
INSERT INTO "MAIN"."GENERATED_COLUMN_TEST"(
  "ID", 
  "_AB_CDC_LSN", 
  "DAY_INSERTED", 
  "_AB_CDC_DELETED_AT", 
  "_AB_CDC_UPDATED_AT", 
  "TIMESTAMP_INSERTED", 
  _airbyte_raw_id, 
  _airbyte_extracted_at, 
  _airbyte_generation_id,
  "_AIRBYTE_META"
)
WITH intermediate_data AS (
  SELECT
    CAST(("_airbyte_data":"id")::text as NUMBER) as "ID", 
    CAST(("_airbyte_data":"_ab_cdc_lsn")::text as FLOAT) as "_AB_CDC_LSN", 
    CAST(("_airbyte_data":"day_inserted")::text as DATE) as "DAY_INSERTED", 
    (("_airbyte_data":"_ab_cdc_deleted_at")::text) as "_AB_CDC_DELETED_AT", 
    (("_airbyte_data":"_ab_cdc_updated_at")::text) as "_AB_CDC_UPDATED_AT", 
    CAST(("_airbyte_data":"timestamp_inserted")::text as TIMESTAMP_NTZ) as "TIMESTAMP_INSERTED", 
    "_airbyte_raw_id", 
    TIMESTAMPADD(
      HOUR, 
      EXTRACT(timezone_hour from "_airbyte_extracted_at"), 
      TIMESTAMPADD(
        MINUTE,
        EXTRACT(timezone_minute from "_airbyte_extracted_at"),
        CONVERT_TIMEZONE('UTC', "_airbyte_extracted_at")
      )
    ) as "_airbyte_extracted_at", 
    "_airbyte_meta", 
    "_airbyte_generation_id", 
    ARRAY_COMPACT(
      ARRAY_CAT(
        CASE WHEN "_airbyte_meta":"changes" IS NOT NULL 
          THEN "_airbyte_meta":"changes" 
          ELSE ARRAY_CONSTRUCT()
        END,
        ARRAY_CONSTRUCT(
          CASE
            WHEN (TYPEOF("_airbyte_data":"id") NOT IN ('NULL', 'NULL_VALUE'))
              AND (CAST(("_airbyte_data":"id")::text as NUMBER) IS NULL)
              THEN OBJECT_CONSTRUCT('field', 'id', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END, 
          CASE
            WHEN (TYPEOF("_airbyte_data":"_ab_cdc_lsn") NOT IN ('NULL', 'NULL_VALUE'))
              AND (CAST(("_airbyte_data":"_ab_cdc_lsn")::text as FLOAT) IS NULL)
              THEN OBJECT_CONSTRUCT('field', '_ab_cdc_lsn', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END, 
          CASE
            WHEN (TYPEOF("_airbyte_data":"day_inserted") NOT IN ('NULL', 'NULL_VALUE'))
              AND (CAST(("_airbyte_data":"day_inserted")::text as DATE) IS NULL)
              THEN OBJECT_CONSTRUCT('field', 'day_inserted', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END, 
          CASE
            WHEN (TYPEOF("_airbyte_data":"_ab_cdc_deleted_at") NOT IN ('NULL', 'NULL_VALUE'))
              AND ((("_airbyte_data":"_ab_cdc_deleted_at")::text) IS NULL)
              THEN OBJECT_CONSTRUCT('field', '_ab_cdc_deleted_at', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END, 
          CASE
            WHEN (TYPEOF("_airbyte_data":"_ab_cdc_updated_at") NOT IN ('NULL', 'NULL_VALUE'))
              AND ((("_airbyte_data":"_ab_cdc_updated_at")::text) IS NULL)
              THEN OBJECT_CONSTRUCT('field', '_ab_cdc_updated_at', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END, 
          CASE
            WHEN (TYPEOF("_airbyte_data":"timestamp_inserted") NOT IN ('NULL', 'NULL_VALUE'))
              AND (CAST(("_airbyte_data":"timestamp_inserted")::text as TIMESTAMP_NTZ) IS NULL)
              THEN OBJECT_CONSTRUCT('field', 'timestamp_inserted', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END
        )
      )
    ) as "_airbyte_cast_errors"
  FROM 
    "airbyte_internal"."main_raw__stream_generated_column_test"
  WHERE 
    ("_airbyte_loaded_at" IS NULL OR ("_airbyte_loaded_at" IS NOT NULL AND TYPEOF("_airbyte_data":"_ab_cdc_deleted_at") NOT IN ('NULL', 'NULL_VALUE'))
)  AND TIMESTAMPADD(
  HOUR, 
  EXTRACT(timezone_hour from "_airbyte_extracted_at"), 
  TIMESTAMPADD(
    MINUTE,
    EXTRACT(timezone_minute from "_airbyte_extracted_at"),
    CONVERT_TIMEZONE('UTC', "_airbyte_extracted_at")
  )
) > '2024-09-09T12:42:16.037Z'
), new_records AS (
  SELECT
    "ID", 
    "_AB_CDC_LSN", 
    "DAY_INSERTED", 
    "_AB_CDC_DELETED_AT", 
    "_AB_CDC_UPDATED_AT", 
    "TIMESTAMP_INSERTED", 
    "_airbyte_raw_id" as "_AIRBYTE_RAW_ID", 
    "_airbyte_extracted_at" as "_AIRBYTE_EXTRACTED_AT", 
    "_airbyte_generation_id" as "_AIRBYTE_GENERATION_ID", 
    CASE WHEN "_airbyte_meta" IS NOT NULL 
      THEN OBJECT_INSERT("_airbyte_meta", 'changes', "_airbyte_cast_errors", true) 
      ELSE OBJECT_CONSTRUCT('changes', "_airbyte_cast_errors") 
    END AS "_AIRBYTE_META",
    row_number() OVER (
      PARTITION BY "ID" ORDER BY "_AB_CDC_LSN" DESC NULLS LAST, "_AIRBYTE_EXTRACTED_AT" DESC
    ) AS row_number
  FROM intermediate_data
)
SELECT 
  "ID", 
  "_AB_CDC_LSN", 
  "DAY_INSERTED", 
  "_AB_CDC_DELETED_AT", 
  "_AB_CDC_UPDATED_AT", 
  "TIMESTAMP_INSERTED", 
  _airbyte_raw_id, 
  _airbyte_extracted_at, 
  _airbyte_generation_id, 
  "_AIRBYTE_META"
FROM 
  new_records
WHERE row_number = 1;

DELETE FROM 
  "MAIN"."GENERATED_COLUMN_TEST"
WHERE 
  "_AIRBYTE_RAW_ID" IN (
    SELECT "_AIRBYTE_RAW_ID" FROM (
      SELECT 
        "_AIRBYTE_RAW_ID", 
        row_number() OVER (PARTITION BY "ID" ORDER BY "_AB_CDC_LSN" DESC NULLS LAST, TIMESTAMPADD(
  HOUR, 
  EXTRACT(timezone_hour from "_AIRBYTE_EXTRACTED_AT"), 
  TIMESTAMPADD(
    MINUTE,
    EXTRACT(timezone_minute from "_AIRBYTE_EXTRACTED_AT"),
    CONVERT_TIMEZONE('UTC', "_AIRBYTE_EXTRACTED_AT")
  )
) DESC) 
        as row_number 
      FROM 
        "MAIN"."GENERATED_COLUMN_TEST"
    )
    WHERE row_number != 1
  );

 DELETE FROM "MAIN"."GENERATED_COLUMN_TEST" 
 WHERE _AB_CDC_DELETED_AT IS NOT NULL;
UPDATE "airbyte_internal"."main_raw__stream_generated_column_test" 
SET "_airbyte_loaded_at" = CURRENT_TIMESTAMP() 
WHERE "_airbyte_loaded_at" IS NULL;
COMMIT;

2024-09-09 12:44:29 destination > INFO sync-operations-4 i.a.c.d.j.JdbcDatabase(execute$lambda$0):50 executing statement: BEGIN TRANSACTION;
INSERT INTO "MAIN"."GENERATED_COLUMN_TEST"(
  "ID", 
  "_AB_CDC_LSN", 
  "DAY_INSERTED", 
  "_AB_CDC_DELETED_AT", 
  "_AB_CDC_UPDATED_AT", 
  "TIMESTAMP_INSERTED", 
  _airbyte_raw_id, 
  _airbyte_extracted_at, 
  _airbyte_generation_id,
  "_AIRBYTE_META"
)
WITH intermediate_data AS (
  SELECT
    CAST(("_airbyte_data":"id")::text as NUMBER) as "ID", 
    CAST(("_airbyte_data":"_ab_cdc_lsn")::text as FLOAT) as "_AB_CDC_LSN", 
    CAST(("_airbyte_data":"day_inserted")::text as DATE) as "DAY_INSERTED", 
    (("_airbyte_data":"_ab_cdc_deleted_at")::text) as "_AB_CDC_DELETED_AT", 
    (("_airbyte_data":"_ab_cdc_updated_at")::text) as "_AB_CDC_UPDATED_AT", 
    CAST(("_airbyte_data":"timestamp_inserted")::text as TIMESTAMP_NTZ) as "TIMESTAMP_INSERTED", 
    "_airbyte_raw_id", 
    TIMESTAMPADD(
      HOUR, 
      EXTRACT(timezone_hour from "_airbyte_extracted_at"), 
      TIMESTAMPADD(
        MINUTE,
        EXTRACT(timezone_minute from "_airbyte_extracted_at"),
        CONVERT_TIMEZONE('UTC', "_airbyte_extracted_at")
      )
    ) as "_airbyte_extracted_at", 
    "_airbyte_meta", 
    "_airbyte_generation_id", 
    ARRAY_COMPACT(
      ARRAY_CAT(
        CASE WHEN "_airbyte_meta":"changes" IS NOT NULL 
          THEN "_airbyte_meta":"changes" 
          ELSE ARRAY_CONSTRUCT()
        END,
        ARRAY_CONSTRUCT(
          CASE
            WHEN (TYPEOF("_airbyte_data":"id") NOT IN ('NULL', 'NULL_VALUE'))
              AND (CAST(("_airbyte_data":"id")::text as NUMBER) IS NULL)
              THEN OBJECT_CONSTRUCT('field', 'id', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END, 
          CASE
            WHEN (TYPEOF("_airbyte_data":"_ab_cdc_lsn") NOT IN ('NULL', 'NULL_VALUE'))
              AND (CAST(("_airbyte_data":"_ab_cdc_lsn")::text as FLOAT) IS NULL)
              THEN OBJECT_CONSTRUCT('field', '_ab_cdc_lsn', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END, 
          CASE
            WHEN (TYPEOF("_airbyte_data":"day_inserted") NOT IN ('NULL', 'NULL_VALUE'))
              AND (CAST(("_airbyte_data":"day_inserted")::text as DATE) IS NULL)
              THEN OBJECT_CONSTRUCT('field', 'day_inserted', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END, 
          CASE
            WHEN (TYPEOF("_airbyte_data":"_ab_cdc_deleted_at") NOT IN ('NULL', 'NULL_VALUE'))
              AND ((("_airbyte_data":"_ab_cdc_deleted_at")::text) IS NULL)
              THEN OBJECT_CONSTRUCT('field', '_ab_cdc_deleted_at', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END, 
          CASE
            WHEN (TYPEOF("_airbyte_data":"_ab_cdc_updated_at") NOT IN ('NULL', 'NULL_VALUE'))
              AND ((("_airbyte_data":"_ab_cdc_updated_at")::text) IS NULL)
              THEN OBJECT_CONSTRUCT('field', '_ab_cdc_updated_at', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END, 
          CASE
            WHEN (TYPEOF("_airbyte_data":"timestamp_inserted") NOT IN ('NULL', 'NULL_VALUE'))
              AND (CAST(("_airbyte_data":"timestamp_inserted")::text as TIMESTAMP_NTZ) IS NULL)
              THEN OBJECT_CONSTRUCT('field', 'timestamp_inserted', 'change', 'NULLED', 'reason', 'DESTINATION_TYPECAST_ERROR')
            ELSE NULL
          END
        )
      )
    ) as "_airbyte_cast_errors"
  FROM 
    "airbyte_internal"."main_raw__stream_generated_column_test"
  WHERE 
    ("_airbyte_loaded_at" IS NULL OR ("_airbyte_loaded_at" IS NOT NULL AND TYPEOF("_airbyte_data":"_ab_cdc_deleted_at") NOT IN ('NULL', 'NULL_VALUE'))
)  AND TIMESTAMPADD(
  HOUR, 
  EXTRACT(timezone_hour from "_airbyte_extracted_at"), 
  TIMESTAMPADD(
    MINUTE,
    EXTRACT(timezone_minute from "_airbyte_extracted_at"),
    CONVERT_TIMEZONE('UTC', "_airbyte_extracted_at")
  )
) > '2024-09-09T12:42:16.037Z'
), new_records AS (
  SELECT
    "ID", 
    "_AB_CDC_LSN", 
    "DAY_INSERTED", 
    "_AB_CDC_DELETED_AT", 
    "_AB_CDC_UPDATED_AT", 
    "TIMESTAMP_INSERTED", 
    "_airbyte_raw_id" as "_AIRBYTE_RAW_ID", 
    "_airbyte_extracted_at" as "_AIRBYTE_EXTRACTED_AT", 
    "_airbyte_generation_id" as "_AIRBYTE_GENERATION_ID", 
    CASE WHEN "_airbyte_meta" IS NOT NULL 
      THEN OBJECT_INSERT("_airbyte_meta", 'changes', "_airbyte_cast_errors", true) 
      ELSE OBJECT_CONSTRUCT('changes', "_airbyte_cast_errors") 
    END AS "_AIRBYTE_META",
    row_number() OVER (
      PARTITION BY "ID" ORDER BY "_AB_CDC_LSN" DESC NULLS LAST, "_AIRBYTE_EXTRACTED_AT" DESC
    ) AS row_number
  FROM intermediate_data
)
SELECT 
  "ID", 
  "_AB_CDC_LSN", 
  "DAY_INSERTED", 
  "_AB_CDC_DELETED_AT", 
  "_AB_CDC_UPDATED_AT", 
  "TIMESTAMP_INSERTED", 
  _airbyte_raw_id, 
  _airbyte_extracted_at, 
  _airbyte_generation_id, 
  "_AIRBYTE_META"
FROM 
  new_records
WHERE row_number = 1;

DELETE FROM 
  "MAIN"."GENERATED_COLUMN_TEST"
WHERE 
  "_AIRBYTE_RAW_ID" IN (
    SELECT "_AIRBYTE_RAW_ID" FROM (
      SELECT 
        "_AIRBYTE_RAW_ID", 
        row_number() OVER (PARTITION BY "ID" ORDER BY "_AB_CDC_LSN" DESC NULLS LAST, TIMESTAMPADD(
  HOUR, 
  EXTRACT(timezone_hour from "_AIRBYTE_EXTRACTED_AT"), 
  TIMESTAMPADD(
    MINUTE,
    EXTRACT(timezone_minute from "_AIRBYTE_EXTRACTED_AT"),
    CONVERT_TIMEZONE('UTC', "_AIRBYTE_EXTRACTED_AT")
  )
) DESC) 
        as row_number 
      FROM 
        "MAIN"."GENERATED_COLUMN_TEST"
    )
    WHERE row_number != 1
  );

 DELETE FROM "MAIN"."GENERATED_COLUMN_TEST" 
 WHERE _AB_CDC_DELETED_AT IS NOT NULL;
UPDATE "airbyte_internal"."main_raw__stream_generated_column_test" 
SET "_airbyte_loaded_at" = CURRENT_TIMESTAMP() 
WHERE "_airbyte_loaded_at" IS NULL;
COMMIT;

2024-09-09 12:44:31 destination > INFO sync-operations-4 i.a.c.d.j.JdbcDatabase(execute$lambda$0):54 statement successfully executed
2024-09-09 12:44:31 destination > INFO sync-operations-4 i.a.i.d.s.t.SnowflakeDestinationHandler(execute):274 Sql 5591705c-a260-45e9-bfbd-2d0aaeff9723-169a232c-ab26-42c1-84ed-1b5d1b1f110a completed in 1810 ms
2024-09-09 12:44:31 destination > INFO sync-operations-4 i.a.i.b.d.o.AbstractStreamOperation(finalizeTable):418 Not overwriting final table for main.generated_column_test. Truncate sync: false; stream success: true; final table suffix not blank: false
2024-09-09 12:44:31 destination > INFO main i.a.i.b.d.o.DefaultSyncOperation(finalizeStreams):150 Cleaning up sync operation thread pools
2024-09-09 12:44:31 destination > INFO main i.a.c.i.d.a.AsyncStreamConsumer(close):200 class io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer closed
2024-09-09 12:44:31 destination > WARN main i.a.c.i.b.IntegrationRunner$Companion(stopOrphanedThreads):442                  The main thread is exiting while children non-daemon threads from a connector are still active.
                 Ideally, this situation should not happen...
                 Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead.
                 The main thread is: main (RUNNABLE)
Thread stacktrace: java.base/java.lang.Thread.getStackTrace(Thread.java:2450)
       at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.dumpThread(IntegrationRunner.kt:500)
       at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.access$dumpThread(IntegrationRunner.kt:327)
       at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$stopOrphanedThreads$1.invoke(IntegrationRunner.kt:447)
       at io.github.oshai.kotlinlogging.internal.MessageInvokerKt.toStringSafe(MessageInvoker.kt:5)
       at io.github.oshai.kotlinlogging.slf4j.internal.LocationAwareKLogger$warn$1.invoke(LocationAwareKLogger.kt:191)
       at io.github.oshai.kotlinlogging.slf4j.internal.LocationAwareKLogger$warn$1.invoke(LocationAwareKLogger.kt:191)
       at io.github.oshai.kotlinlogging.slf4j.internal.LocationAwareKLogger.at(LocationAwareKLogger.kt:43)
       at io.github.oshai.kotlinlogging.slf4j.internal.LocationAwareKLogger.warn(LocationAwareKLogger.kt:191)
       at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.stopOrphanedThreads(IntegrationRunner.kt:442)
       at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.stopOrphanedThreads$default(IntegrationRunner.kt:431)
       at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:217)
       at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)
       at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113)
       at io.airbyte.cdk.integrations.base.adaptive.AdaptiveDestinationRunner$Runner.run(AdaptiveDestinationRunner.kt:68)
       at io.airbyte.integrations.destination.snowflake.SnowflakeDestinationKt.main(SnowflakeDestination.kt:350)
2024-09-09 12:44:31 destination > WARN main i.a.c.i.b.IntegrationRunner$Companion(stopOrphanedThreads):465 Active non-daemon thread: sync-operations-4 (TERMINATED)
 Thread stacktrace: 
creationStack=null
2024-09-09 12:44:31 destination > INFO main i.a.c.i.b.IntegrationRunner(runInternal):224 Completed integration: io.airbyte.integrations.destination.snowflake.SnowflakeDestination
2024-09-09 12:44:31 destination > INFO main i.a.c.i.b.a.AdaptiveDestinationRunner$Runner(run):69 Completed destination: io.airbyte.integrations.destination.snowflake.SnowflakeDestination
2024-09-09 12:44:32 platform > readFromDestination: done. (writeToDestFailed:false, dest.isFinished:true)
2024-09-09 12:44:32 platform > thread status... timeout thread: false , replication thread: true
2024-09-09 12:44:32 platform > Closing StateCheckSumCountEventHandler
2024-09-09 12:44:32 platform > No checksum errors were reported in the entire sync.
2024-09-09 12:44:32 platform > sync summary: {
  "status" : "completed",
  "recordsSynced" : 1,
  "bytesSynced" : 161,
  "startTime" : 1725885848222,
  "endTime" : 1725885872702,
  "totalStats" : {
    "bytesCommitted" : 161,
    "bytesEmitted" : 161,
    "destinationStateMessagesEmitted" : 4,
    "destinationWriteEndTime" : 1725885872687,
    "destinationWriteStartTime" : 1725885848225,
    "meanSecondsBeforeSourceStateMessageEmitted" : 0,
    "maxSecondsBeforeSourceStateMessageEmitted" : 1,
    "maxSecondsBetweenStateMessageEmittedandCommitted" : 2,
    "meanSecondsBetweenStateMessageEmittedandCommitted" : 2,
    "recordsEmitted" : 1,
    "recordsCommitted" : 1,
    "replicationEndTime" : 1725885872701,
    "replicationStartTime" : 1725885848222,
    "sourceReadEndTime" : 1725885866606,
    "sourceReadStartTime" : 1725885848225,
    "sourceStateMessagesEmitted" : 4
  },
  "streamStats" : [ {
    "streamName" : "articles",
    "streamNamespace" : "main",
    "stats" : {
      "bytesCommitted" : 0,
      "bytesEmitted" : 0,
      "recordsEmitted" : 0,
      "recordsCommitted" : 0
    }
  }, {
    "streamName" : "generated_column_test",
    "streamNamespace" : "main",
    "stats" : {
      "bytesCommitted" : 161,
      "bytesEmitted" : 161,
      "recordsEmitted" : 1,
      "recordsCommitted" : 1
    }
  }, {
    "streamName" : "article_contents",
    "streamNamespace" : "main",
    "stats" : {
      "bytesCommitted" : 0,
      "bytesEmitted" : 0,
      "recordsEmitted" : 0,
      "recordsCommitted" : 0
    }
  }, {
    "streamName" : "additional_consent_partners",
    "streamNamespace" : "main",
    "stats" : {
      "bytesCommitted" : 0,
      "bytesEmitted" : 0,
      "recordsEmitted" : 0,
      "recordsCommitted" : 0
    }
  } ],
  "performanceMetrics" : {
    "processFromSource" : {
      "elapsedTimeInNanos" : 10670643,
      "executionCount" : 5,
      "avgExecTimeInNanos" : 2134128.6
    },
    "readFromSource" : {
      "elapsedTimeInNanos" : 18324454726,
      "executionCount" : 418,
      "avgExecTimeInNanos" : 4.38384084354067E7
    },
    "processFromDest" : {
      "elapsedTimeInNanos" : 4191472,
      "executionCount" : 1,
      "avgExecTimeInNanos" : 4191472.0
    },
    "writeToDest" : {
      "elapsedTimeInNanos" : 248484,
      "executionCount" : 2,
      "avgExecTimeInNanos" : 124242.0
    },
    "readFromDest" : {
      "elapsedTimeInNanos" : 24373355412,
      "executionCount" : 3,
      "avgExecTimeInNanos" : 8.124451804E9
    }
  }
}
2024-09-09 12:44:32 platform > failures: [ ]
2024-09-09 12:44:32 platform > 
2024-09-09 12:44:32 platform > ----- END REPLICATION -----
2024-09-09 12:44:32 platform >

Contribute

tanderson-hp commented 1 month ago

Our organization is experiencing this issue and we're not sure how to work around it. We've tried turning on full replica identity for the source table with the generated column.

It's unfortunate because the only reason we're using a generated column is to get around another airbyte postgres issue where it can't sync interval columns to Snowflake destinations.

acsbendi commented 1 month ago

@tanderson-hp Our workaround is to define a view on top of the synced table, and in this view, fill the NULL column values using the same logic as in the source table.

tanderson-hp commented 1 month ago

Thanks @acsbendi, so do you have a separate connection/stream to handle the view sync? My understanding is CDC replication can only be based off tables.

acsbendi commented 1 month ago

The view is created in the destination (Snowflake in our case) and it reads the latest data from the synced table so you don't need Airbyte for that.