redis / go-redis

Redis Go client
https://redis.uptrace.dev
BSD 2-Clause "Simplified" License
20.2k stars 2.38k forks source link

Blocking XGroupCreateMkStream does not interrupt on context cancellation #2276

Open jgirtakovskis opened 2 years ago

jgirtakovskis commented 2 years ago

When XGroupCreateMkStream is called in blocking mode (Block = 0), call does not get interrupted by cancelling context.

Expected Behavior

Blocking function interrupts when context is cancelled

Current Behavior

Function continues to block after context cancellation

Possible Solution

Unsure yet

Steps to Reproduce

package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/go-redis/redis/v9"
    "github.com/google/uuid"
)

func main() {
    rdb := redis.NewUniversalClient(&redis.UniversalOptions{
        Addrs:    []string{"localhost:6379"},
        Password: "", // no password set
        DB:       0,  // use default DB
    })

    defer rdb.Close()

    ctx, cancelFn := context.WithCancel(context.Background())

    go func() {
        for idx := 0; idx < 5; idx++ {
            fmt.Printf("Waiting %v...\n", idx)
            time.Sleep(time.Second)
        }
        cancelFn()
        fmt.Printf("Cancelled context and now expect blocking XGroupCreateMkStream to be interrupted...\n")
    }()

    name := "blag"
    streamName := name
    groupName := name + "-blah"

    _, err := rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Result()
    fmt.Printf("%v\n", err)

    var wg sync.WaitGroup

    wg.Add(1)
    go func() {
        defer wg.Done()
        objs, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: uuid.NewString(),
            Streams:  []string{streamName, ">"},
            Count:    100,
            Block:    0,
        }).Result()
        fmt.Printf("%v, %v\n", err, objs)
    }()

    wg.Wait()
    fmt.Printf("Done.\n")
}

Context (Environment)

I have two goroutines concurrently performing XREADGROUP and XADD in blocking mode. XADD is triggered by external events and is not guaranteed to add items to the stream at any particular cadence or pattern. Shutting down reading goroutine is not possible due to the blocking call that does not get interrupted by context concellation.

Detailed Description

Blocking calls should interrupt when context is cancelled and connection closed.

Possible Implementation

N/A

berndverst commented 1 year ago

Dapr maintainer here. I also need this fixed.

Alger7w commented 1 year ago

I have same problem too. version: v9.0.0-rc.2

latolukasz commented 1 year ago

The same problem, quite a bit blocker for me:(

brettmorien commented 1 year ago

We've built a workaround for this using samber/lo, but agree that this should really be handled by the library level.

❗❗ Turns out this leaks a lot of goroutines and shouldn't be used. ❗❗

var cmd *redis.XStreamSliceCmd

select {
case cmd = <-lo.Async(func() *redis.XStreamSliceCmd {
    return c.Client.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    "group",
        Consumer: "consumerID",
        Streams:  []string{"stream", ">"},
        Count:    1,
    })
}):
case <-ctx.Done():
    return ctx.Err()
}

streams, err := cmd.Result()
armsnyder commented 1 year ago

Note that the workaround above will leak goroutines if you run the code repeatedly, so it's really only viable for handling app shutdown.

armsnyder commented 1 year ago

I was able to contribute test cases in #2432. However I'm less confident in providing a fix. The code would need to safely dispose of the connection if the context is canceled, such as removing it from the connection pool. It would also need to not interfere with the expected behavior of ContextTimeoutEnabled (#2243).

monkey92t commented 1 year ago

This feels complicated, but net.Conn is hard to be controlled by ctx. net.Conn uses Deadline instead of ctx...

var conn net.Conn // go-redis pool.Conn
ctx := context.Background()

processBlockCmd := func() <-chan *redis.XStreamSliceCmd {
    ch := make(chan *redis.XStreamSliceCmd)
    go func() {
        cmd := &redis.XStreamSliceCmd{}
        // write...
        if _, err := conn.Read(nil); err != nil {
            // check conn timeout?
            if err.Error() == "i/o timeout" && errors.Is(ctx.Err(), context.Canceled) {
                cmd.SetErr(err)
            }
        }
        ch <- cmd
        close(ch)
    }()
    return ch
}

