apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] repair deduplicate unable to find `_hoodie_record_key` in data #6194

Closed ehurheap closed 2 years ago

ehurheap commented 2 years ago

Problem In hudi-cli I’m trying to run repair deduplicate against a partition in which I have confirmed via a separate spark query that there are in fact duplicates on the _hoodie_record_key. I'm getting cannot resolve '_hoodie_record_key' given input columns: []

To Reproduce

  1. Verify duplicates exist with separate spark query:

    val basePath="s3://thebucket/data/tables/events"
    val partitionPath="env_id=123/week=20220711"
    val inputPath=s"$basePath/$partitionPath"
    val df = spark.read.load(inputPath)
    df.printSchema() // shows expected schema including `_hoodie_record_key`
    df.createOrReplaceTempView("hoh")
    val hoodieKeyQuery = "select _hoodie_record_key, count(*) from hoh group by _hoodie_record_key having count(*) > 1"
    val dupes = spark.sql(hoodieKeyQuery)
    dupes.count() // about 1000 dupes counted
  2. hudi-cli repair attempt: connect --path s3://thebucket/data/tables/events repair deduplicate --duplicatedPartitionPath "env_id=123/week=20220711" --repairedOutputPath hhdeduplicates --sparkMaster local[2] --sparkMemory 4G --dryrun true --dedupeType "upsert_type" outputs (stack trace at end of doc):

    cannot resolve '_hoodie_record_key' given input columns: []

Expected behavior The dryrun should produce some information about files that will be fixed

Environment Description

Additional context We have both a streaming ingest job and a backfill job writing to this hudi table. Here are the write options for each job: Stream ingest write options:

HoodieWriteConfig.TBL_NAME.key -> "events",
DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "env_id,user_id,event_id",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "env_id,week",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "time",
DataSourceWriteOptions.OPERATION.key -> INSERT_OPERATION_OPT_VAL,
HIVE_STYLE_PARTITIONING.key() -> "true",
"hoodie.insert.shuffle.parallelism" -> ingestConfig.hudiInsertParallelism,
"checkpointLocation" -> ingestConfig.hudiCheckpointPath,
"hoodie.metadata.enable" -> "true",
"hoodie.bloom.index.filter.dynamic.max.entries" -> "800000",
"hoodie.datasource.write.streaming.ignore.failed.batch" -> "false",
"hoodie.write.concurrency.mode" -> "optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes" -> "LAZY",
"hoodie.write.lock.provider" -> "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
"hoodie.write.lock.dynamodb.table" -> "datalake-locks",
"hoodie.write.lock.dynamodb.partition_key" -> "events",
"hoodie.write.lock.dynamodb.region" -> "us-east-1"

Backfill job write options:

HoodieWriteConfig.TBL_NAME.key -> "events",
DataSourceWriteOptions.TABLE_TYPE.key -> COW_TABLE_TYPE_OPT_VAL, // note: this seems to be ignored: the table was created with MOR before the backfill began.
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "env_id,user_id,event_id",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "env_id,week",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "time",
DataSourceWriteOptions.OPERATION.key -> BULK_INSERT_OPERATION_OPT_VAL,
HIVE_STYLE_PARTITIONING.key() -> "true",
"hoodie.insert.shuffle.parallelism" -> hudiInsertParallelism,
"hoodie.upsert.shuffle.parallelism" -> hudiBulkInsertParallelism,
"hoodie.metadata.enable" -> "true",
"hoodie.bloom.index.filter.dynamic.max.entries" -> "800000",
"hoodie.datasource.write.streaming.ignore.failed.batch" -> "false",
"hoodie.bulkinsert.sort.mode" -> "NONE",
"hoodie.combine.before.insert" -> "false",
"hoodie.datasource.write.row.writer.enable" -> "false",
"hoodie.write.concurrency.mode" -> "optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes" -> "LAZY",
"hoodie.write.lock.provider" -> "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
"hoodie.write.lock.dynamodb.table" -> "datalake-locks",
"hoodie.write.lock.dynamodb.partition_key" -> "events",
"hoodie.write.lock.dynamodb.region" -> "us-east-1"

Stacktrace

22/07/22 19:18:58 ERROR SparkMain: Fail to execute commandString
org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key' given input columns: []; line 5 pos 15;
'UnresolvedHaving ('dupe_cnt > 1)
+- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0, count(1) AS dupe_cnt#1L]
   +- SubqueryAlias htbl_1658517533303
      +- View (htbl_1658517533303, [])
         +- LocalRelation <empty>
