qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

java.lang.IllegalStateException: /checkpoint/round/sources/0/shard-commit/1 does not exist #57

Open HariprasadAllaka1612 opened 5 years ago

HariprasadAllaka1612 commented 5 years ago

Hey,

I am using Kinesis-sql repo 2.4.0 branch and when i am trying to write to checkpoint to S3 i am facing this issue.

Initially i thought this might be an issue with eventual consistency of S3 but seems like not. This is happening in my local as well.

Seems like there is an issue with folder creation in shard-commit,

Code snapshot:

val roundStreamDf = sc.readStream .format("kinesis") .option("streamName",streamName) .option("endpointUrl",s"https://kinesis.${region}.amazonaws.com") .option("awsAccessKeyId", acceskeyId) .option("awsSecretKey", secretAccessKey) .option("startingposition",START_POS) .option("kinesis.client.describeShardInterval","3600") .load()

val roundStreamData1 = roundStreamDf
  .selectExpr("cast (data as STRING) jsonData")
  .select(from_json(col("jsonData"), ScalaReflection.schemaFor[Round].dataType.asInstanceOf[StructType]).as("round"))
  .select("round.*")

val query = roundStreamData1 .writeStream .foreachBatch{(batchDf: DataFrame, _ ) => val RoundDf = commonRoundDataProcess(batchDf) RoundDf.persist() while(opNameIterator.hasNext){ val opName = opNameIterator.next() val finalRoundDf = RoundDf.filter(col("OperatorShortName") === opName) accessDataS3.writeDataToRefinedHudiS3(sc,finalRoundDf,extraOptions,opName,ROUND_TYPE_OF_DATA) } RoundDf.unpersist() } .outputMode("update") .trigger(Trigger.ProcessingTime("5 seconds")) .option("checkpointLocation",s"${checkPointBucket}/checkpoint/${ROUND_TYPE_OF_DATA}/") .start()

query.awaitTermination() Log: 2019-10-30 15:06:44 ERROR MicroBatchExecution:91 - Query [id = 56565a9a-c8bc-4818-97ab-0cfe6b345b79, runId = e9623150-3306-4cc0-8b8e-d85edc811cff] terminated with error java.lang.IllegalStateException: Gave up after 3 retries while fetching MetaData, last exception: at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:240) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150) at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:272) at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Caused by: java.lang.IllegalStateException: s3a://gat-datastreaming-resources-dev/checkpoint/round/sources/0/shard-commit/1 does not exist at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) ... 30 more org.apache.spark.sql.streaming.StreamingQueryException: Gave up after 3 retries while fetching MetaData, last exception: === Streaming Query === Identifier: [id = 56565a9a-c8bc-4818-97ab-0cfe6b345b79, runId = e9623150-3306-4cc0-8b8e-d85edc811cff] Current Committed Offsets: {KinesisSource[gat-rounds-stream]: {"metadata":{"streamName":"gat-rounds-stream","batchId":"1"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49600867143952561714236237710058640910088621183742246914"}}} Current Available Offsets: {KinesisSource[gat-rounds-stream]: {"metadata":{"streamName":"gat-rounds-stream","batchId":"1"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49600867143952561714236237710058640910088621183742246914"}}}

Current State: ACTIVE Thread State: RUNNABLE

