Open phderome opened 7 years ago
One step forward, Topology could be also extended using implicit class to handle NamedProcessorSupplier directly. But this is rather a workaround than desired design. I don't really like string based wiring between sources, sinks, processors and state stores.
Let's imagine the following DSL inspired by Akka Streams with explicit wiring:
val pvSource = (...) => Source[ClientKey, Pv]
val evSource = (...) => Source[ClientKey, Ev]
val evPvSink = (...) => Sink[ClientKey, EvPv]
val pvStore = (...) => WindowStore[PvKey, Pv]
val evPvStore = (...) => WindowStore[EvPvKey, EvPv]
val pvWindowProcessor = (pvStore) => Processor[ClientKey, Pv]
val evJoinProcessor = (pvStore, evPvStore) => Processor[ClientKey, Ev, Pv]
val evPvMapProcessor = (...) => Processor[ClientKey, EvPv]
evSource ~> evJoinProcessor ~> evPvMapProcessor ~> evPvSink
pvSource ~> pvWindowProcessor ~> evJoinProcessor
With a little help of generic types it should be feasible to get compile time errors for:
evJoinProcessor ~> pvWindowProcessor
evSource ~> evPvSink
Let me know if you'd like extension methods on Topology with a "clean" API for name constraints (on a feature branch), that seems easy enough for me.
I might potentially look at your ultimate solution in the next while, but don't count on it as my time is rather limited and so is my limited experience with generic types or Shapeless (odds are about 10% or less).
If you know about the functional Scala db access library Quill, it has recently been extended to provide support for structured Spark API (DataFrames and Datasets) and I'd guess it uses type dependent techniques. There's even someone who suggested providing an interface between KSQL and Quill but that looks like a pet wish list for now.
On a side note, it seems like the solution you have for lazy serialization of KafkaProducer within Spark is now no longer necessary as per Spark 2.2 which integrates very naturally with Kafka (https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html)?
Hiding Strings in distinct types that we'd use internally would help. In code we use
evpv-store
andpv-store
only once as Strings and immediately to hide them in distinct classes. Also we force each ProcessorSupplier to be named by construction instead of waiting for the time we specify it to Kafka JAVA API (NamedProcessorSupplier
case class). Ideally, we'd define our own addProcessor and addStateStore methods as extension methods (Scala or C# concept) to Topology so that we specify fewer parameters (i.e. we cannot provide the name and the supplier as two distinct parameters as I still do below).evPvStoreBuilder
returns the name (not as String), ideally it should return a single object as case class (aNamedStore
just like theNamedProcessorSupplier
idea).Below should perhaps return a case class rather than a Tuple (less safe).
Similarly