ReactiveX / RxGo

Reactive Extensions for the Go language.
MIT License
4.93k stars 336 forks source link

mapOperator missing res when return error in "map" function #406

Open feng7208485 opened 9 months ago

feng7208485 commented 9 months ago

here is my code

func main() {
    supplier := make([]rxgo.Producer, 0)
    supplier = append(supplier, GenCheckFieldTypeFuture(context.Background(), AAA{Foo: "111"}))
    observable := rxgo.Create(supplier).Map(func(_ context.Context, i interface{}) (interface{}, error) {
        if v, ok := i.(CheckFieldTypeParam); ok {
            println(fmt.Sprintf("v type is : %v", reflect.TypeOf(v)))
            return v, errors.New("asasfqwfq")
        }
        return i, nil
    }, rxgo.WithPool(1))

    for item := range observable.Observe() {
        println(fmt.Sprintf("item.V type is:%v", reflect.TypeOf(item.V)))
        println(fmt.Sprintf("item.E type is:%v", reflect.TypeOf(item.E)))
    }
}

the out put is

v type is : main.CheckFieldTypeParam
item.V type is:<nil>
item.E type is:*errors.errorString

Expected behavior I expect item.V is not nil, the framwork should not drop result even if the map fun return an error!!!

feng7208485 commented 9 months ago

this bug should fix by following code:

func (op *mapOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
    res, err := op.apply(ctx, item.V)
    if err != nil {
        Of(res).Error(err).SendContext(ctx,** dst)
        operatorOptions.stop()
        return
    }
    Of(res).SendContext(ctx, dst)
}

just add "Of(res)" before line 4