linkedin / venice

Venice, Derived Data Platform for Planet-Scale Workloads.
https://venicedb.org
BSD 2-Clause "Simplified" License
487 stars 84 forks source link

[server][common] Fixed bug in AA/WC parallel processing support #1252

Closed gaojieliu closed 5 days ago

gaojieliu commented 2 weeks ago

Summary, imperative, start upper case, don't end with a period

This PR fixed the following issues:

  1. AASIT should pass non-null KeyLevelLocksManager to IngestionBatchProcessor, otherwise, race condition will happen.
  2. Fixed the locking order in IngestionBatchProcessor to avoid deadlock.
  3. Updated SparseConcurrentList#computeIfAbsent to skip adjust list size if the computed result is null.

    Does this PR introduce any user-facing changes?

    • [x] No. You can skip the rest of this section.
    • [ ] Yes. Make sure to explain your proposed changes and call out the behavior change.
gaojieliu commented 2 weeks ago

Sorry, I am doing mostly white line commenting in this round of review. I should have reviewed the first round of implementation of this functionality, but didn't get a chance, so I'm coming to the party a bit late. I just want to provide the following feedback:

The part of produceToStoreBufferServiceOrKafkaInBatch which splits the records coming from Kafka into "mini batches" seems not ideal to me, and I feel like we should get rid of it. To be clear, I don't think the current code cannot work, but just that it is more complex than it should be. I will list below some issues/concerns that I see with it, and all of these can be fixed individually, but I still think getting rid of that part of the algorithm could be overall better:

  1. The batch size comes from serverConfig.getAAWCWorkloadParallelProcessingThreadPoolSize(), which does not seem to make sense. The default value of that config is 8, which is likely too small. I think the intent was probably to have a separate config for the batch size.
  2. If the Kafka batch is smaller than the configured batch size, then we still instantiate an ongoingBatch of the configured size, which is wasteful in terms of memory allocation (easy to fix, but is it needed?).
  3. I expect that the batch size coming from Kafka should not be too large to handle, and so there is no point in splitting it. If anything, I would think there may be cases where the batch from Kafka is too small and that the optimal thing may be to accumulate records for some time and then create a mega batch (the opposite of a mini batch!).
  4. One of the benefits we hope to get from this functionality is to process together the updates to the same key, so that we do fewer RocksDB reads and writes in those cases. This effect is reduced by the mini batches.

So my recommendation would be to simplify this code by deleting the mini batching altogether, and just pass the Kafka batch directly into the next step. No need to implement the mega batching yet, we can make that call later.

The biggest reason of mini-batching is to make the key-level lock usage deterministic... If we always pass what we received to IngestionBatchProcessor, the KeyLevelLockManager will need to consider the max possible batch size, which is 100 IIRC. 4) is not implemented yet, so the smaller batch size in theory shouldn't affect performance much. When working on 4), we need to use a separate config to control the batch size, so that we can tune the memory usage/performance benefits gained by de-duping.

For 2), today's interface is Iterable, so we can't get the size without iterating it and to reduce the unnecessary copy, we need to change the interface to use something like List.