Open ViciousJ opened 3 months ago
I upgraded again to postgres source v3.6.10 and destination v2.2.1 but the problem still persists.
Having the same issue but with
I've narrowed the problem down to the source connector. I downgraded back to source-postgres 3.3.33 and full refresh/overwrite works great. But upgrading to 4.0.0 or above breaks every other sync as described above.
Hi there, what is the version of the Airbyte platform you are using? We require a minimum v0.58.0. If you are not on that version or later, the state will not be reset, and it will cause 0 records in the ensuing sync.
Having the same issue as well - I am on Airbyte version 0.63.5, Postgres source 3.4.24 and Postgres destination 2.0.15.
On some syncs there ends up being 0 rows in the destination, but in this example 12 rows in the source.
Relevant logs:
2024-07-30 15:04:56 destination > INFO type-and-dedupe i.a.i.b.d.t.BaseDestinationV1V2Migrator(migrateIfNecessary):20 Assessing whether migration is necessary for stream itemAttributes
2024-07-30 15:04:56 destination > INFO type-and-dedupe i.a.i.b.d.t.BaseDestinationV1V2Migrator(shouldMigrate):44 Checking whether v1 raw table _airbyte_raw_itemattributes in dataset src_bc exists
2024-07-30 15:04:56 destination > INFO type-and-dedupe i.a.c.i.d.j.t.JdbcDestinationHandler$Companion(findExistingTable$lambda$1):502 Retrieving existing columns for dest_db.airbyte_internal.src_bc_raw__stream_itemattributes
2024-07-30 15:04:56 destination > INFO type-and-dedupe i.a.c.i.d.j.t.JdbcDestinationHandler$Companion(findExistingTable$lambda$1):502 Retrieving existing columns for dest_db.src_bc._airbyte_raw_itemattributes
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresSource(estimateFullRefreshSyncSize):971 Estimate for table: "src_bc"."itemAttributes" : {sync_row_count: 12, sync_bytes: 8192, total_table_row_count: 12, total_table_bytes: 8192}
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "src_bc"."itemAttributes" is 40073
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresSource(supportResumableFullRefresh):885 stream io.airbyte.protocol.models.v0.ConfiguredAirbyteStream@2a037324[stream=io.airbyte.protocol.models.v0.AirbyteStream@69eb86b4[name=itemAttributes,jsonSchema={"type":"object","properties":{"id":{"type":"number","airbyte_type":"integer"},"name":{"type":"string"},"type":{"type":"string"},"runid":{"type":"string"},"blocked":{"type":"boolean"},"systemId":{"type":"string"},"created_at":{"type":"string","format":"date-time","airbyte_type":"timestamp_without_timezone"},"unitOfMeasure":{"type":"string"}},"$schema":"http://json-schema.org/draft-07/schema#"},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=false,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=src_bc,isResumable=true,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[],generationId=<null>,minimumGenerationId=<null>,syncId=<null>,additionalProperties={}] will sync in resumeable full refresh mode.
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "src_bc"."itemAttributes" is 40073
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresQueryUtils(getTableBlockSizeForStream):299 Stream "src_bc"."itemAttributes" relation size is 8192. block size 8192
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.c.PostgresCtidHandler(queryTableCtid):157 Queueing query for table: itemAttributes
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.i.b.d.t.DefaultTyperDeduper(prepareTablesFuture$lambda$4):154 Final Table exists for stream itemAttributes
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresSource(supportResumableFullRefresh):885 stream io.airbyte.protocol.models.v0.ConfiguredAirbyteStream@a619c2[stream=io.airbyte.protocol.models.v0.AirbyteStream@648ee871[name=postedSalesInvoice,jsonSchema={"type":"object","properties":{"id":{"type":"string"},"no":{"type":"string"},"runid":{"type":"string"},"amount":{"type":"number"},"dueDate":{"type":"string","format":"date"},"orderNo":{"type":"string"},"cancelled":{"type":"boolean"},"orderDate":{"type":"string","format":"date"},"orderType":{"type":"string"},"billToCity":{"type":"string"},"billToName":{"type":"string"},"correction":{"type":"boolean"},"corrective":{"type":"boolean"},"created_at":{"type":"string","format":"date-time","airbyte_type":"timestamp_without_timezone"},"sellToCity":{"type":"string"},"shipToCity":{"type":"string"},"shipToName":{"type":"string"},"billToName2":{"type":"string"},"postingDate":{"type":"string","format":"date"},"shipToName2":{"type":"string"},"documentDate":{"type":"string","format":"date"},"locationCode":{"type":"string"},"shipmentDate":{"type":"string","format":"date"},"sellToAddress":{"type":"string"},"shipToAddress":{"type":"string"},"yourReference":{"type":"string"},"billToPostCode":{"type":"string"},"dimensionSetID":{"type":"number","airbyte_type":"integer"},"sellToAddress2":{"type":"string"},"shipToAddress2":{"type":"string"},"shipToPostCode":{"type":"string"},"salespersonCode":{"type":"string"},"billToCustomerNo":{"type":"string"},"sellToCustomerNo":{"type":"string"},"custLedgerEntryNo":{"type":"number","airbyte_type":"integer"},"customerDiscGroup":{"type":"string"},"shippingAgentCode":{"type":"string"},"amountIncludingVAT":{"type":"number"},"customerPriceGroup":{"type":"string"},"externalDocumentNo":{"type":"string"},"genBusPostingGroup":{"type":"string"},"sellToCustomerName":{"type":"string"},"shipmentMethodCode":{"type":"string"},"sellToCustomerName2":{"type":"string"},"customerPostingGroup":{"type":"string"},"billToCountryRegionCode":{"type":"string"},"sellToCountryRegionCode":{"type":"string"},"shipToCountryRegionCode":{"type":"string"}},"$schema":"http://json-schema.org/draft-07/schema#"},supportedSyncModes=[full_refresh, incremental],sourceDefinedCursor=false,defaultCursorField=[],sourceDefinedPrimaryKey=[],namespace=src_bc,isResumable=true,additionalProperties={}],syncMode=full_refresh,cursorField=[],destinationSyncMode=overwrite,primaryKey=[],generationId=<null>,minimumGenerationId=<null>,syncId=<null>,additionalProperties={}] will sync in resumeable full refresh mode.
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.i.d.j.t.JdbcDestinationHandler(execute):163 Executing sql 7727564a-a649-4601-b5f3-c063ea3c240f-25788628-e759-491c-a01f-d32844542de3: [drop table if exists "src_bc"."itemAttributes_airbyte_tmp" cascade;, create table "src_bc"."itemAttributes_airbyte_tmp" ("id" bigint, "name" varchar, "type" varchar, "runid" varchar, "blocked" boolean, "systemId" varchar, "created_at" timestamp, "unitOfMeasure" varchar, "_airbyte_raw_id" varchar(36) not null, "_airbyte_extracted_at" timestamp with time zone not null, "_airbyte_meta" jsonb not null);]
[create index on "src_bc"."itemAttributes_airbyte_tmp"("_airbyte_extracted_at");]
[create index on "src_bc"."itemAttributes_airbyte_tmp"("_airbyte_raw_id");]
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "src_bc"."postedSalesInvoice" is 40115
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):46 executing query within transaction: drop table if exists "src_bc"."itemAttributes_airbyte_tmp" cascade;
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresQueryUtils(getTableBlockSizeForStream):299 Stream "src_bc"."postedSalesInvoice" relation size is 233234432. block size 8192
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):48 done executing query within transaction: drop table if exists "src_bc"."itemAttributes_airbyte_tmp" cascade;
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):46 executing query within transaction: create table "src_bc"."itemAttributes_airbyte_tmp" ("id" bigint, "name" varchar, "type" varchar, "runid" varchar, "blocked" boolean, "systemId" varchar, "created_at" timestamp, "unitOfMeasure" varchar, "_airbyte_raw_id" varchar(36) not null, "_airbyte_extracted_at" timestamp with time zone not null, "_airbyte_meta" jsonb not null);
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):48 done executing query within transaction: create table "src_bc"."itemAttributes_airbyte_tmp" ("id" bigint, "name" varchar, "type" varchar, "runid" varchar, "blocked" boolean, "systemId" varchar, "created_at" timestamp, "unitOfMeasure" varchar, "_airbyte_raw_id" varchar(36) not null, "_airbyte_extracted_at" timestamp with time zone not null, "_airbyte_meta" jsonb not null);
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.i.d.j.t.JdbcDestinationHandler(execute):163 Executing sql 7727564a-a649-4601-b5f3-c063ea3c240f-1a6735d9-6cd2-436f-90bb-ac5a49fb4c38: [drop table if exists "src_bc"."itemAttributes_airbyte_tmp" cascade;, create table "src_bc"."itemAttributes_airbyte_tmp" ("id" bigint, "name" varchar, "type" varchar, "runid" varchar, "blocked" boolean, "systemId" varchar, "created_at" timestamp, "unitOfMeasure" varchar, "_airbyte_raw_id" varchar(36) not null, "_airbyte_extracted_at" timestamp with time zone not null, "_airbyte_meta" jsonb not null);]
[create index on "src_bc"."itemAttributes_airbyte_tmp"("_airbyte_extracted_at");]
[create index on "src_bc"."itemAttributes_airbyte_tmp"("_airbyte_raw_id");]
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):46 executing query within transaction: create index on "src_bc"."itemAttributes_airbyte_tmp"("_airbyte_extracted_at");
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):48 done executing query within transaction: create index on "src_bc"."itemAttributes_airbyte_tmp"("_airbyte_extracted_at");
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.i.d.j.t.JdbcDestinationHandler(execute):163 Executing sql 7727564a-a649-4601-b5f3-c063ea3c240f-e6ac3730-2497-4ddd-9742-4035d5bae4d2: [drop table if exists "src_bc"."itemAttributes_airbyte_tmp" cascade;, create table "src_bc"."itemAttributes_airbyte_tmp" ("id" bigint, "name" varchar, "type" varchar, "runid" varchar, "blocked" boolean, "systemId" varchar, "created_at" timestamp, "unitOfMeasure" varchar, "_airbyte_raw_id" varchar(36) not null, "_airbyte_extracted_at" timestamp with time zone not null, "_airbyte_meta" jsonb not null);]
[create index on "src_bc"."itemAttributes_airbyte_tmp"("_airbyte_extracted_at");]
[create index on "src_bc"."itemAttributes_airbyte_tmp"("_airbyte_raw_id");]
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):46 executing query within transaction: create index on "src_bc"."itemAttributes_airbyte_tmp"("_airbyte_raw_id");
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):48 done executing query within transaction: create index on "src_bc"."itemAttributes_airbyte_tmp"("_airbyte_raw_id");
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):48 done executing query within transaction: create index on "src_bc"."transferOrderLines_airbyte_tmp"("_airbyte_raw_id");
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):48 done executing query within transaction: create index on "src_bc"."itemLedgerEntries_airbyte_tmp"("_airbyte_extracted_at");
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "src_bc"."postedCreditMemoLines" is 40109
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.c.i.d.j.t.JdbcDestinationHandler(execute):175 Sql 7727564a-a649-4601-b5f3-c063ea3c240f-e6ac3730-2497-4ddd-9742-4035d5bae4d2 completed in 9 ms
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresQueryUtils(getTableBlockSizeForStream):299 Stream "src_bc"."postedCreditMemoLines" relation size is 82624512. block size 8192
2024-07-30 15:04:57 destination > INFO type-and-dedupe i.a.i.b.d.t.DefaultTyperDeduper(prepareTablesFuture$lambda$4):171 Using temp final table for stream itemAttributes, will overwrite existing table at end of sync
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "src_bc"."itemAttributes" is 40073
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.c.InitialSyncCtidIterator(computeNext):118 The latest file node 40073 for stream src_bc_itemAttributes is equal to the last file node 40073 known to Airbyte.
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.c.InitialSyncCtidIterator(createCtidQueryStatement):277 Preparing query for table: itemAttributes
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.c.InitialSyncCtidIterator(createCtidQueryStatement):292 Executing query for table itemAttributes: SELECT ctid::text, "systemId","id","name","type","unitOfMeasure","blocked","created_at","runid" FROM "src_bc"."itemAttributes" WHERE ctid > ?::tid with binding (0,36)
2024-07-30 15:04:57 source > INFO main i.a.c.d.j.s.AdaptiveStreamingQueryConfig(initialize):24 Set initial fetch size: 10 rows
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.PostgresQueryUtils(fileNodeForIndividualStream):235 Relation filenode is for stream "src_bc"."itemAttributes" is 40073
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.c.InitialSyncCtidIterator(computeNext):118 The latest file node 40073 for stream src_bc_itemAttributes is equal to the last file node 40073 known to Airbyte.
2024-07-30 15:04:57 source > INFO main i.a.i.s.p.c.CtidStateManager(createFinalStateMessage):115 Finished initial sync of stream src_bc_itemAttributes, Emitting final state, state is io.airbyte.protocol.models.v0.AirbyteStateMessage@66434cc8[type=STREAM,stream=io.airbyte.protocol.models.v0.AirbyteStreamState@42f22995[streamDescriptor=io.airbyte.protocol.models.v0.StreamDescriptor@693e4d19[name=itemAttributes,namespace=src_bc,additionalProperties={}],streamState={"version":2,"state_type":"ctid","ctid":"(0,0)","incremental_state":{},"relation_filenode":40073},additionalProperties={}],global=<null>,data=<null>,sourceStats=<null>,destinationStats=<null>,additionalProperties={}]
2024-07-30 15:04:57 replication-orchestrator > Stream status TRACE received of status: STARTED for stream src_bc:itemAttributes
2024-07-30 15:04:57 replication-orchestrator > Sending update for src_bc:itemAttributes - null -> RUNNING
2024-07-30 15:04:57 replication-orchestrator > Stream Status Update Received: src_bc:itemAttributes - RUNNING
2024-07-30 15:04:57 replication-orchestrator > Creating status: src_bc:itemAttributes - RUNNING
2024-07-30 15:04:57 replication-orchestrator > Stream status TRACE received of status: COMPLETE for stream src_bc:itemAttributes
2024-07-30 15:05:03 replication-orchestrator > Destination complete for src_bc:itemAttributes
2024-07-30 15:05:03 replication-orchestrator > Sending update for src_bc:itemAttributes - RUNNING -> COMPLETE
2024-07-30 15:05:03 replication-orchestrator > Stream Status Update Received: src_bc:itemAttributes - COMPLETE
2024-07-30 15:05:03 replication-orchestrator > Updating status: src_bc:itemAttributes - COMPLETE
2024-07-30 15:08:49 destination > INFO main i.a.i.b.d.t.DefaultTyperDeduper(typeAndDedupe):310 Skipping typing and deduping for stream src_bc.itemAttributes because it had no records during this sync and no unprocessed records from a previous sync.
2024-07-30 15:23:26 destination > INFO type-and-dedupe i.a.i.b.d.t.DefaultTyperDeduper(commitFinalTableTask$lambda$9):369 Overwriting final table with tmp table for stream src_bc.itemAttributes
2024-07-30 15:23:26 destination > INFO type-and-dedupe i.a.c.i.d.j.t.JdbcDestinationHandler(execute):163 Executing sql 633847a1-7e88-43dd-8868-6272017239ca-7b5d68a7-9869-4d27-992b-181ec0b9bc11: [drop table if exists "src_bc"."itemAttributes" cascade;, alter table "src_bc"."itemAttributes_airbyte_tmp" rename to "itemAttributes";]
2024-07-30 15:23:26 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):46 executing query within transaction: drop table if exists "src_bc"."itemAttributes" cascade;
2024-07-30 15:23:26 destination > INFO type-and-dedupe i.a.c.i.d.j.t.JdbcDestinationHandler(execute):175 Sql 7a4e46ef-cd67-43af-8207-1034f6da6618-004f3b4f-1df1-42db-a67b-745606dd0703 completed in 37 ms
2024-07-30 15:23:26 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):46 executing query within transaction: drop table if exists "src_bc"."itemLedgerEntries" cascade;
2024-07-30 15:23:26 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):48 done executing query within transaction: drop table if exists "src_bc"."itemAttributes" cascade;
2024-07-30 15:23:26 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):46 executing query within transaction: alter table "src_bc"."itemAttributes_airbyte_tmp" rename to "itemAttributes";
2024-07-30 15:23:26 destination > INFO type-and-dedupe i.a.c.d.j.JdbcDatabase(executeWithinTransaction$lambda$1):48 done executing query within transaction: alter table "src_bc"."itemAttributes_airbyte_tmp" rename to "itemAttributes";
sync summary: {
"status" : "completed",
"recordsSynced" : 3567926,
"bytesSynced" : 2901866341,
"startTime" : 1722351890118,
"endTime" : 1722353008310,
"totalStats" : {
"bytesCommitted" : 2901866341,
"bytesEmitted" : 2901866341,
"destinationStateMessagesEmitted" : 374,
"destinationWriteEndTime" : 1722353008182,
"destinationWriteStartTime" : 1722351890145,
"meanSecondsBeforeSourceStateMessageEmitted" : 10,
"maxSecondsBeforeSourceStateMessageEmitted" : 234,
"maxSecondsBetweenStateMessageEmittedandCommitted" : 185,
"meanSecondsBetweenStateMessageEmittedandCommitted" : 14,
"recordsEmitted" : 3567926,
"recordsCommitted" : 3567926,
"replicationEndTime" : 1722353008185,
"replicationStartTime" : 1722351890118,
"sourceReadEndTime" : 1722352115974,
"sourceReadStartTime" : 1722351890146,
"sourceStateMessagesEmitted" : 374
},
..............................
{
"streamName" : "itemAttributes",
"streamNamespace" : "src_bc",
"stats" : {
"bytesCommitted" : 0,
"bytesEmitted" : 0,
"recordsEmitted" : 0,
"recordsCommitted" : 0
}
}
..............................
}
@nataliekwong That must be the problem in my case. I'm on Airbyte v0.50.46. Thank you.
Connector Name
source-postgres
Connector Version
3.6.10
What step the error happened?
During the sync
Relevant information
My setup syncs postgres to postgres. After I upgraded both connectors--destination to Destinations V2 v2.0.15 and source to v3.4.24--every other sync pulls 0 Bytes on a Full Refresh | Overwrite, and therefore deletes the prior data from the destination. The source tables definitely have data in them. The only connections I have that are still copying correctly every time are syncing views, not tables.
Relevant log output
Contribute