RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
935 stars 368 forks source link

Could not get a resource from the pool #307

Closed Akhilj786 closed 3 years ago

Akhilj786 commented 3 years ago

We are leveraging spark on databricks(azure). Spark Version: 3.1.0

I have compiled and upload the jar to one of the databricks cluster using the branch: https://github.com/RedisLabs/spark-redis/tree/build_scala_2.12

Configuration:

# Cluster conf setup
spark.redis.auth PASSWORD
spark.redis.port 6380
spark.redis.host xxxx.xxx.cache.windows.net
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
    StructField("id", StringType(), True),
    StructField("colA", StringType(), True),
    StructField("colB", StringType(), True)
])

data = [
    ['1', '8', '2'],
    ['2', '5', '3'],
    ['3', '3', '1'],
    ['4', '7', '2']
]
df = spark.createDataFrame(data, schema=schema)
df.show()
--------------
(
    df.
    write.
    format("org.apache.spark.sql.redis").
    option("table", "mytable").
    option("key.column", "id").
    save()
)

Error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-1150520576677709> in <module>
      1 (
----> 2     df.
      3     write.
      4     format("org.apache.spark.sql.redis").
      5     option("table", "mytable").

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
   1132             self.format(format)
   1133         if path is None:
-> 1134             self._jwrite.save()
   1135         else:
   1136             self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    108     def deco(*a, **kw):
    109         try:
--> 110             return f(*a, **kw)
    111         except py4j.protocol.Py4JJavaError as e:
    112             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o325.save.
: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
    at redis.clients.jedis.util.Pool.getResource(Pool.java:59)
    at redis.clients.jedis.JedisPool.getResource(JedisPool.java:234)
    at com.redislabs.provider.redis.ConnectionPool$.connect(ConnectionPool.scala:33)
    at com.redislabs.provider.redis.RedisEndpoint.connect(RedisConfig.scala:69)
    at com.redislabs.provider.redis.RedisConfig.clusterEnabled(RedisConfig.scala:182)
    at com.redislabs.provider.redis.RedisConfig.getNodes(RedisConfig.scala:293)
    at com.redislabs.provider.redis.RedisConfig.getHosts(RedisConfig.scala:209)
    at com.redislabs.provider.redis.RedisConfig.<init>(RedisConfig.scala:132)
    at org.apache.spark.sql.redis.RedisSourceRelation.<init>(RedisSourceRelation.scala:45)
    at org.apache.spark.sql.redis.DefaultSource.createRelation(DefaultSource.scala:21)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:71)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:94)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:165)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:164)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:1079)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1079)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:468)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:311)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
    at redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:205)
    at redis.clients.jedis.util.RedisInputStream.readByte(RedisInputStream.java:43)
    at redis.clients.jedis.Protocol.process(Protocol.java:155)
    at redis.clients.jedis.Protocol.read(Protocol.java:220)
    at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:309)
    at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:236)
    at redis.clients.jedis.BinaryJedis.auth(BinaryJedis.java:2225)
    at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:119)
    at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:819)
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:429)
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:360)
    at redis.clients.jedis.util.Pool.getResource(Pool.java:50)
    ... 42 more
Caused by: java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.net.SocketInputStream.read(SocketInputStream.java:127)
    at redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:199)
    ... 53 more

--- Sample implementation ---

Python 3.7.10 (default, Feb 26 2021, 10:16:00)
[Clang 10.0.0 ] :: Anaconda, Inc. on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> host='xxxx.xxx.cache.windows.net'
>>> auth='PASSWORD'
>>> port='6380'
>>> import redis
>>> r=redis.StrictRedis(host='xxxx.xxx.cache.windows.net',port=6380,db=0,password='PASSWORD',ssl=True)
>>> r.set('foo','bar')
True
>>> r.get('foo')
b'bar'
Akhilj786 commented 3 years ago

image

Compiled the jar (branch : https://github.com/RedisLabs/spark-redis/tree/build_scala_2.12 which support 3.x and 2.12 scala version) and uploaded to the cluster Still no luck.

Prior issues: https://github.com/RedisLabs/spark-redis/issues/237#issuecomment-668267729

fe2s commented 3 years ago

Hi @Akhilj786 , how do you configure parameters like spark.redis.host and spark.redis.port? Could you print them in your databricks notebook to check they are configured?

Akhilj786 commented 3 years ago

image @fe2s I have set them in the cluster creation step and also tried with running in spark session image

dtufekcic commented 3 years ago

Seeing the same issue with a Java app on Databricks trying to connect to Azure Redis Cache instance. Any suggestions on a possible workaround?

1/06/02 19:30:05 ERROR GnssMonitor: Caught an exception while analyzing a batch in the GNSS Monitor job
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
    at redis.clients.jedis.util.Pool.getResource(Pool.java:59)
    at redis.clients.jedis.JedisPool.getResource(JedisPool.java:234)
    at com.redislabs.provider.redis.ConnectionPool$.connect(ConnectionPool.scala:35)
    at com.redislabs.provider.redis.RedisEndpoint.connect(RedisConfig.scala:72)
    at com.redislabs.provider.redis.RedisConfig.clusterEnabled(RedisConfig.scala:196)
    at com.redislabs.provider.redis.RedisConfig.getNodes(RedisConfig.scala:320)
    at com.redislabs.provider.redis.RedisConfig.getHosts(RedisConfig.scala:236)
    at com.redislabs.provider.redis.RedisConfig.<init>(RedisConfig.scala:135)
    at org.apache.spark.sql.redis.RedisSourceRelation.<init>(RedisSourceRelation.scala:34)
    at org.apache.spark.sql.redis.DefaultSource.createRelation(DefaultSource.scala:13)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:390)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:424)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:391)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:391)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:264)
    at com.chesapeaketechnology.datasci.gnssmonitor.GnssMonitor.analyzeBatch(GnssMonitor.java:181)
    at org.apache.spark.sql.streaming.DataStreamWriter.$anonfun$foreachBatch$1(DataStreamWriter.scala:613)
    at org.apache.spark.sql.streaming.DataStreamWriter.$anonfun$foreachBatch$1$adapted(DataStreamWriter.scala:613)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:38)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:606)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:604)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:604)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$4(MicroBatchExecution.scala:243)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:647)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:240)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:209)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:203)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:366)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
    at redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:205)
    at redis.clients.jedis.util.RedisInputStream.readByte(RedisInputStream.java:43)
    at redis.clients.jedis.Protocol.process(Protocol.java:155)
    at redis.clients.jedis.Protocol.read(Protocol.java:220)
    at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:318)
    at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:236)
    at redis.clients.jedis.BinaryJedis.auth(BinaryJedis.java:2259)
    at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:119)
    at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:889)
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:424)
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:349)
    at redis.clients.jedis.util.Pool.getResource(Pool.java:50)
    ... 46 more
Caused by: java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.net.SocketInputStream.read(SocketInputStream.java:127)
    at redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:199)
    ... 57 more
21/06/02 19:30:35 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1
dtufekcic commented 3 years ago

Actually, in my case, the solution was to specify spark.redis.ssl true in the Spark Config.

Akhilj786 commented 3 years ago

@fe2s @dtufekcic for now we circumvent this problem with vnet peering (Databricks) <--> Azure Cache Redis (vnet with premium subscription). We utilized 3679 port(non-ssl) as opposed to 3680(ssl port).