Open svroonland opened 6 months ago
Hmm, should we instead of this:
Consumer.runWithGracefulShutdown(Consumer.partitionedStreamWithControl(Subscription.topics("topic150"), Serde.string, Serde.string)) {
stream => ...
}
offer this:
Consumer.partitionedStreamWithGracefulShutdown(Subscription.topics("topic150"), Serde.string, Serde.string) {
(stream, _) => stream.flatMapPar(...)
}
The second parameter would be the SubscriptionStreamControl
, which you could always manually call stop
on. Or would that prevent certain use cases.. 🤔
Hmm, should we instead of this:
If I understand it correctly, the proposal allows for more use cases; with it you can also call stop
for any condition you want. Is it true that after stopping, you can start consuming again?
Well, I mean compared to just the partitionedStreamWithControl
method. In both cases you would need to do something with the stream that ultimately reduces to a ZIO of Any, so I don't think the partitionedStreamWithGracefulShutdown
is limiting in that regard.
stop
currently doesn't support that, since the stream would then be finished. We could probably build pause
and resume
like in #941.
If resume after stop
is not supported (and never will be), then I like the first proposal better where you don't need to call stop
. What would you do after calling stop
?
Well, in both proposals you can call stop
.
I don't think you want to do anything after stop, but it would give you more explicit control when to stop, instead of when the scope ends.
We probably need to decide if we want to add pause/resume in the future. If we do, we should add the control
parameter like in the partitionedStreamWithGracefulShutdown
example for future compatibility. If we don't, we can drop it altogether and make SubscriptionStreamControl
a purely internal concept (if at all).
Hey :)
Thanks for the great work!
Here's some initial feedback:
I'm not a big fan of the SubscriptionStreamControl
implementation.
To me, functions/methods returning it should return a Tuple (stream, control)
.
It avoids adding one more concept for our users to understand and learn (Kafka already has a lot of concepts)
It also simplifies the interface of the control
type, the current one with the [S <: ZStream[_, _, _]]
being complex
It also simplifies the return type of our functions/methods, avoiding this kind of type:
SubscriptionStreamControl[Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]]]
in favor of:
(Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])], SubscriptionStreamControl)
Made the change in a PR to show/study how, to me, it simplifies things: https://github.com/zio/zio-kafka/pull/1207/files
Didn't finish my review yet. I still have some parts of the code to explore/understand, but I have to go. I'll finish it later 🙂
Thanks for the feedback Jules. Agreed about the extra concept that would be unwanted. Check out my latest interface proposal where there is only a plainStreamWithGracefulShutdown
method and SubscriptionStreamControl
remains hidden.
I understand now that when graceful shutdown starts we're ending the subscribed streams. That should work nicely. Lets work out what will happen next to the runloop. The runloop would still be happily fetching records for that stream. When those are offered to the stream, PartitionStreamControl.offerRecords
will probably append those records to the queue (even though it now also contains an 'end' token). Because of the 'end' token that is already in that queue, these new records will never be taken out. Back pressure will kick in (depending on the fetch strategy) and the partitions will be paused. Once we're unsubscribed, 15 seconds later, the queue will be garbage collected. So far so good.
We can do slightly better though. We're fetching and storing all these records in the queue for nothing, even potentially causing an OOM for systems that are tuned for the case where processing happens almost immediately.
My proposal is to:
PartitionStreamControl.offerRecords
when the queue was endedRunloop.handlePoll
only pass running streams to fetchStrategy.selectPartitionsToFetch
so that partitions for ended streams are immediately pausedIf you want, I can extend this PR with that proposal (or create a separate PR).
@erikvanoosten If you have some time to implement those two things, by all means.
@erikvanoosten If you have some time to implement those two things, by all means.
@svroonland Done in commit https://github.com/zio/zio-kafka/pull/1201/commits/1218204f9924281f15da9ccec06438975d03fdb4.
Now I am wondering, how can we test this?
Change looks good. Totally forgot to implement this part.
Depends on https://github.com/zio/zio/issues/8804.
Implements functionality for gracefully stopping a stream for a single subscription: stop fetching records for the assigned topic-partitions but keep being subscribed so that offsets can still be committed. Intended to replace
stopConsumption
, which did not support multiple-subscription use cases.Implements some of #941.
We should deprecate
stopConsumption
before releasing.