select {
case cmd := <-processBlockCmd():
    return cmd
case <-ctx.Done():
    conn.SetDeadline(time.Now())
    return <-processBlockCmd()
}
armsnyder commented 1 year ago

@monkey92t Right, deadlines on net.Conn are best used when you know the timeout ahead of time. The code you shared makes sense, and it is very similar to the change I just proposed in #2433 at a library level. I believe this belongs in the library since I would prefer the redis client to manage the connection for me. If users would prefer not to cancel redis commands with a context, then they can pass context.Background() to redis commands.

EDIT: One change between your code and mine is that you used SetDeadline whereas I closed the connection. Is there a meaningful difference there?

monkey92t commented 1 year ago

@armsnyder We still need to think more. If goroutine is used every time a command is executed, it will have an impact on performance. I haven't thought of a good solution yet.

armsnyder commented 1 year ago

Here's a benchstat comparing master with my PR.

https://gist.github.com/armsnyder/40aca6ea480bf53434d1e41c663e1550

We could optimize by running a goroutine per connection rather than per command. The connection goroutine would handle all I/O, with the command communicating to it over a channel.

brettmorien commented 1 year ago

Hi folks (@monkey92t)!

I'm checking in to see if this is still on the radar. Not being able to cancel out of a blocking call is a deal breaker for gracefully shutting down our apps. We are currently operating on a July 2022 beta of this library until we can upgrade to a properly working release version.

monkey92t commented 1 year ago

Hi folks (@monkey92t)!

I'm checking in to see if this is still on the radar. Not being able to cancel out of a blocking call is a deal breaker for gracefully shutting down our apps. We are currently operating on a July 2022 beta of this library until we can upgrade to a properly working release version.

Thank you for your attention....I've done related tests and it's hard to choose:

  1. Adding the chan+goroutine method (such as @armsnyder's example), we will pay a huge cost for each command executed.
  2. When <-ctx.Done(), we will close a network connection, because we can't trust the state of this connection, which will cause a chain reaction (#2046)

No matter how we do it, it will cause a lot of side effects because of context. I haven't found a better solution. A similar approach is also used in the net(*netFD) package.

I'm trying more solutions and benchmark tests, such as letting users choose whether to pay for listening to ctx, like #2243.

berndverst commented 1 year ago

Hi folks (@monkey92t)! I'm checking in to see if this is still on the radar. Not being able to cancel out of a blocking call is a deal breaker for gracefully shutting down our apps. We are currently operating on a July 2022 beta of this library until we can upgrade to a properly working release version.

Thank you for your attention....I've done related tests and it's hard to choose:

  1. Adding the chan+goroutine method (such as @armsnyder's example), we will pay a huge cost for each command executed.
  2. When <-ctx.Done(), we will close a network connection, because we can't trust the state of this connection, which will cause a chain reaction (Constantly Reestablishing Connections to AWS ElastiCache Redis in Cluster Mode (Continued) #2046)

No matter how we do it, it will cause a lot of side effects because of context. I haven't found a better solution. A similar approach is also used in the net(*netFD) package.

I'm trying more solutions and benchmark tests, such as letting users choose whether to pay for listening to ctx, like #2243.

With respect to (1) I am concerned about goroutines being leaked, or GC overhead. This would be a deal breaker for our use in Dapr (Distributed Application Runtime - github.com/dapr/dapr). We are very performance conscious as our project runs on a variety of targets included embedded systems.

I can't speak in favor of (2), but my vote is against (1).

monkey92t commented 1 year ago

@berndverst What do you think of #2455 ?

berndverst commented 1 year ago

@berndverst What do you think of #2455 ?

Let me loop in one of my co-maintainers - @italypaleale thoughts on https://github.com/redis/go-redis/pull/2455 for addressing the issue discussed here?

brettmorien commented 1 year ago

Hi folks. Any movement on this issue?

wk8 commented 1 year ago

This is a locker for us, could you please look at #2455? Thank you!

kkkbird commented 1 year ago

really need this fix, any update?