kuzudb / kuzu

Embeddable property graph database management system built for query speed and scalability. Implements Cypher.
https://kuzudb.com/
MIT License
1.39k stars 99 forks source link

Hash Index for multiple copy #2938

Open benjaminwinger opened 8 months ago

benjaminwinger commented 8 months ago

The current HashIndexBuilder assumes that we are creating the index from scratch, and the persistent HashIndex is not well-optimized for inserting a large number of entries at once. To support multiple COPY statements one of these things needs to change, and it seems simplest to combine the two into a single HashIndex which can be as fast as the HashIndexBuilder when there is no existing data (and ideally not much slower when doing subsequent copies).

After looking at doing some sort of disk array caching to try and reduce the amount of WAL reads+writes that would currently be done through the BaseDiskArray if we were to use the current HashIndex, I think the solution is to do as much work as possible in the hash index's in-memory HashIndexLocalStorage, and then write the data in a single pass, caching the page to write all 16 slots together, or as many as are modified within that page, as there would be no need to read or write the same page again afterwards (for existing overflow slots this may still require reading/writing the page as many as 16 times since overflow slots aren't necessarily grouped the way primary slots are, but that should still be much better than once per insert).

This should be very similar to what the HashIndexBuilder is already doing, except that the layout is different in the in-memory part, and our Slots aren't created until the data is flushed.

Optimizing Hash index Local Storage for fast updates

