aseigneurin / kafka-streams-scala

Thin Scala wrapper for the Kafka Streams API
Apache License 2.0
50 stars 11 forks source link

Wrapper classes necessary? #4

Open lewismj opened 6 years ago

lewismj commented 6 years ago

If you define a set of implicit definitions for things like Reducer, Initializer ... e.g.

  implicit def toReducer[V](f: (V,V) => V): Reducer[V] = (v1,v2) => f(v1,v2)
  implicit def toInitializer[VA](f: () => VA): Initializer[VA] = () => f()
  implicit def toAggregator[K,V,VA](f: (K,V,VA) => VA): Aggregator[K,V,VA] = (k,v,va) => f(k,v,va)

Then you can just define Rich equivalents:

 implicit class RichTable[K,V](kt: KTable[K,V]) {
    def to(topic: String, f: (K,V,Int) => Int)(implicit ks: Serde[K], vs: Serde[V]): Unit = kt.to(ks,vs,f,topic)
    def through(topic: String, storeName: String)(implicit ks: Serde[K], vs: Serde[V]): KTable[K,V]
      = kt.through(ks,vs,storeName)
  }

I don't think you wouldn't have to redefine all the functions provided by KTable, KStream, etc... The Rich classes could contain just those functions for which you want to make the passing of the Serde implicit. There wouldn't be a need to introduce ...S objects.