spring-projects / spring-statemachine

Spring Statemachine is a framework for application developers to use state machine concepts with Spring.
1.52k stars 598 forks source link

Scheduler Stack Overflow Error #1056

Open atenneypevco opened 2 years ago

atenneypevco commented 2 years ago
<spring-statemachine-core.version>3.0.1</spring-statemachine-core.version>
<spring.version>5.0.6.RELEASE</spring.version>

We implemented the Spring StateMachine originally with 2.5.0. Everything seemed to run well except the number of threads would increase over time (and remain in a "parked" state), as many machines would be created and completed with our service-style application running continuously. We upgraded to Spring StateMachine 3.0.1, changing over to the reactor-core implementation. The significant change to our implementation was:

old:

machine.sendEvent (msg);

new:

machine.sendEvent(Mono.just(msg))
          .doOnComplete(() -> {
                log.debug (uuid + " Entering State " + machine.getState ().getId () );
          })
          .subscribe();

This solved the thread count increase issue. However, we get a lot of the following error, which I cannot pinpoint any specific triggers in our code: 13:47:18,201 [parallel-3] ERROR XactMachineFactory - Exception happened: reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 10/10 Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 10/10 at reactor.core.Exceptions.retryExhausted(Exceptions.java:290) at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67) at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:374) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:295) at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:884) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187) at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270) at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially. at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:56) at org.springframework.statemachine.support.ReactiveStateMachineExecutor$1.lambda$null$0(ReactiveStateMachineExecutor.java:461) at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:56) at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.resubscribe(FluxRetryWhen.java:215) at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onNext(FluxRetryWhen.java:268) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281) at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ... 18 more

We catch and ignore via:

Hooks.onErrorDropped(error -> {
//            log.error("Exception happened:", error); // Fills up log too fast
});

The application seems to run well at this point for a long duration. However, after several weeks of running, the following error shows in the log4j file, almost continuously: 09:57:28,695 [parallel-4] ERROR Schedulers - Scheduler worker in group main failed with an uncaught exception java.lang.StackOverflowError at java.util.function.Consumer.lambda$andThen$0(Consumer.java:65) at java.util.function.Consumer.lambda$andThen$0(Consumer.java:65) ... (1025 lines total exactly as above)

The application is still running and seems to be processing correctly, except the log file is filling up with this error. I have not been able to get this to happen in debug; this only occurs after weeks of running. Any idea what could be causing this? What kinds of things can we check to figure this out?

buehlerjochen commented 1 year ago

I also would be interested in a solution for this issue. We get the same error in the logs (Retries exhausted: 10/10) after switching from version 2.x to 3.2.0.

Caused by: reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially. suggests that acces to the statemachine needs to be serialized somehow. I tried subscribeOn on a Schedulers.single for all sendEvent calls, but that did not change anything.

Removing all timer and timerOnce calls in the statemachine transitions seemed to prevent the problem, but after adding new events and actions it is back again.

A solution or a hint would be highly welcome. How are the events supposed to be given to the statemachine? Any hint in the documentation would be really appreciated.

vlp commented 1 year ago

I can get the same error buehlerjochen describes reliably on 3.2.0 with the following state machine (that works on 2.5.0):

package poc;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.action.Action;
import org.springframework.statemachine.config.EnableStateMachine;
import org.springframework.statemachine.config.StateMachineConfigurerAdapter;
import org.springframework.statemachine.config.builders.StateMachineConfigurationConfigurer;
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;

@Configuration
@EnableStateMachine
public class XXX extends StateMachineConfigurerAdapter<String, String> {

    @Override
    public void configure(StateMachineConfigurationConfigurer<String, String> config) throws Exception {
        config.withConfiguration()
            .autoStartup(true)
            ;
    }

    @Override
    public void configure(StateMachineStateConfigurer<String, String> states) throws Exception {
        states.withStates()
            .initial("S1")
            .state("S1", new Action<String, String>() {
                @Override
                public void execute(StateContext<String, String> context) {
                    CompletableFuture.delayedExecutor(0, TimeUnit.MILLISECONDS).execute( () -> {
                        context.getStateMachine().sendEvent("AAA");
                    });
                }
            }, null)
            .state("S2")
            .end("S3");
    }

    @Override
    public void configure(StateMachineTransitionConfigurer<String, String> transitions) throws Exception {
        transitions
            .withExternal()
                .source("S1").target("S2")
                .event("AAA")
                .and()
            .withExternal()
                .source("S2").target("S1")
                .timerOnce(0)
                ;
    }

}
joemarner commented 1 year ago

I also get the same issues with timerOnce() :-( (Where no problem with 2.5) Any hints how to solve this?

states87 commented 6 months ago

Hello, I have the same issue. How to solve it? Thanks

Robin-dotcom-ca commented 1 month ago

Hello, I have the same issue after upgrading from 2.x to 4.0.0. I can verify the issue was first introduced since 3.x after the reactive way was introduced.

The issue only happens only when the statemachine was configured with multiple timeror timerOnce.