apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.38k stars 2.42k forks source link

Hard deletion using deltastreamer #10483

Closed Kangho-Lee closed 9 months ago

Kangho-Lee commented 9 months ago

Hello guys. this post is from january 2020, any updates about deletion with deltastreamer? Is there a way to avoid having to add this field to incoming records for existing hudi table? I want hard delete an existing table rather than creating a new table with _hoodie_is_deleted field.

When I tried to change the schema by adding the _hoodie_is_deleted field to the existing table by using transformer sql like

hoodie.streamer.transformer.sql=SELECT *, if(__deleted = "false" or __deleted is null, cast(false as boolean), cast(true as boolean)) end AS _hoodie_is_deleted FROM <SRC> a

an error occurred when merging the parquet files in the existing partition directory like

Failed to merge old record into new file for key 5bfe8b1f77e686a37443cf91 from old file xxxx to new file xxxx with writerSchema
...
Null-value for required field: _hoodie_is_deleted

I use hudi version 0.14.0 and conf is below (Delete personal/company information)

ompaction.schedule.enabled: false
hoodie.auto.adjust.lock.configs: true
hoodie.bloom.index.parallelism: 100
hoodie.bloom.index.update.partition.path: true
hoodie.bulkinsert.shuffle.parallelism: 200
hoodie.clean.automatic: false
hoodie.datasource.hive_sync.database: test
hoodie.datasource.hive_sync.metastore.uris: thrift://ranger-hms-oozie1.bdp.bdata.ai:9083,thrift://ranger-hms-oozie2.bdp.bdata.ai:9083
hoodie.datasource.hive_sync.mode: hms
hoodie.datasource.hive_sync.partition_extractor_class: org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.table: greeter_test
hoodie.datasource.hive_sync.use_jdbc: false
hoodie.datasource.meta.sync.enable: true
hoodie.datasource.write.drop.partition.columns: false
hoodie.datasource.write.keygenerator.class: org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.datasource.write.partitionpath.field: 
hoodie.datasource.write.precombine.field: __ordering_field
hoodie.datasource.write.reconcile.schema: true
hoodie.datasource.write.recordkey.field: user_oid
hoodie.datasource.write.row.writer.enable: true
hoodie.delete.shuffle.parallelism: 200
hoodie.deltastreamer.ingestion.targetBasePath: 
hoodie.index.type: GLOBAL_BLOOM
hoodie.insert.shuffle.parallelism: 200
hoodie.meta.sync.client.tool.class: org.apache.hudi.hive.HiveSyncTool
hoodie.metadata.enable: true
hoodie.parquet.compression.codec: snappy
hoodie.parquet.small.file.limit: 134217728
hoodie.streamer.schemaprovider.registry.schemaconverter: 
hoodie.streamer.schemaprovider.registry.url: 
hoodie.streamer.schemaprovider.spark_avro_post_processor.enable: false
hoodie.streamer.source.dfs.root: 
hoodie.streamer.transformer.sql: SELECT *, if(__deleted = "false" or __deleted is null, cast(false as boolean), cast(true as boolean)) AS _hoodie_is_deleted FROM <SRC> a
hoodie.table.type: MERGE_ON_READ
hoodie.upsert.shuffle.parallelism: 200

spark-submit is below

