apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] delta streamer init Parquet file then flink incremental data , Data not updated #5911

Open jiangjin-f opened 2 years ago

jiangjin-f commented 2 years ago

Tips before filing an issue

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. delta streamer ParquetDFSSource init Parquet file
  2. flink incremental Kafka data
  3. use index.bootstrap.enabled=true

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

codope commented 2 years ago

@loveShyBoy Are you using any particular index type ? Can you share all your write configs? cc @danny0405 @yuzhaojing

jiangjin-f commented 2 years ago

@loveShyBoy Are you using any particular index type ? Can you share all your write configs? cc @danny0405 @yuzhaojing

delta streamer used default index type , flink also used hoodie.index.type=BLOOM 0.10.1 do not have index.type config , exists since 0.11.0

--hoodie.table.name=tablename_hudi --table.type=COPY_ON_WRITE --hoodie.datasource.write.recordkey.field=primary_id --write.precombine.field=update_time --hoodie.datasource.write.partitionpath.field=dt --hive_sync.db=ods --write.tasks=4 --hoodie.datasource.write.keygenerator.type=SIMPLE --write.bucket_assign.tasks=6 --max.poll.records=100 --max.partition.fetch.bytes=524288 --hoodie.datasource.write.hive_style_partitioning=true --index.bootstrap.enabled=false --write.index_bootstrap.tasks=6 --index.state.ttl=0.2 --prop:hoodie.index.type=BLOOM

in 0.10.1 flink used default index type is HoodieIndex.IndexType.INMEMORY,so i changed to BLOOM. but the doc say Bloom is default . I'm a little confused。that is right?

` public static final ConfigProperty INDEX_TYPE = ConfigProperty .key("hoodie.index.type") .noDefaultValue() .withDocumentation("Type of index to use. Default is Bloom filter. "

private String getDefaultIndexType(EngineType engineType) { switch (engineType) { case SPARK: return HoodieIndex.IndexType.SIMPLE.name(); case FLINK: case JAVA: return HoodieIndex.IndexType.INMEMORY.name(); default: throw new HoodieNotSupportedException("Unsupported engine " + engineType); } }`

danny0405 commented 2 years ago

flink engine uses a state-backend to store the index by default, for DeltaStreamer did you use the COW table type ?

jiangjin-f commented 2 years ago

flink engine uses a state-backend to store the index by default, for DeltaStreamer did you use the COW table type ?

use the COW table type . If used --index.bootstrap.enabled=true , need to set the --index.state.ttl=0.2 when there is a lot of data? if the hudi table existed .Can this parameter ensure that the data can be updated? (--index.bootstrap.enabled=true) delta streamer init parquet file , then flink incremental kafka data. data not updated.

delta streamer write configs.
spark-submit \ --packages org.apache.spark:spark-avro_2.11:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf spark.default.parallelism=400 --num-executors 100 --executor-cores 4 --executor-memory 16G \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.yarn.heterogeneousExecutors.enabled=false \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer s3://****/0.10/hudi-utilities-bundle_2.11-0.10.0.jar \ **--table-type COPY_ON_WRITE \** **--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \** --source-ordering-field last_update_time \ --target-base-path s3://********/tablename \ --target-table tablename \ --hoodie-conf hoodie.datasource.write.recordkey.field=primary_id \ --hoodie-conf hoodie.datasource.write.partitionpath.field=dt \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ --hoodie-conf hoodie.delete.shuffle.parallelism=400 \ --hoodie-conf hoodie.upsert.shuffle.parallelism=400 \ --hoodie-conf hoodie.bulkinsert.shuffle.parallelism=400 \ --hoodie-conf hoodie.insert.shuffle.parallelism=400 \ --hoodie-conf hoodie.datasource.write.precombine.field=last_update_time \ --hoodie-conf hoodie.base.path = s3://********/tablename \ --hoodie-conf hoodie.deltastreamer.schemaprovider.source.schema.file=s3://*****/source_schema.avsc \ --hoodie-conf hoodie.deltastreamer.schemaprovider.target.schema.file=s3://*****/target_schema.avsc \ --hoodie-conf hoodie.datasource.write.operation=bulk_insert \ --hoodie-conf hoodie.datasource.hive_sync.database=dw \ --hoodie-conf hoodie.datasource.hive_sync.table=tablename \ --hoodie-conf hoodie.datasource.hive_sync.partition_fields=dt \ --hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=false \ --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \ --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://*******:10000 \ --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=s3://*****/checkpoint/ \ **--hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://*****/dw.db/*******_parquet \** --enable-hive-sync \

yuzhaojing commented 2 years ago

flink engine uses a state-backend to store the index by default, for DeltaStreamer did you use the COW table type ?

use the COW table type . If used --index.bootstrap.enabled=true , need to set the --index.state.ttl=0.2 when there is a lot of data? if the hudi table existed .Can this parameter ensure that the data can be updated? (--index.bootstrap.enabled=true) delta streamer init parquet file , then flink incremental kafka data. data not updated.

