apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

About global dedup, find some commit who keep inflight and still generate parquet file and fail dedup #639

Closed Zhujun-Vungle closed 5 years ago

Zhujun-Vungle commented 5 years ago

Hi, Taking investigation on hudi, currently facing a issue when trying hudi global dedup

ENV

data source: kafka etl engine: spark streaming which simply ingest data per 10 minutes and write to s3 with global dedup enable data sink: s3 viewer: hive table which create like https://cwiki.apache.org/confluence/display/HUDI/Registering+sample+dataset+to+Hive+via+beeline

ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'com.uber.hoodie.hadoop.HoodieInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'

HUDI config

kafkaBlock{
// per 10 minutes data as a dataframe
df => 
      spark
        .sql("select date_format(timestamp,'yyyy-MM-dd_HH') as ts,* from t1")
        .dropDuplicates("event_id")
        .toDF()
        .write
        .format("com.uber.hoodie")
        .mode(SaveMode.Append)
        .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
        .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "ts")
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
        .option(HoodieWriteConfig.TABLE_NAME, "table1")
        // global dedup
        .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name)
        //   before write optimization
        .option("hoodie.combine.before.insert", "true")
        .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
          "timestamp")
        .option(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY, "true")
       .save(dst)
}

phenomenon

Order by last update time Screen Shot 2019-04-16 at 4 22 00 PM

The spark streaming running log, no delay at all Screen Shot 2019-04-16 at 4 22 53 PM

question

After running pipeline for a while, I query the hive table(running in spark sqlselect event_id,count(*) from hudi_table group by 1 having COUNT(*) > 1 order by 1 desc limit 10), I got duplicate records. The duplicate records are coming from the inflight commit. So

  1. Did I missing some hudi config in writing pipeline?
  2. Why the commit is still inflight but parquet file can be generated?
  3. In hudi, did the commit finish and then spark .save finish so there is no asynchronous problem?
  4. Is there any reader side config issue with hive output format ? since it's org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat

Regards

vinothchandar commented 5 years ago

Hi.. please leave us a note on the dev mailing list for faster response. https://hudi.apache.org/community.html

Did I missing some hudi config in writing pipeline?

For Spark SQL, you need to push the path filter as documented here. https://hudi.apache.org/querying_data.html#spark-ro-view Getting duplicates typically means, this is missing..

Why the commit is still inflight but parquet file can be generated?

So, the sequence of events is . We write parquet files and then upon successful writing of all attempted parquet files, we actually make the commit as completed. (i.e not inflight anymore). So this is normal. This is done to prevent queries from reading partially written parquet files..

In hudi, did the commit finish and then spark .save finish so there is no asynchronous problem?

.save() internally calls hoodieWriteClient.commit().

Is there any reader side config issue with hive output format ? since it's org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat

OutputFormat is not really used anywhere by Hudi. So no.

Hope that helps. I will start a thread on the mailing list and we can continue there, if you have more follow ups.

Zhujun-Vungle commented 5 years ago

Thank you so much !! Really helpful!!

Zhujun-Vungle commented 5 years ago

Hi Sorry , failed open https://lists.apache.org/list.html?dev@hudi.apache.org.
I have some follow up questions for this issue:

So, the sequence of events is . We write parquet files and then upon successful writing of all attempted parquet files, we actually make the commit as completed. (i.e not inflight anymore). So this is normal. This is done to prevent queries from reading partially written parquet files..

Does that mean:

  1. Some file inflight may never reach commit?
  2. In occasion which inflight and parquet file generated by inflight still exists, the global dedup will not dedup based on such kind file?
  3. In occasion which inflight and parquet file generated by inflight still exists, the correct query result will be decided by read config(I mean mapreduce.input.pathFilter.class in sparksql)
  4. Is there any way we can use spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[com.uber.hoodie.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]); in spark thrift server when start it?

Best,

vinothchandar commented 5 years ago

@Zhujun-Vungle I responded on the mailing list.. hope that answers this

Zhujun-Vungle commented 5 years ago

Gotcha, thank you so much!