KnightChess commented 2 years ago

make spark.sql.parquet.mergeSchema to true I think can solve this error. but may be there has diff schema data file in this partition? I think we need to check it.

ehurheap commented 2 years ago

@KnightChess I added spark.sql.parquet.mergeSchema true to the spark properties file, then reconnected to the hudi-cli and re-tried the repair command again. The result was the same:

org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key' given input columns: []

I am pretty confident that there is no schema diff in the data files.

srehman420 commented 2 years ago

@KnightChess I added spark.sql.parquet.mergeSchema true to the spark properties file, then reconnected to the hudi-cli and re-tried the repair command again. The result was the same:

org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key' given input columns: []

I am pretty confident that there is no schema diff in the data files.

Can you verify if the duplicates are in the same file? hoodie_file column has the same value or different value

KnightChess commented 2 years ago

@ehurheap

22/07/22 19:18:58 ERROR SparkMain: Fail to execute commandString
org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key' given input columns: []; line 5 pos 15;
'UnresolvedHaving ('dupe_cnt > 1)
+- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0, count(1) AS dupe_cnt#1L]
   +- SubqueryAlias htbl_1658517533303
      +- View (htbl_1658517533303, [])
         +- LocalRelation <empty>

+- LocalRelation <empty>, it look like the path you input not have complete files, can you give some detail log in org.apache.hudi.cli.DedupeSparkJob the info log is: List of files under partition: xxx => yyyy

ehurheap commented 2 years ago

Confirming that the list of files is empty: List of files under partition: () =>

I did some debugging and this list seems to be empty because the timeline in each of the relevant HoodieFileGroups is empty:

From HoodieFileGroup.java:

  public Stream<FileSlice> getAllFileSlices() {
    if (!timeline.empty()) {
      return fileSlices.values().stream().filter(this::isFileSliceCommitted);
    }
    return Stream.empty();
  }

Is there some assumption or rule that repair deduplicatecan only be run for certain conditions? For example, before compaction has happened?

KnightChess commented 2 years ago

@ehurheap look like repair deduplicate target is base file. Is there any duplicate data when you use query or other op? For log file, I think it will combine the duplicate primary key when you read, so need not repair.

nsivabalan commented 2 years ago

these set of commands worked for me. I followed our quick start guide, just by setting write operation to "bulk_insert". I repeated the same insert Df twice to hudi table.

spark.sql("select partitionpath, count(uuid) from hudi_trips_snapshot group by 1 order by 1").show(false)
+------------------------------------+-----------+
|partitionpath                       |count(uuid)|
+------------------------------------+-----------+
|americas/brazil/sao_paulo           |6          |
|americas/united_states/san_francisco|10         |
|asia/india/chennai                  |4          |
+------------------------------------+-----------+

spark.sql("select partitionpath, uuid, fare from hudi_trips_snapshot order by 1,2").show(false)
+------------------------------------+------------------------------------+------------------+
|partitionpath                       |uuid                                |fare              |
+------------------------------------+------------------------------------+------------------+
|americas/brazil/sao_paulo           |69f49197-cc8e-4d91-b745-dbe6e83a4b5a|66.62084366450246 |
|americas/brazil/sao_paulo           |69f49197-cc8e-4d91-b745-dbe6e83a4b5a|66.62084366450246 |
|americas/brazil/sao_paulo           |be747659-1a1d-4445-b5ae-ccfe5104e69e|43.4923811219014  |
|americas/brazil/sao_paulo           |be747659-1a1d-4445-b5ae-ccfe5104e69e|43.4923811219014  |
|americas/brazil/sao_paulo           |ebe7576f-18c6-4cb8-b119-94ac7bb518c2|34.158284716382845|
|americas/brazil/sao_paulo           |ebe7576f-18c6-4cb8-b119-94ac7bb518c2|34.158284716382845|
|americas/united_states/san_francisco|07ce908b-1f5e-405f-98fb-37796264d7c3|19.179139106643607|
|americas/united_states/san_francisco|07ce908b-1f5e-405f-98fb-37796264d7c3|19.179139106643607|
|americas/united_states/san_francisco|09e8d4d3-f6ce-425a-8b8a-1858d9aea981|64.27696295884016 |
|americas/united_states/san_francisco|09e8d4d3-f6ce-425a-8b8a-1858d9aea981|64.27696295884016 |
|americas/united_states/san_francisco|15b605fb-4ce7-4f37-a1ec-79989ae9b90f|33.92216483948643 |
|americas/united_states/san_francisco|15b605fb-4ce7-4f37-a1ec-79989ae9b90f|33.92216483948643 |
|americas/united_states/san_francisco|ac572297-8aa7-4cfc-be1b-280ab8b6c783|27.79478688582596 |
|americas/united_states/san_francisco|ac572297-8aa7-4cfc-be1b-280ab8b6c783|27.79478688582596 |
|americas/united_states/san_francisco|de8d8395-f3aa-412a-bb49-78f4e2537677|93.56018115236618 |
|americas/united_states/san_francisco|de8d8395-f3aa-412a-bb49-78f4e2537677|93.56018115236618 |
|asia/india/chennai                  |047c2afe-3cb1-4976-badb-10042a33e9e9|41.06290929046368 |
|asia/india/chennai                  |047c2afe-3cb1-4976-badb-10042a33e9e9|41.06290929046368 |
|asia/india/chennai                  |f74ac5dc-a908-4669-b607-8a382ffbf103|17.851135255091155|
|asia/india/chennai                  |f74ac5dc-a908-4669-b607-8a382ffbf103|17.851135255091155|
+------------------------------------+------------------------------------+------------------+

