guycipher / k4

High-performance open-source, durable, transactional embedded storage engine designed for low-latency, and optimized read and write efficiency.
https://pkg.go.dev/github.com/guycipher/k4/v2
BSD 3-Clause "New" or "Revised" License
249 stars 6 forks source link

Multithreaded compaction #8

Closed guycipher closed 3 weeks ago

guycipher commented 3 weeks ago

During the compaction operation, we could implement pairing of SSTables and process each pair with its own routine. This way, every pair and merge operation within a compaction can run in parallel, leveraging the advantages of parallelism, especially when using an SSD.

guycipher commented 3 weeks ago

Pretty easy to implement. We simply add to the K4 waitgroup the sstable merge routines. Like this on shutdown we also still wait for compactions to finish up.

guycipher commented 3 weeks ago

Rough but this is the idea

func (k4 *K4) compact() error {
    k4.sstablesLock.Lock()
    defer k4.sstablesLock.Unlock()

    k4.printLog("Starting compaction")

    // we merge the first sstable with the second sstable and so on
    // each merge creates a new sstable, removing the former sstables

    // we will figure out how many pairs we can make
    pairs := len(k4.sstables) / 2

    // for each pair we spin up a routine
    // each routine will merge the ith sstable with the (i+1)th sstable
    // this is called multi-threaded compaction
    // we will wait for all the routines to finish before we return compaction is done

    wg := &sync.WaitGroup{} // wait group for the routines

    // we start from oldest sstables
    for i := 0; i < pairs; i++ {

        wg.Add(1)
        go func() {
            defer wg.Done()

            // we will merge the ith sstable with the (i+1)th sstable
            // we will create a new sstable and write the merged data to it
            // then we will remove the ith and (i+1)th sstable
            // then we will add the new sstable to the list of sstables

            // we will create a bloom filter which will be on initial pages of sstable
            // we will add all the keys to the bloom filter
            // then we will add the key value pairs to the sstable

            // create a bloom filter
            bf := bloomfilter.NewBloomFilter(1000000, 8)

            // create a new sstable
            newSstable, err := k4.createSSTable()
            if err != nil {
                k4.printLog(fmt.Sprintf("Failed to create sstable: %v", err))
                return
            }

            // get the ith and (i+1)th sstable
            sstable1 := k4.sstables[i]
            sstable2 := k4.sstables[i+1]

            // add all the keys to the bloom filter
            it := newSSTableIterator(sstable1.pager, k4.compress)
            for it.next() {
                key := it.currentKey()
                bf.Add(key)
            }

            it = newSSTableIterator(sstable2.pager, k4.compress)
            for it.next() {
                key := it.currentKey()
                bf.Add(key)
            }

            // serialize the bloom filter
            bfData, err := bf.Serialize()
            if err != nil {
                k4.printLog(fmt.Sprintf("Failed to serialize bloom filter: %v", err))
                return
            }

            // Write the bloom filter to the SSTable
            _, err = newSstable.pager.Write(bfData)
            if err != nil {
                k4.printLog(fmt.Sprintf("Failed to write to sstable: %v", err))
                return
            }

            // iterate over the ith and (i+1)th sstable
            it = newSSTableIterator(sstable1.pager, k4.compress)
            for it.next() {
                key, value := it.current()

                // Check for compression
                if k4.compress {
                    key, value, err = compressKeyValue(key, value)
                    if err != nil {
                        k4.printLog(fmt.Sprintf("Failed to compress key-value pair: %v", err))
                        return
                    }
                }

                // Serialize key-value pair
                data := serializeKv(key, value)

                // Write to SSTable
                _, err := newSstable.pager.Write(data)
                if err != nil {
                    k4.printLog(fmt.Sprintf("Failed to write to sstable: %v", err))
                    return
                }
            }

            it = newSSTableIterator(sstable2.pager, k4.compress)

            for it.next() {
                key, value := it.current()

                // Check for compression
                if k4.compress {
                    key, value, err = compressKeyValue(key, value)
                    if err != nil {
                        k4.printLog(fmt.Sprintf("Failed to compress key-value pair: %v", err))
                        return
                    }
                }

                // Serialize key-value pair
                data := serializeKv(key, value)

                // Write to SSTable
                _, err = newSstable.pager.Write(data)
                if err != nil {
                    k4.printLog(fmt.Sprintf("Failed to write to sstable: %v", err))
                    return
                }
            }

            // Remove the ith and (i+1)th sstable
            err = sstable1.pager.Close()
            if err != nil {
                k4.printLog(fmt.Sprintf("Failed to close sstable: %v", err))
                return
            }

            err = sstable2.pager.Close()
            if err != nil {
                k4.printLog(fmt.Sprintf("Failed to close sstable: %v", err))
                return
            }

            // remove sstables from the list
            k4.sstables = append(k4.sstables[:i], k4.sstables[i+2:]...)

            // Append SSTable to list of SSTables
            k4.sstables = append(k4.sstables, newSstable)

            // remove the paired sstables from the directory
            err = os.Remove(k4.directory + string(os.PathSeparator) + sstableFilename(i))
            if err != nil {
                k4.printLog(fmt.Sprintf("Failed to remove sstable: %v", err))
                return
            }

            err = os.Remove(k4.directory + string(os.PathSeparator) + sstableFilename(i+1))
            if err != nil {
                k4.printLog(fmt.Sprintf("Failed to remove sstable: %v", err))
                return
            }
        }()

    }

    wg.Wait() // wait for all the routines to finish

    k4.printLog("Compaction completed")

    return nil
}
guycipher commented 3 weeks ago

I did something similar actually with https://github.com/tidesdb/tidesdb/blob/master/libtidesdb.cpp TidesDB multithreaded compaction.

guycipher commented 3 weeks ago

Waow! The multithreaded compaction works REALLY well. Ok well, I'll write more tests before commiting for v1.6.0 just to be sure on this new enhancement.

guycipher commented 3 weeks ago

Completed. https://github.com/guycipher/k4/releases/tag/v1.6.0