apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.46k stars 2.43k forks source link

[SUPPORT] Deltastreamer errors ingesting kafka-streams topics (transactional / exactly_once producers) #8258

Open danielfordfc opened 1 year ago

danielfordfc commented 1 year ago

Describe the problem you faced

Deltastreamer, when being run on a transactional topic (one being produced to by a transactional producer, like kafka-streams) is unable to be read.

Caused by: java.lang.IllegalArgumentException: requirement failed:
 Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 after polling for 1000

 OR when using AllowNonConsecutiveOffsets=false..

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4) (192.168.1.240 executor driver): java.lang.IllegalArgumentException: requirement failed: Failed to get records for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 5 after polling for 1000
        at scala.Predef$.require(Predef.scala:281)
        at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:143)

Full stack traces will be linked below.

Our configuration is as follows:

Note: this works fine on non-transactional topics, and compacted topics (cleanup.policy=compact) when setting AllowNonConsecutiveOffsets=true Note: This isn't a networking issue or timeout issue which we'll discuss below.

hoodie.datasource.write.recordkey.field=viewtime
hoodie.datasource.write.partitionpath.field=pageid
hoodie.deltastreamer.source.kafka.topic=${topic}
hoodie.deltastreamer.schemaprovider.registry.url=[http://localhost:8081/subjects/${topic}-value/versions/latest](http://localhost:8081/subjects/$%7Btopic%7D-value/versions/latest)
schema.registry.url=http://localhost:8081/
# Kafka Consumer props
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
# Consumer Group
group.id=hudi-deltastreamer-${topic}
#isolation.level=read_committed <-- tried adjusting this with no effect
#enable.auto.commit=false <-- tried adjusting this with no effect

# spark.properties
spark.streaming.kafka.allowNonConsecutiveOffsets=true <--  so we use this by default as some of our topics are compacted
spark.streaming.kafka.consumer.poll.ms=1000 <-- To make it fail faster, from default of 120,000
spark.executor.cores=1

spark-submit \
  --master local[1] \
  --num-executors=1 \
  --executor-cores=1 \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
  --conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \
  --conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \
  --properties-file ~/Sandbox/spark-sandbox/src/main/resources/spark.properties \
 ~/Workspace/github.com/apache/hudi/target/hudi-utilities-bundle_2.12-0.12.1.jar \
 --op INSERT \
 --props /tmp/hoodie-conf-${topic}.properties \
 --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
 --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
 --source-ordering-field viewtime  \
 --table-type COPY_ON_WRITE \
 --target-base-path file://${target_base_path} \
 --target-table $target_table

We've identified that this is because the get() / compactedNext() method used by the InternalKafkaDataConsumer is failing to poll records/batches for these transactional topics..

If we create a simple non-compacted topic that we'll be writing to non-transactionally, and one transactionally:

kafka-topics --bootstrap-server localhost:9092 --create --topic my-transactional-topic --partitions 5 
kafka-topics --bootstrap-server localhost:9092 --create --pageviews --partitions 5 
# Following messages can be consumed when produced **without** transactional producer, but not with..
[{:viewtime 100 :userid "User_0" :pageid "Page_0"}
               {:viewtime 101 :userid "User_1" :pageid "Page_1"}
               {:viewtime 102 :userid "User_2" :pageid "Page_2"}
               ...etc...
               {:viewtime 115 :userid "User_15" :pageid "Page_15"}]

Produced 16 message non-transactionally -- as you can see the end/"next available" offset is the one after the last offset containing data in each partition Screenshot 2023-03-21 at 12 36 09

Produced 16 message transactionally -- as you can see the end/"next available" offset is 2 more than the last offset containing data in each partition, because the end of that batch of write placed a commit marker/offset message in each partition Screenshot 2023-03-21 at 12 36 37

And we see the stack traces mentioned at the bottom:
hoodie-allow-consecutive-off-false.log hoodie-allow-consecutive-off-true.log

Notably

Extra Information gathered from running this locally

Dive into our local example showing how we get the poll of [topic-partition] 5, followed by a poll of [] 0, followed by the crash when AllowNonConsecutiveOffsets=true

Interestingly, in the below, when setting AllowNonConsecutiveOffsets=False, we see that the initial poll for the partition 0 (which from the above screenshot, showed offset 0->4 being valid messages, offset 5 being the commit marker, has it poll those first 5 messages, then fail on the next poll.

23/03/21 12:48:57 INFO org.apache.spark.streaming.kafka010.KafkaRDD: Computing topic my-transactional-topic, partition 0 offsets 0 -> 6
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.KafkaDataConsumer: Not used cached consumer found, re-using it InternalKafkaConsumer(hash=511066e5, groupId=spark-executor-hudi-deltastreamer-my-transactional-topic, topicPartition=my-transactional-topic-0)
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 1 requested 0
23/03/21 12:48:57 INFO org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Initial fetch for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 0
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Seeking to my-transactional-topic-0 0
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled [my-transactional-topic-0]  5
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 1 requested 1
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 2 requested 2
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 3 requested 3
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 4 requested 4
23/03/21 12:48:57 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Get spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 nextOffset 5 requested 5
23/03/21 12:48:58 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled []  0
23/03/21 12:48:59 WARN org.apache.spark.storage.BlockManager: Putting block rdd_2_0 failed due to exception java.lang.IllegalArgumentException: requirement failed: Failed to get records for spark-executor-hudi-deltastreamer-my-transactional-topic my-transactional-topic-0 5 after polling for 1000.

If we create another topic with one partition and write a single batch of 16 records transactionally (so 0->15 is data, 16 is commit marker, end of topic is 17), we see similar behaviour.

my-transactional-topic-single-partition.log

If we remove the possibility that it might be crashing because the endOffset is the "invisible" marker that it can't read, by adding another 16 records (putting 17->32 as data, 33 as the marker and 34 as endOffset), we see a similar issue with the following:

requirement failed: Got wrong record for spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 even after seeking to offset 16 got offset 17 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets

my-transactional-topic-single-partition-32-msgs.log

Changing to AllowNonConsecutiveOffsets=true on the above topic yields the following:

23/03/21 13:24:10 INFO org.apache.spark.streaming.kafka010.KafkaRDD: Computing topic my-transactional-topic-single-partition, partition 0 offsets 0 -> 34
23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.KafkaDataConsumer: Not used cached consumer found, re-using it InternalKafkaConsumer(hash=9903e40, groupId=spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition, topicPartition=my-transactional-topic-single-partition-0)
23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: compacted start spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 starting 0
23/03/21 13:24:10 INFO org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Initial fetch for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 0
23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Seeking to my-transactional-topic-single-partition-0 0
23/03/21 13:24:10 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled [my-transactional-topic-single-partition-0]  32
23/03/21 13:24:10 INFO org.apache.spark.storage.BlockManager: Removing RDD 6
23/03/21 13:24:11 DEBUG org.apache.spark.streaming.kafka010.InternalKafkaConsumer: Polled []  0
23/03/21 13:24:11 WARN org.apache.spark.storage.BlockManager: Putting block rdd_2_0 failed due to exception java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 after polling for 1000.
23/03/21 13:24:11 WARN org.apache.spark.storage.BlockManager: Block rdd_2_0 could not be removed as it was not found on disk or in memory
23/03/21 13:24:11 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-hudi-deltastreamer-my-transactional-topic-single-partition my-transactional-topic-single-partition-0 after polling for 1000

Stack trace for the above: my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log

Answers Required

So we know what the problem is, we are just unsure on how to fix. We've taken this to the hudi office hours before and the host suggested to ask @yihua for advice.

Usual Environment in Production, but all of this has been reproduced locally

Hudi version : Deltastreamer on EMR 6.8.0 running Hudi 0.11.1-amzn-0 Spark version : 3.3.0 Hive version : 3.1.3 Hadoop version : Amazon 3.2.1 Storage (HDFS/S3/GCS..) : S3 Running on Docker? (yes/no) : No

Additional context

hudi 0.12.1 used for local testing Can add more details if required.

Stacktrace Stacktraces have been littered throughout but pasted here again:

my-transactional-topic-single-partition-32-msgs.log my-transactional-topic-single-partition-allownonconsecutiveoffsetsTrue.log hoodie-allow-consecutive-off-false.log hoodie-allow-consecutive-off-true.log my-transactional-topic-single-partition.log

WarFox commented 1 year ago

More observations on this

As seen in the logs attached above, we suspect that the underlying Spark KafkaRDD library is confused about the transaction marker when it is the last record in the topic

ad1happy2go commented 1 year ago

I am able to reproduce your issue with HoodieDeltaStreamer. We are looking into this issue.

meanwhile you can use spark structured streaming to read the data from transactional topic and write to Hudi. I am not seeing any error while using the structured streaming. Below is the code you can refer. code.txt

WarFox commented 1 year ago

Thanks @ad1happy2go

We are also leaning towards using either Spark Structured streaming or Batch processing as a workaround. We have some working examples for both cases now, thanks again for sharing your example code too

Could you confirm if we need to have a separate process for compaction when using Spark Structured streaming for writing to Hudi?

ad1happy2go commented 1 year ago

@WarFox We can either use async compaction enable or offline compaction job. With streaming, we normally don't use inline compaction job as that will increase the latency.

ad1happy2go commented 1 year ago

@danielfordfc JIRA created for the tracking the fix - https://issues.apache.org/jira/browse/HUDI-6297

danielfordfc commented 7 months ago

@ad1happy2go @nsivabalan is this absolutely not going to happen anytime soon? It's preventing us from directly ingesting a large majority of our Kafka topics in our organisation and i'm very surprised it's not a more widely experienced issue, given its a common feature of topics produced through kafka-streams applications

ad1happy2go commented 7 months ago

Currently I didn't had any viable solution to fix this. Not sure if anybody is looking into it currently. So it is not going to happen anytime soon. @bvaradar Do you have any insights how can we handle transactional topics? PR - https://github.com/apache/hudi/pull/9059