RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
940 stars 372 forks source link

Spark-Redis XADD API written to Redis is slow and data is missing #190

Open Hacker0912 opened 5 years ago

Hacker0912 commented 5 years ago

Hello,

We are testing this Redis connector for Spark streaming in our project. However, we find that there is currently no API support for writing output of a Spark streaming job directly (as a stream) to Redis. We have tried some workaround as follows, motivated by examples from your documentation:

queryDf.writeStream.outputMode("append") .foreachBatch { (batchDf: DataFrame, _: Long) => batchDf.foreachPartition { partition => withConnection(redisConfig.connectionForKey(key = scala.util.Random.nextString(100))) { conn => partition.foreach { row => println(String.valueOf(row.get(row.fieldIndex("v1")))) outputStreamKeys.foreach { outputStreamKey => conn.xadd(outputStreamKey, StreamEntryID.NEW_ENTRY, queryOutputSchemaFieldNames.map { name => name -> String.valueOf(row.get(row.fieldIndex(name))) }.toMap.asJava ) } } } } }.start()

It works. However, the throughput is extremely low: If we just save the output as Parquet files, we can get 5x larger throughput. We are not sure if this is as expected or we are missing something. Could you help take a look of that?

fe2s commented 5 years ago

Hi @Hacker0912 , I assume you run a single Redis node or why do you use a random key to create a connection? Looks like you save the output to several Redis streams. When you compare it with parquet, do you also save it multiple times? You can optimize on data conversion, convert only once and reuse output for all outputStreamKeys in outputStreamKeys.foreach.

When you compare it with Parquet, what is your setup?

Hacker0912 commented 5 years ago

Hi @fe2s,

Thanks for the swift response. We run the Redis server on a single node but we run the Spark jobs on a multi-node cluster with multiple executors. Since each executor operates on a different data partition, we thought different connection objects should be created for different executors to use to write data to Redis in parallel. Otherwise, there might be contention since multiple executors might compete with each other to use the same connection object to write data to Redis (correct me if I am wrong). There were actually only 2 entries in OutputStreamKeys (even there is only one entry, the behavior did not seem to change), and thus we were really just writing the same row to two different Redis XSTREAMS since it seems like that the same XSTREAMS cannot shared by two downstream queries due to the offset mechanism (if one entry in a XSTREAM is consumed by a query, then the other query will not able to re-read it again).

When writing data to parquet files, the setup is almost the same as in the case in which data was written as Redis XSTREAMS. The only difference is that the data was written in "Batch" level rather than "Partition" level. The Redis API does not seem to support batch level writing since the Jedis connection object is not serializable and can only be created and triggered at partition level.

There could be some understanding errors since we do have very limited understanding of the whole spark-redis API mechanism especially when using it to write data to Redis as XSTREAMS. We would appreciate it a lot if you can help us to understand what might actually have happened and if there are some ways for us to improve the performance.

FYI: we also tried to run the Spark job on a single executor given only one core. And we still observed that the throughput is much smaller than that given by writing the output as parquet files.

Thanks

fe2s commented 5 years ago

Hi @Hacker0912 ,

Thank you for the detailed description.

We run the Redis server on a single node but we run the Spark jobs on a multi-node cluster with multiple executors. Since each executor operates on a different data partition, we thought different connection objects should be created for different executors to use to write data to Redis in parallel. Otherwise, there might be contention since multiple executors might compete with each other to use the same connection object to write data to Redis (correct me if I am wrong).

redisConfig.connectionForKey(key) returns a connection (from connection pool, see ConnectionPool.scala) to a Redis node that contains given key. Since you are running a single Redis node there is no sense to pass a random key, though it shouldn't harm.

You are getting a jedis instance per spark partition / java thread, so it looks good. And actually this is the recommended approach of using Jedis in multi-threaded environment, see jedis docs. The maxTotal parameter of the pool is set to 250, so as long as you have less than 250 partitions per executor it should create new connections and shouldn't block. Obviously there might be some low-level / network contention when maintaining a lot of connections.

There were actually only 2 entries in OutputStreamKeys (even there is only one entry, the behavior did not seem to change), and thus we were really just writing the same row to two different Redis XSTREAMS since it seems like that the same XSTREAMS cannot shared by two downstream queries due to the offset mechanism (if one entry in a XSTREAM is consumed by a query, then the other query will not able to re-read it again).

Do you use spark-redis spark.readStream.format("redis") to consume from these streams?

There could be some understanding errors since we do have very limited understanding of the whole spark-redis API mechanism especially when using it to write data to Redis as XSTREAMS. We would appreciate it a lot if you can help us to understand what might actually have happened and if there are some ways for us to improve the performance.

Actually you are not using much of spark-redis API, the only thing used in your example from spark-redis is the connection management. The .foreachBatch is a Spark API, it's not from spark-redis.

It's hard to tell why writing to parquet files appears to be faster in your test. Most likely the built-in file sink doesn't use .foreachBatch internally and rather has some optimized code path.

Hacker0912 commented 5 years ago

Hi @fe2s ,

Thanks for the response

Do you use spark-redis spark.readStream.format("redis") to consume from these streams? We did use spark.readStream.format("redis") to consume from these streams and we observed that the number of tuples in streams being consumes was way less than what was expected and that was why we started to go back to the XStream writing part and investigate the results written to Redis.

It's hard to tell why writing to parquet files appears to be faster in your test. Most likely the built-in file sink doesn't use .foreachBatch internally and rather has some optimized code path. This is what we thought about. We also notice that there is currently no API in Spark-Redis supporting to write the whole dataframe to Redis as XStreams (and that is why we came up with the workaround described initially). We are wondering is there any near-future plan for the Spark-Redis team to add this API? Writing the whole DataFrame at once might help to boost the performance a lot compared to the case in which multiple small partitions are written.

fe2s commented 5 years ago

Hi @Hacker0912 ,

We did use spark.readStream.format("redis") to consume from these streams and we observed that the number of tuples in streams being consumes was way less than what was expected and that was why we started to go back to the XStream writing part and investigate the results written to Redis.

Do you have some ideas how to reproduce it? If so, please, open a new ticket.

This is what we thought about. We also notice that there is currently no API in Spark-Redis supporting to write the whole dataframe to Redis as XStreams (and that is why we came up with the workaround described initially). We are wondering is there any near-future plan for the Spark-Redis team to add this API? Writing the whole DataFrame at once might help to boost the performance a lot compared to the case in which multiple small partitions are written.

There is no near-future plan to implement a Sink for structured streaming. The initial idea was to reuse the existing dataframe persistence API with a combination of .foreachBatch(). We might want to implement a Sink in the future version, but I don't have any ETA at the moment.