apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Spark reader fails to read hudi table created using 1.0.0-beta2 version #12068

Open dataproblems opened 3 days ago

dataproblems commented 3 days ago

Describe the problem you faced

I'm creating a hudi table using bulk insert operation and the reader of the table fails with IllegalStateException.

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. Read the data using spark.read.parquet("...")
  2. Write the table using data.write.format("hudi").options("...").save("...")
  3. Read the data again using spark.read.format("hudi").load("...")

Expected behavior

I should be able to read the data back into a dataframe with no exceptions.

Environment Description

Additional context

Here are the hudi options I'm using for the bulk insert:

val BulkWriteOptions: Map[String, String] = Map(
    DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, 
    HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy", 
    HoodieStorageConfig.PARQUET_MAX_FILE_SIZE
      .key() -> "2147483648", 
    "hoodie.parquet.small.file.limit" -> "1073741824",
    HoodieTableConfig.POPULATE_META_FIELDS.key() -> "false", 
    HoodieWriteConfig.BULK_INSERT_SORT_MODE
      .key() -> BulkInsertSortMode.GLOBAL_SORT
      .name(), 
    HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true", /
    HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX", 
    DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
    "hoodie.metadata.record.index.enable" -> "true", 
    "hoodie.metadata.enable" -> "true", 
    "hoodie.datasource.write.hive_style_partitioning" -> "true", 
    "hoodie.clustering.inline" -> "true", 
    "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "2147483648",
    "hoodie.clustering.plan.strategy.small.file.limit" -> "1073741824"
    // Also using precombine key, partition path, table name, and key gen class. 
  )

Here's the hoodie.properties from the table that was generated using 1.0.0-beta2

#Updated at 2024-10-05T03:50:57.280Z
#Sat Oct 05 03:50:57 UTC 2024
hoodie.table.timeline.timezone=LOCAL
hoodie.table.precombine.field=<some timestamp field>
hoodie.table.version=8
hoodie.database.name=
hoodie.table.initial.version=8
hoodie.datasource.write.hive_style_partitioning=true
hoodie.table.metadata.partitions.inflight=
hoodie.table.checksum=4125900621
hoodie.table.keygenerator.type=COMPLEX
hoodie.partition.metafile.use.base.format=false
hoodie.archivelog.folder=archived
hoodie.table.cdc.enabled=false
hoodie.table.name=<some table name>
hoodie.populate.meta.fields=false
hoodie.table.type=COPY_ON_WRITE
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.base.file.format=PARQUET
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.metadata.partitions=column_stats,files,record_index
hoodie.timeline.layout.version=1
hoodie.table.recordkey.fields=<some number of fields from the row>
hoodie.table.partition.fields=<a single field>

Stacktrace

24/10/07 19:40:46 INFO S3NativeFileSystem: Opening 's3://some-bucket-path/.hoodie/hoodie.properties' for reading
Exception in thread "main" java.lang.IllegalStateException
    at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:62)
    at org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField$lzycompute(HoodieHadoopFsRelationFactory.scala:146)
    at org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField(HoodieHadoopFsRelationFactory.scala:141)
    at org.apache.hudi.HoodieMergeOnReadSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:224)
    at org.apache.hudi.HoodieCopyOnWriteSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:300)
    at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:295)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:135)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:80)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:185)
    at <Some source code line>
    at <Some source code line>
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1006)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1095)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1104)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
danny0405 commented 3 days ago

There is a check when populateMetaFields is disabled. And I see you table config option is set up as hoodie.populate.meta.fields=false.

if (tableConfig.populateMetaFields()) {
      HoodieRecord.RECORD_KEY_METADATA_FIELD
    } else {
      val keyFields = tableConfig.getRecordKeyFields.get()
      checkState(keyFields.length == 1)
      keyFields.head
    }
dataproblems commented 3 days ago

@danny0405 - are you saying that I need to set hoodie.populate.meta.fields=true during the bulk insert operation?

danny0405 commented 3 days ago

@danny0405 - are you saying that I need to set hoodie.populate.meta.fields=true during the bulk insert operation?

yes, if your primary key fields are multiple.

dataproblems commented 2 days ago