The HashIndexLocalStorage could be replaced with a structure that groups elements to be inserted (and deleted, but that's not relevant to copy) by their primary slot ID, with the number of primary slots calculated using the total number of entries in both local storage and on-disk (and re-hashing as necessary to keep the number of primary slots equal to what will be needed when writing to disk).

I suspect that a vector of vectors will perform best, though it does have significantly more overhead than the current unordered_map (1 24-byte vector per primary slot, plus small allocations for each vector's buffer when keys are inserted). It might be interesting to compare this to the HashIndexBuilder, as that could be adapted for use as a purely in-memory index, and has the benefit already of using the exact same slot mechanism and supporting rehashing, but wouldn't be well-optimized for doing a small number of inserts into a large existing index (since we'd need to more or less allocate space for the entire index in-memory).

Disk Array

To let the HashIndex read and update slots one page at a time (unless we change the slot size to one page), and to bypass the WAL file when writing new pages, there will need to be a new version of the disk array, or maybe just some new functions added to the existing BaseDiskArray.

Copy Pipeline optimizations

The copy pipeline currently waits until all data is read before beginning to build the hash index. We should instead process data one chunk at a time as soon as it is available. This should help avoid keeping too many copies of the data in-memory, and processing the data one chunk at a time provides the opportunity for the compiler to do vectorization, or use SIMD instructions manually. Edit: The data is being consumed every time a buffer is filled by the thread that finished filling it (unless it's still being consumed by another thread) (IndexBuilder::insert -> IndexBuilderLocalBuffers::insert -> IndexBuilderGlobalQueues::insert -> IndexBuilderGlobalQueues::maybeConsumeIndex). Incomplete buffers are consumed at the end.

Strings

The DiskOverflowFile used for handling string writes does not currently support parallel writes. In addition to co-ordinating writing to separate pages in the way that the HashIndexBuilder+InMemFile is, it would be helpful to avoid losing track of the write cursors after each time a COPY is done. Currently there is an overflowFileNextBytePosRecord WAL record which records the position to start writing, but that only works for a single writer and is not set during COPY, instead just starting from the end of the file. To support 256 writers without losing track of any space, the overflow file could include a header, written via the WAL file once writes are finished, which tracks the 256 write cursors and the page counter.

Multi-threaded prepareCommit

Compared to flushing in the current HashIndexBuilder, the HashIndex's prepareCommit is relatively expensive and like HashIndexBuilder::flush can be done in parallel without issues, it's just that it isn't called until after the parallel processing stage of the pipeline is completed. As long as changes to existing data goes through the WAL file I don't think it should be an issue to do this early, but I'm not really very familiar with the execution pipeline and not sure what the best way to handle it would be.

Slot Size

I'm not sure if this deserves its own issue, but it might be worthwhile to look into the size of the hash index slots. We're currently using 256 byte slots. I'm not sure exactly why that was chosen other than that I've noticed that it matches the size in the dash paper (see #2287), however that size was chosen to match the block size of intel optane memory (which is what dash is optimized for) and may not be particularly relevant to us.

I had tried some quick benchmarks recently increasing the block size to 512 and 4096 bytes, and while I don't have the exact results to hand, the results were more or less that lookups slowed down (more entries to iterate over), and inserts sped up (from a reduced need for overflow slots and possibly from adding larger units when rehashing).

I think that using SIMD instructions to speed up finding matching fingerprints will help significantly with the lookups and this should scale well to larger slot slot sizes (e.g. 512 byte SIMD can let us compare 64 entries in a single instruction, which would be a slot size of 1104 (obviously if we want a power of 2 size we'd need to have fewer entries in practice), plus the cost to scan the resulting bitmask), however I'm not sure if it's worthwhile looking into this until after some of the other changes have been done (though I did briefly play around with optimizing the fingerprint comparison to make use of auto-vectorization here, but that still only works with additional compiler flags).

However, if we are able to increase the slot size to equal the page size without reducing lookup performance too much, then the hash index could do a single pass write even for overflow slots, since each overflow slot would also take up a full page.

Changes

I think these should be done roughly in this order, but there is some flexibility as they are not all related to each other.

benjaminwinger commented 7 months ago

Progress of the Hash Index Rewrite

Baseline

Note: some of these numbers are subject to change as I'm writing this from memory; I'll re-produce the results and update this later to be more accurate

The benchmark I've been using is a 60 000 000 node copy of consecutive integers from a csv (produced by seq 60000000), to a table containing a single primary key int64 column. This is done using data stored in /tmp (tmpfs), and written to a database also in /tmp, to remove disk performance as a variable.

On the current master this takes roughly 2.2 3.8 seconds (release mode) on my 6 core 12 threads Ryzen 5500 machine.

Baseline Flamegraph ![flamegraph-baseline](https://github.com/kuzudb/kuzu/assets/8703943/7ce9dd59-6e9d-453f-886c-77c74719cc9b)

Flamegraphs unfortunately don't visualize well via github here, and many names are truncated. Downloading the svg file and opening in browser (in firefox anyway, not sure about others) will let you zoom in to different sections.

The flamegraph of the copy operation (using a RelWithDebInfo build) shows that the bulk of this work is done in the copy from CSV and HashIndexBuilder::append (both parallel, noting that the profile includes the total time, so parallel tasks are over-represented in comparison to serial ones). Flushing takes almost no time (0.18%, see note above about this being done in tmpfs).

Initial fairly naive implementation

Naive Implementation Flamegraph ![flamegraph](https://github.com/kuzudb/kuzu/assets/8703943/6dcdf9f8-72bb-4a59-a510-96bf122b94e6)

(Edit: this is taking roughly 18 seconds in release mode)

I've started off by implementing a version which breaks down the hash index insertion into three parts.

  1. Entries are inserted into the HashIndexBuilder, as in the baseline. The HashIndexBuilder is kept at a size that will match the eventual size of the on-disk index so that the primary slots are identical.
  2. Instead of flushing the HashIndexBuilder directly to disk, the in-memory data is merged with the on-disk data 2.1. The on-disk index is resized to fit the new total number of entries 2.2. Each primary slot is updated one by one with the corresponding entries from the builder.

This really just has one optimization compared to using the existing insertIntoPersistentIndex function: it inserts entries in a single pass, writing once per slot instead of once per entry.

Compared to the baseline, there are two significant new sections in the flamegraph:

Optimizations

The bulk of the work in HashIndex::mergeBulkInserts being done here appears to be writes through the buffer manager. By my calculations the current number of writes are: 33 writes per page when resizing (the header and page are updated once for each of the 16 elements in the page, and when the page is added the PIP is updated once), and 16 writes per page when updating slots (one per slot). This also adds to the cost of clearing the eviction queue, since an entry is added for each write. It should only be necessary to write on average slightly more than once per page. The data gets written once, the PIPs get cached and written only when full (or at the end) and the header gets cached and written at the end.

The initial work on this is promising, even if it hasn't changed much yet:

Flamegraph after caching pages when updating ![flamegraph2](https://github.com/kuzudb/kuzu/assets/8703943/35606d68-f4d4-42c3-b4d6-f808c746a4f9)

The above flamegraph is from some WIP changes which cache the page from the primary slot disk array when iterating over and updating the slots, with the result being that the cost of reserve and appendOverflowSlot are now the dominant parts of HashIndex::mergeBulkInserts.

Currently working on (in various stages of completeness; these should hopefully make the initial copy comparable to the old implementation):

TODO:

Will be updated when I've gotten further optimizations fully implemented

WAL File Replaying

It does appear that with some optimizations the majority of the time is now being spent clearing the WAL and/or flushing the buffer manager. In its current state, it's taking about 10 seconds (fourth item in table below): the first ~two seconds are spent creating the in-memory HashIndexBuilder, the next 1.5s (timed) doing PrimaryKeyIndex::prepareCommit, and the remaining ~6 seconds are presumably flushing the buffer manager and clearing the WAL (measured at 4.5s). I don't think it's represented very well in the flamegraphs. Presumably bypassing the WAL file will help (for the first copy anyway).

I've also opened #3137, which reduces the cost of clearing the WAL in this benchmark to 3.2s.

Performance Summary

This will probably get a little messy since I'm listing the changes cumulatively, however there are some things like #3109 which are being merged first. See commits for details

Changes (cumulative) Copy time in tmpfs for 60 million INT64 from CSV Second Copy of 60 million INT64
Baseline (4d21128a) 3.8s -
Naive resize + single pass update (e8bb1f9ad) ~17.1s -
With a write iterator caching the page being updated (6b4d51089) ~11.5s -
Caching the last DiskArray APPageIdx when appending (caused an unnecessary read from the WAL file for each slot; should be possible to cache the last PIP when writing so this may end up being redundant) e16b1e01b6b ~10.6s -
With WAL replayer FileInfo cache (5eb42af2c) ~9.2s -
After rebase (including #3109, though the write iterator mostly already handled that) (fed83baad39d38d7) ~8.8s -
With WAL file bypassing (e092969603dd) ~5.0s -
Caching PIP updates in the diskArray (e45ef549f0e0e2c) ~4.8s -
A quick test of mergeBulkInserts using openmp; a new intermediate operator that takes the hash indexes with the bulk inserts stored in memory and flushes them in parallel before adding the COPY TABLE record to the WAL should work as a more permanent solution (not included below) ~4.0s -
With multi-copy (95943678304e7 slight regression, or maybe just random variation) ~4.9s ~14.5s
benjaminwinger commented 6 months ago

Updated TODO List, roughly in order of priority:

benjaminwinger commented 6 months ago

Moved this out of the previous todolist comment. I was thinking about this in the context of what was originally the last bullet point there (changing splitSlots to avoid re-hashing the same entries multiple times). It's not the highest priority yet, mostly because it will be complicated, but I think this might be reasonable to implement.

Possible implementation of single pass split and merge

When the hash index is non-empty and the copy is much larger than the current size of the index. HashIndex::splitSlots currently will rehash the same slots multiple times in this case and it might be possible to re-write it to do just one pass over the slots. Further, it should be possible to combine this with the mergeBulkInserts function to do merging and splitting in a single pass.

Entries that get moved to a new slot could be moved into the InMemHashIndex. There would be roughly as many entries removed during rehashing as there are inserted from the InMemHashIndex for any given slot. E.g. (assuming hash is just the modulus operator) In Memory Slot 0 Slot 1
4 5
6 7
On Disk (note that in this version of the merging algorithm, slots 2 and 3 get added when they are filled, not ahead of time like the current algorithm) Slot 0 Slot 1 Slot 2 Slot 3
0 1
2 3

When processing the disk slot 0, entry 0 is left alone, entry 2 is kicked out and replaced with entry 4, while entry 2 can take entry 4's place in the in-memory index. Later entry 2 and entry 6 will both be added to slot 2. In this case though, it would actually make sense to process slot 0 and slot 2 simultaneously, like the splitSlots method in #3325, which would remove the need to move anything from disk into memory in this case, but in some cases it will still be necessary to do so. E.g.

In Memory Slot 0 Slot 1 Slot 2 Slot 3
8 9 6 7
12 13 10 11
16 17 14 15
On Disk (note that in the desired final configuration for 6 slots, the nextSplitSlotId would be 2, and slots would be hashed modulo 4 with anything ending up in 0 or 1 being re-hashed modulo 8). Slot 0 Slot 1 Slot 2 Slot 3 Slot 4 Slot 5
0 1
2 3
4 5

In this case, Slot 0 and Slot 2 could still be processed together, allowing entry 2 to be written to slot 2, while entry 8 and entry 16 go into slot 0, however entry 4 needs to be moved out of slot 0 on-disk into slot 0 in-memory as it will later need to be moved into slot 4 on-disk (along with entry 12). Handling two slots at a time (when slots are being split) should reduce the likelihood that the in memory index will need to be resized, as otherwise I think you end up moving more entries from disk into memory than the other way around.

Third example for the case where the on-disk index is larger and we start with a slot that doesn't need to be split:

In Memory Slot 0 Slot 1
10 11
12 13
14 15
On Disk Slot 0 Slot 1 Slot 2 Slot 3 Slot 4 Slot 5
0 1 2
4 3
8 5 6
7
9

In this case, Slot 0 was already split into slot 2 and slot 1 is over-full by comparison. Slot 0 gets processed first by itself and 4 gets moved into memory. 10 and 14 will later be stored in slot 2 and 4 will later be stored in slot 4. Then slot 1 and slot 3 get processed, 3, 7, 11 and 15 get moved into slot 3 on disk, 1 and 9 stay put, and 5 gets moved into memory and will later go into slot 5 with 13.

Maybe it would be possible to choose a second slot to process with the already split slot 0 so that we can avoid having to increase the size of the in-memory slot 0 since 4 gets moved into memory and nothing gets moved to the disk when processing slot 0. Slot two would be the obvious choice, since slots 1 and 3 will be processed next, so I think that would be roughly (1 << currentLevel) + slotId (and once slotId == nextSplitSlotId, the second slot would be a new empty slot).