RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
936 stars 367 forks source link

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool #382

Open dcmshi opened 9 months ago

dcmshi commented 9 months ago

hi all, I'm running in to this connection exception when trying to connect to our redis cluster and load a dataframe using the com.redislabs:spark-redis_2.12:3.1.0 package and using that in a PySpark notebook in Databricks. I've tried setting spark.redis.host and other config options in the cluster settings as mentioned in this issue[0], as well as just specifying it in teh code, but still results in the same error message. Anyone run in to something similar when running this through Databricks?

from pyspark.sql.types import IntegerType, StringType, StructField, StructType

df = spark.read.format(
  "org.apache.spark.sql.redis"
  ).schema(
    StructType(
        [
            StructField("classroom_cup_identifier", StringType(), True),
            StructField("cycle_id", StringType(), True),
            StructField("classroom_id", StringType(), True),
            StructField("total_answers_correct", IntegerType(), True),
        ]
    )
  ).option(
    "keys.pattern", "classroom-cup:classroom-versus-classroom-leaderboard:*"
    ).option(
      "key.column", "classroom_cup_identifier"
      ).load()

df.show()

[0] https://github.com/RedisLabs/spark-redis/issues/357 Screenshot 2023-10-18 at 4 07 28 PM

dcmshi commented 9 months ago

also including stacktrace in case that's helpful, we verified with the native Python Redis Client there is no issues connecting to the Databricks environment to the Redis Cluster as well.

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
File <command-131202819613220>, line 19
      1 from pyspark.sql.types import IntegerType, StringType, StructField, StructType
      4 df = spark.read.format(
      5   "org.apache.spark.sql.redis"
      6   ).schema(
      7     StructType(
      8         [
      9             StructField("classroom_cup_identifier", StringType(), True),
     10             StructField("cycle_id", StringType(), True),
     11             StructField("classroom_id", StringType(), True),
     12             StructField("total_answers_correct", IntegerType(), True),
     13         ]
     14     )
     15   ).option(
     16     "keys.pattern", "classroom-cup:classroom-versus-classroom-leaderboard:*"
     17     ).option(
     18       "key.column", "classroom_cup_identifier"
---> 19       ).load()
     21 df.show()

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/readwriter.py:314, in DataFrameReader.load(self, path, format, schema, **options)
    312     return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    313 else:
--> 314     return self._df(self._jreader.load())

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw)
    186 def deco(*a: Any, **kw: Any) -> Any:
    187     try:
--> 188         return f(*a, **kw)
    189     except Py4JJavaError as e:
    190         converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o1179.load.
: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
    at redis.clients.jedis.util.Pool.getResource(Pool.java:84)
    at redis.clients.jedis.JedisPool.getResource(JedisPool.java:377)
    at com.redislabs.provider.redis.ConnectionPool$.connect(ConnectionPool.scala:35)
    at com.redislabs.provider.redis.RedisEndpoint.connect(RedisConfig.scala:94)
    at com.redislabs.provider.redis.RedisConfig.getNonClusterNodes(RedisConfig.scala:276)
    at com.redislabs.provider.redis.RedisConfig.getNodes(RedisConfig.scala:370)
    at com.redislabs.provider.redis.RedisConfig.getHosts(RedisConfig.scala:267)
    at com.redislabs.provider.redis.RedisConfig.<init>(RedisConfig.scala:166)
    at com.redislabs.provider.redis.RedisConfig$.fromSparkConfAndParameters(RedisConfig.scala:154)
    at org.apache.spark.sql.redis.RedisSourceRelation.<init>(RedisSourceRelation.scala:34)
    at org.apache.spark.sql.redis.DefaultSource.createRelation(DefaultSource.scala:42)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:383)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:378)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:334)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:334)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:226)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
    at java.lang.Thread.run(Thread.java:750)
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Failed to create socket.
    at redis.clients.jedis.DefaultJedisSocketFactory.createSocket(DefaultJedisSocketFactory.java:110)
    at redis.clients.jedis.Connection.connect(Connection.java:226)
    at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:144)
    at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:314)
    at redis.clients.jedis.BinaryJedis.initializeFromClientConfig(BinaryJedis.java:92)
    at redis.clients.jedis.BinaryJedis.<init>(BinaryJedis.java:297)
    at redis.clients.jedis.Jedis.<init>(Jedis.java:169)
    at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:177)
    at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:571)
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:298)
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:223)
    at redis.clients.jedis.util.Pool.getResource(Pool.java:75)
    ... 28 more
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:613)
    at redis.clients.jedis.DefaultJedisSocketFactory.createSocket(DefaultJedisSocketFactory.java:80)
    ... 39 more
3043442162 commented 4 months ago

I have a suggestion. I have a project that uses spark-redis, in which I encountered the same problem as you, but my redis did not enable auth. If there is no problem with the code, you can consider checking whether redis is normal. For example, whether the redis.conf limits the host addresses that can be connected, whether auth is enabled, and so on. After I comment out the //.config("spark.redis.auth", "passwd") line, data can be written to reids normally.