RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
939 stars 372 forks source link

How spark-redis write to Redis in Structured Streaming #208

Closed Litchilitchy closed 4 years ago

Litchilitchy commented 4 years ago

I am using spark-redis to write DataFrame to Redis using (sensors is readStream from Redis stream)

val query = sensors
  .writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    resultDF = batchDF....... // some operations here
    var canNotWrite = true
    while (canNotWrite)
      try {
        resultDF.write
        .format("org.apache.spark.sql.redis")
        .option("table", "output")
        .mode(SaveMode.Append)
        .save()
        println("******") // line 1
        canNotWrite = false
      }
      catch {
        case e: Exception => {
          println("------") // line 2
          canNotWrite = true
      }
    }
  }
  .start()

However, I found 2 things I can not understand.

  1. Suppose batchDF size is 2500, but I check Redis key output:* periodically and it is appended by 1000, 1000, 500 instead of append 2500 records by once.

  2. I set the Redis maxmemory to limit the Redis memory size, and this save method would get error if it could not write to Redis. However it does not raise error at first moment, it seems repeatedly streaming a same batchDF, and output line 1. But there is no new stream in Redis.

Why? And by the way, would my code work?

fe2s commented 4 years ago

Hi @Litchilitchy ,

  1. It splits the batch further into smaller groups before writing, see https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala#L137 . This is done to avoid out of memory problems. You can control the split size using "iterator.grouping.size" option, the default is 1000.
  2. Not sure I got the question, I will try to reproduce your code and get back to you.
Litchilitchy commented 4 years ago

Hi @Litchilitchy ,

1. It splits the batch further into smaller groups before writing, see https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala#L137 . This is done to avoid out of memory problems. You can control the split size using "iterator.grouping.size" option, the default is 1000.

2. Not sure I got the question, I will try to reproduce your code and get back to you.

The second question may be raised by our code bug. So let me double check. Thanks for your kind help and answer of question 1

Litchilitchy commented 4 years ago

How to set iterator.grouping.size in code? @fe2s

fe2s commented 4 years ago

@Litchilitchy through spark option

resultDF.write
        .format("org.apache.spark.sql.redis")
        .option("table", "output")
        .option("iterator.grouping.size", 5000)
        .mode(SaveMode.Append)
        .save()