Logical Plan: Project [round#17.Type AS Type#19, round#17.PId AS PId#20L, round#17.RId AS RId#21, round#17.UId AS UId#22, round#17.GId AS GId#23, round#17.GS AS GS#24, round#17.STS AS STS#25, round#17.FGExtId AS FGExtId#26, round#17.Dev AS Dev#27, round#17.CTS AS CTS#28, round#17.PCcy AS PCcy#29, round#17.PSysXR AS PSysXR#30, round#17.BetTotal AS BetTotal#31, round#17.BetBonus AS BetBonus#32, round#17.WinTotal AS WinTotal#33, round#17.WinBonus AS WinBonus#34, round#17.GGR AS GGR#35, round#17.JpContr AS JpContr#36, round#17.JpPO AS JpPO#37, round#17.JpSeed AS JpSeed#38, round#17.OpName AS OpName#39] +- Project [jsontostructs(StructField(Type,StringType,true), StructField(PId,LongType,true), StructField(RId,StringType,true), StructField(UId,StringType,true), StructField(GId,StringType,true), StructField(GS,StringType,true), StructField(STS,StringType,true), StructField(FGExtId,StringType,true), StructField(Dev,StringType,true), StructField(CTS,StringType,true), StructField(PCcy,StringType,true), StructField(PSysXR,DoubleType,true), StructField(BetTotal,DoubleType,true), StructField(BetBonus,DoubleType,true), StructField(WinTotal,DoubleType,true), StructField(WinBonus,DoubleType,true), StructField(GGR,DoubleType,true), StructField(JpContr,DoubleType,true), StructField(JpPO,DoubleType,true), StructField(JpSeed,DoubleType,true), StructField(OpName,StringType,true), jsonData#15, Some(Europe/Berlin)) AS round#17] +- Project [cast(data#5 as string) AS jsonData#15] +- StreamingExecutionRelation KinesisSource[gat-rounds-stream], [data#5, streamName#6, partitionKey#7, sequenceNumber#8, approximateArrivalTimestamp#9]

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

Caused by: java.lang.IllegalStateException: Gave up after 3 retries while fetching MetaData, last exception: at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:240) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150) at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:272) at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) ... 1 more Caused by: java.lang.IllegalStateException: s3a://gat-datastreaming-resources-dev/checkpoint/round/sources/0/shard-commit/1 does not exist at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) ... 30 more

Process finished with exit code 0

itsvikramagr commented 5 years ago

@HariprasadAllaka1612 - You need to check the executor logs to see why the checkpoint location is not created.

HariprasadAllaka1612 commented 5 years ago

@HariprasadAllaka1612 - You need to check the executor logs to see why the checkpoint location is not created.

The checkpoint location is getting created. When using foreachbatch,

  1. Batch 0 is executing successfully and writing the data to offsets, shard-commit and commits folders.
  2. When executing batch 1 its writing to offsets and commits but not writing the data to shard-commits
  3. When it goes to batch 2 to when getting previousbatch info its failing as shard-commit is not created for batch 1
itsvikramagr commented 4 years ago

Sure. Let me reproduce the issue and provide a fix.

HariprasadAllaka1612 commented 4 years ago

Hi @itsvikramagr Could you please let us know if there is any update on this issue? Sorry to bother, just wanted to know when we can have our code changed based on this fix :)

itsvikramagr commented 4 years ago

Hey @HariprasadAllaka1612 - I could not spend any cycle on it. Let me do it tomorrow.

itsvikramagr commented 4 years ago

@HariprasadAllaka1612 - I can reproduce the issue in certain scenarios. I still haven't got to the bottom of it. Will start a PR as soon as we have a solution for it.

HariprasadAllaka1612 commented 4 years ago

@HariprasadAllaka1612 - I can reproduce the issue in certain scenarios. I still haven't got to the bottom of it. Will start a PR as soon as we have a solution for it.

@itsvikramagr Thanks alot for looking into it. Please let me know if you need any help from my side

3mlabs commented 4 years ago

Hi, we are also facing a similar issue while consuming kinesis stream through spark structured streaming. Please find below error - 20/02/11 14:24:34 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 1] java.lang.IllegalStateException: file:/tmp/tmp23/checkpoint/customer/sources/0/shard-commit/0 does not exist at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150) at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:272) at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

it tries for 3 times and failed. I think this is same issue except one difference that our script is failing for 0th batch itself and not going for 1 st batch.

Please let us know if we ave any workaround for this.

HariprasadAllaka1612 commented 4 years ago

