Open mophite opened 3 years ago
f = cli.client.RequestStream(payload.New(req.Bytes(), context.Marshal(c)))
cc, cancel := context.WithTimeout(context.TODO(), timeout)
f. SubscribeOn(scheduler.Parallel()). DoFinally(func(s rx.SignalType) { //todo handler rx.SignalType cancel() close(rsp) close(errs) }). Subscribe( cc, rx.OnNext(func(p payload.Payload) error { rsp <- payload.Clone(p).Data() return nil }), rx.OnError(func(e error) { errs <- e }), )
Sorry for the late reply, I will check the timeout logic once I have time.
The current Flux Processor doesn't support Context API, it will be supported in the near future.
Flux Processor
f = cli.client.RequestStream(payload.New(req.Bytes(), context.Marshal(c)))
cc, cancel := context.WithTimeout(context.TODO(), timeout)
f. SubscribeOn(scheduler.Parallel()). DoFinally(func(s rx.SignalType) { //todo handler rx.SignalType cancel() close(rsp) close(errs) }). Subscribe( cc, rx.OnNext(func(p payload.Payload) error { rsp <- payload.Clone(p).Data() return nil }), rx.OnError(func(e error) { errs <- e }), )