qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

Switch to use Kinesis list-shards API #89

Closed chadlagore closed 4 years ago

chadlagore commented 4 years ago

This is is a fix for https://github.com/qubole/kinesis-sql/issues/83. Upgrading to 1.1.4 is causing even more issues with this, perhaps due to the new delete-shard logic.

The ListShards API provides 100TPS per stream, whereas DescribeStream provides 10TPS per account.

Some output from the logs in my staging environment running this code (I updated the logging so you can observe it hit ListShards):

20/08/26 01:36:46 INFO KinesisReader: List shards in Kinesis Stream:  Buffer({ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49608030938287294587608828212751578733957487395713581058,}})
chadlagore commented 4 years ago

@itsvikramagr -- please let me know if you can run tests for this, I believe they're all hitting AWS. A review would be great as well when you have time 🙏

chadlagore commented 4 years ago

@itsvikramagr bump on this.

itsvikramagr commented 4 years ago

Got busy with other tasks. Let me look into it in the next couple of days.

chadlagore commented 4 years ago

all good, thanks @itsvikramagr

itsvikramagr commented 4 years ago

LGTM.

itsvikramagr commented 4 years ago

Thanks, @chadlagore for your contribution.

GuyLevinhr commented 3 years ago

Hi,

I have a problem writing to kinesis on stream, I get the following error: java.lang.IllegalArgumentException: Could not resolve region for endpoint: https://kinesis.eu-north-1.amazonaws.com

But I can read from kinesis from the same region *The readStream from kinesis works because if I run writeStream to console lines are displayed in the console

I would love to get help or guidance on how to resolve this. spark version 2.4.3 Python version 3.6

` import boto3 import findspark import json import os from pyspark.sql import SparkSession from pyspark.sql.functions import from_json, col, from_unixtime , to_json,struct from pyspark.sql.types import StructField, StructType, StringType, IntegerType

os.environ["SPARK_HOME"] = "~/spark-2.4.7-bin-hadoop2.7" os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages=com.qubole.spark/spark-sql-kinesis_2.11/1.2.0_spark-2.4 pyspark-shell'

os.environ['endVarNameForEndpoint'] = "https://kinesis.eu-north-1.amazonaws.com" findspark.init()

spark = SparkSession.builder \ .master("local[*]") \ .appName("PySparkKinesis") \ .getOrCreate()

kinesisDF = spark \ .readStream \ .format("kinesis") \ .option("streamName", stream_name) \ .option("region", "eu-north-1") \ .option("endpointUrl", "https://kinesis.eu-north-1.amazonaws.com")\ .option("awsAccessKeyId", awsAccessKeyId ) \ .option("awsSecretKey", awsSecretKey ) \ .option("startingposition", "TRIM_HORIZON")\ .load()

kinesisDF.writeStream \ .format("kinesis") \ .option("streamName", stream_name_Sub) \ .option("region", "eu-north-1") \ .option("endpointUrl", "https://kinesis.eu-north-1.amazonaws.com")\ .option("awsAccessKeyId", awsAccessKeyId) \ .option("awsSecretKey", awsSecretKey) \ .option("checkpointLocation", "/tmp/spark") \ .option("kinesis.executor.maxConnections",2) \ .option("awsUseInstanceProfile","false") \ .start() \ .awaitTermination() `

at org.apache.spark.sql.kinesis.CachedKinesisProducer$$anonfun$getRegionNameByEndpoint$3.apply(CachedKinesisProducer.scala:183) at org.apache.spark.sql.kinesis.CachedKinesisProducer$$anonfun$getRegionNameByEndpoint$3.apply(CachedKinesisProducer.scala:183) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.kinesis.CachedKinesisProducer$.getRegionNameByEndpoint(CachedKinesisProducer.scala:182) at org.apache.spark.sql.kinesis.CachedKinesisProducer$.org$apache$spark$sql$kinesis$CachedKinesisProducer$$createKinesisProducer(CachedKinesisProducer.scala:109) at org.apache.spark.sql.kinesis.CachedKinesisProducer$$anon$1.load(CachedKinesisProducer.scala:45) at org.apache.spark.sql.kinesis.CachedKinesisProducer$$anon$1.load(CachedKinesisProducer.scala:42) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257) at com.google.common.cache.LocalCache.get(LocalCache.java:4000) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.kinesis.CachedKinesisProducer$.getOrCreate(CachedKinesisProducer.scala:141) at org.apache.spark.sql.kinesis.KinesisWriteTask.execute(KinesisWriteTask.scala:49) at org.apache.spark.sql.kinesis.KinesisWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KinesisWriter.scala:40) at org.apache.spark.sql.kinesis.KinesisWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KinesisWriter.scala:40) at org.apache.spark.sql.kinesis.KinesisWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KinesisWriter.scala:40) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.sql.kinesis.KinesisWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KinesisWriter.scala:40) at org.apache.spark.sql.kinesis.KinesisWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KinesisWriter.scala:38) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)