FundingCircle / jackdaw

A Clojure library for the Apache Kafka distributed streaming platform.
https://fundingcircle.github.io/jackdaw/
BSD 3-Clause "New" or "Revised" License
369 stars 80 forks source link

Transducers are Coming #200

Open creese opened 5 years ago

creese commented 5 years ago

This has been a long time coming but I think we’re finally here. This proposal is composable with the existing Jackdaw Streams DSL. Just define your transducers and use transduce-kstream:

(defn transduce-kstream
  [kstream xf]
  "Takes a kstream and xf and transduces the stream."
  (-> kstream
      (j/transform (fn [] (transformer xf)) ["transducer"])
      (j/flat-map (fn [[_ v]] v))))

It turns out that KStream::transform followed by KStream::flatMap is equivalent to transduce with concat. We can use the latter to test our business logic with pure Clojure (no Kafka Streams). This approach was pioneered by Matthias awhile ago. The difference is now we're adding state.

Here is how to test your transducers:

(def coll
  [[nil {:debit-account "tech"
         :credit-account "cash"
         :amount 1000}]
   [nil {:debit-account "cash"
         :credit-account "sales"
         :amount 2000}]])

(->> coll
     (transduce (xf-split-entries nil nil) concat)
     (transduce (xf-running-balances (atom {}) swap!) concat))

The function xf-running-balances takes two arguments, a "store" and a function that "behaves like clojure.core/swap!" and returns a transducer. When developing your tranducers, you can use an atom and swap!.

When using your tranducers from Kafka Streams, no changes are needed. You supply different arguments. The examples show how to provide a state store and a helper function defined in jackdaw.streams.xform. However, if this doesn't work for you, you can write your own.

Here is the topology:

(require '[jackdaw.streams :as j])
(require '[jackdaw.streams.xform :as jxf])

(defn topology-builder
  [{:keys [entry-requested transaction-pending transaction-added] :as topics} xforms]
  (fn [builder]
    (jxf/add-state-store! builder)
    (-> (j/kstream builder entry-requested)
        (jxf/transduce-kstream (::xf-split-entries xforms))
        (j/through transaction-pending)
        (jxf/transduce-kstream (::xf-running-balances xforms))
        (j/to transaction-added))
    builder))

This PR contains examples for Word Count and the Simple Ledger.

codecov[bot] commented 5 years ago

Codecov Report

:exclamation: No coverage uploaded for pull request base (master@cee3aba). Click here to learn what that means. The diff coverage is 15.62%.

Impacted file tree graph

@@            Coverage Diff            @@
##             master     #200   +/-   ##
=========================================
  Coverage          ?   78.18%           
=========================================
  Files             ?       43           
  Lines             ?     2530           
  Branches          ?      151           
=========================================
  Hits              ?     1978           
  Misses            ?      401           
  Partials          ?      151
Impacted Files Coverage Δ
src/jackdaw/streams/xform.clj 11.53% <11.53%> (ø)
src/jackdaw/streams/xform/fakes.clj 33.33% <33.33%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update cee3aba...9efe7b7. Read the comment docs.

kidpollo commented 5 years ago

Looking good! I would add examples of unit tests of the actual word count transducer. Also what happens to the simple ledger tests?

DaveWM commented 4 years ago

I spoke to @blak3mill3r (the author of Noah), yesterday about how he's implemented stateful transducers in Noah. He came up with a broadly similar solution to what we have here, it seems like the use of volatile! within the transducer code is a real sticking point. The main difference between our solutions is that in Noah, all the transducers use a single state store rather than each transducer having its own. We weren't sure what the performance implications of that would be, but it's worth bearing in mind in case we run into perf issues in future.

We also discussed starting a shared library for all the core transducers re-written to support persisting their state, so that they can be used with Jackdaw and Noah. Blake's going to set this up, then I thought we could potentially pull this in in Jackdaw. There are some open questions around this though, such as what we do about other popular transducer libraries like xforms.

blak3mill3r commented 4 years ago

Here is that shared library which reimplements (the transducer arity of) all of the functions in clojure.core that return a stateful transducer:

https://github.com/blak3mill3r/coddled-super-centaurs

That function is then bound twice by noah to instrument the transducer state and tie it into a StateStore:

https://github.com/blak3mill3r/noah/blob/5803dd5/src/noah/transduce.clj#L34-L35 https://github.com/blak3mill3r/noah/blob/5803dd5/src/noah/transduce.clj#L85-L86

Also, @DaveWM ... I checked, and as far as I can tell, there aren't any stateful transducers in xforms or kixi.stats. They have interesting higher-order transducers and reducing fns, and (I think...) these should work fine composed with these instrumented stateful transducers.

blak3mill3r commented 4 years ago

Also I want to clarify regarding: "all the transducers use a single state store rather than each transducer having its own"

Each time you transduce a KStream, if that transduction needs state, you must provide a store. That transducer can of course be a composition of several transducers, any of which can be stateful, and all of the states for these composed transducers will be stored together in a clojure vector as the record in the state store. To transduce multiple KStreams, you would use multiple state stores.

kidpollo commented 4 years ago

Plz merge this already 😛 !!

kidpollo commented 5 months ago

Bump!