qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

startingPosition AT_TIMESTAMP iteratorOption NumberFormatException #93

Open adamlbailey opened 4 years ago

adamlbailey commented 4 years ago

Excellent addition for reading from stream at specific positions per #78

However, I'm having trouble using the "AT_TIMESTAMP" option to read from shards at specific timestamps.

The shardInfo object I'm using is below:

{
  "metadata": {
    "streamName": "QSR-data-stream-production",
    "batchId": "1"
  },
  "shardId-000000000002": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  },
  "shardId-000000000003": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  },
  "shardId-000000000004": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  },
  "shardId-000000000005": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "2020-09-30T19:58:46.480-00:00"
  }
}

Here is the exception:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 9, ip-10-0-9-81.us-west-2.compute.internal, executor 5): java.lang.NumberFormatException: For input string: "2020-09-30T19:58:46.480-00:00"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
    at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
    at org.apache.spark.sql.kinesis.KinesisReader.getShardIterator(KinesisReader.scala:120)
    at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getShardIterator(KinesisSourceRDD.scala:146)
    at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getNext(KinesisSourceRDD.scala:197)
    at org.apache.spark.sql.kinesis.KinesisSourceRDD$$anon$1.getNext(KinesisSourceRDD.scala:138)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
adamlbailey commented 4 years ago

Update:

Using timestamp instead of iso formatted strings solves the problem. Would be nice to support iso strings as well for readability and compatibility with the options afforded by other kinesis libraries. Will consider adding a PR to address this soon.

{
  "metadata": {
    "streamName": "QSR-data-stream-production",
    "batchId": "1"
  },
  "shardId-000000000002": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  },
  "shardId-000000000003": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  },
  "shardId-000000000004": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  },
  "shardId-000000000005": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition": "1601495926480"
  }
}
itsvikramagr commented 4 years ago

@adamlbailey - Looking forward to the PR.

gopi-t2s commented 3 years ago

Hi @adamlbailey,

I am trying to read the data from Kinesis using at_timestamp as option in startingposition. Here is my below piece of code to achieve this

pos = json.dumps({"at_timestamp": "02/26/2021 3:07:13 PDT"}) kinesisDF = spark.readStream.format("kinesis").option("streamName", name).option("endpointUrl", URL).option("awsAccessKeyId",key).option("awsSecretKey",sKey).option("startingposition",pos).load()

Here is the ERROR message I am receiving pyspark.sql.utils.IllegalArgumentException: 'org.json4s.package$MappingException: Expected object but got JString(02/26/2021 3:07:13 PDT)'

I am new to use this kinesis connector and I know the way I am passing value for the starting position is wrong, could you help me how to pass the at_timestamp as the value for the startposition option.

Thanks in Advance!

adamlbailey commented 3 years ago

Hi @gopi-t2s, I'm somewhat removed from this work now but if memory serves:

You're going to want to construct an object exemplified in my previous comment.. Practically, I did this by writing a helper that described the stream so I could list each shard with the right timestamp Long value.

gopi-t2s commented 3 years ago

Thanks @adamlbailey for your inputs..

chadlagore commented 3 years ago

I ran into this as well @gopi-t2s - were you able to make it work? I was unsure if pyspark was going to be supported for this,

gopi-t2s commented 3 years ago

No @chadlagore, I am still looking for the ways to attain this..

nikitira commented 3 years ago

I got it working with the upper example from @adamlbailey

minimal example in pyspark:

now_ts = datetime.now().strftime("%s") + "000"  # timestamp in epoch time format, e.g. "1601495926000"
from_timestamp = {
  "metadata": {
    "streamName": "my-stream",
    "batchId": "1"
  },
  "shardId-000000000000": {
    "iteratorType": "AT_TIMESTAMP",
    "iteratorPosition":  now_ts
  }
}

starting_position = json.dumps(from_timestamp)

my_stream = (spark
                       .readStream
                       .format('kinesis')
                       .option('streamName', "my-stream")
                       .option('endpointUrl', KINESIS_ENDPOINT)
                       .option('region', KINESIS_REGION)
                       .option('startingposition', starting_position)

hope this helps @chadlagore @gopi-t2s

gopi-t2s commented 3 years ago

Thank you @nikitira , I will try this..