arichiardi / fonda

An async pipeline approach to functional core - imperative shell.
Apache License 2.0
22 stars 1 forks source link

Dynamically add steps to execute from a processor #28

Closed arichiardi closed 5 years ago

arichiardi commented 5 years ago

Hi :smile:

I have the following use case:

  1. Execute fonda with some steps
  2. One of those steps queries an http endpoint
  3. Based on what's returned, and it can be multiple values, I need to query another http endpoint

At the moment fonda does not expose any way to add a step at runtime, which is a functionality that the normal interceptor pattern allows. I guess it is quite common and fonda should include it as well in some form.

ElChache commented 5 years ago

I can think on two different approaches:

  1. Give access to the FondaContext to each step
  2. Provide a set of functions to each step to add, remove and maybe even replace steps
arichiardi commented 5 years ago

I think we will need to rethink a bit our context merging strategy...it seems like isolating the returned data put us in an uncomfortable position, see also #25.

It seems that we naively thought the only operation we needed for a processor was to merge a key to the context but there are already two things that are challenging that assumption.

arichiardi commented 5 years ago

This is the equivalent in sieppari:

(defn trigger-policy
  [ctx]
  (-> (policy/trigger! (select-keys ctx [:config :logger :trace-id])
                       (:policy-name ctx)
                       (get-in ctx [:request :event]))
      (.then #(update-in ctx [:request :policies] conj %)
             #(assoc ctx :error %))))

(defn trigger-projection
  [ctx]
  (-> (projection/trigger! (select-keys ctx [:config :logger :trace-id])
                           (:projection-name ctx)
                           (get-in ctx [:request :event]))
      (.then #(update-in ctx [:request :projections] conj %)
             #(assoc ctx :error %))))

(defn enque-triggers
  [{:keys [available-policies available-projections] :as ctx}]
  (-> ctx
      (sieppari-ctx/inject (mapv (fn [projection-name]
                                   {:enter #(trigger-projection (assoc % :projection-name projection-name))
                                    :error log-error})
                                 available-projections))
      (sieppari-ctx/inject (mapv (fn [policy-name]
                                   {:enter #(trigger-policy (assoc % :policy-name policy-name))
                                    :error log-error})
                                 available-policies))))

Just finished the implementation.

arichiardi commented 5 years ago

After discussion with @ElChache we agreed on a new step similar to this:

{:queue (fn [ctx queue] ( ... return the new queue ))}

This would definitely enable the above use case and it would not expose the whole FondaContext but only the queue.

arichiardi commented 5 years ago

After conversation with @ElChache it seems like exposing the full queue might be a bit overkill and we should instead just add things to it until we find a use case that needs full control of it.

ElChache commented 5 years ago

I'd like a declarative solution where a step declares what steps to inject. Something like

{:processor (fn []...)
 :steps (fn [procesor-res] <added-steps>)}

Where is a collection of the steps to be injected just after the actual step

arichiardi commented 5 years ago

That would not work unfortunately...in my use case I need to generate steps from the result of one of more of the preceding steps - that is why I needed to access the context above (I have have two http calls).

For that to happen we need a new step type I think.

What about:

{:enqueue (fn [ctx] ( ... return vector of steps to enqueue....))}
ElChache commented 5 years ago

I didn't think on that type of situation. Yes, it makes so much sense to do it this way. I am sold with this solution :)

arichiardi commented 5 years ago

I am just not still sold on the name of the step, :enqueue? :add? :concat ?

It is a step type so we picked a noun for all the other types (:processor and :tap)

ElChache commented 5 years ago

yeah about :processor, I'd like to change it to :fn, or accept both, but I guess that's another story, another issue... The problem I see with :enqueue is that if you think on the steps as a queue, and you are enqueueing new steps, I'd expect those steps to be queued at the end of the queue, not after this step. I wouldn't worry about verbs or nouns, as :tap can be a verb as well, as in

the telephones were tapped by the state security police.

I don't like :add as it is too generic. In sieppary they use inject, I think it makes more sense than enqueue

arichiardi commented 5 years ago

Yeah I like :inject as well. Let's see how it looks like:

(defn policy-processor
  [policy-name ctx]
  (policy/trigger! (select-keys ctx [:config :logger :trace-id])
                   policy-name
                   (get-in ctx [:request :event])))

(defn projection-processor
  [projection-name ctx]
  (projection/trigger! (select-keys ctx [:config :logger :trace-id])
                       projection-name
                       (get-in ctx [:request :event])))

{:inject (fn [{:keys [available-policies
                      available-projections]}]

           ;; we should accept any sequence
           (concat (mapv (fn [projection-name]
                           {:processor (partial trigger-projection projection-name)
                            :path [:projections]})
                         available-projections)

                   (mapv (fn [policy-name]
                           {:processor (partial trigger-policy policy-name)
                            :path [:policies]})
                         available-policies)))}

Apart from the index problem already described in #25 it looks good.

ElChache commented 5 years ago

I like it! I guess your paths would have an index attached on the path in this case right?

(map-indexed 
  (fn [idx projection-name]
     {:processor (partial trigger-projection projection-name)
      :path [:projections idx]})
  available-projections)

And the ctx initialized to {:projections []}

I don't see #25 as a big problem

arichiardi commented 5 years ago

Added with #34