tolitius / mount

managing Clojure and ClojureScript app state since (reset)
Eclipse Public License 1.0
1.22k stars 88 forks source link

Mount and core.async #84

Open fcabouat opened 7 years ago

fcabouat commented 7 years ago

Hi,

Sorry to post here instead of Slack (blocked by proxy), and it might be a bit of a noob question, but I've got trouble finding documentation on Mount regarding the stopping process / running concurrent processes.

When I have a starting order : state1 -> state2 -> state3 And the stopping order : state3 -> state2 -> state1

Can I block in the state3 function, waiting to clean up some running concurrent process ?

(For more context :

I've got quite a few channels and go-loops running, with dependencies between them (1 go loop giving tempo -> 3 thread I/O workers -> 1 test go-loop downstream-consumer).

I have the following states in my namespace :

(defstate workers-nbr :start 3)
(defstate close-chan  :start (a/chan))

(...)

(defstate heartbeat-chan :start (heartbeat-start) ; starts a go-loop alt!-ing between a timeout and close-chan, returns a chan
                         :stop  #(a/close! close-chan))

(...)

(defstate work-chan :start (workers-start) ; starts 3 long running threads doing IO while heartbeat-chan isn't closed, returns a chan
                    :stop #(a/close! heartbeat-chan))

(...)

(defstate work-test :start (work-test-start) ; starts a go-loop and returns the go-loop chan
                    :stop  #(a/close! work-chan))

And my "main" :

  (m/start)
  (.addShutdownHook (Runtime/getRuntime)
                    (Thread.
                      (fn []
                        (a/close! work-test)
                        (m/stop))))
  (<!! work-test))

I get these kinds of exceptions when I exit :

Exception in thread "async-thread-macro-3" java.lang.IllegalArgumentException: No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: mount.core.NotStartedState

I guess the states get stopped while my go-loops are still running meaning work-chan is stopped right after #(a/close! work-chan) got fired, but before the go-loop had time to pick up the closed signal, hence the go-loop trying a <! on an already closed state.

I'm thinking of blocking inside the stop functions on the go-loop / threads like :

(defstate work-test :start (work-test-start)
                    :stop  #(do (a/close! work-chan)
                                (<!!      work-test)

Will it work properly ? Are mount and core.async meant to be combined in this way ? Or is it bad design ?

Cheers,

François

tolitius commented 7 years ago

a couple observations:

configuration over public constants

(defstate workers-nbr :start 3)

this would usually come from configuration / program parameters rather than being defined as a state

stop functions

is there any reason your stop functions are anonymous functions:

:stop #(a/close! heartbeat-chan)

rather than just:

:stop (a/close! heartbeat-chan)

not stopping state on stop

most likely the root of your problem is that a single state is started and stopped in different states, whereas it should be started and stopped in/as the same state:

(defstate a :start (foo)
            :stop  b)

(defstate b :start (bar)
            :stop  a)

vs.

(defstate a :start (foo)
            :stop  a)

(defstate b :start (bar)
            :stop  b)
thurn commented 7 years ago

Hi, I'm having a very similar problem. In general, I think mount is not currently a good tool for managing channel state. My initial instinct was to do something like this:

(mount/defstate requests-channel
  :start (async/chan)
  :stop (async/close! requests-channel))

But this doesn't actually work. The basic problem is that asynchronous goroutines expect to see a channel in the "closed" state in order to terminate themselves, but mount replaces the channel object with a NotStartedState instance and causes them to crash. Eg. if you have standard goroutine code like this:

(async/go-loop []
    (when-let [request (async/<! requests/requests-channel)]
      ;; handle request
      (recur)))

Your program will randomly crash because requests-channel is gone.

Personally, I'm not really a fan of the whole (undocumented) NotStartedState thing. If it were up to me, stopped states would just get assigned the value you return from :stop, similar to how component works. That was actually how I thought it worked from reading the documentation.

tolitius commented 7 years ago

Personally, I'm not really a fan of the whole (undocumented) NotStartedState thing

why NotStartedState

The initial idea was to follow this succession of values:

NotStartedState => (start it) => (stop it) => NotStartedState

Reasoning:

  1. The state, once stopped returns to its initial value of NotStartedState which gives an immediate feedback in case it is used while it should not be: i.e. rather than chasing NullPointers, etc.

  2. There are states that do not require a :stop function, in which case, if they are stopped, should "no longer be started", and also should not be used: NotStartedState clearly communicates this (in case they are stopped).

stop value has "value"

I do not disagree it creates confusion in cases where other parts of an application rely on a stop value to control their flow. However I also think controlling flow based on mutation is a smell.

core.async made certain design choices, and returning nil from a closed channel is one of them. It works well in some cases, does not work well in others: for example when you depend on it to get out of a tight go-loop.

I usually do something like (pseudo code):

(async/go-loop []
    (when-let [request (alt!
                         request-channel ([msg] msg)
                         stop-channel ([_] nil))]
      ;; handle request
      (recur)))

and then in a :stop function(s) I send a message to a stop-channel which:

I am not saying my way is better, it is just something I feel more comfortable with.

However 47% of me agrees with you that it would be more expected (less surprising) if a state ended up bound to a "stop value" once a :stop function is called. Another 4% and I'll just do it, so I am more than willing to have a discussion about it.

thurn commented 7 years ago

Yeah, I see your point. NotStartedState obviously does have some error-detection benefits. The interaction with channels is just unfortunate, since the "loop until it returns nil" idiom is so widespread and convenient, especially for things like correctly cleaning up child channels created via sub or tap. Wish there was a good compromise solution.

tolitius commented 7 years ago

I don't think mount has limitations when it comes to core.async. Besides the solution above with a stop-channel you can do (pseudo code):

(defn listen-to-events []
  (let [req-chan (chan)]
    (async/go-loop []
       (when-let [request (<! req-chan)]
         ;; handle request
         (recur)))
    req-chan))

(defstate listener :start (listen-to-events)
                   :stop (async/close! listener))

It also conveys a notion of state a bit better since it is not just a channel that is stateful, but a listener as well.

fcabouat commented 7 years ago

Hi, sorry for my low participation after asking a question, changed a bit of focus at work and went on holidays.

You were obviously right for your first two remarks (external conf, using anonymous function instead of a simple function call block). Your third remark was a bit confusing to me since you display a cyclic dependency with a and b. My go-loop pipeline is acyclic, with a-fetching -> b-accumulating -> c-output. And I had in mind I would be able to manage dependant states/the dependency order with mount, starting automatically c, then b, then a, and stopping a, then b, then c.

I'll have to wrap my head around this a bit more, since it's still not clear to me at this point what could/should be handled by mount as state (the channels, the go-loops, should I have just one restartable state for the whole pipeline ?, etc.). I might still be doing pretty basic mistakes.

Anyway, I'm glad thurn stepped in and fueled the discussion a bit further.

Cheers,

tolitius commented 7 years ago

it's still not clear to me at this point what could/should be handled by mount as state (the channels, the go-loops, should I have just one restartable state for the whole pipeline ?, etc.) I might still be doing pretty basic mistakes.

These are not basic mistakes, different developers would come up with different designs and will stand their ground to prove their way is the best :)

It would depend on the application design and usage of course, but when it has to do with core.async I tend to wrap go-loops in mount states. Since a go-loop should be started and should be stopped and it is stateful.

Sometimes it makes sense to deem the whole pipeline as a stateful component: https://stackoverflow.com/a/40435030/211277

Things to consider when deciding when create a mount state and what should be "in it":

fcabouat commented 7 years ago

This SO answer is awesome / many food for thoughts, thanx !

fcabouat commented 7 years ago

Little update : I made sure states are responsible for stopping only themselves, but still since I have running go-loops /thread-loops and dependencies (state B loop depending on state A), I keep having some kind of race condition.

I tried with a stop-channel, it seems like state B gets his stop called, which would close its loop in the next cycle (when alt!!-ing on the stop channel)... and then A gets stopped...

But during this last cycle, the B loop is still running and calls A -> IllegalArgumentException: No implementation of method: :my-method! of protocol: #'my-ns/StateAProtocol found for class: mount.core.NotStartedState.

So as I get it, there are only two options here :

Regards,

buzzdan commented 5 years ago

I don't think mount has limitations when it comes to core.async. Besides the solution above with a stop-channel you can do (pseudo code):

(defn listen-to-events []
  (let [req-chan (chan)]
    (async/go-loop []
       (when-let [request (<! req-chan)]
         ;; handle request
         (recur)))
    req-chan))

(defstate listener :start (listen-to-events)
                   :stop (async/close! listener))

It also conveys a notion of state a bit better since it is not just a channel that is stateful, but a listener as well.

i just wanna put it here for the ones to come - this is working:

(defstate listener
          :start (listen-to-events)
          :stop (async/close! ^clojure.core.async.impl.protocols/Channel listener))

notice the type hint above --> ^clojure.core.async.impl.protocols/Channel this is what actually making the difference

Why ?

before using the type hint we get this message:

Execution error (IllegalArgumentException) at clojure.core.async.impl.protocols/eval5371$fn$G (protocols.clj:21).
No implementation of method: :close! of protocol: #'clojure.core.async.impl.protocols/Channel found for class: mount.core.DerefableState

it means that it gets mount.core.DerefableState as the type instead of a channel so helping it by hinting it to ^Channel helps!

Enjoy 😉