spring-cloud / spring-cloud-netflix

Integration with Netflix OSS components
http://cloud.spring.io/spring-cloud-netflix/
Apache License 2.0
4.86k stars 2.44k forks source link

HystrixCommands.toMono() not working properly #2708

Closed Delorien84 closed 5 years ago

Delorien84 commented 6 years ago

Hi, I have discovered strange behavior when using Flux stream with Hystrix (using Observable command via org.springframework.cloud.netflix.hystrix.HystrixCommands.

I have following usecase (in real web service is call is used via WebClient, but this works too):

Flux
                .just(1, 2, 3, 4, 5)
                .delayElements(Duration.ofMillis(500))
                .flatMap(i -> HystrixCommands
                        .from(Mono.just(i)
                                .map(x -> x * 10)
                                .delayElement(Duration.ofMillis(200)))
                        .commandName("command")
                        .toMono()) // THIS IS IMPORTANT
                .delayElements(Duration.ofMilis(200)) // simulate other WS call
                .subscribe(System.out::println);

Problem with this code, is that Hystrix does not count successful events. This cause incorrect metric, and cause to circuit never to closed.

I have look deeper and found possible root cause.

In Mono.from(Publisher) called in HystrixCommands.toMono(), it creates reactor.core.publisher.MonoFromPublisher, that subscribe reactor.core.publisher.MonoNext.NextSubscriber. There is invocation of method onComplete() inside method onNext(T). This cause the subscription complete and unsubscribe this mono. Because of that parent RxJava Observable (inside hystrix) is unsubscribed and then onComplete() call is never fired (and then calculation of successfully event).

I have found workaround to use following call: .toFlux().singleOrEmpty() instead of call .toMono(). Following flux -> mono conversion does not call onComplete() on onNext() but rather wait for parent onComplete() propagation.

I do not known, if this should be fixed in this project. But at least the workaround may be implemented here.

Note: Additional example, that may be more easily debuged:

Flux
                .just(1, 2, 3, 4, 5)
                .delayElements(Duration.ofMillis(500))
                .flatMap(i -> {
                    HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("group");
                    HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("command");
                    Setter setterToUse = Setter.withGroupKey(groupKey).andCommandKey(commandKey);
                    Observable<Integer> res = new HystrixObservableCommand<Integer>(setterToUse) {

                        @Override
                        protected Observable<Integer> construct() {
                            return Observable.just(i * 10);
                        }

                    }.toObservable();
                    return Mono.from(RxReactiveStreams.toPublisher(res));
                })
                .subscribe(System.out::println);

Then see rx.internal.producers.SingleProducer.request(long) on line 71, c.isUnsubscribed() return true, then line 75 c.onCompleted() is never reach. When using Flux.from(RxReactiveStreams.toPublisher(res)).singleOrEmpty(), the line 75 is reach and c.onCompleted() is called. Causing hystrix hooks called and properly calculated successful event.

Spring version: 2.0.0.RC1 Spring Cloud version: Finchley.M5 Spring Cloud Hystrix version: 2.0.0.M5 Java version:

java version "1.8.0_162"
Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
spencergibb commented 6 years ago

@Delorien84 can you explain the specific problem, what is the expected outcome vs what is actually happening?

Delorien84 commented 6 years ago

Sorry for that.

It cause that Hystrix onSuccess hook is not called, that cause several problems.

Expectation is, that Hystrix onSuccess hook is called in all case , without using workaround described above ( .toFlux().singleOrEmpty())

Additional notes: When Hystrix command return empty Observable (empty Mono), it works without workaround.

Delorien84 commented 6 years ago

See https://github.com/Delorien84/hystrix-observable-flux-bug for test case.

spencergibb commented 6 years ago

Hmm, why are you trying to convert a Flux to a Mono?

Delorien84 commented 6 years ago

@spencergibb Because I call web service via WebClient that return Mono as result. As i understand with talk in 1069. What I am trying to do, is the following chain WebClient(Mono) -> Hystrix(Observable) -> Return(Mono) (Hystrix does not use Single, only Observable and there is a API, that allows me to do). The problem is conversion from Observable to Mono via Mono.from(RxReactiveStreams.toPublisher(observable)). When first item arrived to Mono it cause cancellation of Observable (see explanation in referenced bug). Canceled Observable does not trigger onSuccess handlers and Hystrix does not correctly count success event.

So for me it is confusing the org.springframework.cloud.netflix.hystrix.HystrixCommands API, that has toMono() method, but it does not work as user expect. WebClient return Mono, so I naturally call HystrixCommands.toMono(). I would recommend one of the following:

spencergibb commented 6 years ago

@smaldini any ideas?

spencergibb commented 5 years ago

This module has entered maintenance mode. This means that the Spring Cloud team will no longer be adding new features to the module. We will fix blocker bugs and security issues, and we will also consider and review small pull requests from the community.