awslabs / spark-sql-kinesis-connector

Spark Structured Streaming Kinesis Data Streams connector supports both GetRecords and SubscribeToShard (Enhanced Fan-Out, EFO)
Apache License 2.0
24 stars 11 forks source link

Streaming job fails with error when restarting job after cancelling previous instance #12

Closed ganga4reddy closed 4 months ago

ganga4reddy commented 7 months ago

Hi, We are starting the kinesis stream read to read from kinesis and write to s3 in EMR instance, Intermittently job fails with the mentioned exception when we start a new instance of job after canceling previous any idea how to resolve the issue

ERROR MicroBatchExecution: Query [id =****, runId = ****] terminated with error java.lang.IllegalStateException: Unable to fetch committed metadata from previous batch id 10. Some data may have been missed at org.apache.spark.sql.connector.kinesis.KinesisV2MicrobatchStream.latestOffset(KinesisV2MicrobatchStream.scala:231) ~[RawAggregation.jar:?] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489) ~[spark-sql_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) ~[spark-sql_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at


val kinesis = spark
  .readStream
  .format("aws-kinesis")
  .option("kinesis.region", region)
  .option("kinesis.streamName", streamArn)
  .option("kinesis.consumerType", "GetRecords")
  .option("kinesis.endpointUrl", shttps://kinesis.$region.amazonaws.com/)
  .option("kinesis.startingposition", "LATEST")
  .option("kinesis.metadataCommitterType", "DYNAMODB")
  .option("kinesis.dynamodb.tableName", "tbleName")
  .load()

kinesis.writeStream.foreachBatch (…)
.outputMode("append")
  .start()

regards GR

hwanghw commented 6 months ago

Thanks for reporting the issue. From the error, shard info for the latest committed batch is missing. Do you see the same error if kinesis.metadataCommitterType set to HDFS?

Can you share the data in application log, checkpoint path as well as data in dynamodb (or hdfs) for troubleshooting

mrugeles commented 6 months ago

I got the same issue:

kinesisStream: <class 'pyspark.sql.dataframe.DataFrame'> Traceback (most recent call last): File "/tmp/spark-6b195558-12b0-4563-b261-8731ff4ef459/play_events_sql.py", line 285, in play_events_consumer.connect() File "/tmp/spark-6b195558-12b0-4563-b261-8731ff4ef459/play_events_sql.py", line 271, in connect self.process() File "/tmp/spark-6b195558-12b0-4563-b261-8731ff4ef459/play_events_sql.py", line 61, in process self.spark.streams.awaitAnyTermination() File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line 596, in awaitAnyTermination File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in call File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 185, in deco pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = e079b09a-9190-49b9-8f3e-784fb8bf6ca1, runId = f5654ea9-9904-44d0-b214-5fc1813fe65d] terminated with exception: Unable to fetch committed metadata from previous batch id 0. Some data may have been missed

The code Im using:

df.writeStream .format("parquet") .partitionBy("year", "month", "day", "hour") .option("checkpointLocation", "s3://some_bucket/folder") .option("path", "s3://another_bucket") .start()

hwanghw commented 6 months ago

A work around is to remove the last commit at /commits/. It will restart from the last batch (To avoid data duplication, sink need to be idempotent )

I'll try to reproduce the issue.

ganga4reddy commented 6 months ago

@hwanghw if this helps, here is the scenario where I faced the issue

checkpoint locaiton : dynamo table

  1. create emr server less app in aws and run the job instance which reads from kinesis
  2. stop current running job instance and start new
  3. publish new set of messages to kinesis, new istance of job reads the message from kinesis after which it fails with reported error

as a workaround, I switched to s3 as checkpoint locaition and started batchDF.persist() at beginning of batch read and unpersist at end of loop, I am not facing the issue any more with s3

hwanghw commented 6 months ago

Thanks for the details. If there are more than one action running for the same KDS stream e.g. df.count and df.write. The two actions can trigger 2 jobs which may update the metadata at the same time. This is described in README(https://github.com/awslabs/spark-sql-kinesis-connector/blob/main/README.md#avoid-race-conditions).