FundingCircle / jackdaw

A Clojure library for the Apache Kafka distributed streaming platform.
https://fundingcircle.github.io/jackdaw/
BSD 3-Clause "New" or "Revised" License
369 stars 80 forks source link

Add flat-transform wrappers for Kafka Streams #245

Closed 99-not-out closed 2 years ago

99-not-out commented 4 years ago

Spotted flatTransform for a KStream and it looks worth adding. Also added a few of the common bits of wrapper / helper / sugar code which pop up from uses of transformers.

With flatTransformValues note that if you return an empty list or nil, it halts processing. With transformValues, if you return nil processing is not halted and the next processor in the stream sees [k nil]. This difference can be useful if you meant to halt processing, as it avoids a subsequent filter.

@creese We really need to get your transducers changes merged too!

99-not-out commented 4 years ago

Coverage doesn't seem to like jackdaw/streams/protocols.clj anymore 😞

codecov[bot] commented 4 years ago

Codecov Report

Merging #245 into master will increase coverage by 0.70%. The diff coverage is 90.12%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #245      +/-   ##
==========================================
+ Coverage   80.82%   81.53%   +0.70%     
==========================================
  Files          41       41              
  Lines        2519     2583      +64     
  Branches      149      149              
==========================================
+ Hits         2036     2106      +70     
+ Misses        334      328       -6     
  Partials      149      149              
Impacted Files Coverage Δ
src/jackdaw/streams/configured.clj 69.23% <ø> (ø)
src/jackdaw/streams/interop.clj 80.28% <ø> (ø)
src/jackdaw/streams/mock.clj 74.24% <25.00%> (-3.18%) :arrow_down:
src/jackdaw/streams.clj 83.33% <85.71%> (+1.07%) :arrow_up:
src/jackdaw/streams/specs.clj 97.04% <91.30%> (+0.15%) :arrow_up:
src/jackdaw/streams/extras.clj 35.29% <95.45%> (+28.77%) :arrow_up:
src/jackdaw/streams/lambdas.clj 75.71% <100.00%> (+7.78%) :arrow_up:
src/jackdaw/streams/protocols.clj 100.00% <100.00%> (ø)
src/jackdaw/serdes/edn2.clj 90.90% <0.00%> (+45.45%) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update d53da3f...0d7f9e1. Read the comment docs.

kidpollo commented 2 years ago

This PR and the #305 seem to have related/similar changes. @99-not-out Is there any insight on which will win hahaha or if both would be merged. I kinda want some of these stuff. Would love this merged and a new release cut sometime soon :P

kidpollo commented 2 years ago

@99-not-out would it make sense to add a simple interop to set message headers?

Right now one would have to do something like:

...

(defn set-headers [context headers]
  (let [record-headers (.headers context)]
    (doseq [[k v] headers]
      (.add record-headers
            k
            (.getBytes v))))
  context)
...
(-> stream
          (j/transform-values
           (jl/value-transformer-with-ctx
            (fn [ctx v]
              (set-headers ctx {"foo" "bar"})
              v)))
99-not-out commented 2 years ago

Going to close this PR in favour of https://github.com/FundingCircle/jackdaw/pull/305 (which takes the core parts of this PR and will hopefully me merged soon!)

99-not-out commented 2 years ago

This PR and the #305 seem to have related/similar changes. @99-not-out Is there any insight on which will win hahaha or if both would be merged. I kinda want some of these stuff. Would love this merged and a new release cut sometime soon :P

305 will win :-)

99-not-out commented 2 years ago

@99-not-out would it make sense to add a simple interop to set message headers?

Right now one would have to do something like:

...

(defn set-headers [context headers]
  (let [record-headers (.headers context)]
    (doseq [[k v] headers]
      (.add record-headers
            k
            (.getBytes v))))
  context)
...
(-> stream
          (j/transform-values
           (jl/value-transformer-with-ctx
            (fn [ctx v]
              (set-headers ctx {"foo" "bar"})
              v)))

I did have a bit of a think about headers a while a go - there is even a PR somewhere I think to add support in test machine.

Lets start a new PR specifically for headers @kidpollo - there are some nuances in there (like supporting encodings, so the interop isn't all byte[] based ...). Will tag you wen we start one (or feel free to start one yourself!)

gphilipp commented 2 years ago

Superseded by #305.