spring-projects / spring-statemachine

Spring Statemachine is a framework for application developers to use state machine concepts with Spring.
1.54k stars 604 forks source link

State machine state not getting changed quickly enough #951

Open hmnshgpt455 opened 3 years ago

hmnshgpt455 commented 3 years ago

Please consider below sequence of events (This all is happening inside a microservice M1):

  1. I have sent an event E from a bean which transitions state from state S to T. For E there is an action A configured. Let's say I send the event E with a message containing an order id in the header.

  2. I am sending a JMS message in A. That message is being listened by another microservice M2, which will return back another message. That message is being listened by a JMS listener in M1 and it will call a method inside of a bean.

  3. After action is executed, there is an interceptor (let's name it I) where preStateChange method is overriden. Inside interceptor, I am updating and persisting the order entity (using JPA) in DB with the new target state T.

What's happening is that, A is called before the preStateChange method inside I. A sends the JMS message to microservice M2, listener inside M1 gets the response for that message from M2, calls the bean method. In the bean method I am sending another event which transitions the state machine and order both from T to U. The problem here is that M2 is replying back quickly enough such that the control reaches the bean method before the state is actually transitioned to T. So, when I send second event to transition T to U, state machine does not accept it because the state is still S and the state machine terminates execution as there is no further handling for this.

All the above is happening from inside a task scheduled method which calls step 1 to 3 every 10 seconds. Note : For every task, the state machine is fetched with new UUID from the state factory. So, everytime there a brand new reference at the start of every step 1. Another thing is, this problem is randomly for some orders (tasks). Some are getting updated before M2 replies back and for them the execution is successful while for some state machine state is not getting updated quickly enough.

Please help how to tackle this situation. Please let me know if anything is not clear with the issue, I can provide the github link for the repo as well, if required.

` @Service @RequiredArgsConstructor @Slf4j public class BeerOrderManagerImpl implements BeerOrderManager {

public static final String BEER_ORDER_ID_HEADER = "BEER_ORDER_ID";

private final StateMachineFactory<BeerOrderStatusEnum, BeerOrderEventEnum> stateMachineFactory;
private final BeerOrderRepository beerOrderRepository;
private final BeerOrderStateChangeInterceptor beerOrderStateChangeInterceptor;

@Override
//THis method is called by the scheduler every 10 seconds
public BeerOrder newBeerOrder(BeerOrder beerOrder) {
    beerOrder.setId(null);
    beerOrder.setOrderStatus(BeerOrderStatusEnum.NEW);
    BeerOrder savedBeerOrder = beerOrderRepository.saveAndFlush(beerOrder);
    sendEvent(savedBeerOrder, BeerOrderEventEnum.VALIDATE_ORDER);
    return savedBeerOrder;
}

@Override
//This is the method that will be called once M2 replies back to the message being sent in A. The problem is happening in this method.
public void handleBeerOrderValidationResult(Boolean isValidBeerOrder, UUID beerOrderId) {

    /**
     * If i don't call this method, then for some orders the state remains as new and the below sendEvent is not accepted.
     * This was resolved with this WA method. Wanted to know, if there is anything in SM, which can be used to handle this situation
     */
    //awaitForStatus(beerOrderId, BeerOrderStatusEnum.PENDING_VALIDATION);

    Optional<BeerOrder> beerOrderOptional = beerOrderRepository.findById(beerOrderId);

    beerOrderOptional.ifPresentOrElse(beerOrder -> {
        if (isValidBeerOrder) {
            log.debug("Order status right now : " + beerOrder.getOrderStatus());
            //This event is not getting accepted, because it's source state is PENDING_VALIDATION state
            Boolean isEventAccepted = sendEvent(beerOrder, BeerOrderEventEnum.VALIDATION_PASSED);
            if (!isEventAccepted) log.debug("Event from pending validation to validation passed not accepted for order with id " + beerOrderId);

            //awaitForStatus(beerOrderId, BeerOrderStatusEnum.VALIDATED);
            BeerOrder validatedOrder = beerOrderRepository.findById(beerOrderId).get();
            isEventAccepted = sendEvent(validatedOrder, BeerOrderEventEnum.ALLOCATE_INVENTORY_TO_ORDER);
            if (!isEventAccepted) log.debug("Event from validation passed to pending inventory not accepted for order with id " + beerOrderId);
        } else {
            sendEvent(beerOrder, BeerOrderEventEnum.VALIDATION_FAILED);
        }
    }, () -> log.error("Order not found with id : " + beerOrderId));

}

private Boolean sendEvent(BeerOrder beerOrder, BeerOrderEventEnum eventEnum) {
    StateMachine<BeerOrderStatusEnum, BeerOrderEventEnum> stateMachine = buildSM(beerOrder);

    Message message = MessageBuilder
                        .withPayload(eventEnum)
                        .setHeader(BEER_ORDER_ID_HEADER, beerOrder.getId().toString())
                        .build();

    return stateMachine.sendEvent(message);

}

private StateMachine<BeerOrderStatusEnum, BeerOrderEventEnum> buildSM(BeerOrder beerOrder) {
    StateMachine<BeerOrderStatusEnum, BeerOrderEventEnum> stateMachine =
            stateMachineFactory.getStateMachine(beerOrder.getId());

    stateMachine.stop();

    stateMachine.getStateMachineAccessor()
            .doWithAllRegions(sma -> {
                sma.addStateMachineInterceptor(beerOrderStateChangeInterceptor);
                sma.resetStateMachine(new DefaultStateMachineContext<>(
                                beerOrder.getOrderStatus(), null, null, null));
            });
    stateMachine.start();

    return stateMachine;
}

private void awaitForStatus(UUID beerOrderId, BeerOrderStatusEnum statusEnum) {

    AtomicBoolean found = new AtomicBoolean(false);
    AtomicInteger loopCount = new AtomicInteger(0);

    while (!found.get()) {
        if (loopCount.incrementAndGet() > 40) {
            found.set(true);
            log.debug("Loop Retries exceeded");
        }

        beerOrderRepository.findById(beerOrderId).ifPresentOrElse(beerOrder -> {
            if (beerOrder.getOrderStatus().equals(statusEnum)) {
                found.set(true);
                log.debug("Order Found");
            } else {
                //log.debug("Order Status Not Equal. Expected: " + statusEnum.name() + " Found: " + beerOrder.getOrderStatus().name());
            }
        }, () -> {
            log.debug("Order Id Not Found");
        });

        if (!found.get()) {
            try {
                //log.debug("Sleeping for retry");
                Thread.sleep(100);
            } catch (Exception e) {
                // do nothing
            }
        }
    }
}

} @Slf4j @Component @RequiredArgsConstructor public class BeerOrderStateChangeInterceptor extends StateMachineInterceptorAdapter<BeerOrderStatusEnum, BeerOrderEventEnum> {

private final BeerOrderRepository beerOrderRepository;
private final StateMachinesHelper stateMachinesHelper;

@Override
public void preStateChange(State<BeerOrderStatusEnum, BeerOrderEventEnum> state, Message<BeerOrderEventEnum> message,
                           Transition<BeerOrderStatusEnum, BeerOrderEventEnum> transition, StateMachine<BeerOrderStatusEnum, BeerOrderEventEnum> stateMachine,
                           StateMachine<BeerOrderStatusEnum, BeerOrderEventEnum> rootStateMachine) {

    Optional<BeerOrder> beerOrderOptional = stateMachinesHelper.extractBeerOrderFromMessage(message);
    beerOrderOptional.ifPresentOrElse(beerOrder -> {
        log.debug("Saving state for order with ID : " + beerOrder.getId() + " new status : " + state.getId());
        beerOrder.setOrderStatus(state.getId());
        beerOrderRepository.saveAndFlush(beerOrder);
    }, () -> log.debug(" Inside interceptor order id not found with id " + (String) message.getHeaders().get(BeerOrderManagerImpl.BEER_ORDER_ID_HEADER)));

}

} ` Above is the interceptor I.

`@Component @RequiredArgsConstructor @Slf4j public class ValidateOrderAction implements Action<BeerOrderStatusEnum, BeerOrderEventEnum> {

private final JmsTemplate jmsTemplate;
private final BeerOrderMapper beerOrderMapper;
private final StateMachinesHelper stateMachinesHelper;
private final BeerOrderRepository beerOrderRepository;

@Override
public void execute(StateContext<BeerOrderStatusEnum, BeerOrderEventEnum> context) {

    Optional<BeerOrder> beerOrderOptional = stateMachinesHelper.extractBeerOrderFromMessage(context.getMessage());
    beerOrderOptional.ifPresentOrElse(beerOrder -> {
        BeerOrderDto beerOrderDto = beerOrderMapper.beerOrderToDto(beerOrder);
        jmsTemplate.convertAndSend(JmsConfig.VALIDATE_ORDER_QUEUE, new ValidateBeerOrderRequest(beerOrderDto));
        log.debug("Sent validation request to queue for order id : " + beerOrderDto.getId());
    },() -> log.debug(" Inside ValidateOrderAction order id not found with id " + (String) context.getMessage().getHeaders().get(BeerOrderManagerImpl.BEER_ORDER_ID_HEADER)));

}

}`

Above is the action A.

Below is the SM config :

`@Configuration @EnableStateMachineFactory @AllArgsConstructor public class BeerOrderStateMachineConfig extends StateMachineConfigurerAdapter<BeerOrderStatusEnum, BeerOrderEventEnum> {

private final Action<BeerOrderStatusEnum, BeerOrderEventEnum> validateOrderAction;
private final Action<BeerOrderStatusEnum, BeerOrderEventEnum> allocateOrderAction;

@Override
public void configure(StateMachineStateConfigurer<BeerOrderStatusEnum, BeerOrderEventEnum> states) throws Exception {
    states.withStates()
            .initial(BeerOrderStatusEnum.NEW)
            .states(EnumSet.allOf(BeerOrderStatusEnum.class))
            .end(BeerOrderStatusEnum.DELIVERED)
            .end(BeerOrderStatusEnum.VALIDATION_FAILED)
            .end(BeerOrderStatusEnum.CANCELLED)
            .end(BeerOrderStatusEnum.PICKED_UP)
            .end(BeerOrderStatusEnum.DELIVERY_FAILED);
}

@Override
public void configure(StateMachineTransitionConfigurer<BeerOrderStatusEnum, BeerOrderEventEnum> transitions) throws Exception {
    transitions.withExternal()
            .source(BeerOrderStatusEnum.NEW).target(BeerOrderStatusEnum.PENDING_VALIDATION)
            .action(validateOrderAction)
            .event(BeerOrderEventEnum.VALIDATE_ORDER)
        .and().withExternal()
            .source(BeerOrderStatusEnum.PENDING_VALIDATION).target(BeerOrderStatusEnum.VALIDATED)
            .event(BeerOrderEventEnum.VALIDATION_PASSED)
        .and().withExternal()
            .source(BeerOrderStatusEnum.PENDING_VALIDATION).target(BeerOrderStatusEnum.VALIDATION_FAILED)
            .event(BeerOrderEventEnum.VALIDATION_FAILED)
        .and().withExternal()
            .source(BeerOrderStatusEnum.VALIDATED).target(BeerOrderStatusEnum.PENDING_INVENTORY_ALLOCATION)
            .event(BeerOrderEventEnum.ALLOCATE_INVENTORY_TO_ORDER)
            .action(allocateOrderAction)
        .and().withExternal()
            .source(BeerOrderStatusEnum.PENDING_INVENTORY_ALLOCATION).target(BeerOrderStatusEnum.INVENTORY_ALLOCATION_FAILED_EXCEPTION)
            .event(BeerOrderEventEnum.INVENTORY_ALLOCATION_FAILURE_EXCEPTION)
        .and().withExternal()
            .source(BeerOrderStatusEnum.PENDING_INVENTORY_ALLOCATION).target(BeerOrderStatusEnum.INVENTORY_ALLOCATED)
            .event(BeerOrderEventEnum.INVENTORY_ALLOCATION_SUCCESS)
        .and().withExternal()
            .source(BeerOrderStatusEnum.PENDING_INVENTORY_ALLOCATION).target(BeerOrderStatusEnum.INVENTORY_ALLOCATION_FAILED_PENDING_INVENTORY)
            .event(BeerOrderEventEnum.INVENTORY_ALLOCATION_FAILURE_NO_INVENTORY);
}

}`