clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

Shorthand for "put this stuff on stream, tell me when done" #142

Closed lvh closed 3 years ago

lvh commented 6 years ago

I have a handful of CLI-ish tools (that is: the process runs based on some event, say a cron job, does its thing and exits). They talk to a manifold stream to send data off into the ether, e.g. via unsiemly to send it to a SIEM (think stream processing endpoint if you're unfamiliar with the term) or unclogged to send it directly to syslog.

For this use case, you end up with the following pattern: put some stuff in a stream, and wait until that resulting stream is drained (i.e. processing has finished). I'm not sure I can use consume for this, because consume tells me when the source is finished, not when all processing is done. However, I think this doesn't matter in practice right now because the threads on which consume callbacks are executed aren't daemonized; so they'll keep the process alive for as long as there is processing to do (unlike clock-related threads, see #61). Is that part of the API contract, or just an incidental thing?

Is there a general way to spell "put everything on this stream, and tell me when it's totally done"? Is the answer really just (assuming you want blocking):

@(ms/put-all! stream msgs)
(System/exit 0)

... or do I need to do something involving close! and on-drained? I feel like the "correct" thing to do here is to rely on backpressure, but I also don't want to break if I allow e.g. a buffer of 10 messages and you're only sending 3 (all puts will succeed ~immediately, but they're clearly not done processing).

cc @irinarenteria @derwolfe because I'm sure y'all have had this problem too.

ztellman commented 6 years ago

What does stream represent here? If you're talking about writing to System/out, byte-streams/transfer will block until everything is flushed, but unless the thing you're writing to is actually at the boundary of your process, waiting for put! to succeed doesn't mean anything's been made visible outside that process.

lvh commented 6 years ago

stream is a manifold stream that has some Clojure maps on it. And yes, the second half of your comment hits the nail on the head re: what I'm trying to accomplish: I want to know that everything I've put on the stream has been fully consumed.

ztellman commented 6 years ago

Just waiting for on-drained isn't enough, because the consumer may have the message, but hasn't necessarily acted on it. One way to do this is to have the producer send an end-of-stream signal, and then wait for the producer to close the stream once it's completely processed it. Another way is to create a deferred via which the consumer can signal the producer. Streams by themselves cannot solve this, as they purposefully create indirection between production and consumption.

lvh commented 6 years ago

Gotcha. Is that purely an API-level argument, or is it also true in practice? In particular, if a synchronous consumer callback (via manifold.stream/consume) is on the other end of the stream, will the non-daemon executor thread that callback is called on prevent the process from exiting?

ztellman commented 6 years ago

Well, a non-daemon executor thread will keep the process from shutting down even if it's idle. You'd need to actually shut down the thread pool, which again leads to some potential race conditions in terms of having completed your work.

lvh commented 6 years ago

OK, that makes sense. Thanks for the clarification :)

lvh commented 6 years ago

Is there some reference code that implements this? I'm having a heck of a time getting it to work. (See linked ticket for experiments.) I think I sort of have the poison-value approach working, but that breaks if there are multiple producers writing to the same stream, right? (Or rather, it means that a successful put doesn't really mean anything -- it might still get ignored.)

consume and consume-async both return deferreds, but those fire when the stream is exhausted, not when processing is done, even though those functions know about processing (e.g. with consume-async, the next call of the callback doesn't happen until the deferred from the last one has fired).

In particular, it appears that you can't check if an input stream is drained inside a consume(-async) callback. You can use the deferred from consume(-async), but that's not quite what you want because of the aforementioned reason.

ztellman commented 6 years ago

The easiest way to do this is with a duplex stream. Send a message, wait for a response that it was handled. You can wrap around this so that the entire operation returns a deferred that is realized once the response comes back.

lvh commented 6 years ago

I don't think I follow how a duplex stream would work here. The caller hands me a stream of messages to process (the signature is something like (->siem! some-config a-stream-of-msgs)). I think I get the idea of half-closing (at least if it works like TCP sockets do, i.e. I promise to not send any more stuff, but I can still read messages until you say you're done). Presumably, that ->siem! function builds the feedback stream -- but why duplex it (which I guess means use manifold.stream/splice) instead of just returning the feedback stream? (Also, once we're doing that, why is that better than just returning a callback?)

I started writing this and ended up with (synchronous cb):

(defn my-cb [vs] (println "cb saw" vs))

(defn setup!
  ([source feedback-cb]
   (->> source
        (ms/batch 3 100)
        (ms/consume (fn [vs]
                      (my-cb vs)
                      (feedback-cb vs)))))
  ([source]
   (setup! source (constantly nil))))

(defn do-all!
  [inputs]
  (let [s (ms/stream)
        d (md/deferred)
        caboose (last inputs)]
    (setup! s (fn [vs]
                 (when (identical? (last vs) caboose)
                   (println "closing, firing d")
                   (ms/close! s)
                   (md/success! d nil))))
    (ms/put-all! s inputs)
    d))

(md/chain (do-all! [1 2 3 4 5]) #(println "done" %))

... which ostensibly does what I want. Once I had that, I was able to port it to something with a feedback stream:

(defn my-cb [vs] (println "cb saw" vs))

(defn setup!
  ([source feedback-stream]
   (->> source
        (ms/batch 3 100)
        (ms/consume (fn [vs]
                      (my-cb vs)
                      (ms/put! feedback-stream vs)))))
  ([source]
   (setup! source (ms/consume (ms/stream) (constantly nil)))))

(defn do-all!
  [inputs]
  (let [s (ms/stream)
        feedback-stream (ms/stream)
        all-done (md/deferred)
        caboose (last inputs)]
    (setup! s feedback-stream)
    (ms/consume
     (fn [vs]
       (when (identical? (last vs) caboose)
         (println "closing, firing d")
         (ms/close! s)
         (md/success! all-done nil)))
     feedback-stream)

    (ms/put-all! s inputs)

    all-done))

(md/chain (do-all! [1 2 3 4 5]) #(println "done" %))

The (ms/consume (ms/stream) (constantly nil)) default is so that there aren't a bunch of put!s laying around by default that will never succeed. Is this sorta OK? I feel like I'm missing the point re: the duplex stream.

lvh commented 6 years ago

Here's a version that allows the inputs to be a stream as well (I think):

(defn my-cb [vs] (println "cb saw" vs))

(defn setup!
  ([source feedback-stream]
   (->> source
        (ms/batch 3 100)
        (ms/consume (fn [vs]
                      (my-cb vs)
                      (ms/put! feedback-stream vs)))))
  ([source]
   (->> source
        (ms/batch 3 100)
        (ms/consume my-cb))))

(defn do-all!
  [inputs]
  (let [inputs (ms/->source inputs)
        feedback-stream (ms/stream)
        last-v (volatile! ::unset)
        all-done (md/deferred)]
    (setup!
     (ms/map (fn [v] (vreset! last-v v) v) inputs)
     feedback-stream)
    (ms/consume
     (fn [vs]
       (when (and (ms/drained? inputs)
                  (identical? (last vs) @last-v))
         (println "closing, firing d")
         (md/success! all-done nil)))
     feedback-stream)
    all-done))

(md/chain (do-all! [1 2 3 4 5]) #(println "done" %))

This makes the assumption that the vreset! callback will always run before the callback on the feedback stream, which seems true.

dm3 commented 6 years ago

Do I understand correctly that you want to inform the producer when the consumer is done processing all items? I've been doing the same by attaching deferreds to every input item like so:

(require '[manifold.stream :as s])
(require '[manifold.deferred :as d])
(defn send! [dst inputs]
  (let [items (map #(vector % (d/deferred)) inputs)]
    (-> (s/put-all! dst items)
        (d/chain
          (fn [result]
            (when result
              (apply d/zip (map last items))))
          (fn [_] :done)))))

(defn consume! [cb src]
  (->> src
       (s/batch 3 100)
       (s/consume
         (fn [vs]
           (cb (map first vs))
           (doseq [d (map second vs)]
             (d/success! d :done))))))

(def s (s/stream))
(def res (send! s [1 2 3 4 5]))
(d/chain res #(println "DONE" %))
(consume! #(println "got" %) s)
> got (1 2 3)
> got (4 5)
> DONE :done
KingMob commented 3 years ago

Closing, as the conversation has been dead for 4 years, but feel free to reopen if necessary.