Hi, we are also facing a similar issue while consuming kinesis stream through spark structured streaming. Please find below error - 20/02/11 14:24:34 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 1] java.lang.IllegalStateException: file:/tmp/tmp23/checkpoint/customer/sources/0/shard-commit/0 does not exist at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150) at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:272) at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

it tries for 3 times and failed. I think this is same issue except one difference that our script is failing for 0th batch itself and not going for 1 st batch.

Please let us know if we ave any workaround for this.

Hey,

As of now, I am streaming the data in micro batches and not trying to do any foreach batch operations. I separated my process as stream and batch where i am batch processing based on the time its written to S3.

3mlabs commented 4 years ago

@HariprasadAllaka1612, I was referring to the comment you added issue for 'foreachBatch' Support (#58), we are trying to do the same thing and facing similar issue you posted. Do you have any resolution to use 'foreachBatch' function.

HariprasadAllaka1612 commented 4 years ago

@HariprasadAllaka1612, I was referring to the comment you added issue for 'foreachBatch' Support (#58), we are trying to do the same thing and facing similar issue you posted. Do you have any resolution to use 'foreachBatch' function.

unfortunately, no. @itsvikramagr is working on fixing the issue

itsvikramagr commented 4 years ago

@HariprasadAllaka1612 and @3mlabs - Apologies. I am a little behind on this one. This looks like an involved issue and we have to understand how forEachBatch is working and why is it impacting us. We might have to revisit the design of the connector.

itsvikramagr commented 4 years ago

assigning the issue to @abhishekd0907

abhishekd0907 commented 4 years ago

@HariprasadAllaka1612 and @3mlabs - Can you please give more details on the following points:

  1. Can you check if new tasks are created by foreachBatch for every batch? Since tasks write to checkpoint/round/sources/0/shard-commit, it is possible that directory is not created if there are no tasks in the previous batch. I could reproduce this by writing a .foreachBatch(batchDf: Dataframe, batchId: Long) { //no-op }

  2. Can you give more details about the operation inside your foreachBatch code. It is possible that foreachBatch is leading to a no-op or not triggering any tasks for some batches.

  3. Can you trigger an action for batchDf inside foreachBatch code, eg. batchDf.count to trigger tasks?

itsvikramagr commented 4 years ago

@HariprasadAllaka1612 and @3mlabs - We debugged the issue and could reproduce it only when forEachBatch was a no-op. i.e it wasn't generating any tasks. Can you please confirm this if this is true in your case.

We added a explicit action in the foreachBatch code such as batchDf.count. Can you try this hack and see if you can make you application run without any issues?

HariprasadAllaka1612 commented 4 years ago

Hi @itsvikramagr @abhishekd0907 I will try this and let you know

arnehuang commented 4 years ago

Hi, ran into this as well writing to s3. Just wondering if any updates?

itsvikramagr commented 4 years ago

@arnehuang - are you using forEachbatch for writing into S3? If not, can you raise another ticket with a way to reproduce and stack trace?

arnehuang commented 4 years ago

Sorry, after debugging I realized it has to do with s3 consistency and not this library. Using hdfs / s3guard fixed it for me.

sudeshnasengupta commented 4 years ago

Hi @itsvikramagr and @abhishekd0907 : we're facing the same call stack issue in our streaming job and are not using any "foreach" or "foreachBatch". Has there been any further progress on this issue, if not a fix? Is there any relationship between this error and the configuration option "kinesis.client.avoidEmptyBatches"?

Our streaming job does the following when it handles the StreamingQuery:

{
  org.apache.spark.sql.streaming.StreamingQuery query = eventsToWriteOut.writeStream()
          .format("parquet")
          .outputMode("append")
          .option("checkpointLocation", checkpointLocation)
          .option("path", path)
          .option("charset", "UTF-8")              
          .partitionBy("solution_id", "env", "event_date")
          .trigger(Trigger.ProcessingTime(triggerInterval))
          .start();

  if(query.lastProgress() != null) {
    System.out.println("PROGRESS " + query.lastProgress().prettyJson());
  }
  try {
    query.awaitTermination();
  } catch (StreamingQueryException e) {        
    LOGGER.error("Error terminating query", e);
  }        
}

FYI, I'm pasting below a snippet of our specific error's call stack:

20/08/14 17:51:53 INFO IngressJob: In runJob Parameters Config(SimpleConfigObject({"checkpointDirectory":,"describeShardInterval":"5m","duplicateWindow":"0 minutes","endpointUrl":,"fetchBufferSize":"25gb","maxFetchTime":"30000","maxRecordsPerFetch":"25000","maxRecordsPerRead":"10000","path":,"region":,"runInterval":-1,"streamName":,"triggerInterval":"60 seconds"})) 20/08/14 17:51:53 INFO IngressJob: Stream Name , Trigger interval 60 seconds, EndpointRUL 20/08/14 17:51:53 INFO IngressJob: Continuous mode false, run for (-60000 millisecs) 20/08/14 17:51:53 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead. 20/08/14 17:51:53 WARN SparkConf: The configuration key 'spark.yarn.driver.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.driver.memoryOverhead' instead. 20/08/14 17:51:53 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect. 20/08/14 17:51:53 INFO SharedState: loading hive config file: file:/mnt/yarn/usercache/hadoop/appcache/application_1597427394593_0001/container_1597427394593_0001_01_000001/hive-site.xml 20/08/14 17:51:53 INFO JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL. 20/08/14 17:51:53 INFO JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/json. 20/08/14 17:51:53 INFO JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/execution. 20/08/14 17:51:53 INFO JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/execution/json. 20/08/14 17:51:53 INFO JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /static/sql. 20/08/14 17:51:53 INFO AMRMClientImpl: Received new token for : ip-10-15-190-241.us-west-2.compute.internal:8041 20/08/14 17:51:54 INFO YarnAllocator: Launching container container_1597427394593_0001_01_000002 on host ip-10-15-190-241.us-west-2.compute.internal for executor with ID 1 20/08/14 17:51:54 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them. 20/08/14 17:51:54 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0 20/08/14 17:51:54 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint 20/08/14 17:51:55 INFO YarnAllocator: Launching container container_1597427394593_0001_01_000003 on host ip-10-15-190-241.us-west-2.compute.internal for executor with ID 2 20/08/14 17:51:55 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them. 20/08/14 17:51:55 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0 20/08/14 17:51:56 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. 20/08/14 17:51:56 INFO AMRMClientImpl: Received new token for : ip-10-15-190-228.us-west-2.compute.internal:8041 20/08/14 17:51:56 INFO YarnAllocator: Launching container container_1597427394593_0001_01_000004 on host ip-10-15-190-228.us-west-2.compute.internal for executor with ID 3 20/08/14 17:51:56 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them. 20/08/14 17:51:56 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0 20/08/14 17:51:58 INFO S3NativeFileSystem: Opening for reading 20/08/14 17:51:59 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.15.190.241:40008) with ID 1 20/08/14 17:51:59 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.15.190.241:40010) with ID 2 20/08/14 17:51:59 INFO ExecutorAllocationManager: New executor 1 has registered (new total is 1) 20/08/14 17:51:59 INFO ExecutorAllocationManager: New executor 2 has registered (new total is 2) 20/08/14 17:51:59 INFO MicroBatchExecution: Starting [id = 0240d731-82b2-47b9-8cde-c7fd5a15ded0, runId = 900fc90e-5747-44d5-a209-89cc7399e210]. Use to store the query checkpoint. 20/08/14 17:51:59 INFO MicroBatchExecution: Using Source [KinesisSource[]] from DataSourceV2 named 'kinesis' [org.apache.spark.sql.kinesis.KinesisSourceProvider@48719d58] 20/08/14 17:51:59 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-15-190-241.us-west-2.compute.internal:32799 with 5.8 GB RAM, BlockManagerId(2, ip-10-15-190-241.us-west-2.compute.internal, 32799, None) 20/08/14 17:51:59 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-15-190-241.us-west-2.compute.internal:32867 with 5.8 GB RAM, BlockManagerId(1, ip-10-15-190-241.us-west-2.compute.internal, 32867, None) 20/08/14 17:51:59 INFO deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum 20/08/14 17:51:59 INFO S3NativeFileSystem: Opening 's3:///checkpoint/offsets/157' for reading 20/08/14 17:51:59 INFO S3NativeFileSystem: Opening 's3:///checkpoint/offsets/156' for reading 20/08/14 17:51:59 INFO S3NativeFileSystem: Opening 's3:///checkpoint/commits/157' for reading 20/08/14 17:51:59 INFO KinesisSource: End Offset is {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}} 20/08/14 17:51:59 INFO YarnAllocator: Launching container container_1597427394593_0001_01_000007 on host ip-10-15-190-228.us-west-2.compute.internal for executor with ID 4 20/08/14 17:51:59 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them. 20/08/14 17:51:59 INFO KinesisSource: Processing 0 shards from Stream() 20/08/14 17:51:59 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0 20/08/14 17:52:00 INFO KinesisSource: GetBatch generating RDD of offset range: 20/08/14 17:52:00 INFO MicroBatchExecution: Resuming at batch 158 with committed offsets {KinesisSource[]: {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}} and available offsets {KinesisSource[]: {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}} 20/08/14 17:52:00 INFO MicroBatchExecution: Stream started from {KinesisSource[]: {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}} 20/08/14 17:52:00 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 1] java.lang.IllegalStateException: s3:///checkpoint/sources/0/shard-commit/157 does not exist at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150) at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275) at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) 20/08/14 17:52:01 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 2] java.lang.IllegalStateException: s3:///checkpoint/sources/0/shard-commit/157 does not exist at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150) at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275) at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) 20/08/14 17:52:01 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.15.190.228:52110) with ID 3 20/08/14 17:52:01 INFO ExecutorAllocationManager: New executor 3 has registered (new total is 3) 20/08/14 17:52:01 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-15-190-228.us-west-2.compute.internal:40549 with 5.8 GB RAM, BlockManagerId(3, ip-10-15-190-228.us-west-2.compute.internal, 40549, None) 20/08/14 17:52:02 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.15.190.228:52118) with ID 4 20/08/14 17:52:02 INFO ExecutorAllocationManager: New executor 4 has registered (new total is 4) 20/08/14 17:52:03 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-15-190-228.us-west-2.compute.internal:41259 with 5.8 GB RAM, BlockManagerId(4, ip-10-15-190-228.us-west-2.compute.internal, 41259, None) 20/08/14 17:52:03 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 3] java.lang.IllegalStateException: s3:///checkpoint/sources/0/shard-commit/157 does not exist at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150) at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275) at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) 20/08/14 17:52:03 ERROR MicroBatchExecution: Query [id = 0240d731-82b2-47b9-8cde-c7fd5a15ded0, runId = 900fc90e-5747-44d5-a209-89cc7399e210] terminated with error java.lang.IllegalStateException: Gave up after 3 retries while fetching MetaData, last exception: at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$withRetry$2.apply(HDFSMetadataCommitter.scala:241) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:240) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150) at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275) at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) Caused by: java.lang.IllegalStateException: s3:///checkpoint/sources/0/shard-commit/157 does not exist at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) ... 30 more 20/08/14 17:52:03 ERROR IngressJob: Error terminating query org.apache.spark.sql.streaming.StreamingQueryException: Gave up after 3 retries while fetching MetaData, last exception: === Streaming Query === Identifier: [id = 0240d731-82b2-47b9-8cde-c7fd5a15ded0, runId = 900fc90e-5747-44d5-a209-89cc7399e210] Current Committed Offsets: {KinesisSource[]: {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}} Current Available Offsets: {KinesisSource[]: {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}}

