KStateMachine / kstatemachine

KStateMachine is a powerful Kotlin Multiplatform library with clean DSL syntax for creating complex state machines and statecharts driven by Kotlin Coroutines.
https://kstatemachine.github.io/kstatemachine/
Boost Software License 1.0
358 stars 21 forks source link

app crash when sending multiple events at once(through for loop) #87

Closed preetb123 closed 7 months ago

preetb123 commented 8 months ago

App crash. How to avoid this while still processing every single event? @nsk90

checking if queue is empty, queue size: 0
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
adding StartGroupCallEvent to queue size: 1
nextEventAndArgument() called with StartGroupCallEvent
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
adding UpdateUserStatusEvent to queue size: 1
adding UpdateUserStatusEvent to queue size: 2
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
adding UpdateUserStatusEvent to queue size: 1
nextEventAndArgument() called with UpdateUserStatusEvent
adding UpdateUserStatusEvent to queue size: 1
adding UpdateUserStatusEvent to queue size: 2
adding UpdateUserStatusEvent to queue size: 3
nextEventAndArgument() called with UpdateUserStatusEvent
adding UpdateUserStatusEvent to queue size: 3
nextEventAndArgument() called with UpdateUserStatusEvent
adding UpdateUserStatusEvent to queue size: 3
adding UpdateUserStatusEvent to queue size: 4
adding UpdateUserStatusEvent to queue size: 5
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
adding UpdateUserStatusEvent to queue size: 1
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
adding UpdateUserStatusEvent to queue size: 2
adding UpdateUserStatusEvent to queue size: 3
adding UpdateUserStatusEvent to queue size: 1
adding UpdateUserStatusEvent to queue size: 6
adding UpdateUserStatusEvent to queue size: 6
adding UpdateUserStatusEvent to queue size: 6
adding UpdateUserStatusEvent to queue size: 7
adding UpdateUserStatusEvent to queue size: 8
adding UpdateUserStatusEvent to queue size: 10
adding UpdateUserStatusEvent to queue size: 10
adding UpdateUserStatusEvent to queue size: 11
nextEventAndArgument() called with UpdateUserStatusEvent
adding UpdateUserStatusEvent to queue size: 11
adding UpdateUserStatusEvent to queue size: 12
adding UpdateUserStatusEvent to queue size: 13
adding UpdateUserStatusEvent to queue size: 15
adding UpdateUserStatusEvent to queue size: 15
adding UpdateUserStatusEvent to queue size: 17
adding UpdateUserStatusEvent to queue size: 18
adding UpdateUserStatusEvent to queue size: 19
adding UpdateUserStatusEvent to queue size: 20
adding UpdateUserStatusEvent to queue size: 21
adding UpdateUserStatusEvent to queue size: 22
adding UpdateUserStatusEvent to queue size: 24
adding UpdateUserStatusEvent to queue size: 24
adding UpdateUserStatusEvent to queue size: 25
adding UpdateUserStatusEvent to queue size: 26
adding UpdateUserStatusEvent to queue size: 16
nextEventAndArgument() called with UpdateUserStatusEvent
adding UpdateUserStatusEvent to queue size: 28
adding UpdateUserStatusEvent to queue size: 29
adding UpdateUserStatusEvent to queue size: 30
adding UpdateUserStatusEvent to queue size: 31
adding UpdateUserStatusEvent to queue size: 32
adding UpdateUserStatusEvent to queue size: 32
adding UpdateUserStatusEvent to queue size: 33
adding UpdateUserStatusEvent to queue size: 34
adding UpdateUserStatusEvent to queue size: 35
adding UpdateUserStatusEvent to queue size: 37
adding UpdateUserStatusEvent to queue size: 37
adding UpdateUserStatusEvent to queue size: 27
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with UpdateUserStatusEvent
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
nextEventAndArgument() called with null
checking if queue is empty, queue size: 0
checking if queue is empty, queue size: 0
adding UpdateUserChannelOperationEvent to queue size: 1
nextEventAndArgument() called with UpdateUserChannelOperationEvent
nextEventAndArgument() called with null
FATAL EXCEPTION: DefaultDispatcher-worker-1
Process: com.sample.testapp, PID: 15260
java.lang.IllegalStateException: Event queue is not empty, internal error
    at com.sample.testapp.statemachine.MyPendingEventHandler.checkEmpty(MyPendingEventHandler.kt:13)
    at ru.nsk.kstatemachine.StateMachineImpl.eventProcessingScope(StateMachineImpl.kt:174)
    at ru.nsk.kstatemachine.StateMachineImpl.access$eventProcessingScope(StateMachineImpl.kt:17)
    at ru.nsk.kstatemachine.StateMachineImpl$processEvent$2.invokeSuspend(StateMachineImpl.kt:128)
    at ru.nsk.kstatemachine.StateMachineImpl$processEvent$2.invoke(Unknown Source:8)
    at ru.nsk.kstatemachine.StateMachineImpl$processEvent$2.invoke(Unknown Source:2)
    at ru.nsk.kstatemachine.CoroutinesLibCoroutineAbstraction$withContext$2.invokeSuspend(CoroutinesLibCoroutineAbstraction.kt:14)
    at ru.nsk.kstatemachine.CoroutinesLibCoroutineAbstraction$withContext$2.invoke(Unknown Source:8)
    at ru.nsk.kstatemachine.CoroutinesLibCoroutineAbstraction$withContext$2.invoke(Unknown Source:4)
    at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:78)
    at kotlinx.coroutines.BuildersKt__Builders_commonKt.withContext(Builders.common.kt:159)
    at kotlinx.coroutines.BuildersKt.withContext(Unknown Source:1)
    at ru.nsk.kstatemachine.CoroutinesLibCoroutineAbstraction.withContext(CoroutinesLibCoroutineAbstraction.kt:14)
    at ru.nsk.kstatemachine.StateMachineImpl.processEvent(StateMachineImpl.kt:114)
    at ru.nsk.kstatemachine.StateMachine$DefaultImpls.processEvent$default(StateMachine.kt:63)
    at com.sample.testapp.statemachine.AppModel$sendEvent$1$1.invokeSuspend(AppModel.kt:26)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
    Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@168ccc2, Dispatchers.Default]
