ra1u / redis-dart

fast redis protocol parser and client
MIT License
84 stars 35 forks source link

Slow performance when reading from pub sub and adding to Redis Streams #93

Closed escamoteur closed 10 months ago

escamoteur commented 10 months ago

Hi,

I was quite surprised that I couldn't get to more than 3800 PUBLISH commands from one Dart client to a local Redis in Docker. Running multiples of those clients increased the throughput linearly so as if the bottleneck is somehow the amount that a single dart process can send over the Redis connection.
Pure reading from pubs sub was better, I almost got the rate that multiple clients inserted into the pub sub channel (11k/s). When trying to insert the received Data from pub-sub to a RedisStream with LPUSH the throughput dropped again into the range of 3500/s. Any idea what the reason could be?

  void connectToPubSub(String channel) async {
    pubSub!.subscribe([channel]);
    final stream = pubSub!.getStream();
    var streamWithoutErrors = stream.handleError((e) => print("error $e"));
    await for (var msg in streamWithoutErrors) {
      var kind = msg[0];
      var jsonPayload = msg[2];
      if (kind == "message") {
        // final payload = jsonDecode(jsonPayload);
        // await redisCommand?.send_object(
        //     ['XADD', 'soketi-stream', '*', 'payload', jsonPayload]);
        await redisCommand
            ?.send_object(['LPUSH', 'soketi-queue', jsonPayload.toString()]);
      } else {
        print("received non-message ${msg}");
      }
    }
  }
ra1u commented 10 months ago

When you call LPUSH you wait for response to be received from server before advancing with second command.

Take a look in Readme when we present fast example. What we do here is that using send().then() approach, we are sending and receiving in asynchronous order - we send multiple "INCR" commands, before we process one.

I am not sure how you can take this approach from fast example with async, because after each async send() you wait for server to respond and that seems to be a bottleneck.

This library does not have requirements that we need to wait for response after each send. What is required is that each send() has exactly one (1) response from redis server.

escamoteur commented 10 months ago

How does the library then report errors if I don't await them? I have seen 8n your examples that you use the then() instead of await, but that should not really make a difference or what do I miss here? Am 7. Dez. 2023, 18:16 +0100 schrieb Luka Rahne @.***>:

When you call LPUSH you wait for response to be received from server before advancing with second command. Take a look in Readme when we present fast example. What we do here is that using send().then() approach, we are sending and receiving in asynchronous order - we send multiple "INCR" commands, before we process one. I am not sure how you can take this approach from fast example with async, because after each async send() you wait for server to respond and that seems to be a bottleneck. This library does not have requirements that we need to wait for response after each send. What is required is that each send() has exactly one (1) response from redis server. — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

escamoteur commented 10 months ago

Ah, I now understand, you fire a lot of reads with just a then. Will try that Am 7. Dez. 2023, 18:16 +0100 schrieb Luka Rahne @.***>:

When you call LPUSH you wait for response to be received from server before advancing with second command. Take a look in Readme when we present fast example. What we do here is that using send().then() approach, we are sending and receiving in asynchronous order - we send multiple "INCR" commands, before we process one. I am not sure how you can take this approach from fast example with async, because after each async send() you wait for server to respond and that seems to be a bottleneck. This library does not have requirements that we need to wait for response after each send. What is required is that each send() has exactly one (1) response from redis server. — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

ra1u commented 10 months ago

Simple way you can validate this is something where you modify just the line that sends LPUSH

  void connectToPubSub(String channel) async {
    pubSub!.subscribe([channel]);
    final stream = pubSub!.getStream();
    var streamWithoutErrors = stream.handleError((e) => print("error $e"));
    await for (var msg in streamWithoutErrors) {
      var kind = msg[0];
      var jsonPayload = msg[2];
      if (kind == "message") {
        // final payload = jsonDecode(jsonPayload);
        // await redisCommand?.send_object(
        //     ['XADD', 'soketi-stream', '*', 'payload', jsonPayload]);
        // no await here
        redisCommand
            ?.send_object(['LPUSH', 'soketi-queue', jsonPayload.toString()]).then((_) { /* we dont care what is received */ })
      } else {
        print("received non-message ${msg}");
      }
    }
  }
escamoteur commented 10 months ago

Hey, I now get up to 35k/s essages forwarded, thanks for the help