de-dup via hudi-cli

Launch hudi-cli

connect --path /tmp/hudi_trips_cow/
set --conf SPARK_HOME=/Users/nsb/Documents/tools/spark-2.4.7-bin-hadoop2.7
repair deduplicate --duplicatedPartitionPath "americas/brazil/sao_paulo" --repairedOutputPath "/tmp/dedupedData/" --sparkMemory 1g --sparkMaster local[2] --dedupeType "upsert_type"

After this, I see some parquet files in "/tmp/dedupedData/". I tried reading them via spark-shell to check for duplicates.

val df = spark.read.format("parquet").load("/tmp/dedupedData/")
df.registerTempTable("tbl1")
spark.sql("select partitionpath, uuid, fare from tbl1 order by 1,2").show(false)
+-------------------------+------------------------------------+------------------+
|partitionpath            |uuid                                |fare              |
+-------------------------+------------------------------------+------------------+
|americas/brazil/sao_paulo|69f49197-cc8e-4d91-b745-dbe6e83a4b5a|66.62084366450246 |
|americas/brazil/sao_paulo|be747659-1a1d-4445-b5ae-ccfe5104e69e|43.4923811219014  |
|americas/brazil/sao_paulo|ebe7576f-18c6-4cb8-b119-94ac7bb518c2|34.158284716382845|
+-------------------------+------------------------------------+------------------+

Let me know if this helps.

ehurheap commented 2 years ago

@KnightChess , yes there is duplicate data when running a query on the data:

  val path="s3://<bucketpath>/tables/events"
  val events = spark.read.format("hudi").option("hoodie.datasource.query.type", "read_optimized").load(path)

  events.createOrReplaceTempView("events")
  val dupeQuery =
    """select env_id, event_id, user_id, count(*) from events
      | where env_id = 123 and week = '20220711'
      | group by env_id, event_id, user_id
      | having count(*) > 1
      |""".stripMargin

val res = spark.sql(dupeQuery)
res: org.apache.spark.sql.DataFrame = [env_id: bigint, event_id: bigint ... 2 more fields]

  scala> res.show
  +---------+----------------+----------------+--------+
  |   env_id|        event_id|         user_id|count(1)|
  +---------+----------------+----------------+--------+
  |      123|4401289435098557|3813718218593807|       2|
  |      123|7627782625576713|4299498150167280|       2|
  |      123|7972131523381176|4461192992664821|       2|
...
  +---------+----------------+----------------+--------+
  only showing top 20 rows
nsivabalan commented 2 years ago

sorry. I am bit confused. If I am not wrong, this is your issue. you had duplicates in your hudi table somehow. you tried to execute hudi-cli to dedup and ran into issues and posted w/ the stacktrace.

I gave you the commands I used and showed that it worked for me.

But I could not gauge your response to that. repair dedup command does not fix duplicates in the table. It dumps the deduped records to a separate location as parquet data. you may need to delete the matching entries from hudi and load the parquet data again. I understand its not easy. but thats the only option we have.

