spring-projects / spring-statemachine

Spring Statemachine is a framework for application developers to use state machine concepts with Spring.
1.54k stars 604 forks source link

Wrong nested initial state on composite state repeatable entrance #958

Open azhuchkov opened 3 years ago

azhuchkov commented 3 years ago

I have bunch of long-running jobs which are managed using state machines.

Here is the diagram describing process:

image

Then I use StateMachineRuntimePersister to persist job state into RDBMS - particularly method preStateChange to prevent state transition in case of failure.

The first weird thing I encounter is that after transition from TASK_FAILED (triggerless or by timer) states become composite, e.g. getIds()=[PREPARATION, URL_RESOLVING] (on first pass I see getIds()=[PREPARATION]).

What is worse after this transition I have getIds()=[EXECUTION, TASK_FAILED], like if internal state of EXECUTION superstate has not been reset to initial state. Nevertheless the next state change is to TASK_PROPOSED (which impossible to reach directly from TASK_FAILED), so it looks like despite all the weird things interceptor sees, FSM still works.

Version used: 3.0.0

Parameters passed to interceptor's preStateChange() on TASK_ERROR message and further:

state = ObjectState [getIds()=[TASK_FAILED], getClass()=class org.springframework.statemachine.state.ObjectState, hashCode()=1354772651, toString()=AbstractState [id=TASK_FAILED, pseudoState=null, deferred=[], entryActions=[], exitActions=[org.springframework.statemachine.action.Actions$$Lambda$1053/87936157@2c757071], stateActions=[], regions=[], submachine=null]], message = GenericMessage [payload=TASK_ERROR, headers={id=541eec7a-2cb9-8c4d-2dc4-0ad34ee5c522, timestamp=1620894700561}], transition = AbstractTransition [source=ObjectState [getIds()=[WORKING], getClass()=class org.springframework.statemachine.state.ObjectState, hashCode()=1064701466, toString()=AbstractState [id=WORKING, pseudoState=null, deferred=[], entryActions=[], exitActions=[], stateActions=[], regions=[], submachine=null]], target=ObjectState [getIds()=[TASK_FAILED], getClass()=class org.springframework.statemachine.state.ObjectState, hashCode()=1354772651, toString()=AbstractState [id=TASK_FAILED, pseudoState=null, deferred=[], entryActions=[], exitActions=[org.springframework.statemachine.action.Actions$$Lambda$1053/87936157@2c757071], stateActions=[], regions=[], submachine=null]], kind=EXTERNAL, guard=null], stateMachine = ERROR EXECUTION TASK_PROPOSED TASK_FAILED WORKER_ACQUIRED WAIT_FOR_START WORKING WAIT_FOR_WORKER ACCEPTED CANCELLED PREPARATION WAIT_FOR_SERVICE URL_RESOLVING URL_RESULT_CHECK  / EXECUTION,WORKER_ACQUIRED,WORKING / uuid=40c89ec7-17e8-4ac3-8e9b-b6faebf76c9b / id=16, rootStateMachine = ERROR EXECUTION TASK_PROPOSED TASK_FAILED WORKER_ACQUIRED WAIT_FOR_START WORKING WAIT_FOR_WORKER ACCEPTED CANCELLED PREPARATION WAIT_FOR_SERVICE URL_RESOLVING URL_RESULT_CHECK  / EXECUTION,WORKER_ACQUIRED,WORKING / uuid=40c89ec7-17e8-4ac3-8e9b-b6faebf76c9b / id=16
state = StateMachineState [getIds()=[PREPARATION, URL_RESOLVING], toString()=AbstractState [id=PREPARATION, pseudoState=null, deferred=[], entryActions=[org.springframework.statemachine.action.Actions$$Lambda$1053/87936157@4097d932], exitActions=[], stateActions=[], regions=[], submachine=WAIT_FOR_SERVICE URL_RESOLVING URL_RESULT_CHECK  /  / uuid=f3cc4010-5292-4824-a10c-7a3d05f2aca9 / id=16], getClass()=class org.springframework.statemachine.state.StateMachineState], message = GenericMessage [payload=TASK_ERROR, headers={id=541eec7a-2cb9-8c4d-2dc4-0ad34ee5c522, timestamp=1620894700561}], transition = AbstractTransition [source=ObjectState [getIds()=[TASK_FAILED], getClass()=class org.springframework.statemachine.state.ObjectState, hashCode()=1354772651, toString()=AbstractState [id=TASK_FAILED, pseudoState=null, deferred=[], entryActions=[], exitActions=[org.springframework.statemachine.action.Actions$$Lambda$1053/87936157@2c757071], stateActions=[], regions=[], submachine=null]], target=StateMachineState [getIds()=[PREPARATION, URL_RESOLVING], toString()=AbstractState [id=PREPARATION, pseudoState=null, deferred=[], entryActions=[org.springframework.statemachine.action.Actions$$Lambda$1053/87936157@4097d932], exitActions=[], stateActions=[], regions=[], submachine=WAIT_FOR_SERVICE URL_RESOLVING URL_RESULT_CHECK  /  / uuid=f3cc4010-5292-4824-a10c-7a3d05f2aca9 / id=16], getClass()=class org.springframework.statemachine.state.StateMachineState], kind=EXTERNAL, guard=null], stateMachine = ERROR EXECUTION TASK_PROPOSED TASK_FAILED WORKER_ACQUIRED WAIT_FOR_START WORKING WAIT_FOR_WORKER ACCEPTED CANCELLED PREPARATION WAIT_FOR_SERVICE URL_RESOLVING URL_RESULT_CHECK  / EXECUTION,TASK_FAILED / uuid=40c89ec7-17e8-4ac3-8e9b-b6faebf76c9b / id=16, rootStateMachine = ERROR EXECUTION TASK_PROPOSED TASK_FAILED WORKER_ACQUIRED WAIT_FOR_START WORKING WAIT_FOR_WORKER ACCEPTED CANCELLED PREPARATION WAIT_FOR_SERVICE URL_RESOLVING URL_RESULT_CHECK  / EXECUTION,TASK_FAILED / uuid=40c89ec7-17e8-4ac3-8e9b-b6faebf76c9b / id=16
state = StateMachineState [getIds()=[EXECUTION, TASK_FAILED], toString()=AbstractState [id=EXECUTION, pseudoState=null, deferred=[], entryActions=[org.springframework.statemachine.action.Actions$$Lambda$1053/87936157@3433d121], exitActions=[org.springframework.statemachine.action.Actions$$Lambda$1053/87936157@4db692c0], stateActions=[], regions=[], submachine=TASK_PROPOSED TASK_FAILED WORKER_ACQUIRED WAIT_FOR_START WORKING WAIT_FOR_WORKER  /  / uuid=b46ebcbd-e25a-4795-ac6e-e7de4c677f77 / id=16], getClass()=class org.springframework.statemachine.state.StateMachineState], message = GenericMessage [payload=URL_RESULT, headers={payload=http://10.10.10.10:2222/cc968bbb4, id=125c7499-1b85-a81a-bca7-9ab1252b5f88, timestamp=1620894700618}], transition = AbstractTransition [source=ObjectState [getIds()=[URL_RESOLVING], getClass()=class org.springframework.statemachine.state.ObjectState, hashCode()=634233927, toString()=AbstractState [id=URL_RESOLVING, pseudoState=org.springframework.statemachine.state.DefaultPseudoState@4520207d, deferred=[], entryActions=[org.springframework.statemachine.action.Actions$$Lambda$1053/87936157@507e0129], exitActions=[], stateActions=[], regions=[], submachine=null]], target=ObjectState [getIds()=[URL_RESULT_CHECK], getClass()=class org.springframework.statemachine.state.ObjectState, hashCode()=235163342, toString()=AbstractState [id=URL_RESULT_CHECK, pseudoState=org.springframework.statemachine.state.ChoicePseudoState@77d8567f, deferred=[], entryActions=[], exitActions=[], stateActions=[], regions=[], submachine=null]], kind=EXTERNAL, guard=null], stateMachine = ERROR EXECUTION TASK_PROPOSED TASK_FAILED WORKER_ACQUIRED WAIT_FOR_START WORKING WAIT_FOR_WORKER ACCEPTED CANCELLED PREPARATION WAIT_FOR_SERVICE URL_RESOLVING URL_RESULT_CHECK  / PREPARATION,URL_RESOLVING / uuid=40c89ec7-17e8-4ac3-8e9b-b6faebf76c9b / id=16, rootStateMachine = ERROR EXECUTION TASK_PROPOSED TASK_FAILED WORKER_ACQUIRED WAIT_FOR_START WORKING WAIT_FOR_WORKER ACCEPTED CANCELLED PREPARATION WAIT_FOR_SERVICE URL_RESOLVING URL_RESULT_CHECK  / PREPARATION,URL_RESOLVING / uuid=40c89ec7-17e8-4ac3-8e9b-b6faebf76c9b / id=16
state = ObjectState [getIds()=[TASK_PROPOSED], getClass()=class org.springframework.statemachine.state.ObjectState, hashCode()=734022314, toString()=AbstractState [id=TASK_PROPOSED, pseudoState=null, deferred=[], entryActions=[org.springframework.statemachine.action.Actions$$Lambda$1053/87936157@175541c9], exitActions=[], stateActions=[], regions=[], submachine=null]], message = GenericMessage [payload=WORKER_READY, headers={payload=amq.gen-B6-qHxX8z9hs9INvPbqzuA, id=f36335a5-744d-050a-6baf-964ca3feedc8, timestamp=1620894700649}], transition = AbstractTransition [source=ObjectState [getIds()=[WAIT_FOR_WORKER], getClass()=class org.springframework.statemachine.state.ObjectState, hashCode()=279597205, toString()=AbstractState [id=WAIT_FOR_WORKER, pseudoState=org.springframework.statemachine.state.DefaultPseudoState@3a192050, deferred=[], entryActions=[org.springframework.statemachine.action.Actions$$Lambda$1053/87936157@259b2bd0], exitActions=[], stateActions=[], regions=[], submachine=null]], target=ObjectState [getIds()=[TASK_PROPOSED], getClass()=class org.springframework.statemachine.state.ObjectState, hashCode()=734022314, toString()=AbstractState [id=TASK_PROPOSED, pseudoState=null, deferred=[], entryActions=[org.springframework.statemachine.action.Actions$$Lambda$1053/87936157@175541c9], exitActions=[], stateActions=[], regions=[], submachine=null]], kind=EXTERNAL, guard=null], stateMachine = ERROR EXECUTION TASK_PROPOSED TASK_FAILED WORKER_ACQUIRED WAIT_FOR_START WORKING WAIT_FOR_WORKER ACCEPTED CANCELLED PREPARATION WAIT_FOR_SERVICE URL_RESOLVING URL_RESULT_CHECK  / EXECUTION,WAIT_FOR_WORKER / uuid=40c89ec7-17e8-4ac3-8e9b-b6faebf76c9b / id=16, rootStateMachine = ERROR EXECUTION TASK_PROPOSED TASK_FAILED WORKER_ACQUIRED WAIT_FOR_START WORKING WAIT_FOR_WORKER ACCEPTED CANCELLED PREPARATION WAIT_FOR_SERVICE URL_RESOLVING URL_RESULT_CHECK  / EXECUTION,WAIT_FOR_WORKER / uuid=40c89ec7-17e8-4ac3-8e9b-b6faebf76c9b / id=16
azhuchkov commented 3 years ago

It seems that recreating state machine and resetting it to persisted state solves the issue. Just resetting existing instance didn't help.

In any case it looks like FSM dumps (toString()) show sane current state, the problem is interceptor starts receiving weird states.

BTW, I use DefaultStateMachineService for managing state machines and do not call release.

azhuchkov commented 3 years ago

One more observation: provisioning FSM every time new event arrives with subsequent stop creates circular dependency if some action need to send event: [eventService <-- action <-- stateMachineFactory <-- eventService]. Technically it can be solved introducing some buffer, like BlockingQueue, but it does not look good enough.

So, what is a way to go to implement long-running tasks management using FSM with persistence and timers?