apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.33k stars 2.42k forks source link

[SUPPORT] Spark readStream fails with [COLUMN_ALREADY_EXISTS] for streaming tables created with "hoodie.schema.on.read.enable" & "hoodie.datasource.write.reconcile.schema" enabled #10533

Open imonteroq opened 8 months ago

imonteroq commented 8 months ago

Describe the problem you faced Streaming in Spark from a Hudi table fails with the error below when a writeStream process has created / written to the table with the schema evolution settings hoodie.schema.on.read.enable & hoodie.datasource.write.reconcile.schema on. I have not been able to upsert a source schema containing either more columns and/or fewer columns than the target schema without this two settings enabled.

org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column _hoodie_commit_seqno already exists. Consider to choose another name or rename the existing column.

To Reproduce

val sparkConf: SparkConf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("testAppName")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
  .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
  .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
val spark = SparkSession.builder.config(sparkConf).getOrCreate()

import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext

val hudiOptions: Map[String, String] = Map(
  "hoodie.table.name" -> "test_table",
  "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
  "hoodie.datasource.write.operation" -> "upsert",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.datasource.write.recordkey.field" -> "identifier",
  "hoodie.datasource.write.precombine.field" -> "date",
  "hoodie.datasource.insert.dup.policy" -> "none",
  "hoodie.avro.schema.externalTransformation" -> "true",
  "hoodie.schema.on.read.enable" -> "true",
  "hoodie.datasource.write.reconcile.schema" -> "true")

val inMemoryRecords: List[Contract] =
  List(Contract("001", 1, "test1", 100), Contract("002", 2, "test2", 100), Contract("003", 3, "test3", 100))

val contractsInMemory: MemoryStream[Contract] = MemoryStream[Contract]
contractsInMemory.addData(inMemoryRecords)

contractsInMemory
  .toDF()
  .writeStream
  .format("hudi")
  .trigger(Trigger.AvailableNow())
  .queryName("streamingQueryName")
  .option("checkpointLocation", "/tmp/checkpoint")
  .options(hudiOptions)
  .outputMode(OutputMode.Append())
  .start("/tmp/data")
  .processAllAvailable()

spark.readStream
  .format("hudi")
  .load("/tmp/data")
  .writeStream
  .format("memory")
  .queryName("queryName")
  .outputMode("append")
  .start()
  .processAllAvailable()

Environment Description

Additional context

This works fine with either hoodie.schema.on.read.enable or hoodie.datasource.write.reconcile.schema disabled.

Stacktrace

org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column.
    at org.apache.spark.sql.errors.QueryCompilationErrors$.columnAlreadyExistsError(QueryCompilationErrors.scala:2300)
    at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:113)
    at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:55)
    at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:71)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:427)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:275)
    at org.apache.spark.sql.hudi.streaming.HoodieStreamSource.getBatch(HoodieStreamSource.scala:171)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:586)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
    at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:582)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:582)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
[stream execution thread for test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Async log purge executor pool for query test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066] has been shutdown
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.SparkContext - SparkContext is stopping with exitCode 0.
[dispatcher-event-loop-4] INFO org.apache.spark.MapOutputTrackerMasterEndpoint - MapOutputTrackerMasterEndpoint stopped!
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.storage.memory.MemoryStore - MemoryStore cleared
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.storage.BlockManager - BlockManager stopped
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster stopped
[dispatcher-event-loop-2] INFO org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint - OutputCommitCoordinator stopped!
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.SparkContext - Successfully stopped SparkContext

[STREAM_FAILED] Query [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066] terminated with exception: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column.
org.apache.spark.sql.streaming.StreamingQueryException: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column.
=== Streaming Query ===
Identifier: test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066]
Current Committed Offsets: {}
Current Available Offsets: {org.apache.spark.sql.hudi.streaming.HoodieStreamSource@1ac520b5: {"commitTime":"20240118170827012"}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource MemorySink, 443bb452-13e7-4d43-916d-bac5fd5d1f2c, [queryName=test_hudi], Append
+- StreamingExecutionRelation org.apache.spark.sql.hudi.streaming.HoodieStreamSource@1ac520b5, [_hoodie_commit_time#142, _hoodie_commit_seqno#143, _hoodie_record_key#144, _hoodie_partition_path#145, _hoodie_file_name#146, identifier#147, name#148, quantity#149, status#150, agent#151, metadata#152, contacts#153, date#154]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column.
    at org.apache.spark.sql.errors.QueryCompilationErrors$.columnAlreadyExistsError(QueryCompilationErrors.scala:2300)
    at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:113)
    at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:55)
    at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:71)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:427)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:275)
    at org.apache.spark.sql.hudi.streaming.HoodieStreamSource.getBatch(HoodieStreamSource.scala:171)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:586)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
    at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:582)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:582)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
    ... 1 more
ad1happy2go commented 8 months ago

@imonteroq You are correct. I was able to reproduce this. Here is full reproducible example - https://gist.github.com/ad1happy2go/db5813b8bd8d5f7142c2f9b0b2f29922

Created Tracking JIRA to fix the same - https://issues.apache.org/jira/browse/HUDI-7319

imonteroq commented 7 months ago

Thank you @ad1happy2go. One question while we are at it, will Hudi ever make use of Spark's checkpointing to carry out managed incremental ingestion? So far it seems from the documentation that a commit timestamp should be provided.

ad1happy2go commented 7 months ago

Sorry can you elaborate on that.

Yes, spark streaming should use the checkpointing in order to resume the stream. Please clarify in case i am missing anything here.

imonteroq commented 7 months ago

Sorry can you elaborate on that.

Yes, spark streaming should use the checkpointing in order to resume the stream. Please clarify in case i am missing anything here.

Apologies, my bad, I omitted part of my question. I meant incremental ingestion from another Hudi table using Spark's Structure Streaming. I've tested this functionality and it will always load the full table unless I pass in a commit timestamp. I can raise another issue to make my point better.

ad1happy2go commented 7 months ago

@imonteroq Ideally it should use the checkpoint details although not very sure, one reason may be cleaner configuration. Are you setting below config.

https://hudi.apache.org/docs/configurations/#hoodiedatasourcereadincrfallbackfulltablescanenable

ad1happy2go commented 6 months ago

@imonteroq Any updates on this.