spring-projects / spring-statemachine

Spring Statemachine is a framework for application developers to use state machine concepts with Spring.
1.52k stars 598 forks source link

What is the reactive way to handle result of the SendEvent? #1088

Open sergproua opened 1 year ago

sergproua commented 1 year ago

I created simple test machine with trigger-less transitions, my tests using StateMachineTestPlan shouldGetToTheFinishAndWinStepTest and shouldGetToTheFinishAndLooseStepTest as well as nonreactive tests shouldGetToTheFinishAndWinNonReactiveTest and shouldGetToTheFinishAndLooseNonReactiveTest using sendEvent(...).subscribe() work as expected however getting flux from SendEvent and "flatmapping" them in tests 'shouldGetToTheFinishAndWinReactiveTest' and 'shouldGetToTheFinishAndLooseReactiveTest' do not work. Is it a bug or a feature? What am I missing? Thanks in advance!

package ssm;

import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.StateMachineEventResult;
import org.springframework.statemachine.config.StateMachineBuilder;
import org.springframework.statemachine.test.StateMachineTestPlan;
import org.springframework.statemachine.test.StateMachineTestPlanBuilder;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

@Log4j2
public class StateMachineTest {
    enum States {START, POINT1, POINT2, FINISH, PODIUM}

    enum Events {GO}

    StateMachine<States, Events> createStateMachine(Duration runnerSpeed, boolean shouldWin) throws Exception {
        StateMachineBuilder.Builder<States, Events> builder = StateMachineBuilder.builder();
        builder.configureStates()
                .withStates()
                .initial(States.START)
                .states(EnumSet.allOf(States.class));

        builder.configureTransitions()
                .withExternal().source(States.START).target(States.POINT1).event(Events.GO)
                .actionFunction(ctx -> Mono.delay(runnerSpeed).doOnNext(aLong -> log.info("Point1 - check")).then())
                .and()
                .withExternal().source(States.POINT1).target(States.POINT2)
                .actionFunction(ctx -> Mono.delay(runnerSpeed).doOnNext(aLong -> log.info("Point2 - check")).then())
                .and()
                .withExternal().source(States.POINT2).target(States.FINISH)
                .actionFunction(ctx -> Mono.delay(runnerSpeed).doOnNext(aLong -> log.info("Finish!")).then())
                .and()
                .withExternal().source(States.FINISH).target(States.PODIUM)
                .guard(stateContext -> {
                    log.info(shouldWin ? "Winner!" : "Too Slow!");
                    return shouldWin;
                })
//   Where is the .guardFunction(Guard<States, Events>) or .guard(ReactveGuard<States, Events>) ??
//                .guard(new ReactiveGuard<States, Events>() {
//                    @Override
//                    public Mono<Boolean> apply(StateContext<States, Events> ctx) {
//                        log.info(shouldWin ? "Winner!" : "Too Slow!");
//                        return Mono.just(shouldWin);
//                    }
//                })
        ;

        builder.configureConfiguration()
                .withConfiguration()
                .autoStartup(false)
                .machineId(this.getClass().getSimpleName());

        return builder.build();
    }

    @Test
    public void shouldGetToTheFinishAndWinStepTest() throws Exception {
        var ssm = createStateMachine(Duration.ofMillis(100), true);

        StateMachineTestPlan<States, Events> plan =
                StateMachineTestPlanBuilder.<States, Events>builder()
                        .defaultAwaitTime(1)
                        .stateMachine(ssm, this.getClass().getSimpleName())
                        .step()
                        .expectStateMachineStarted(1)
                        .expectState(States.START)
                        .and()
                        .step()
                        .sendEvent(Events.GO)
                        .expectStateChanged(4)
                        .expectStates(States.PODIUM)
                        .and()
                        .build();

        plan.test();
    }

    @Test
    public void shouldGetToTheFinishAndLooseStepTest() throws Exception {
        var ssm = createStateMachine(Duration.ofMillis(100), false);

        StateMachineTestPlan<States, Events> plan =
                StateMachineTestPlanBuilder.<States, Events>builder()
                        .defaultAwaitTime(1)
                        .stateMachine(ssm, this.getClass().getSimpleName())
                        .step()
                        .expectStateMachineStarted(1)
                        .expectState(States.START)
                        .and()
                        .step()
                        .sendEvent(Events.GO)
                        .expectStateChanged(3)
                        .expectStates(States.FINISH)
                        .and()
                        .build();

        plan.test();
    }

    @Test
    public void shouldGetToTheFinishAndWinReactiveTest() throws Exception {
        AtomicReference<StateMachine<States, Events>> machine = new AtomicReference<>();

        Mono.just(createStateMachine(Duration.ofMillis(100), false))
                .flatMap(ssm -> ssm.startReactively().thenReturn(ssm))
                .doOnNext(machine::set)
                .flatMapMany(ssm -> ssm.sendEvent(Mono.just(MessageBuilder.withPayload(Events.GO).build())))
                .flatMap(StateMachineEventResult::complete)
                .blockLast();

        assertThat(machine.get().getState().getId(), is(States.PODIUM));
    }

    @Test
    public void shouldGetToTheFinishAndLooseReactiveTest() throws Exception {
        AtomicReference<StateMachine<States, Events>> machine = new AtomicReference<>();

        Mono.just(createStateMachine(Duration.ofMillis(100), true))
                .flatMap(ssm -> ssm.startReactively().thenReturn(ssm))
                .doOnNext(machine::set)
                .flatMapMany(ssm -> ssm.sendEvent(Mono.just(MessageBuilder.withPayload(Events.GO).build())))
                .flatMap(StateMachineEventResult::complete)
                .blockLast();

        assertThat(machine.get().getState().getId(), is(States.FINISH));
    }

    @Test
    public void shouldGetToTheFinishAndWinNonReactiveTest() throws Exception {
        var ssm = createStateMachine(Duration.ofMillis(100), false);

        ssm.startReactively().block();

        ssm.sendEvent(Mono.just(MessageBuilder.withPayload(Events.GO).build()))
                .subscribe(r -> log.info("event {}, {}", r.getResultType(), ssm.getState()));

        Thread.sleep(1000);

        assertThat(ssm.getState().getId(), is(States.FINISH));
    }

    @Test
    public void shouldGetToTheFinishAndLooseNonReactiveTest() throws Exception {
        var ssm = createStateMachine(Duration.ofMillis(100), true);

        ssm.startReactively().block();

        ssm.sendEvent(Mono.just(MessageBuilder.withPayload(Events.GO).build()))
                .publishOn(Schedulers.boundedElastic())
                .subscribe(r -> log.info("event {}, {}", r.getResultType(), ssm.getState()));

        Thread.sleep(1000);

        assertThat(ssm.getState().getId(), is(States.PODIUM));
    }

}
jayChrono commented 5 months ago

I struggled with this a bit but found a workaround. Simply cast StateMachineTransitionBuilder on StateMachineTransitionConfigurer (in your case, builder.configureTransitions()) to access additional configuration methods. Then use StateMachineTransitionBuilder.addTransition method to include your ReactiveGuard.