monix / monix-kafka

Monix integration with Kafka
Apache License 2.0
123 stars 38 forks source link

Commit after all connected multicasted observables are finished processing a record #42

Open yeenow123 opened 5 years ago

yeenow123 commented 5 years ago

I am creating multiple observables (per Kafka topic)

val kafkaConsumerConfig = KafkaConsumerConfig.default.copy(
      bootstrapServers = kConfig.bootstrapServers,
      groupId = kConfig.groupId,
      enableAutoCommit = false,
      observableCommitOrder = ObservableCommitOrder.AfterAck,
      observableCommitType = ObservableCommitType.Sync,
      autoOffsetReset = AutoOffsetReset.Latest
    )

val observable = Observable.merge(topics.map {
    provider => KafkaStream
      .creatConsumer(provider)
  } : _*)

val multiCast = observable.multicast(Pipe.publish[ConsumerRecord[A, B]])

... // multiple subscribers subscribe separately to the multiCast

multicast.connect()

It doesn't seem like the commit back to Kafka is issued after all subscribers are finished processing (I am using a combination of .subscribe() and .foreach to subscribe. It seems to commit after any of them complete. Is this possible or am I doing anything incorrectly?

yeenow123 commented 5 years ago

The issue seems to stem from Observable.merge. I tried Observable.concat and it seems to give me the functionality I need. I guess the ordering semantics between the two operators are different, one being unorder vs ordered.

EDIT: Spoke too soon.. when I use .concat only one one of the observables seem to actually start up

Avasil commented 5 years ago

@yeenow123

The issue seems to stem from Observable.merge. I tried Observable.concat and it seems to give me the functionality I need. I guess the ordering semantics between the two operators are different, one being unorder vs ordered.

That's correct, concat will wait for the first one to complete before starting the second one and merge will send elements as they come with time.

How do you commit / use kafkaConsumerConfig ? I dont recognize KafkaStream.creatConsumer(provider)

yeenow123 commented 5 years ago

@Avasil

I copied partial code and made some typos changing names. The kafkaConsumerConfig is passed into the KafkaConsumerObservable constructor.

As with my EDIT on my previous comment, concat only seems to start the first observable from the list of observables passed into concat

Avasil commented 5 years ago

Probably the first Obsevable never ends, I'll see if I can reproduce your issue