@danny0405 - When I enabled that for the table that had multiple fields in the record key, I notice that the bulk insert operation is taking an unreasonably long time. Something that took 10 ish minutes before this change ran for over 2 hours and failed with the following exception:

java.io.EOFException: Unexpected EOF while trying to read response from server
    at org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:538) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
    at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
    at org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1137) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
24/10/08 19:19:15 WARN DataStreamer: DataStreamer Exception
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:1.8.0_422]
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[?:1.8.0_422]
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[?:1.8.0_422]
    at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[?:1.8.0_422]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470) ~[?:1.8.0_422]
    at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:62) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
    at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:141) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
    at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:158) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
    at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:116) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_422]
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_422]
    at java.io.DataOutputStream.flush(DataOutputStream.java:123) ~[?:1.8.0_422]
    at org.apache.hadoop.hdfs.DataStreamer.sendPacket(DataStreamer.java:858) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
    at org.apache.hadoop.hdfs.DataStreamer.sendHeartbeat(DataStreamer.java:876) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
    at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:675) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
24/10/08 19:23:19 ERROR TransportRequestHandler: Error sending result RpcResponse[requestId=8906630334403057573,body=NioManagedBuffer[buf=java.nio.HeapByteBuffer[pos=0 lim=81 cap=156]]] to /10.0.26.16:52206; closing connection
java.io.IOException: Broken pipe

was this expected? I'm not operating with a lot of data for this test, but if the bulk insert operation takes exponentially higher time with this config, it would not be something that we can use.

danny0405 commented 2 days ago

It looks like a Hadoop error, is there any cues related to Hudi specifically? The non-metadata field write should be faster but should not be that long, 2 ~ 3x performance gap is expected there.

dataproblems commented 2 days ago

No - given that it executes for over 2 hours, I would assume that it is stemming from something within hudi. I see this ERROR AppendDataExec: Data source write support org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite@16d3b8c5 aborted.

rangareddy commented 1 day ago

Hi @dataproblems

Could you please share your spark-submit/pyspark command here. I can see you have mentioned Spark version is 3.3 and in the above error mentioned it is pointed spark3.

rangareddy commented 1 day ago

I have tested the following sample code and worked without any issues.

Cluster Details:

spark-shell \
    --jars packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.0.0-beta2.jar \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
    --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
    --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
    --conf spark.ui.port=14040
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import spark.implicits._

val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
  Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
    (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
    (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
    (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));

var inserts = spark.createDataFrame(data).toDF(columns:_*)

val tableName = "trips_table"
val basePath = "file:///tmp/trips_table"

val bulkWriteOptions: Map[String, String] = Map(
    DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, 
    HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy", 
    HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648", 
    "hoodie.parquet.small.file.limit" -> "1073741824",
    HoodieTableConfig.POPULATE_META_FIELDS.key() -> "false", 
    HoodieWriteConfig.BULK_INSERT_SORT_MODE.key() -> BulkInsertSortMode.GLOBAL_SORT.name(), 
    HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
    HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX", 
    DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
    "hoodie.metadata.record.index.enable" -> "true", 
    "hoodie.metadata.enable" -> "true", 
    "hoodie.datasource.write.hive_style_partitioning" -> "true", 
    "hoodie.clustering.inline" -> "true", 
    "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "2147483648",
    "hoodie.clustering.plan.strategy.small.file.limit" -> "1073741824",
    "hoodie.datasource.write.partitionpath.field" -> "city",
    "hoodie.datasource.write.recordkey.field" -> "uuid",
    "hoodie.datasource.write.precombine.field" -> "ts",
    "hoodie.table.name" -> tableName
)

inserts.write.format("hudi").
  options(bulkWriteOptions).
  mode(Overwrite).
  save(basePath)

val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.show(false)
dataproblems commented 20 hours ago

Hi @rangareddy. The packages I use are --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:1.0.0-beta1,org.apache.hudi:hudi-aws:1.0.0-beta1. Another thing to note is that my recordKey is complex. I'm not sure if that impacts the change.

Are you suggesting that 1.0.0-beta2 should only be used with spark 3.5? ( For managing other dependencies, we're using EMR 6.x which does not have spark 3.5 and supports up till spark 3.4).

I also tried to use the 3.4 spark bundle and got the same exact error while trying to read the table. So it's reproducible on my end.