zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
335 stars 138 forks source link

Is pollFrequency of the RunLoop redundant with the fetch.min.bytes setting of KafkaConsumer? #656

Closed tnielens closed 1 year ago

tnielens commented 1 year ago

Here are the first lines of the zio-kafka RunLoop

def run: ZIO[Scope, Nothing, Fiber.Runtime[Throwable, Unit]] =
  ZStream
    .mergeAll(3, 1)(
      ZStream(Command.Poll()).repeat(Schedule.spaced(pollFrequency)),
      ZStream.fromQueue(requestQueue).mapChunks(c => Chunk.single(Command.Requests(c))),
      ZStream.fromQueue(commitQueue)
    )
    ...

I don't understand the purpose of spacing in time consumer.poll() calls to the KafkaConsumer. From the javadoc, the example polls the consumer in loops without waiting.

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("foo", "bar"));
 while (true) {
     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
     for (ConsumerRecord<String, String> record : records)
         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
 }

Isn't that potentially harming latency? If calls to consumer.poll are spaced in time, messages reaching the broker right after the return of consumer.poll call take a latency hit due to the pollFrequency spacing. For example, given instants t1 < t2 < t1 + pollTimeout < t3 < t2 + pollFrequency and a consumer.poll() blocking call started at t1, when a message arrives on the broker at t2 and the poll call returns, the RunLoop waits until t2 + pollFrequency for polling again, and a message arriving at t3 takes a latency hit.

Maybe it was done to let batches grow on the broker instead of fetching small ones? If so, it seems redundant with settings KafkaConsumer already provides. See fetch.min.bytes and fetch.max.wait.ms. These settings let record batches grow up to a certain threshold before returning the response to the caller. By default, the KafkaConsumer is configured for lowest latency. In order to increase throughput, and at the cost of latency, fetch.min.bytes can be increased.

tnielens commented 1 year ago

I focused on the Poll command and overlooked the requestQueue. My points in the description are erroneous as the requests from the queue are processed as soon as the RunLoop is ready. Sorry for the trouble, keep up the nice work šŸ™

guizmaii commented 1 year ago

Interestingly your issue made me think about this code and might have helped me to find a bug in the code: https://github.com/zio/zio-kafka/pull/661

Thanks šŸ™‚

svroonland commented 1 year ago

@tnielens Thanks for examining this functionality in detail, it never hurts.

Polling serves two needs: fetching new data and keeping up a heartbeat with the broker. The spaced polling serves the second goal. For fetching data, we indeed want to get the records with as little latency as possible but we also need to take backpressure into account (when downstream cannot process records fast enough). Because of this, we poll as soon as we have a request in the queue and pause partitions for which there is not request. On top of that is a buffering mechanism which drives extra requests, this helps to increase throughput.

I suppose we could document this a bit clearer, together with the settings that affect it.

tnielens commented 1 year ago

Thanks for the comments šŸ™ .

Some more questions on the topic

svroonland commented 1 year ago

About point 1, I think you're right. We could immediately poll again in that case. Could you create a new issue for that?

Point 2: not quite. The bufferedRecords in State are records for partitions for which no Request was currently. The only situation I know where this can happen is if new partitions are assigned after rebalancing, in which case there was no chance yet to pause them. To be honest I'm not exactly sure of other circumstances in which this occurs, since partitions without request are always paused.

This also made me think about the case when there's a small poll interval and a large lag / high volume of messages to be processed. In that case both the Requests and the scheduled poll commands are driving the polling. I'm not sure if that is efficient, since if there's not a pending request for every call to poll, it would lead to frequent pausing & resuming. It also relates to this https://github.com/zio/zio-kafka/issues/428#issuecomment-1033741109

Stuff to be investigated after #590 I suppose..

tnielens commented 1 year ago

About point 1, I think you're right. We could immediately poll again in that case. Could you create a new issue for that?

Will do.

Point 2: not quite. The bufferedRecords in State are records for partitions for which no Request was currently. The only situation I know where this can happen is if new partitions are assigned after rebalancing, in which case there was no chance yet to pause them. To be honest I'm not exactly sure of other circumstances in which this occurs, since partitions without request are always paused.

Would be nice to document that on the State.bufferedRecords field. The purpose might not be evident for outsider code readers.

This also made me think about the case when there's a small poll interval and a large lag / high volume of messages to be processed. In that case both the Requests and the scheduled poll commands are driving the polling. I'm not sure if that is efficient, since if there's not a pending request for every call to poll, it would lead to frequent pausing & resuming. It also relates to this https://github.com/zio/zio-kafka/issues/428#issuecomment-1033741109

Maybe Poll could be renamed to LivenessPoll and would trigger only if no consumer.poll call hasn't happened for a certain time. Also I think that not polling for "liveness" is good behavior as well. Kafka users are normally aware of the max.poll.interval.ms setting, and it is good behavior for the broker to unassign partitions from unresponsive consumers.

guizmaii commented 1 year ago

Thanks guys for this very interesting discussion šŸ™‚

Sorry if my question is stupid. About:

Polling serves two needs: fetching new data and keeping up a heartbeat with the broker. The spaced polling serves the second goal.

Why do we need to keep a heartbeat?

AFAIK, the Kafka clients are already doing this automatically (behind our back, giving us very little, if any, control over it) I know for sure that the AdminClient instances are doing this. I'm not a 100% sure about the Consumer and the Producer, but I'd expect so. We had some issues in our app because of that (not a typical Kafka app, we're making a UI on top of Kafka, see https://www.conduktor.io/explorer, so we have very specific issues that any "normal app using Kafka" wouldn't have)

tnielens commented 1 year ago

Why do we need to keep a heartbeat?

As per section "Detecting Consumer Failures" in the KafkaConsumer javadoc, two different processes signal liveness to the broker, see session.timeout.ms and max.poll.interval.ms. If the opened streams of zio-kafka keep triggering consumer.poll calls (for that #664 must be fixed), I think there should not be any need for a liveness Poll command at the zio-kafka RunLoop level. All that said, mind that there must be one initial consumer.poll invocation for the consumer to be assigned partitions at the beginning.

erikvanoosten commented 1 year ago

From this line (from the same KafkaConsumer javadoc):

Basically if you don't call poll at least as frequently as the configured max interval, then the client will proactively leave the group so that another consumer can take over its partitions.

I deduce we do need to continue calling poll for a liveness signal. If we wouldn't, after max.poll.interval.ms, the broker will revoke all assigned partitions from this client.

svroonland commented 1 year ago

Let's continue the discussion in #664