kelindar / column

High-performance, columnar, in-memory store with bitmap indexing in Go
MIT License
1.44k stars 57 forks source link

Sum functionality for Numeric columns in transactions #53

Closed Dreeseaw closed 2 years ago

Dreeseaw commented 2 years ago

This PR introduces a simple sum aggregation for transactions. It adds 3 functions, txn.SumInt64(), txn.SumUint64(), & txn.SumFloat64(), that sum the given Numeric type column with respect to the txn's current index, via txn_lock's rangeRead().

Since there are no prior built-in aggregation functions in the package, I understand if the goal is to let the user handler that functionality via a txn.Range() call. However, I believe that more work could be done to further parallelize this operation (read & add all chunks concurrently?). Due to the data access required for this, accounting for simple aggregations in column itself seems like the best place.

Dreeseaw commented 2 years ago

@kelindar Hello, I'm working on fixing the data race, but would this functionality be something wanted here in general?

on99 commented 2 years ago

this is a great feature IMO

Dreeseaw commented 2 years ago

I've been looking into the test failure in the most recent commit (https://github.com/kelindar/column/runs/6029689995?check_suite_focus=true) and can not replicate it through using Docker to test with Ubuntu 20.04 (Github Actions Workflow uses Bullseye).

Test Dockerfile -

FROM golang:1.17-bullseye

WORKDIR /usr/

COPY go.mod go.sum ./
RUN go mod download && go mod verify

COPY . .
RUN go test -race -covermode atomic -coverprofile=profile.cov .

However, I did notice that the Github Actions are currently being run with Go 1.16 - https://github.com/kelindar/column/blob/main/.github/workflows/test.yml#L14. Is this something that should be updated, as go.mod indicates 1.17? I can't say this difference itself made the test non-deterministic, but is something to note.

If anyone would like to take a look themselves, I'd greatly appreciate it.

kelindar commented 2 years ago

@Dreeseaw, first of all, thanks for the contribution. However, this one is a bit tricky, I need to better understand the use-case and the intent. The approach proposed seems to make the code more concise but not necessarily faster.

Currently we can sum all of the cells of a column the following manner:

var sum float64
players.Query(func(txn *Txn) error {
    balance := txn.Float64("balance")
    txn.Range(func(idx uint32) {
        v, _ := balance.Get()
        sum += v
    })
    return nil
})

Now, there's a few of problems with this code:

  1. It's very verbose to write for this use-case of simply summing values
  2. It's not parallelised by default, as you'd need to write more code to create several goroutines
  3. It's not vectorised and cannot be since we do not have access to the raw data array

What this PR is proposing would solve problem (1) but not (2) or (3), which could potentially be added in future releases.

players.Query(func(txn *Txn) error {
    sum := txn.SumFloat64("balance")
    return nil
})

There's a couple problems with this approach as well:

Moreover, once we open this pandora's box, we'll also need to implement other aggregation functions such as Min(), Max(), Avg() and so on.

Now, I do agree that having aggregation functions is useful in the library, we just need to think a bit more about the right way of adding it. What we could do is instead add these functions onto the column readers themselves such as float64Reader. To avoid code duplication we might need to look into the generics for these readers, or simply generate the appropriate code as we currently do.

Ideally, our API should look relatively simple and in line with the current design:

players.Query(func(txn *Txn) error {
    sum := txn.Float64("balance").Sum()
    return nil
})

This would solve for problem (1) by making this code more concise. We also have the ability to solve for (3) given that these readers have access to raw data of the column, hence we can use SIMD vectorisation to optimise certain aggregations, such as sum, and initially the implementation would rely on the txn.Range() function itself to maximise code reuse, as I have tried to optimise this function and all future optimisation efforts can be done there.

Now, for the parallelisation issue, it's quite tricky and I don't know how to properly solve it on the API design at the moment. I do think if we vectorise these there will not be a major need for parallelisation unless you operate on very large arrays. We could consider introducing something like RangeParallel() to explicitly iterate chunk by chunk in multiple goroutines, but it might bring other problems.

