awslabs / spark-sql-kinesis-connector

Spark Structured Streaming Kinesis Data Streams connector supports both GetRecords and SubscribeToShard (Enhanced Fan-Out, EFO)
Apache License 2.0
24 stars 11 forks source link

KPL proxy configuration is not supported. #21

Open ttrankle opened 3 months ago

ttrankle commented 3 months ago

Currently the spark-sql-kinesis-consumer does not support proxy configurations for the outbound network traffic. According to the AWS documentation we need to allow the proxy configurations to be passed into KinesisProducerConfiguration.

Without the proxy configuration, if you are behind and enterprise proxy then we get the following error:

org.apache.spark.sql.connector.kinesis.shaded.com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages.
    at org.apache.spark.sql.connector.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:173)
    at org.apache.spark.sql.connector.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.flush(KinesisProducer.java:916)
    at org.apache.spark.sql.connector.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.flush(KinesisProducer.java:936)
    at org.apache.spark.sql.connector.kinesis.KinesisWriteTask.flushRecordsIfNecessary(KinesisWriteTask.scala:98)
    at org.apache.spark.sql.connector.kinesis.KinesisWriteTask.close(KinesisWriteTask.scala:118)
    at org.apache.spark.sql.connector.kinesis.KinesisWriter$.$anonfun$write$4(KinesisWriter.scala:42)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:73)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.sql.connector.kinesis.KinesisWriter$.$anonfun$write$2(KinesisWriter.scala:42)
    at org.apache.spark.sql.connector.kinesis.KinesisWriter$.$anonfun$write$2$adapted(KinesisWriter.scala:39)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1039)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1039)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
hwanghw commented 3 months ago

Thanks for your contribution! We will review the changes.