apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] Schema evolution using DataSource and HiveSyncTool hudi 0.15.0 #11803

Open Armelabdelkbir opened 2 months ago

Armelabdelkbir commented 2 months ago

Describe the problem you faced hello i try to test several schema evolution usecases using hudi 0.15 and spark3.5 using hms 4 first test: Adding column in PG --> debezium / schema registry ok --> hudi (MOR) hivesyncTool KO

org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:250)
    at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:193)
    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:167)
    ... 69 more
Caused by: InvalidOperationException(message:The following columns have types incompatible with the existing columns in their respective positions :
username)
  1. type promotion following this doc: https://hudi.apache.org/docs/schema_evolution/#type-promotions in PG , Double to String ok --> debezium / schema registy ok --> hudi (MOR) hivesyncTool KO Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Could not convert field Type from DOUBLE to string for field salary
  2. DROP column in PG --> debezium / schema registy ok --> hudi (MOR) hivesyncTool job not failed but i see always the column but with null values for newest inserts my configuration is :
    "hoodie.datasource.hive_sync.ignore_exceptions" -> "true",
      "hoodie.write.set.null.for.missing.columns" -> "true",
      "hoodie.schema.on.read.enable" -> "true"
      hive.metastore.disallow.incompatible.col.type.changes"-> "false"

    when i drop tables (ro / rt ) and i restart my job is creating new tables correctly, but is not production way to handle schemas evolutions

To Reproduce

Steps to reproduce the behavior:

  1. on the PG source side: cdc_hudi=> ALTER TABLE employees ADD COLUMN test_str VARCHAR ; ALTER TABLE cdc_hudi=> INSERT INTO employees (name, department, username, test_str) VALUES ('armel011', 'Engineering', 'arm23220', 'teststr'); INSERT 0 1 2.debezium / schema registry ok ( latest version contains added columns) image

3.restart spark job

Expected behavior

Case 1. add column Case 2: Rename column Case 3. change datatype double to string Case 4. drop column

Environment Description

Stacktrace


    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:170)
    at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:79)
    ... 68 more
Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to update table for employees_ro
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.updateTableDefinition(HMSDDLExecutor.java:162)
    at org.apache.hudi.hive.HoodieHiveSyncClient.updateTableSchema(HoodieHiveSyncClient.java:205)
    at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:347)
    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:250)
    at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:193)
    at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:167)
    ... 69 more
Caused by: InvalidOperationException(message:The following columns have types incompatible with the existing columns in their respective positions :
test_str)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:59744)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:59730)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result.read(ThriftHiveMetastore.java:59672)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:88)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_table_with_environment_context(ThriftHiveMetastore.java:1693)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_table_with_environment_context(ThriftHiveMetastore.java:1677)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.alter_table_with_environmentContext(HiveMetaStoreClient.java:373)
    at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.alter_table_with_environmentContext(SessionHiveMetaStoreClient.java:322)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173)```
Armelabdelkbir commented 2 months ago

1.For the first i reorder my datasets columns by moving the __hoodie_is_deleted from the end , and it works well

 cdc_hudi=> ALTER TABLE schema_testADD COLUMN department VARCHAR(100);ALTER TABLE 
 cdc_hudi=> INSERT INTO schema_test (role, salary, department) VALUES ('Developer', 85000.00, 'Engineering');
 INSERT 0 1`

