apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.2k stars 2.38k forks source link

[SUPPORT] Incomplete Table Migration #3879

Closed jardel-lima closed 2 years ago

jardel-lima commented 2 years ago

Describe the problem you faced

I am trying to migrate some tables to hudi format, but I am facing some issues. We have a 7GB (snnapy compacted) table with 200M rows, 49 columns and just one partition. Using PySpark DataSource the migration finished without any error, although I notice that about 10000 rows were missing in the hudi table. I have tried to migrate the table again, but the same issue happened.

To Reproduce

Migrate a huge table with a single partition using bulk_insert operation.

Expected behavior

I expected that all row would be migrated

Environment Description

Additional context

Hudi Options:

{'hoodie.table.name': 'table_a',
 'hoodie.datasource.write.operation': 'bulk_insert',
'hoodie.datasource.write.recordkey.field': 'KEY', 
'hoodie.datasource.write.partitionpath.field': 'YEAR', 
'hoodie.datasource.write.precombine.field': 'REF_DATE',
 'hoodie.datasource.write.hive_style_partitioning': 'true', 
'hoodie.datasource.hive_sync.enable': 'true', 
'hoodie.datasource.hive_sync.database': 'database_a', 
'hoodie.datasource.hive_sync.table': 'table_a', 
'hoodie.datasource.hive_sync.partition_fields': 'YEAR', 
'hoodie.datasource.hive_sync.support_timestamp': 'true', 
'hoodie.bulkinsert.shuffle.parallelism': 17,
 'hoodie.cleaner.commits.retained': 3, 
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}
nsivabalan commented 2 years ago

this seems very strange, that same no of records are missing everytime. Can you try by disabling row writer (hoodie.datasource.write.row.writer.enable) to see if the issue goes away? I don't suspect this to be an issue, but just in case.

jardel-lima commented 2 years ago

Hi @nsivabalan. Thanks for your reply. I did what you said and the issue went way, thanks a lot. Does this setting impact anything in the table? Can I make all Hudi operations without any problem?

nsivabalan commented 2 years ago

@jardel-lima : you may see some perf hit, but you should be fine. Is it possible to give us a reproducible code snippet or data may be(please mask any PII). We have not seen this issue before. And definitely wanted to dig in if there are any data loss.

jardel-lima commented 2 years ago

Hi @nsivabalan, as I said before I could migrate others tables without any problem, just in a few tables a had this problem. I will try to create a dataset to reproduce this problem. How can I share it with you?

nsivabalan commented 2 years ago

you can share it here if its simple script or via github gists. Or if you want to share data, may be google drive link or something where I have access to download the data.

nsivabalan commented 2 years ago

@jardel-lima : let us know if you have any updates or if you can share the dataset.

jardel-lima commented 2 years ago

Hi @nsivabalan. I am talking with our DPO about modifications we have to make in our dataset before sending it to you. As soon as I get his approval, I will share it with you.

Meanwhile I can share the code I am using:

hudi_options = {
'hoodie.table.name': 'hudi_table',
'hoodie.datasource.write.operation': 'bulk_insert',
'hoodie.datasource.write.recordkey.field': 'UUID',
'hoodie.datasource.write.partitionpath.field': 'PARTITION',
'hoodie.datasource.write.precombine.field': 'SORT_KEY',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': 'sandbox',
'hoodie.datasource.hive_sync.table': 'hudi_table',
'hoodie.datasource.hive_sync.partition_fields': 'PARTITION',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.support_timestamp': 'true',
'hoodie.parquet.compression.codec' : 'snappy',
'hoodie.bulkinsert.shuffle.parallelism': 100
}

df.write\
  .format("hudi")\
  .options(**hudi_options)\
  .mode('overwrite')\
  .save('s3://s3-buckete/s3-folder')

The environment still the same as a said in the first message.

Thanks for your attention.

nsivabalan commented 2 years ago

sure. let us know once you have the dataset available to share.

jardel-lima commented 2 years ago

Hi @nsivabalan. HERE is the dataset used to replicate this problem. The file is not public, but will give access as soon as you request.

Here is the code that initiate the spark session, maybe it will be useful for you:

spark = (
    SparkSession.builder.appName("Hudi_Data_Processing_Framework")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .config("spark.jars.packages","org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0")
    .config("spark.executor.memory", "4G")
    .config("spark.executor.cores","2")
    .enableHiveSupport()
    .getOrCreate()
)

Here is the code used to load this dataset:

df = spark.read.load('<<dataset_path>>',
                       encoding='utf-8',
                       format='com.databricks.spark.csv',
                       header=True,
                       delimiter=';',
                       inferSchema=True)

Sorry for the delay. I hope it can help you identify the problem.

nsivabalan commented 2 years ago

thanks. have placed a request.

nsivabalan commented 2 years ago

May I know whats the partition path field I should be choosing while writing to hudi ? and I assume record key is UUID and preCombine field is SORT_KEY.

spark.sql("describe tbl1").show()
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|    UUID|   string|   null|
|       A|   string|   null|
|       B|timestamp|   null|
|       C|   string|   null|
|       D|      int|   null|
|       E|timestamp|   null|
|       F|   string|   null|
|       G|timestamp|   null|
|SORT_KEY|timestamp|   null|
+--------+---------+-------+
nsivabalan commented 2 years ago

@jardel-lima : gentle ping. may I know which field I should be using as partition path.

jardel-lima commented 2 years ago

Hi @nsivabalan. There is a partition column called PARTITION. As it is a partition column you may have to load the dataset using relative path, ./anonymous_dataset.

nsivabalan commented 2 years ago

I could not reproduce the missing records.

val df = spark.read.option("encoding","utf-8").option("header","true").option("inferSchema","true").option("delimiter",";").format("com.databricks.spark.csv").load("/Users/nsb/Downloads/anonymous_sample_table/")

df.count
res0: Long = 10000

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"
val basePath = "/tmp/hudi_trips_cow5"

// upsert operation

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "SORT_KEY").
  option(RECORDKEY_FIELD_OPT_KEY, "UUID").
  option(PARTITIONPATH_FIELD_OPT_KEY, "PARTITION").
  option(TABLE_NAME, tableName).
  mode(Append).save(basePath)

val hudiDf = spark.read.format("hudi").load(basePath)
hudiDf.count
res4: Long = 10000

// bulk insert operation

val basePath = "/tmp/hudi_trips_cow6"

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "SORT_KEY").
  option("hoodie.datasource.write.operation","bulk_insert").
  option(RECORDKEY_FIELD_OPT_KEY, "UUID").
  option(PARTITIONPATH_FIELD_OPT_KEY, "PARTITION").
  option(TABLE_NAME, tableName).
  mode(Append).save(basePath)

val hudiDf = spark.read.format("hudi").load(basePath)
hudiDf.count
nsivabalan commented 2 years ago

Closing this as not reproducible for now. But would definitely curious on this end. If you can help us w/ a dataset to reproduce the issue, do let us know. we take data missing very seriously.