awslabs / spark-sql-kinesis-connector

Spark Structured Streaming Kinesis Data Streams connector supports both GetRecords and SubscribeToShard (Enhanced Fan-Out, EFO)
Apache License 2.0
26 stars 13 forks source link

key not found: subSequenceNumber #25

Closed mriveraFacephi closed 4 months ago

mriveraFacephi commented 4 months ago

Hi, I'm receiving java.lang.IllegalArgumentException: java.util.NoSuchElementException: key not found: subSequenceNumber error when I read messages from Kinesis in PySpark 3.3.0.

The following code shows the configuration that I establish for aws-kinesis:

kinesis = spark\
    .readStream\
    .format('aws-kinesis')\
    .option('kinesis.region', region)\
    .option('kinesis.streamName', streamName)\
    .option("kinesis.consumerType", "GetRecords")\
    .option('kinesis.endpointUrl', endpointUrl)\
    .option('kinesis.startingposition', 'LATEST')\
    .option('kinesis.maxFetchRecordsPerShard', '10000')\
    .option('kinesis.maxFetchTimePerShardSec', '20')\
    .option('kinesis.checkNewRecordThreads', '4')\
    .option('kinesis.getRecords.numberOfRecordsPerFetch', '2500')\
    .option('checkpointAppName', streamName)\
    .option('checkpointInterval', 1)\
    .load()

Kinesis dataframe returns the following schema:

DataFrame[data: binary, streamName: string, partitionKey: string, sequenceNumber: string, approximateArrivalTimestamp: timestamp]

Messages were put in Kinesis with boto3 put_records method.

Code works fine with native Spark kinesis conector.

Why does this library require subSequenceNumber element?

Full error trace:

24/07/09 06:48:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/07/09 06:48:51 ERROR MicroBatchExecution: Query [id = 381820ff-b97f-4ed0-a910-6efb0d7d4d56, runId = 6f66bd32-a67d-4689-bc75-42d0347002a6] terminated with error
java.lang.IllegalArgumentException: java.util.NoSuchElementException: key not found: subSequenceNumber
    at org.apache.spark.sql.connector.kinesis.KinesisV2SourceOffset$.apply(KinesisV2SourceOffset.scala:93)
    at org.apache.spark.sql.connector.kinesis.KinesisV2MicrobatchStream.deserializeOffset(KinesisV2MicrobatchStream.scala:356)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$getStartOffset$1(MicroBatchExecution.scala:413)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.getStartOffset(MicroBatchExecution.scala:413)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:448)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:447)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:436)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:687)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:432)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:237)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:218)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:212)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.util.NoSuchElementException: key not found: subSequenceNumber
    at scala.collection.immutable.Map$Map2.apply(Map.scala:227)
    at org.apache.spark.sql.connector.kinesis.KinesisV2SourceOffset$.$anonfun$apply$2(KinesisV2SourceOffset.scala:83)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
    at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.connector.kinesis.KinesisV2SourceOffset$.apply(KinesisV2SourceOffset.scala:79)
    ... 36 more

Thank you

mriveraFacephi commented 4 months ago

Solved. It was my mistake. I had previous checkpoint directory which was created by spark-sql-kinesis connector (included in Glue 4). I completely removed previous checkpoint directory and it worked fine.