confluentinc / kafka-connect-bigquery

A Kafka Connect BigQuery sink connector
Apache License 2.0
3 stars 1 forks source link

Fix struct order for schema updates when using upsert/delete mode #368

Open jurgispods opened 11 months ago

jurgispods commented 11 months ago

When using the connector in upsert/delete mode, it can fail under certain circumstances when the schema is updated in such a way that the intermediate table and the destination table have differently ordered nested struct fields.

Example scenario

Schema version 1

Assume the Kafka source topic has the following Avro schema (version 1):

{
   "type":"record",
   "name":"Message",
   "fields":[
      {
         "name":"data",
         "type":{
            "type":"record",
            "name":"Data",
            "fields":[
               {
                  "name":"minAmount",
                  "type":[
                     "null",
                     "int"
                  ],
                  "default":null
               },
               {
                  "name":"name",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               }
            ]
         }
      }
   ]
}

The corresponding Bigquery destination table schema:

[  
  {
    "fields": [
      {
        "name": "minAmount",
        "type": "INTEGER"
      },
      {
        "name": "name",
        "type": "STRING"
      }
    ],
    "name": "data",
    "type": "RECORD"
  }
  // additional fields for Kafka key, Kafka data
]

Schema version 2

Now, the source table schema is updated to version 2:

{
   "type":"record",
   "name":"Message",
   "fields":[
      {
         "name":"data",
         "type":{
            "type":"record",
            "name":"Data",
            "fields":[
               {
                  "name":"minAmount",
                  "type":[
                     "null",
                     "int"
                  ],
                  "default":null
               },
               {
                  "name":"maxAmount",
                  "type":[
                     "null",
                     "int"
                  ],
                  "default":null
               },
               {
                  "name":"name",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               }
            ]
         }
      }
   ]
}

The problem now is that the Bigquery schemas of the intermediate and destination tables will have different orders of nested fields.

Bigquery schema of the intermediate table after creation:

[  
  {
    "fields": [
      {
        "name": "minAmount",
        "type": "INTEGER"
      },
      {
        "name": "maxAmount",
        "type": "INTEGER"
      },
      {
        "name": "name",
        "type": "STRING"
      },
    "name": "data",
    "type": "RECORD"
  }
  // additional fields for Kafka key, Kafka data
]

Updated Bigquery destination table schema - note that the new field maxAmount is appended at the end:

[  
  {
    "fields": [
      {
        "name": "minAmount",
        "type": "INTEGER"
      },
      {
        "name": "name",
        "type": "STRING"
      },
      {
        "name": "maxAmount",
        "type": "INTEGER"
      }
    ],
    "name": "data",
    "type": "RECORD"
  }
  // additional fields for Kafka key, Kafka data
]

The connector will subsequently fail during the periodic merge flush:

Value of type STRUCT<minAmount INT64, maxAmount INT64, name STRING> cannot be assigned to dstTableAlias.data, which has type STRUCT<minAmount INT64, name STRING, maxAmount INT64>

This can be easily seen by looking at the executed MERGE queries.

Comparison of executed MERGE queries

This query will fail due to different orders of nested fields:

BEGIN
    CREATE TEMP TABLE _SESSION.destination AS (        
        select 'foo' as key, struct(1 as minAmount, 'v1' as name, null as maxAmount) as data
    );
    CREATE TEMP TABLE _SESSION.intermediate AS (        
        select 'foo' as key, struct(1 as minAmount, 10 as maxAmount, 'v2' as name) as data
    );

MERGE _SESSION.destination dstTableAlias 
USING _SESSION.intermediate src 
ON dstTableAlias.key=src.key 
WHEN MATCHED THEN UPDATE SET dstTableAlias.data=src.data
WHEN NOT MATCHED THEN INSERT (`key`,`data`) VALUES (src.key, src.data);

select * from _SESSION.destination;

END

In contrast, this query succeeds:

BEGIN
    CREATE TEMP TABLE _SESSION.destination AS (        
        select 'foo' as key, struct(1 as minAmount, 'v1' as name, null as maxAmount) as data
    );
    CREATE TEMP TABLE _SESSION.intermediate_reordered AS (
        select 'foo' as key, struct(1 as minAmount, 'v2' as name, 10 as maxAmount) as data
    );

MERGE _SESSION.destination dstTableAlias 
USING _SESSION.intermediate_reordered src 
ON dstTableAlias.key=src.key 
WHEN MATCHED THEN UPDATE SET dstTableAlias.data=src.data
WHEN NOT MATCHED THEN INSERT (`key`,`data`) VALUES (src.key, src.data);

select * from _SESSION.destination;

END

We can see that for upserts, the order of struct fields matters.

Proposed changes

In this PR, I have added the destination table schema to the list returned by SchemaManager.getSchemasList when it is called for an intermediate table in upsert/merge mode. That way, the intermediate table schema is forced to respect the order of nested fields in the destination table schema - schema updates are simply applied on top of it, ensuring the same field order in both tables when new fields are added.

Please let me know what you think of this approach.

cla-assistant[bot] commented 11 months ago

CLA assistant check
All committers have signed the CLA.

b-goyal commented 11 months ago

Thanks @jurgispods for taking time to write the detailed example and explaining the issue. I will take sometime to go over the changes and get back in 2 weeks time. Meanwhile, would you be able to add an integration test for this change please.

jurgispods commented 11 months ago

@b-goyal Sure, I can add an integration test.

jurgispods commented 11 months ago

@b-goyal I just added an integration test that reproduces the issue (and the fix).

I found out it only shows under certain circumstances, i.e. when a schema update happens after the intermediate table has been deleted. Otherwise, schemas of destination and intermediate tables are always in sync, as they are updated using the same logic.

As far as I can seen, deletion of intermediate table only happens when the connector is stopped. So in order to replicate the error, I had to write an IT test that is quite involved:

  1. Create records + table for record with schema v1
  2. Stop the connector to force deletion of intermediate tables
  3. Start the connector again and feed in records with updated schema v2 -> exception
  4. Apply fix, stop + start connector again -> success

In order to show that the connector indeed fails, I added a config for toggling my fix on or off. That might not be necessary in the final PR, as in reality, it should always be on. We could instead test that with a unit test and remove the added config.

b-goyal commented 10 months ago

Thanks for adding the integration test @jurgispods. Had an initial look but did not follow the root cause and resolution. Will need some more time to review this.

jurgispods commented 8 months ago

Hi @b-goyal, is there an update on this?