qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

What may be causing and how to work around StreamingQueryException: Gave up after 3 retries while fetching MetaData ? #110

Open dgoldenberg-audiomack opened 2 years ago

dgoldenberg-audiomack commented 2 years ago

Spark 3.1.1, running in AWS EMR 6.3.0, python 3.7.2

I'm getting the following error:

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.StreamingQueryException: Gave up after 3 retries while fetching MetaData, last exception: 
=== Streaming Query ===
Identifier: [id = e825addf-9c21-4e9d-a05b-581ae8911f29, runId = e2ea753f-d2dc-42ea-bec2-17a516faadf7]
Current Committed Offsets: {KinesisSource[events-prod]: {"shardId-000000000035":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000041":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000044":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000038":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000032":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000043":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"metadata":{"streamName":"events-prod","batchId":"0"},"shardId-000000000031":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000034":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000040":{"iteratorType":"AT_TIMESTAMP","iteratorPosition":"1647283749833"},"shardId-000000000037":
.................................................................

I have tried to increase the max num retries and the retry interval, e.g.:

MAX_NUM_RETRIES = 10  # default is 3
RETRY_INTERVAL_MS = 3000  # default is 1000
MAX_RETRY_INTERVAL_MS = 30000  # default is 10000

spark.readStream.format("kinesis")
        .option("streamName", pctx.stream_name)
        .option("endpointUrl", pctx.endpoint_url)
        .option("region", pctx.region_name)
        .option("checkpointLocation", pctx.checkpoint_path)
        .option("startingposition", "LATEST")
        .option("kinesis.client.numRetries", MAX_NUM_RETRIES)
        .option("kinesis.client.retryIntervalMs", RETRY_INTERVAL_MS)
        .option("kinesis.client.maxRetryIntervalMs", MAX_RETRY_INTERVAL_MS)
        .load()

but it seems the code keeps holding onto the default value of 3 retries.

Any ideas, anyone?

Thanks

dgoldenberg-audiomack commented 2 years ago

From stderr.out on EMR.

Any ideas as to why these files HDFSMetadataCommitter is looking for might be getting deleted or are non-existent?

22/03/15 00:54:00 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 1]
java.lang.IllegalStateException: hdfs://ip-10-2-XXX-XXX.awsinternal.acme.com:8020/mnt/tmp/temporary-03b8fecf-32d5-422c-9375-4c3450ed0bb8/sources/0/shard-commit/0 does not exist
    at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.$anonfun$get$1(HDFSMetadataCommitter.scala:163)
    at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229)
    at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:151)
    at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275)
    at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$6(MicroBatchExecution.scala:399)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:399)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:382)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:613)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:378)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:211)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
dgoldenberg-audiomack commented 2 years ago

This issue I've filed seems a DUP of https://github.com/qubole/kinesis-sql/issues/57.