apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.86k stars 4.26k forks source link

Support Redis Cluster when reading with RedisIO #21124

Open damccorm opened 2 years ago

damccorm commented 2 years ago

I am trying to use the RedisIO connector with Redis cluster but it looks like the Jedis client that RedisIO uses only works on a standalone Redis server, not on a cluster. I get this error when trying to read from Redis:


Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: redis.clients.jedis.exceptions.JedisMovedDa
taException:
MOVED 15000 172.16.2.3:6379
        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371)

       at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339)

       at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)

       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)

      at com.oracle.quanta.RedisToAtp.run(RedisToAtp.java:196)
        at com.oracle.quanta.RedisToAtp.main(RedisToAtp.java:54)
Caused
by: redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 15000 172.16.2.3:6379
        at
redis.clients.jedis.Protocol.processError(Protocol.java:116)
        at redis.clients.jedis.Protocol.process(Protocol.java:166)

       at redis.clients.jedis.Protocol.read(Protocol.java:220)
        at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:278)

       at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:230)
        at redis.clients.jedis.Connection.getMultiBulkReply(Connection.java:224)

       at redis.clients.jedis.Jedis.mget(Jedis.java:474)
        at org.apache.beam.sdk.io.redis.RedisIO$ReadFn.fetchAndFlush(RedisIO.java:517)

       at org.apache.beam.sdk.io.redis.RedisIO$ReadFn.finishBundle(RedisIO.java:500)

This is the code that I use:


        PCollection<Event> events =

                pipeline

                        /*

                       * Step #1: Read from Redis.

                         */

         .apply("Read Redis KV Store", RedisIO.read()

                                .withEndpoint(redisHost,
6379)

                                .withKeyPattern(redisKeyPattern))

Is there a way to configure RedisIO to work with a cluster? I would have expected it to use JedisCluster when working with Redis in cluster mode but from https://github.com/apache/beam/blob/master/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java it appears that it only uses the standalone Jedis client.

Source: https://lists.apache.org/thread.html/rb2826a1b081055ed64ca56b7c8985fbe121488fea5bd5830a38a38f7%40%3Cuser.beam.apache.org%3E

Imported from Jira BEAM-13065. Original Jira may contain additional context. Reported by: lcwik.

rfan-debug commented 2 years ago

Hereby I gave my two cents of thought:

  1. We not only need to implement a RedisIO on top of a client library that is compatible with RedisCluster, but also need to consider to make multi swappable with pipelined when writing data into the cluster. Some use cases only require backfilling the redis cache layer with idempotent writes.

  2. Should we consider adding a new type of RedisIO backed by Lettuce? It may help reducing the CROSSSLOT error rates through command routing. Jedis has less features than lettuce in general.

sigalite commented 1 year ago

@damccorm i am facing same issues so i created s redisIO which works with redis cluster but i'm afraid the performance is really poor. any new thoughts updates on this issue?

damccorm commented 1 year ago

Unfortunately, I just copied this issue over from Jira when we did the migration to GitHub issues so I don't have context. @johnjcasey may have thoughts here

johnjcasey commented 1 year ago

I don't have context on this, unfortunately. I wouldn't be surprised if we would need to re-implement on a new client

sigalite commented 1 year ago

hi, thanks for your response. I have implemented a new connector in java that uses the corresponding redis client (JedisCluster to be more specific). JedisCluster does not support redis transactions so each write is done separately. not sure this is the cause but the performance of this is really low. I wondered if there are important performance wise points i should take into account when writing such a high volume connector.

thanks בתאריך יום ג׳, 30 במאי 2023, 21:40, מאת johnjcasey ‏< @.***>:

I don't have context on this, unfortunately. I wouldn't be surprised if we would need to re-implement on a new client

— Reply to this email directly, view it on GitHub https://github.com/apache/beam/issues/21124#issuecomment-1568899265, or unsubscribe https://github.com/notifications/unsubscribe-auth/AU2O5LBZG7YIPTPTW36PIFLXIY5ILANCNFSM5X4A7IZQ . You are receiving this because you commented.Message ID: @.***>