clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

Questions: best way to monitor stream consumption #150

Open bhurlow opened 6 years ago

bhurlow commented 6 years ago

Hey Zach,

I have a somewhat simple manifold question: how can I monitor the progress of a stream being consumed over time? I have an incoming http request input-stream which I'm "piping" to an sink (in this case s3). I know the total content length, so I want to create another stream which can calculate the progress of the stream being drained and print this elsewhere, say a websocket. Bonus points if I can get that monitoring stream to emit events less frequently than the input one.

Is there a simple way to handle this in manifold?

Thank you!

ztellman commented 6 years ago

There are a few ways you could do this, but probably the easiest is to just do something like:

(let [bytes (AtomicLong. 0)]
  [(s/map #(do (.addAndGet bytes (num-bytes %)) %)) bytes)
   (s/periodically 1000 #(.getAndSet bytes 0))])

This will return a 2-tuple of an instrumented stream, and a stream that will emit the number of bytes that have passed through in the last second. If your incoming stream is just a manifold stream of byte-arrays, replace num-bytes with count.

bhurlow commented 6 years ago

Awesome thank you