spark-submit \
--deploy-mode cluster \
--master yarn \
--driver-cores 4 \
--executor-cores 2 \
--driver-memory 4G \
--executor-memory 8G \
--num-executors 5 \
--conf spark.driver.memoryOverhead=1G \
--conf spark.executor.memoryOverhead=1G \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.shuffle.spill.compress=true \
--conf spark.shuffle.compress=true \
--conf spark.default.parallelism=200 \
--conf spark.sql.shuffle.partitions=200 \
--conf spark.rdd.compress=true \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.driver.maxResultSize=8G \
--conf spark.sql.debug.maxToStringFields=1000 \
--conf spark.kerberos.access.hadoopFileSystems= \
--conf spark.kerberos.keytab=test.keytab \
--conf spark.kerberos.principal=test \
--conf spark.eventLog.dir=hdfs://test \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.rolling.enabled=true \
--conf spark.eventLog.rolling.maxFileSize=256m \
--conf spark.history.fs.cleaner.enabled=true \
--conf spark.history.fs.cleaner.interval=1d \
--conf spark.history.fs.cleaner.maxAge=14d \
--conf spark.history.fs.logDirectory=hdfs://test \
--conf spark.history.retainedApplications=1000 \
--conf spark.history.kerberos.enabled=true \
--conf spark.history.kerberos.keytab=test.keytab \
--conf spark.history.kerberos.principal=test \
--conf spark.yarn.historyServer.address= \
--conf spark.datasource.hive.warehouse.metastoreUri=thrift://ranger-hms-oozie1.bdp.bdata.ai:9083,thrift://ranger-hms-oozie2.bdp.bdata.ai:9083 \
--conf 'spark.driver.extraJavaOptions=-Dhdp.version=3.1.0.0-78 -Dfile.encoding=UTF-8' \
--conf 'spark.yarn.am.extraJavaOptions=-Dhdp.version=3.1.0.0-78 -Dfile.encoding=UTF-8' \
--conf spark.driver.extraClassPath=/etc/spark/conf/c3s \
--conf spark.executor.extraClassPath=/etc/spark/conf/c3s \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE= \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE= \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/usr/hdp:/usr/hdp:ro \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/usr/hdp:/usr/hdp:ro \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG= \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG= \
--conf spark.hadoop.dfs.client.ignore.namenode.default.kms.uri=true \
--conf spark.hadoop.hive.metastore.uris=thrift://ranger-hms-oozie1.bdp.bdata.ai:9083,thrift://ranger-hms-oozie2.bdp.bdata.ai:9083 \
--conf spark.sql.catalogImplementation=hive \
--conf 'spark.sql.hive.hiveserver2.jdbc.url=jdbc:hive2://zk-etcd1.bdp.bdata.ai:2181,zk-etcd2.bdp.bdata.ai:2181,zk-etcd3.bdp.bdata.ai:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-batch' \
--conf 'spark.datasource.hive.warehouse.hs2.url.resolved=jdbc:hive2://zk-etcd1.bdp.bdata.ai:2181,zk-etcd2.bdp.bdata.ai:2181,zk-etcd3.bdp.bdata.ai:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-batch' \
--conf spark.yarn.archive= \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.executorEnv.ENV=real \
--conf spark.yarn.appMasterEnv.ENV=real \
--conf spark.yarn.am.nodeLabelExpression= \
--conf spark.yarn.executor.nodeLabelExpression= \
--conf spark.executorEnv.ENV=real \
--conf spark.yarn.appMasterEnv.ENV=real \
--conf spark.executorEnv.SPARK_USER= \
--conf spark.yarn.appMasterEnv.SPARK_USER= \
--queue batch \
--jars local:///home1/irteam/jars/hudi-spark3.3-bundle_2.12-0.14.0-zd.jar,local:///home1/irteam/jars/hudi-utilities-slim-bundle_2.12-0.14.0-zd.jar \
--class hoodie.ingestion.DeltaStreamerWrapperJob \
--hoodie-conf hoodie.upsert.shuffle.parallelism=200 \
--hoodie-conf hoodie.insert.shuffle.parallelism=200 \
--hoodie-conf hoodie.delete.shuffle.parallelism=200 \
--hoodie-conf hoodie.bulkinsert.shuffle.parallelism=200 \
--hoodie-conf hoodie.clean.automatic=false \
--hoodie-conf hoodie.datasource.write.precombine.field=__ordering_field \
--hoodie-conf hoodie.streamer.source.dfs.root= \
--hoodie-conf hoodie.streamer.schemaprovider.registry.schemaconverter= \
--hoodie-conf compaction.schedule.enabled=false \
--hoodie-conf hoodie.bloom.index.parallelism=100 \
--props greeter_delete.properties \
--table-type MERGE_ON_READ \
--source-ordering-field \
__ordering_field \
--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
--target-base-path greeter_test/ \
--target-table greeter_test \
--schemaprovider-class org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider \
--op UPSERT \
--enable-sync \
--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--disable-compaction

Using transformer sql above with creating a new table is works what I want. However, when using the above method on an existing hudi table, a merge error occurs. Please help me to solve.

ad1happy2go commented 9 months ago

@Kangho-Lee This is happening as in the new schema, _hoodie_is_deleted is non-nullable column which can't be evolved as existing data dont have this column and need to set as null.

For these cases, we introduced a new prop starting 0.14.0 in this PR - https://github.com/apache/hudi/pull/9262 so add this configuration 'hoodie.datasource.write.new.columns.nullable' as true.

Kangho-Lee commented 9 months ago

@ad1happy2go thanks to reply. I tried to add hoodie.datasource.write.new.columns.nullable configuration. For the new hoodie.streamer.source.dfs.root data, no merge error occurred and the desired result was obtained.

But, Is there any way to change the value of an existing (parquet) data in an existing table? Existing data is not applied by transformer sql and the value is replaced with null.

hoodie.streamer.transformer.sql=SELECT *, if(__deleted = "false" or __deleted is null, cast(false as boolean), cast(true as boolean)) end AS _hoodie_is_deleted FROM <SRC> a
ad1happy2go commented 9 months ago

@Kangho-Lee So you want to update the old data also, The only way is to re-ingest that old data so it follow the upsert path again with _hoodie_is_deleted and do the needful. Thanks.

Kangho-Lee commented 9 months ago

@ad1happy2go It looks like doing a re-ingest or creating a new table same. Thank you for answer.

ad1happy2go commented 9 months ago

@Kangho-Lee Thanks for the answer. Closing the issue then. Please create a new one for any further issues/queries.