mtesseract / nakadi-client

Haskell Client Library for the Nakadi Event Broker
Other
13 stars 9 forks source link

Implement foundation for different cursor commit strategies #86

Closed mtesseract closed 6 years ago

mtesseract commented 6 years ago

New type: CommitStrategy Include in Config Default strategy reflects current behaviour: committing every cursor. Can be extended with high-performance cursor committing strategies. This is the first step towards #66.

@etorreborre @amrhassan

mtesseract commented 6 years ago

This needs some changes prior to merging.

The naming is not correct (SyncUnbuffere is in fact AsyncUnbuffered).

Will do this later.

etorreborre commented 6 years ago

That looks good to me, maybe the hardest part is going to write a test for this feature :-). Also I was not aware that committing cursors could become a bottleneck. Did you have this experience in production?

mtesseract commented 6 years ago

Regarding tests: It shouldn't be too hard, I guess. We have tests testing subscription consumption. We could run them for each cursor comitting strategy.

Regarding performance: Yes, if I remember correctly, it can have significant impact for high-throughput subscriptions. Doing an extra HTTP call for every batch and waiting for its response (potentially retrying) is relatively expensive. I believe this can be mitigated by tuning the batch size, though.

We should have benchmarks comparing the numbers.

etorreborre commented 6 years ago

👍

mtesseract commented 6 years ago
nakadi-client Test Suite
  Subscriptions
    Processing
      SubscriptionProcessing/async/TimeBuffer:                  OK (3.49s)
      SubscriptionProcessing/sync:                              OK (8.42s)
      SubscriptionProcessing/async/NoBuffer:                    OK (5.68s)
mtesseract commented 6 years ago

Some updates on this.

Implementing this feature was pretty easy. But the test suite kept failing. :-( It turned out to be tricky for me to debug, in particular since I was not able to reproduce this problem on my machine. In the end, it looks like the following was the problem:

One of my assumptions about the behaviour of the dockerized Nakadi turned out to be wrong: I simply did not expect Nakadi to force clients (i.e., the test suite) to reconnect by simply stopping transmitting of non-empty batches. The tests for subscription consumption simply stopped receiving batches with events at some point, receiving only keep-alive cursors. Generally, it looks like the correct behaviour for clients in such a situation is to let the connection terminate gracefully. On the following reconnect Nakadi continues sending the events on the stream and the test suite is happy.

This PR not only implements the foundation for smarter committing strategies, but it also implements the smart committing strategy, which is showing good performance in my tests.

I consider this now to be review-ready. Thanks. :-) @etorreborre

etorreborre commented 6 years ago

This looks all good to me even if I can not tell why your last commit is better than the previous one :-).

mtesseract commented 6 years ago

Hi! There was a logical problem with the code before the last commit:

Using the smart strategy, committing is done on two occasions:

  1. When some fixed duration has elapsed without commit
  2. When at least one partition had crosses the configured event threshold.

On (2), the timer used for (1) was resetted and this is a per-stream timer, not a per-partition timer(!). Thus, the following could happen:

Say we have two partitions, one being high-throughput (A), the other one being very low-throughput (B) (artificial example, yes). Partition (A) reaches the event threshold before the timer has elapsed. In that case (before the commit!), only the cursor for partition (A) was committed. Furthermore, if (A) had a constantly high event throughput, always crossing the event threshold before the timer has elapsed, then cursors for partition (B) would never be committed.

I see two solutions here:

(i) Use per-partition committers including per-partition timer.

(ii) Simply commit all partitions when either the timer has elapsed or some partition has crossed the threshold.

While I think that (i) is, from a theoretical perspective, more elegant, I think (ii) is sufficient for nakadi-client and much simpler to implement (we would have to create and destroy timers dynamically, depending on how many partitions Nakadi has asigned to use). That's why I have quickly implemented (ii).