confluentinc / kafka-connect-hdfs

Kafka Connect HDFS connector
Other
10 stars 397 forks source link

Data is Hive table shows new columns as NULL after schema evolution with a default value specified when the format is Parquet #508

Open swathimocharla opened 4 years ago

swathimocharla commented 4 years ago

hi there, I have avro data that is being written to HDFS with hive integration turned on with "format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat" and "schema.compatibility": "BACKWARD". Add a new column to the schema with a default value and send some data. Ex. initial schema: [{"name":"a1","type":"string"}] new schema: [{"name":"a1","type":"string"},{"name": "a2", "type": "string", "default": "a2defval"}] There are 2 issues here:

  1. Send in data for both columns a1, a2. Ex. {"a1":"val2","a2":"a2val2"} Hive shows: val2 NULL Restart the connector and then hive shows val2 a2val2
  2. The older data that was pushed in before schema evolution is not updated with the default value for a2 (the new column) Hive still reflects: val1 NULL

Is this expected? I've done the same tests with the format class Avro, and didn't face either of these issues.

parquet hive table description doesn't show the default values, StorageDescriptor(cols:[FieldSchema(name:a1, type:string, comment:null), FieldSchema(name:a2, type:string, comment:null), FieldSchema(name:partition, type:string, comment:)]

but the schema in the parquet file footer in hdfs shows the default value: parquet.avro.schema▒{"type":"record","name":"myrecord","fields":[{"name":"a1","type":"string"},{"name":"a2","type":{"type":"string","connect.default":"a2defval"},"default":"a2defval"}]

swathimocharla commented 4 years ago

@ncliang could you take a look?

swathimocharla commented 4 years ago

@kkonstantine , @rhauch , can you help?

swathimocharla commented 3 years ago

Update: changed the hiveMetaStore.alterTable(table); to hiveMetaStore.alterTable(table, true); in ParquetHiveUtil.java to allow cascade option in the alter table. After this change the new data after schema evolution looks fine in the hive tables.

hive> select * from test;
OK
// schema with a single column "f1"
value1  NULL        NULL    0
value2  NULL        NULL    0
value3  NULL        NULL    0
// schema evolved with "f2" (String, default value "defvalue"), "f3" (int, default value: 9)
val2    a2val2      13      0
val3    a2val3      0       0
val4    a2val4      4       0
// push data with old schema again "f1"
value4  defvalue        9       0
value5  defvalue        9       0
value6  defvalue        9       0
Time taken: 0.156 seconds, Fetched: 9 row(s)

@kkonstantine can you please share your opinion.

swathimocharla commented 3 years ago

@dosvath , any thoughts?

dosvath commented 3 years ago

Hi @swathimocharla thanks for raising the issue, on a first pass it sounds like the handling for parquet format is different from the avro format. hiveMetaStore.alterTable(table); is used in the AvroHiveUtil as well and seems to be working fine, the parquet behavior will need some more investigation on our end, I will be testing the behavior and follow up here.

swathimocharla commented 3 years ago

hi @dosvath , yes, avro works as expected and the issue is only with parquet. Thank you for looking into this issue, will wait to hear back from you.

dosvath commented 3 years ago

Hi @swathimocharla could you share which hadoop and hive version you are using?

swathimocharla commented 3 years ago

hi @dosvath , we are using CDH 6.3.2 Hadoop 3.0.0 Hive 2.1.1

dosvath commented 3 years ago

Hi @swathimocharla this connector is compatible with Hadoop 2.x and Hive 2.x, could you try our HDFS3 connector and see if it resolves your issue? https://docs.confluent.io/kafka-connect-hdfs3-sink/current/index.html

swathimocharla commented 3 years ago

hi @dosvath, we can try this, but we are in production on the current version and this clearly seems to be a bug. It would help finding a resolution on the current version.

swathimocharla commented 3 years ago

@dosvath , what is your opinion on the fix in the hiveMetaStore.alterTable(table); to hiveMetaStore.alterTable(table, true); in ParquetHiveUtil.java to allow cascade option in the alter table.

dosvath commented 3 years ago

Hi @swathimocharla I believe the difference in behavior between the AVRO and Parquet formats is that we apply the avro schema here before altering the table. I think it would be okay to use the workaround you specified for your use-case as it seems the cascade operation is standard when using partitioned tables.