RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
939 stars 372 forks source link

Does spark-redis support writing to redis streams? #254

Closed agentdalecoper closed 4 years ago

agentdalecoper commented 4 years ago

Does spark-redis support writing to redis streams? As it seems from the guide it supports reading from redis, and writing only hash back.

fe2s commented 4 years ago

Hi @feitgraph , no, it doesn't support writing to redis streams at the moment.

agentdalecoper commented 4 years ago

@fe2s Thanks for quick response! It is not quite obvious from the guide as it stated that redis streams are supported. Maybe doc updates required on this part.

fe2s commented 4 years ago

@feitgraph , thanks for the feedback. BTW, what kind of data do you want to write to redis streams? Is it a streaming dataframe (structured streaming) ?

agentdalecoper commented 4 years ago

@fe2s The possible use case is structured streaming dataframe with timeseries data for example. You want to sink out to the redis streams new events in the dataframe.

fe2s commented 4 years ago

@feitgraph There is one option to implement a custom write function. Here is an example (I didn't test it):

val sensors: DataFrame = ???

val query = sensors
  .writeStream
  .outputMode("update")
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    writeToRedis(batchDF)
  }
  .start()

import com.redislabs.provider.redis.util.ConnectionUtils._
import com.redislabs.provider.redis.util.PipelineUtils._

def writeToRedis(batchDf: DataFrame): Unit = {
  val sc = batchDf.sparkSession.sparkContext
  implicit val redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf)
  batchDf.foreachPartition { partition =>
    partition.toSeq.groupBy(_.getAs[String]("your_stream_key")).foreach { case (streamKey, rows) =>
      withConnection(streamKey) { conn =>
        foreachWithPipeline(conn, rows) { case (pipeline, row) =>
          pipeline.xadd(streamKey, ???, ???)
        }
      }
    }
  }
}

query.awaitTermination()
agentdalecoper commented 4 years ago

@fe2s Thanks for your explanation!