delta streamer write configs. spark-submit \ --packages org.apache.spark:spark-avro_2.11:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf spark.default.parallelism=400 --num-executors 100 --executor-cores 4 --executor-memory 16G \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.yarn.heterogeneousExecutors.enabled=false \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer s3://****/0.10/hudi-utilities-bundle_2.11-0.10.0.jar \ **--table-type COPY_ON_WRITE \** **--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \** --source-ordering-field last_update_time \ --target-base-path s3://********/tablename \ --target-table tablename \ --hoodie-conf hoodie.datasource.write.recordkey.field=primary_id \ --hoodie-conf hoodie.datasource.write.partitionpath.field=dt \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ --hoodie-conf hoodie.delete.shuffle.parallelism=400 \ --hoodie-conf hoodie.upsert.shuffle.parallelism=400 \ --hoodie-conf hoodie.bulkinsert.shuffle.parallelism=400 \ --hoodie-conf hoodie.insert.shuffle.parallelism=400 \ --hoodie-conf hoodie.datasource.write.precombine.field=last_update_time \ --hoodie-conf hoodie.base.path = s3://********/tablename \ --hoodie-conf hoodie.deltastreamer.schemaprovider.source.schema.file=s3://*****/source_schema.avsc \ --hoodie-conf hoodie.deltastreamer.schemaprovider.target.schema.file=s3://*****/target_schema.avsc \ --hoodie-conf hoodie.datasource.write.operation=bulk_insert \ --hoodie-conf hoodie.datasource.hive_sync.database=dw \ --hoodie-conf hoodie.datasource.hive_sync.table=tablename \ --hoodie-conf hoodie.datasource.hive_sync.partition_fields=dt \ --hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=false \ --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \ --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://*******:10000 \ --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=s3://*****/checkpoint/ \ **--hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://*****/dw.db/*******_parquet \** --enable-hive-sync \

If you need to update data from a period of time ago, please set index.state.ttl large than this time. --index.bootstrap.enabled=true can load index from parquet and update it. Can you find this log for every task in taskmanager?

LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.", this.getClass().getSimpleName(), taskID, partitionPath, cost);
jiangjin-f commented 2 years ago

flink engine uses a state-backend to store the index by default, for DeltaStreamer did you use the COW table type ?

use the COW table type . If used --index.bootstrap.enabled=true , need to set the --index.state.ttl=0.2 when there is a lot of data? if the hudi table existed .Can this parameter ensure that the data can be updated? (--index.bootstrap.enabled=true) delta streamer init parquet file , then flink incremental kafka data. data not updated. delta streamer write configs. spark-submit \ --packages org.apache.spark:spark-avro_2.11:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf spark.default.parallelism=400 --num-executors 100 --executor-cores 4 --executor-memory 16G \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.yarn.heterogeneousExecutors.enabled=false \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer s3://****/0.10/hudi-utilities-bundle_2.11-0.10.0.jar \ **--table-type COPY_ON_WRITE \** **--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \** --source-ordering-field last_update_time \ --target-base-path s3://********/tablename \ --target-table tablename \ --hoodie-conf hoodie.datasource.write.recordkey.field=primary_id \ --hoodie-conf hoodie.datasource.write.partitionpath.field=dt \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ --hoodie-conf hoodie.delete.shuffle.parallelism=400 \ --hoodie-conf hoodie.upsert.shuffle.parallelism=400 \ --hoodie-conf hoodie.bulkinsert.shuffle.parallelism=400 \ --hoodie-conf hoodie.insert.shuffle.parallelism=400 \ --hoodie-conf hoodie.datasource.write.precombine.field=last_update_time \ --hoodie-conf hoodie.base.path = s3://********/tablename \ --hoodie-conf hoodie.deltastreamer.schemaprovider.source.schema.file=s3://*****/source_schema.avsc \ --hoodie-conf hoodie.deltastreamer.schemaprovider.target.schema.file=s3://*****/target_schema.avsc \ --hoodie-conf hoodie.datasource.write.operation=bulk_insert \ --hoodie-conf hoodie.datasource.hive_sync.database=dw \ --hoodie-conf hoodie.datasource.hive_sync.table=tablename \ --hoodie-conf hoodie.datasource.hive_sync.partition_fields=dt \ --hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=false \ --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \ --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://*******:10000 \ --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=s3://*****/checkpoint/ \ **--hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://*****/dw.db/*******_parquet \** --enable-hive-sync \

If you need to update data from a period of time ago, please set index.state.ttl large than this time. --index.bootstrap.enabled=true can load index from parquet and update it. Can you find this log for every task in taskmanager?

LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.", this.getClass().getSimpleName(), taskID, partitionPath, cost);

ok 。i know .i will try,then tell you test results. Thank you very much。

jiangjin-f commented 2 years ago

I found hudi table exist one record ,then Consume the same recordKey data from Kafka , The only difference is 'last_ updatetime' is before the existing record 'last update_time' field value. then hudi update record.
Whether there are parameter settings to keep the records up to date.

kafka Consume data 02dd5eac-ec19-46d9-b619-94d63fcbea78

upsert data image cc94a750-3a25-4d25-bf39-2b5b394d3687

public static GenericRecord stitchRecords(GenericRecord left, GenericRecord right, Schema stitchedSchema) { GenericRecord result = new Record(stitchedSchema); for (Schema.Field f : left.getSchema().getFields()) { result.put(f.name(), left.get(f.name())); } for (Schema.Field f : right.getSchema().getFields()) { result.put(f.name(), right.get(f.name())); } return result; }

danny0405 commented 1 year ago

Sorry for the late reply, did you solve your problem already ? Hudi would order the duplicates by preCombine field, and try to keep the latest one (with the biggest preCombine field), by default the preCombine field is named as ts.