ReactiveX / RxGo

Reactive Extensions for the Go language.
MIT License
4.96k stars 338 forks source link

Sometimes First().Get() not returning a context canceled error #372

Closed dayaftereh closed 2 years ago

dayaftereh commented 2 years ago

I use Filter() with First() and Get() to get the first rxgo.Item, which match my filter function. But when the passed context by rxgo.WithContext is canceled, the Get() is sometimes not returning the context canceled error.

The following test is running 10 times an context cancel after 100 ms and counts how often an error (context canceled) is returned from Get(). Running the test multiply times the ok and not ok count changes randomly.

var item rxgo.Item

func run(t *testing.T, run int) bool {
    // create a context and cancel it after 100 ms
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        time.Sleep(time.Millisecond * 100)
        cancel()
    }()

    // make an observable from a channel
    ch := make(chan rxgo.Item)
    observable := rxgo.FromChannel(ch, rxgo.WithContext(ctx))

    // use first and get to get the first matching item
    i, err := observable.
        Filter(func(i interface{}) bool {
            return false
        }, rxgo.WithContext(ctx)).
        First(rxgo.WithContext(ctx)).
        Get(rxgo.WithContext(ctx))

    // use to keep the item
    item = i

    // the error should be not nil, because context canceled
    return err != nil
}

func TestContextCancelRxGO(t *testing.T) {
    runs := 10
    okRuns := 0
    for i := 0; i < runs; i++ {
        // try multiply times
        ok := run(t, i)
        // count the ok runs
        if ok {
            okRuns++
        }
    }

    t.Logf("ok runs: %d", okRuns)
    notOk := runs - okRuns
    t.Logf("not ok runs: %d", notOk)

    if notOk > 0 {
        t.Fail()
    }
}

Expected is that all runs returning an context canceled error and the notOk count is zero.

I think this is an issue with First() and Get() in combination with Filter().

Tested with: github.com/reactivex/rxgo/v2 v2.5.0

si3nloong commented 2 years ago

You shouldn't pass context on every function, because by default the context is propagate to every children. Instead you should do like this

func run(t *testing.T, run int) bool {
    // create a context and cancel it after 100 ms
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        time.Sleep(time.Millisecond * 100)
        cancel()
    }()

    // make an observable from a channel
    ch := make(chan rxgo.Item)
    observable := rxgo.FromChannel(ch, rxgo.WithContext(ctx))
    // use first and get to get the first matching item
    i, err := observable.
        Filter(func(i interface{}) bool {
            return false
        }).
        First().
        Get()

        /// OR this ------------------->
    observable := rxgo.FromChannel(ch)
    // use first and get to get the first matching item
    i, err := observable.
        Filter(func(i interface{}) bool {
            return false
        }).
        First().
        Get(rxgo.WithContext(ctx))

    // use to keep the item
    item = i

    // the error should be not nil, because context canceled
    return err != nil
}
dayaftereh commented 2 years ago

@si3nloong I tested both versions of your code with github.com/reactivex/rxgo/v2 v2.5.0, but the issue still exists.