mkuthan / example-kafkastreams

Kafka Streams DSL vs Processor API
16 stars 10 forks source link

interested in minor changes for polymorphism? #2

Open phderome opened 6 years ago

phderome commented 6 years ago

With this and a few more minor changes

daemonThread {
  sleep(3.seconds) // scalastyle:off
  startStreams(deduplicate(Serdes.String(), Serdes.String()))
}

I can change

Stores.windowStoreBuilder(
      Stores.persistentWindowStore(storeName, retention, segments, window, retainDuplicates),
      Serdes.String(),
      Serdes.String()
    )

to

Stores.windowStoreBuilder(
        Stores.persistentWindowStore(storeName, retention, segments, window, retainDuplicates),
        sk, // variable for Serdes<K>
        sv // variable for Serdes<V>
      )

and make the example fully generic in K, V (though still initialized as String, String).

Also we could do this to prevent injecting values that are assumed to be String and leave that at the edge of the program (start up). The AnyRef type constraint is so that I don't attempt to rework the type of GenericProducer which assumes AnyRef for keys and values.

def duplicates[K <: AnyRef, V <: AnyRef](producer: GenericProducer,
                                           keyGen: Int => K,
                                           valGen: Int => V): Unit