nsivabalan commented 2 years ago

let me know how we can help further.

ehurheap commented 2 years ago

sorry for the confusion @nsivabalan. I reviewed the commands that you specified to verify they were the same as what I tried. The main differences between what you did and our situation:

I can run a spark query similar to yours and verify there are duplicates in the given partition.

  val dupePath="s3://<bucket>/data/tables/events/env_id=123456789/week=20220711"

  val ddF = spark.read.parquet(dupePath)
  ddF.createOrReplaceTempView("hoh")

  val dupQ = s"""
       |select _hoodie_partition_path, _hoodie_record_key, count(*)
       | from hoh group by _hoodie_partition_path, _hoodie_record_key
       | order by count(*) desc
       |""".stripMargin
  spark.sql(dupQ).show(false)

sample output:


  +------------------------------+-------------------------------------------------------------------+--------+
  |_hoodie_partition_path        |_hoodie_record_key                                                 |count(1)|
  +------------------------------+-------------------------------------------------------------------+--------+
  |env_id=123456789/week=20220711|env_id:123456789,user_id:7806358957060773,event_id:5758152328327473|3       |
  |env_id=123456789/week=20220711|env_id:123456789,user_id:5403332495352077,event_id:3536309858058402|2       |
  |env_id=123456789/week=20220711|env_id:123456789,user_id:4713648045477470,event_id:8717656941318904|2       |
  |env_id=123456789/week=20220711|env_id:123456789,user_id:2025910439767252,event_id:8549159234261693|2       |
  |env_id=123456789/week=20220711|env_id:123456789,user_id:7507696929673571,event_id:3179806702204642|2       |
  |env_id=123456789/week=20220711|env_id:123456789,user_id:7301684312119961,event_id:717438862368076 |2       |

But when I run this hudi-cli command:

hudi:events->repair deduplicate --duplicatedPartitionPath "env_id=123/week=20220711" --repairedOutputPath /tmp/hhdeduplicates --sparkMaster local[2] --sparkMemory 4G --dryrun true --dedupeType "upsert_type"

The output I get is:

22/08/12 16:27:21 ERROR SparkMain: Fail to execute commandString
org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key' given input columns: []; line 5 pos 15;
'UnresolvedHaving ('dupe_cnt > 1)
+- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0, count(1) AS dupe_cnt#1L]
   +- SubqueryAlias htbl_1660321638341
      +- View (`htbl_1660321638341`, [])
         +- LocalRelation <empty>
hudi:events->
hudi:eveat org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
hudi:eveat org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179)
hudi:eveat org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175)
hudi:eveat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:573)
hudi:eveat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
hudi:eveat org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:573)
hudi:eveat org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181)
hudi:eveat org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:193)
hudi:eveat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
hudi:eveat org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:193)
hudi:eveat org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:204)
hudi:eveat org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:209)
hudi:eveat scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
...

Deduplication failed!

There are no files in the --repairedOutputPath location. I understand that if there were data there, we would use that deduplicated data to replace what is currently in hudi: first deleting the duplicates from hudi, then load the deduped data from the repairedOutputPath location.

But since we have no repaired deduplicated data, we are stuck. Does that make sense? Have I missed something ?

nsivabalan commented 2 years ago

got it. isn't your partition path is --duplicatedPartitionPath "env_id= 123456789/week=20220711". I see you have tried "env_id=123/week=20220711"

123 -> 123456789.

bcoz, I do't see any other difference. only other diff is, I did not enable hive style partitioning in my script. I can give that a retry again.

nsivabalan commented 2 years ago

ok, I guess the issue is w/ MOR table. dedup as of now is supported only for COW table. let me file a jira.

nsivabalan commented 2 years ago

https://github.com/apache/hudi/issues/6194 you can follow the jira for progress. we will try to get it in next release. thanks for reporting. if you don't have any more questions, feel free to close out the github issue. we can use the jira to follow up.

ehurheap commented 2 years ago

Thanks @nsivabalan. (I must have flubbed the partition path in my copy/pasting - sorry about that.) I'll close this ticket and follow the jira.

ehurheap commented 2 years ago

Reopening: @nsivabalan what is the link for the jira? The link posted above is for this ticket.

nsivabalan commented 2 years ago

my bad. Its https://issues.apache.org/jira/browse/HUDI-4752