RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
936 stars 367 forks source link

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool #374

Closed baronchin closed 1 year ago

baronchin commented 1 year ago

Hi, I got this exceptions when use spark-redis:redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool

program code as follow:

spark = SparkSession.builder.appName("xxx")\
    .config("spark.master", "spark://apdev01:7077").config("spark.deploy.mode", "client").config('spark.executor.cores', 4).config('spark.executor.memory', '10G')\
    .config('spark.cores.max', 4)\
    .config("spark.sql.session.timeZone", "UTC").getOrCreate()

chk="/spark_redis/test_chk5"
df = spark \
    .readStream \
    .format("socket") \
    .option("host", "apdev01") \
    .option("checkpointLocation",chk)\
    .option("port", 60001) \
    .load()
tb = df.select(
    f.split("value",",")[0].alias("id"),
    f.split("value",",")[1].alias("get_v")
)

def to_redis(df:DataFrame,batch_id):
    df.write\
      .format("org.apache.spark.sql.redis")\
      .option("table", "output_test")\
        .option("spark.redis.host","apdev01")\
        .option("spark.redis.port",9012)\# use 9012 not 6379
        .option("spark.redis.db",1)\
      .save()

tb.writeStream.foreachBatch(to_redis).start().awaitTermination()

also, I can use another machine connect to redis server successfully: telnet apdev01 9012

baronchin commented 1 year ago

Hi, redis set up by docker image

baronchin commented 1 year ago

Hi, already found the solution define the options when startup sparksession

spark = SparkSession.builder.appName("baron")\
    .config("spark.master", "spark://apdev01:7077").config("spark.deploy.mode", "client").config('spark.executor.cores', 1).config('spark.executor.memory', '1G')\
    .config('spark.cores.max', 1)\
    .config("spark.redis.host","apdev01")\
    .config("spark.redis.port",9012)\
    .config("spark.redis.db",1)\
    .config("spark.sql.session.timeZone", "UTC").getOrCreate()

df = spark.createDataFrame([("jessy",31),("parker",23)],["name","age"])

df.write\
    .format("org.apache.spark.sql.redis")\
    .option("table", "person")\
    .save()