in hudi tables we can see the new column

 `|CREATE TABLE `cdc_hudi`.`schema_test_ro` (
  `_hoodie_commit_time` STRING,
  `_hoodie_commit_seqno` STRING,
  `_hoodie_record_key` STRING,
  `_hoodie_partition_path` STRING,
  `_hoodie_file_name` STRING,
  `_hoodie_is_deleted` BOOLEAN,
  `ts_ms` BIGINT,
  `op` STRING,
  `id` INT,
  `role` STRING,
  `salary` DOUBLE,
  `department` STRING)
USING hudi
OPTIONS (
  `hoodie.query.as.ro.table` 'true')
Armelabdelkbir commented 2 months ago

i tried also to rename column but not working as expected: Rename column in PG:

cdc_hudi=> ALTER TABLE schema_test
RENAME COLUMN department TO team;
ALTER TABLE
cdc_hudi=> INSERT INTO schema_test (role, salary, team) 
VALUES ('DataOps', 85000.00, 'Engineering');

in hudi tables i see both of columns old one and new one :

|CREATE TABLE `cdc_hudi`.`schema_test_ro` (
  `_hoodie_commit_time` STRING,
  `_hoodie_commit_seqno` STRING,
  `_hoodie_record_key` STRING,
  `_hoodie_partition_path` STRING,
  `_hoodie_file_name` STRING,
  `_hoodie_is_deleted` BOOLEAN,
  `ts_ms` BIGINT,
  `op` STRING,
  `id` INT,
  `role` STRING,
  `salary` DOUBLE,
  `department` STRING,
  `team` STRING)
USING hudi
OPTIONS (
  `hoodie.query.as.ro.table` 'true')

spark.sql(s"select op,id,role,salary,department,team from cdc_hudi.schema_test_ro").show(100)
+---+---+---------+--------+-----------+-----------+                            
| op| id|     role|  salary| department|       team|
+---+---+---------+--------+-----------+-----------+
|  c|  2|  Manager| 90000.0|       null|       null|
|  c|  3|  Analyst|65000.25|       null|       null|
|  c|  4| Engineer| 75000.5|       null|       null|
|  c|  5|Developer| 85000.0|Engineering|       null|
|  c|  6|  DataOps| 85000.0|       null|Engineering|
+---+---+---------+--------+-----------+-----------+
ad1happy2go commented 2 months ago

@Armelabdelkbir This is expected, as it doesn't supports that. it will take new batch is like schema evolved and adding the column at last.

Just realising just with incremental, how hudi will identify if it was a rename with the debezium payload. It could be column drop and addition of new column also.

Armelabdelkbir commented 2 months ago

@ad1happy2go Thanks i understand for RENAME, what about DROP table, i tried to drop column in table, but meta-sync doesn't sync the new schema, when i insert some values i see the old column with NULL values in PG:

cdc_hudi=> alter table schema_test DROP team ;
ALTER TABLE
cdc_hudi=> select * from schema_test ;
 id |   role    |  salary  
----+-----------+----------
  2 | Manager   | 90000
  3 | Analyst   | 65000.25
  4 | Engineer  | 75000.5
  5 | Developer | 85000
  6 | DataOps   | 85000

cdc_hudi=> INSERT INTO schema_test (role, salary) VALUES
    ('Tea Engineer','200M')
  1. debezium ok
  2. i restart my streaming job to take latest version
  3. in hms tables i see always the old column with null values in both _RO and _RT tables
    
    |CREATE TABLE `cdc_hudi`.`schema_test_ro` (
    `_hoodie_commit_time` STRING,
    `_hoodie_commit_seqno` STRING,
    `_hoodie_record_key` STRING,
    `_hoodie_partition_path` STRING,
    `_hoodie_file_name` STRING,
    `_hoodie_is_deleted` BOOLEAN,
    `ts_ms` BIGINT,
    `op` STRING,
    `id` INT,
    `role` STRING,
    `salary` STRING,
    `team` STRING)
    USING hudi
    OPTIONS (
    `hoodie.query.as.ro.table` 'true')

spark.sql(s"select id, role, salary, team from cdc_hudi.schema_test_ro").show(20)

+---+-----------------+----------+-----------+ | id| role| salary| team| +---+-----------------+----------+-----------+ | 2| Manager| 90000.0| Engineering| | 3| Analyst| 65000.25| Engineering| | 4| Engineer| 75000.5| Engineering| | 5| Developer| 85000.0| Engineering| | 6| DataOps| 85000.0|Engineering| | 7| Tea Engineer| 200M| null| +---+-----------------+----------+-----------+


 is this behavior expected  ? or i miss something 
ad1happy2go commented 2 months ago

@Armelabdelkbir You may need to use https://hudi.apache.org/docs/configurations/#hoodiedatasourcewriteschemaallowautoevolutioncolumndrop for that.

Armelabdelkbir commented 2 months ago

@ad1happy2go i add this property and i test, always remain the same: i have null values and schema contains, the old values: i drop salary column in postgresql

+---+-----------------+------+
| id|             role|salary|
+---+-----------------+------+
|  2|          Manager|  null|
|  3|          Analyst|  null|
|  4|         Engineer|  null|
|  5|        Developer|  null|
|  6|          DataOps|  null|
|  7|          DataOps|  null|
|  8|Rockstar engineer|  null|
|  9|     Tea Engineer|  null|
| 11|    Beer Engineer|  null|
| 10|     Tea Engineer|  null|
| 12|  Coffee engineer|  null|
| 13|  Coffee engineer|  null|
| 14|  Coffee engineer|  null|
+---+-----------------+------+
ad1happy2go commented 1 month ago

@Armelabdelkbir Do you have the debezium payload which you are getting after column drop. I can test out.

rangareddy commented 1 month ago

Hi @Armelabdelkbir

I have tested your scenario with following code. drop column also worked with out any issues.

Step1: Create a database and table in postgres

CREATE DATABASE cdc_hudi;

\connect cdc_hudi;

CREATE TABLE employees (
    id INT PRIMARY KEY NOT NULL,
    name VARCHAR(100) NOT NULL,
    age INT,
    salary DECIMAL(10, 2),
    department VARCHAR(50)
);

ALTER TABLE employees REPLICA IDENTITY FULL;

Step2: Configure the debezium source connector for postgres db.

vi debezium-source-postgres.json

{
  "name": "postgres-debezium-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "plugin.name": "pgoutput",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "cdc_hudi",
    "topic.prefix": "cdc_topic",
    "database.server.name": "postgres",
    "schema.include.list": "public",
    "table.include.list": "public.employees",
    "publication.autocreate.mode": "filtered",
    "tombstones.on.delete": "false",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081/",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081/",
    "slot.name": "pgslot1"
  }
}

Step3: Create the postgres-debezium-connector connector using above configuration and verify the status

curl -s -X POST -H 'Accept: application/json' -H "Content-Type:application/json" \
 -d @debezium-source-postgres.json http://localhost:8083/connectors/ | jq

curl -s -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/postgres-debezium-connector/status | jq

Step4: Insert one record in postgres db and verify kafka topic is created or not

INSERT INTO employees (id, name, age, salary, department) VALUES (1, 'Ranga', 30, 50000.00, 'Sales');
SELECT * FROM employees;
$CONFLUENT_HOME/bin/kafka-topics --list --bootstrap-server localhost:9092 | grep employees
cdc_topic.public.employees

Step5: Run the Kafka Consumer to verify the data is consumed or not.

$CONFLUENT_HOME/bin/kafka-avro-console-consumer \
  --bootstrap-server localhost:9092 \
  --from-beginning \
  --topic cdc_topic.public.employees

Step6: Run the Spark application

HUDI_HOME=/Install/apache/hudi
HUDI_UTILITIES_JAR=$(ls $HUDI_HOME//packaging/hudi-utilities-bundle/target/hudi-utilities-bundle*.jar | grep -v sources | grep -v tests)

$SPARK_HOME/bin/spark-submit \
  --master "local[2]" \
  --deploy-mode client \
  --conf 'spark.hadoop.spark.sql.legacy.parquet.nanosAsLong=false' \
  --conf 'spark.hadoop.spark.sql.parquet.binaryAsString=false' \
  --conf 'spark.hadoop.spark.sql.parquet.int96AsTimestamp=true' \
  --conf 'spark.hadoop.spark.sql.caseSensitive=false' \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_JAR \
  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
  --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator \
  --hoodie-conf bootstrap.servers=localhost:9092 \
  --hoodie-conf schema.registry.url=http://localhost:8081 \
  --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/cdc_topic.public.employees-value/versions/latest \
  --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer \
  --hoodie-conf hoodie.deltastreamer.source.kafka.topic=cdc_topic.public.employees \
  --hoodie-conf auto.offset.reset=earliest \
  --hoodie-conf hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true \
  --table-type MERGE_ON_READ \
  --op UPSERT \
  --target-base-path file:///tmp/debezium/postgres/employees \
  --target-table employees_cdc  \
  --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \
  --source-ordering-field _event_lsn \
  --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \
  --continuous \
  --min-sync-interval-seconds 60

Step7: Perform the schema evaluation operations and verify the data.

-- Adding Columns
ALTER TABLE employees ADD COLUMN address VARCHAR;

INSERT INTO employees (id, name, age, salary, department, address) VALUES (2, 'Nishanth', 30, 50000.00, 'Sales', 'Bangalore');

SELECT * FROM employees;

Step8: Launch another spark job and verify the data

export SPARK_VERSION=3.5
$SPARK_HOME/bin/spark-shell \
  --master "local[2]" \
  --deploy-mode client \
  --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.15.0 \
  --conf 'spark.hadoop.spark.sql.legacy.parquet.nanosAsLong=false' \
  --conf 'spark.hadoop.spark.sql.parquet.binaryAsString=false' \
  --conf 'spark.hadoop.spark.sql.parquet.int96AsTimestamp=true' \
  --conf 'spark.hadoop.spark.sql.caseSensitive=false' \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
val basePath="file:///tmp/debezium/postgres/employees"
spark.read.format("hudi").load(basePath).show(false)

Step8: You can repeat further steps and verify the data.

-- Renaming a column
ALTER TABLE employees RENAME COLUMN department TO dept;

INSERT INTO employees (id, name, age, salary, dept, address) VALUES (3, 'Vinod', 35, 40000.00, 'HR', 'Andhrapradesh');

SELECT * FROM employees;

ALTER TABLE employees DROP column dept;

INSERT INTO employees (id, name, age, salary, address) VALUES (4, 'Manjo', 35, 40000.00, 'Chennai');

SELECT * FROM employees;

-- Type change column
ALTER TABLE employees ALTER COLUMN age TYPE bigint;

INSERT INTO employees (id, name, age, salary, address) VALUES (5, 'Raja', 59, 70000.00, 'Bangalore');

SELECT * FROM employees;
rangareddy commented 1 month ago

Spark-Submit Output:

Screenshot 2024-09-16 at 3 08 12 PM

Postgres Output:

Screenshot 2024-09-16 at 3 08 32 PM

Kafka topic output:

Screenshot 2024-09-16 at 3 08 47 PM
ad1happy2go commented 1 month ago

@Armelabdelkbir As demonstrated by @rangareddy drop column is working as expected with that config. Let us know in case you have any more issues on this. Thanks