zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
333 stars 135 forks source link

FromJavaConsumer: how to synchronize access to the java consumer #1276

Open tnielens opened 3 weeks ago

tnielens commented 3 weeks ago

694 introduced a fromJavaConsumer constructor. But it isn't clear how to synchronize access to the consumer once handed to zio-kafka. My use case is the same as the one reported in #694. I have some logic working on the java consumer to check for the consumer lag, health and other metrics. These checks run periodically but I need to coordinate the accesses with zio-kafka. As far as I understand, zio-kafka doens't provide that ability and I'll face concurrency issues if both my checks and zio-kafka run tasks against the consumer concurrently.

svroonland commented 3 weeks ago

zio-kafka protects against concurrent access using a Semaphore. We could perhaps allow the user to pass their own Semaphore in fromJavaConsumer. It would need to have at most 1 permit, something we can't guarantee, but it seems reasonable to document this and expect undefined behavior when it's more than 1 permit.

erikvanoosten commented 3 weeks ago

@tnielens What is your use case? Would you be helped if all operations of the java-kafka consumer are available on the zio-kafka consumer? @svroonland, another option would be to return an implementation of org.apache.kafka.clients.consumer.Consumer which wraps the given java consumer and uses the zio-kafka semaphore to synchronize access.

tnielens commented 3 weeks ago

My usecase is passing the kafka consumer to a HealthCheckServiceof mine which is maintained separately and works against the standard Consumer class. It checks different things. Among them, metrics like the last-poll-seconds-ago, what is the current lag of the consumer, does the lag decrease, etc.

One workaround I'm trying now is introducing a class SynchronizedConsumer[K, V](delegate: Consumer[K, V]) extends Consumer[K, V] which wraps all delegate calls with synchronized { ... } except for the wakeup()call. Not ideal as zio-kafka's Consumer might not expect all calls to the standard Consumer to potentially block.

erikvanoosten commented 3 weeks ago

Not ideal as zio-kafka's Consumer might not expect all calls to the standard Consumer to potentially block.

I think that is okay! The zio-kafka consumer already runs on its own thread because almost every call to the standard consumer blocks! If it blocks a little bit longer, nobody will notice!

We could improve somewhat by moving this logic into zio-kafka, because then access would be shared over a single, fair semaphore. But to be honest, since you have a perfectly good workaround, I am not sure it's worth it to put this into the library.