apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.38k stars 1.26k forks source link

Pluggable Key-Value Store for Upserts #11658

Open Aravind-Suresh opened 11 months ago

Aravind-Suresh commented 11 months ago

Pinot upserts require an in-memory map that tracks the primary keys to the corresponding record locations. This is used to find if a record with that primary key already exists and if so, merges that with the incoming record. For tables with a large number of primary keys, this leads to huge memory consumption because this map is stored in the heap memory.

In certain use-cases, we came across tables that required a longer retention period and a strict level of correctness, so we explored alternatives on replacing this in-memory map with disk-backed maps. However, the current implementation (see ConcurrentMapPartitionUpsertManager) is heavily coupled with the in-memory map (Java’s ConcurrentHashMap). This reduces the flexibility for Pinot adopters to replace this Map with their own implementation of this map.

Creating this issue to discuss if we can extract an interface out of this to make the "Map" pluggable and to gather community's feedback.

This write-up talks about this idea in detail.

cc @tibrewalpratik17

Jackie-Jiang commented 11 months ago

It is already done this way, where TableUpsertMetadataManager is the pluggable interface, TableUpsertMetadataManagerFactory is the factory to plug in custom implementation, BaseTableUpsertMetadataManager and BasePartitionUpsertMetadataManager contains the general logic of upsert handling independent of the back storage. At StarTree we already have the pluggable off-heap upsert metadata manager, which proves that the current model works.

tibrewalpratik17 commented 11 months ago

@Jackie-Jiang we see that this exists but while trying to use other storage dbs, we had to copy-paste a lot of code from ConcurrentMapPartitionUpsertMetadataManager just to change the storage layer.

We realised having a storage-based interface makes it more flexible as we don't need to do any overhead maintenance of changes happening in ConcurrentMapPartitionUpsertMetadataManager and just implement ConcurrentMap interface. We essentially want the addRecord, updateRecord logic to be same in our case as well.

What we are interested in is changing the underlying storage for _primaryKeyToRecordLocationMap.put, _primaryKeyToRecordLocationMap.remove... calls and writing the entire ConcurrentMapPartitionUpsertMetadataManager again seems like an overkill.

Jackie-Jiang commented 11 months ago

If you are considering replacing concurrent hash map with an off-heap KV store, you won't be able to reuse the implementation in ConcurrentMapPartitionUpsertMetadataManager because it directly stores the segment reference. I'm in for extracting more sharable methods, so if you find anything useful, you may pull that up into the BasePartitionUpsertMetadataManager

Aravind-Suresh commented 11 months ago

Yes you are right @Jackie-Jiang. For our experiments, we did a small refactor to RecordLocation to hold the segment name instead of IndexSegment and derived the index segment from _trackedSegments (this is currently a set, but can be changed to a map).

Once this change is done, there won't be any serialisation problems, and we were able to change the ConcurrentHashMap with other implementations.

So, our suggestion is to change RecordLocation and memoize segmentName -> IndexSegment inside the ConcurrentMapPartitionUpsertManager class itself. Post this, we can change the map implementation to anything compatible with ConcurrentMap.

What do you think? @Jackie-Jiang

Jackie-Jiang commented 11 months ago

In that case how much code do you think can be reused? Currently ConcurrentMapPartitionUpsertMetadataManager heavily rely on ConcurrentHashMap.compute(), and TBH I don't think there are lots of implementation providing the same concurrency guarantee.

We've tried all 3 libraries you mentioned, and none of them provide the same guarantee as ConcurrentHashMap. If you really want to extract some common code, you may add a new implementation with a K-V store abstraction