RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
935 stars 368 forks source link

Consistently getting `JedisConnectionException` #325

Open raokrutarth opened 2 years ago

raokrutarth commented 2 years ago

I have verified the redis instance is up on aws:

docker run --rm -it redis redis-cli     -h "ec2-3-217-106-135.compute-1.amazonaws.com"     -p myPort     --pass myPass

Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
ec2-3-217-106-135.compute-1.amazonaws.com:25449> keys *

Versions:

libraryDependencies += "com.redislabs" %% "spark-redis" % "3.0.0"
scalaVersion := "2.12.10"
spark version 3.1.2 (Scala version 2.12.10)

Full exception from scala application:

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool

Code snippets to reproduce:


class Cache(spark: SparkSession) {

  // https://github.com/RedisLabs/spark-redis/blob/master/doc/dataframe.md
  spark.conf.set("spark.redis.host", AppConfig.settings.redis.host)
  spark.conf.set("spark.redis.port", AppConfig.settings.redis.port)
  spark.conf.set("spark.redis.auth", AppConfig.settings.redis.password)
  spark.conf.set("spark.redis.ssl", AppConfig.settings.redis.useTls)
  spark.conf.set("spark.redis.timeout", 5000)

  def getDf(key: String): Try[DataFrame] = Try {
    spark.read
      .format("org.apache.spark.sql.redis")
      .option("table", key)
      .option("infer.schema", true)
      .load()
  }
}
val cache       = new Cache(spark)
cache.getDf("my_table") match {
      case Failure(exception) => log.error(s"Failed fetch from cache with error $exception")
      case _                  => log.info("Saved result to cache.")
    }
fe2s commented 2 years ago

Hi @raokrutarth , can you try to configure in the following way?

spark.read
      .format("org.apache.spark.sql.redis")
      .option("table", key)
      .option("infer.schema", true)
      .option("host", AppConfig.settings.redis.host)
      .option("port", AppConfig.settings.redis.port)
      .option("auth", AppConfig.settings.redis.password)
      .option("ssl", AppConfig.settings.redis.useTls)
      .option("timeout", 5000)
      .load()
raokrutarth commented 2 years ago

That worked. Thanks. Any reason setting using spark.conf.set does not work?

raokrutarth commented 2 years ago

Reopening issue. Specifying creds for every read/write is very slow. Would be ideal to set the auth and establish a connection with redis once.

fe2s commented 2 years ago

spark-redis takes the configuration options from org.apache.spark.SparkConf. When you use sparkSession.conf.set("...", "...") it sets the option in org.apache.spark.sql.RuntimeConfig and those options are not propagated to SparkConf. In order to configure options in SparkConf you have to pass them during the creation of SparkSession, e.g.

val spark = SparkSession
      .builder()
      .master("local")
      .config("spark.redis.port", "6379")
      .getOrCreate()

We need to investigate if we should respect/support the options from RuntimeConfig.