sky-uk / kafka-topic-loader

Reads the contents of provided Kafka topics
BSD 3-Clause "New" or "Revised" License
4 stars 1 forks source link

Add partitioned load and run source #32

Open lacarvalho91 opened 5 years ago

lacarvalho91 commented 5 years ago

Add a source that will emit a source for each partition in the given topics. This will work the same way loadAndRun does except it will be per partition, leveraging Alpakka Kafka's source per partition

   /**
    * Same as [[TopicLoader.loadAndRun]], but with one stream per partition.
    * See [[akka.kafka.scaladsl.Consumer.plainPartitionedSource]] for an
    * explanation of how the outer Source works.
    */
  def partitionedLoadAndRun[K : Deserializer, V : Deserializer](
      topics: NonEmptyList[String],
  )(implicit system: ActorSystem): Source[(TopicPartition, Source[ConsumerRecord[K, V], Future[Done]]), Future[Consumer.Control]] = ???
bcarter97 commented 2 years ago

I'm not really sure of the use case for this method. Source per partition supports automatic partition assignment from Kafka, but for example in an application that loads from Kafka to re-create state, why would kafka-topic-loader need to be the one getting the assignment from Kafka? Would it not make more sense for the implementation to use the partitioned source and kafka-topic-loader to just be assigned a partition to read from?

Something like

  /** Same as [[TopicLoader.loadAndRun]], but for a specified partitions. See
    * [[akka.kafka.scaladsl.Consumer.plainPartitionedSource]] for how to get a partition assignment from Kafka.
    */
  def partitionedLoadAndRun[K : Deserializer, V : Deserializer](
      partitions: NonEmptyList[TopicPartition]
  )(implicit system: ActorSystem): Source[ConsumerRecord[K, V], (Future[Done], Future[Consumer.Control])] = ???