zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
337 stars 138 forks source link

Replace pending requests by single boolean #1063

Closed erikvanoosten closed 11 months ago

erikvanoosten commented 1 year ago

Since streams now wait for data by listening to their own queue, we no longer need to keep track of pending requests inside runloop. Instead:

Runloop still needs to know which streams need data. For this def awaitingData was added to the stream.

The advantage of this approach is:

Also:

svroonland commented 1 year ago

I don't have a concrete example, but I have a feeling that there's a race condition in here somewhere, with morePollsNeeded being toggled by Poll commands and handlePoll. We need to be sure somehow that if a partition stream makes a request, that that stream's request is fulfilled, instead of another partition fulfillment leading to morePollsNeeded being reset.

Perhaps a Set[TopicPartition] is a good middleground between boolean and a Chunk[Request], so we can keep track of individual TPs that are requesting data..?

erikvanoosten commented 1 year ago

I don't have a concrete example, but I have a feeling that there's a race condition in here somewhere, with morePollsNeeded being toggled by Poll commands and handlePoll.

I am sympathetic to such feelings; they are powerful indicators. Even so, I think we are solid. There are no race conditions because the runloop runs sequentially. Streams run concurrently with the runloop so we could have multiple Poll command arriving in one loop run, but this is okay. When one or more Poll commands arrives in the run loop, they will be handled. Either in this loop run, or in the next, or the next...

When a Poll command is being sent, we need to make sure that handlePoll is invoked inside the Runloop's run method (line 467 in this PR). This is why the Poll command sets the morePollsNeeded flag to true.

Then once handlePoll runs, we use the fetchStrategy to select the partitions that need data. (This is slightly tricky because this PR assumes that the fetch strategy will always select partitions that have no data. We may need to make this requirement more clear in the FetchStrategy scaladocs, or we can wrap the fetch strategy to make sure this happens.)

Finally, based on which partitions want data, and which actually got data we set the morePollsNeeded flag again (determined in line 259). This time it is used to determine whether we should send another Poll command or not (line 470 in this PR). So in case no data was received for some partition, the loop immediately starts again to give it another chance.

erikvanoosten commented 1 year ago

Okay, so there is a race condition 😄

// PartitionStreamControl.scala
ZStream.repeatZIOChunk {
    // First try to take all records that are available right now.
    // When no data is available, request more data and await its arrival.
    dataQueue.takeAll.flatMap ( data =>
      if (data.isEmpty) requestAndAwaitData // <--- Poll command send
      else ZIO.succeed(data)
    )
  }.flattenTake
  .chunksWith(_.tap(records => queueSize.update(_ - records.size))) //  <--- queueSize updated
  .interruptWhen(interruptionPromise)

This PR only works correctly when queueSize is set to 0 strictly before the Poll command is send. That is currently not the case.

erikvanoosten commented 1 year ago

so we can keep track of individual TPs that are requesting data..?

@svroonland Forgot to write this earlier: we already track this. The streams that have a queueSize of 0 are also the streams that requested data. (With the caveat that we have a race condition, see previous comment.)

svroonland commented 1 year ago

Forgot to write this earlier: we already track this. The streams that have a queueSize of 0 are also the streams that requested data. (With the caveat that we have a race condition, see previous comment.)

Yes, but that condition should somehow be a guarantee that there is also at least one Poll command on the queue, otherwise it would not lead to any code checking that condition (in handlePoll).

erikvanoosten commented 1 year ago

Yes, but that condition should somehow be a guarantee that there is also at least one Poll command on the queue, otherwise it would not lead to any code checking that condition (in handlePoll).

Indeed. That part is covered by what I wrote in https://github.com/zio/zio-kafka/pull/1063#issuecomment-1742195179

erikvanoosten commented 1 year ago

I removed the race condition. Also, we no longer depend on the FetchStrategy to do the right thing.

erikvanoosten commented 1 year ago

So in the end this PR makes state tracked by Runloop smaller, in turn it extends the state tracked by PartitionStreamControl. Some compute extensive operation were removed, but some others were introduced. A bit less data is passed around inside Runloop.

This PR should have a small positive effect on the garbage collector (heap allocations are used for a shorter time) but overall complexity is about the same.

yarosman commented 12 months ago

It's strange, but there were diffs from already merged pr 1065

erikvanoosten commented 12 months ago

It's strange, but there were diffs from already merged pr 1065

I extracted the uncontroversial parts from this PR into #1065. I didn't rebase it yet. Depending on @svroonland further comments I may not even bother and just close this PR.

erikvanoosten commented 12 months ago

@svroonland I am beginning to believe that we should not merge this PR. We may be able to improve this in other ways. Moving data to the Streams causes more coordination points (more ZIO's). Keeping this data in the run loop State avoids this.

guizmaii commented 11 months ago

@erikvanoosten Can you rebase your PR, please?

erikvanoosten commented 11 months ago

Let's try another way to tackle this.