Dreeseaw commented 2 years ago

@kelindar Thanks for the response! I really appreciate hearing from you.

I agree that my original implementation isn't that well thought-out. I've been playing around with an API structure more inline with what's already existing, and hit a little snag trying to implement the functionality as you've defined above -

players.Query(func(txn *Txn) error {
    sum := txn.Float64("balance").Sum()
    return nil
})

which would require a Sum from numberReaders (column_generate.go) -

func (s numberReader) Sum() number {
    // no access to txn.rangeRead
    // current readers solely operate at *cursor, not entire column
}

The only similar functionality to aggregations is filtering (accessing entire column wrt txn index), which is done via the txn.FilterInt64 & so on, giving it access to rangeRead and the internal, optimizable columnReader functions. I agree that the use of a string param in the API is confusing when the string param normally means reader or index creation.

To try and provide a similar-feeling API to the one you've shown above, I think adding two new generic functions are the best way to provide the functionality under the current API structure -

func (c *numberColumn) Sum(offset uint32, index bitmap.Bitmap) number {
        var total number
        total = 0

        // simple access for now, but more could be down in the future
        index.Range(func (idx uint32) {
                idx = offset + idx
                total = total + number(c.data[idx])
        })
        return total
}

func (txn *Txn) Sum(s *numberWriter) number {
        var total number
        total = 0

        txn.rangeRead(func(offset uint32, index bitmap.Bitmap) {
                total = total + s.reader.Sum(offset, index)
        })
        return total
}

Let me know what you think of this approach.

Also, I'm having a little trouble using genny to generate new column_number.go files for testing. I understand the purpose, but how exactly do you run the tool?

kelindar commented 2 years ago

I finally got some time to clean up this mess: no more genny and refactored the numerical columns in #61. I've also added Sum() aggregation function implementation in this PR (see https://github.com/kelindar/column/pull/61/files#diff-3c51c19b5c0318fcbd139f437fe0df252d2e178dc856ebb7009856f3d0c5d9a3R169-R177), so this can now be done generically for the moment and possible to improve in future.

Dreeseaw commented 2 years ago

@kelindar I love the upgrades to Generics & exposing the whole txn for column readers!

Since you got around to adding Sum(), I went to clean up the README & my original tests, and I realized that Sums are not stable within a single transaction. This could effect not only the situation in the test, but also the one laid out in my README changes (filtering on the value based on a sum). While I haven't run similar tests for updates & deletes yet, I'd think they would struggle with the same problems as running simultaneous inserts.

To combat this, I think holding a read lock on each shard accessed in the Sum() from the time the aggregation is called until the end of the transaction is a solid beginning approach. This covers each of insert, update, & delete while providing a consistent view across all aggregations and functionality depend on those values.

Let me know what you have in mind for fixing this. Also, thanks again for the generics work to make this possible!

kelindar commented 2 years ago

@Dreeseaw interesting point. So what you're seeing in this transaction test is actually expected, it's a non-repeatable read anomaly as 2 sums in the same transaction can return a different result. Basically, the current isolation level provided by the library is essentially read committed on a per-shard basis, so as soon as a separate transaction commits to the shard, you'll start seeing the data in the ongoing transactions.

Full read-lock on the transaction would be inefficient, as we would need to know which shards we'd need to lock. This specific scenario can be avoided by implementing repeatable read or snapshot isolation level, but similarly it will have a negative impact on performance and would also require a rather major re-design as each column would need to do something like a copy-on-write of each shard and we'd need to maintain a linked list of modified shards for each transaction.

Dreeseaw commented 2 years ago

Got it, thanks for the explanation.

I've removed the unstable test and left the added documentation and benchmark for you to run locally on your next PR.

kelindar commented 2 years ago

@Dreeseaw to link with this discussion, I've got the vectorized aggregates working https://github.com/kelindar/column/pull/67