nsk90 commented 8 months ago

Hi, could you provide a code sample facing this issue?

nsk90 commented 8 months ago

Is the falling StateMachine called from multiple threads? Pease specify with which scope do you call createStateMachine function.

preetb123 commented 8 months ago

Is the falling StateMachine called from multiple threads? Pease specify with which scope do you call createStateMachine function.

@nsk90 , thank you very much for replying. I called createStateMachine from GlobalScope.launch{}.

preetb123 commented 8 months ago

@nsk90 for now I have made following modification and so far it did not crash. But not sure if it is the right way to do

  private val SINGLE_THREAD = newSingleThreadContext("mvi")

  fun sendEvent(event: AppEvent): Unit = intent {
    sendEffect(ModelEffect.ControlEventSent(event))
    val machine = NysnoApplication.getApp().machine
    GlobalScope.launch {
      withContext(SINGLE_THREAD){
        machine?.processEvent(event)
      }
    }
  }
nsk90 commented 8 months ago

I think the crash is related to GlobalScope usage. As it uses Default coroutine dispatcher internally (correct?). This dispatcher uses thread pool, not a single thread. So processEvent may be executed concurrenly, which is wrong. You should pass single threaded coroutine context into createStateMachine function. Try it. If this does not help, I will try to reproduce your issue locally to debug.

preetb123 commented 8 months ago

@nsk90 is this fine?

GlobalScope.launch {
  withContext(singleThread){
    machine = createAndStartStateMachine()
    machine?.start(null)
    printStateMachine()  
  }
}
nsk90 commented 8 months ago

yeah this looks better, please inline createAndStartStateMachine() body, so is can see pure library methods, to give exact answer.

I think you don't need GlobalScope at all. Having separate single thread you can create your own CoroutineScope from it, and use the latter as createStateMachine argument.

This is not critical and does not change the way how it should work, both cases are effectively equivalent.

Something like this:

CoroutineScope(newSingleThreadContext("your context"))
// or
CoroutineScope(Dispatchers.Default.limitedParallelism(1))
preetb123 commented 8 months ago

this is how createAndStartStateMachine looks. Actually it took lot of time to create if I pass start = true while creating and I got machine variable not initialized error in sendEvent function while accessing machine, so had to change it this way.

suspend fun createAndStartStateMachine(): StateMachine {
  return createStateMachine(GlobalScope, "MyAppState", ChildMode.PARALLEL, start = false) {
    logger = StateMachine.Logger {
      Timber.tag("StateMachine").d(it())
    }

    state("AppState") {
      addInitialState(AppState.LoginState) {
        ....
      }
      ...
    }
  }
}
preetb123 commented 8 months ago

CoroutineScope(Dispatchers.Default.limitedParallelism(1))

you mean like this?

val applicationScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))
applicationScope.launch {
  machine = createAndStartStateMachine()
  machine?.start(null)
  printStateMachine()
}
nsk90 commented 8 months ago

not, actually

GlobalScope.launch { // any scope/context, it does not matter

      val machineScope = CoroutineScope(newSingleThreadContext("your context")) // must be single threaded, you should cancel it when you complete working with the machine.

      val machine = createStateMachine(machineScope, ....) {...} // this scope/context matters
      //...

      // the library will execute all suspendable functions from machineScope context
      machine.processEvent(...) // will switch to machineScope internally, so you can call it from any thread and context.
      // please not that it is only about suspendable library functions. all other non-suspendable api cannot be used in 
      // multithreaded environment without proper synchronization.
} 
preetb123 commented 8 months ago

not, actually

GlobalScope.launch { // any scope/context, it does not matter

      val machineScope = CoroutineScope(newSingleThreadContext("your context")) // must be single threaded, you should cancel it when you complete working with the machine.

      val machine = createStateMachine(machineScope, ....) {...} // this scope/context matters
      //...

      // the library will execute all suspendable functions from machineScope context
      machine.processEvent(...) // will switch to machineScope internally, so you can call it from any thread and context.
      // please not that it is only about suspendable library functions. all other non-suspendable api cannot be used in 
      // multithreaded environment without proper synchronization.
} 

thank you very much @nsk90