Thanks!

itsvikramagr commented 4 years ago

@sudeshnasengupta - the issue is because of a design choice we have taken in this connector. shard Metadata is updated by the executor tasks which read from kinesis on task completion which works fine since every micro-batch will some shard to read from.

In your case, I can see that your existing shards are closed and the streaming job has read all the data in the closed shard '

INFO KinesisSource: End Offset is {"shardId-000000000539":{"iteratorType":"SHARD_END","iteratorPosition":""},"metadata":{"streamName":,"batchId":"157"},"shardId-000000000535":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000538":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000534":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000537":{"iteratorType":"SHARD_END","iteratorPosition":""},"shardId-000000000536":{"iteratorType":"SHARD_END","iteratorPosition":""}}

There is no shard to read from. Hence no tasks are launched in the micro-batch job 20/08/14 17:51:59 INFO KinesisSource: Processing 0 shards from Stream() The issue with no open shards resulting in streaming job failure can be handled by adding a few checks. But the core of the problem is that, for a micro-batch, we need to have at tasks for KinesisRDD to update metadata. We either have to force the task launch, or we handed the job completion event which takes care of it (or add some RPC call).

Any fixes with RPC call or handling job completion might require a change in spark core which can not be provided as a separate module/jar. We will add it to the Qubole distribution of Apache Spark and I can share the patch here (once it's done) but then you have to ask your vendor to include it in their Apache Spark distributions.

sudeshnasengupta commented 4 years ago

Hi @itsvikramagr : Indeed, our use case where this error happened is: we resharded the Kinesis stream, while it was being queried by the Spark streaming job, and we continued to push events to the stream, post-resharding, as well as query the stream, without waiting the 24-hours (retention period) after resharding.

itsvikramagr commented 4 years ago

@sudeshnasengupta - Your use-case can be handled within this connector. Let me start a PR for the same.

sudeshnasengupta commented 4 years ago

That'll be great, @itsvikramagr! Just to clarify our desired use case:

WTa-hash commented 3 years ago

I am also running into a similar error as others here:

java.lang.IllegalStateException: s3://......./sources/0/shard-commit/1630 does not exist at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:163) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter$$anonfun$get$1.apply(HDFSMetadataCommitter.scala:151) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:150) at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275) at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

This is a Spark structured stream job reading from a Kinesis stream. I did notice this error occurred around the time I made some shard adjustments on my stream.

It seems this path s3://......./sources/0/shard-commit is the checkpoint location. Would doing something like this - https://github.com/qubole/kinesis-sql/issues/93 - and persisting it the last offset (or timestamp) after successful batch processing work instead of relying on Spark checkpointing?

gabor-arki-epam commented 3 years ago

@WTa-hash this is unrelated to the issue. You should not checkpoint on S3 because it is not an HDFS compliant filesystem.

The checkpointing logic writes temporary files renaming them at the end, but on S3 there is no rename instead the entire object is written again and the old one is deleted. Due to this being not an atomic operation the checkpoint can stuck in a bad, inconsistent state and a job trying to use it will fail due to missing files. The larger your checkpoint is the more likely this is going to happen. For reliable checkpointing you can either use HDFS or attach EFS. See: https://cm.engineering/using-hdfs-to-store-spark-streaming-application-checkpoints-2146c1960d30 https://blog.yuvalitzchakov.com/improving-spark-streaming-checkpoint-performance-with-aws-efs/

dgoldenberg-audiomack commented 2 years ago

I'm seeing the same problem. Is the recommended workaround then to use HDFS or EFS for checkpointing?

Feels like the whole qubole package then is basically not in a working state...