G-Research / tdigest-rs

Simple Python package to compute TDigests, implemented in Rust
Apache License 2.0
4 stars 2 forks source link

Using T-Digest while streaming data #18

Open shubhanshu02 opened 2 weeks ago

shubhanshu02 commented 2 weeks ago

I am working on a streaming system where I need to calculate the statistics of a metric like percentiles and medians. While the data I am getting is a stream, I want to query the percentiles at certain intervals.

Similar to the Python t-digest libray (https://github.com/CamDavidsonPilon/tdigest) which provides an option to update the t-digest with digest.update(value) function, does this library expose any function to add the data as it gets available?

Below is one way I can think of for archiving this. Is there any better way of doing this?

import numpy as np
from tdigest_rs import TDigest

stream = [1.0, 2.0, 3, 0.3]

tdigest = None
for data in stream:
    current_digest = TDigest.from_array(np.array([data], dtype=np.float32))

    if tdigest is None:
        tdigest = current_digest
    else:
        tdigest = tdigest.merge(current_digest)

print(tdigest.quantile(0.25))
dannyfriar commented 1 week ago

Yeah that is the correct way to use it when streaming data and this what I currently do.

We can add an update method to wrap this behaviour if that's useful or if you would like to contribute then I'm happy to review a PR.

shubhanshu02 commented 1 week ago

Great, thanks for confirming. Sure, I can make a pull request.

There is one another form of this algorithm mentioned in the t-digest paper where we first buffer the data points for some time and then merge the two t-digests. (Page 4, first paragraph)

One version keeps a buffer of incoming samples. When the buffer fills, the contents are sorted and merged with the centroids computed from previous samples. This merging form of the t-digest algorithm has the virtue of allowing all memory structures to be allocated statically. On an amortized basis, this buffer-and-merge algorithm can be very fast especially if the input buffer is large.

This results in better control over accuracy, speed and memory if we buffer-and-merge with different compression factors (stratified merge on Page 10).

Here, we can have the caller specify these values while keeping some default values:

  1. max buffer size: Maximum data points to collect in buffer before merging.
  2. delta: compression factor to use for creating t-digest objects
  3. merge delta: compression factor to merge t-digest objects

1 and 3 will only be required when the caller uses the update method. Otherwise, they will not be required. What do you think about this?

dannyfriar commented 1 week ago

I think it would be best to keep the update method simple and leave it to the user to buffer the incoming data prior to calling update. Having the distinction between the delta and merge_delta makes sense here though I think.

Generally we're keen to keep this library small and focused but if there are other useful building blocks needed for this alternative approach we can consider adding them.

shubhanshu02 commented 3 days ago

So, if I understood properly, then this is API we are trying to provide here:

function update(arr: numpy array):
    // takes the buffer of numbers and merges them with the current t-digest
    ...

Does this look good to you?

dannyfriar commented 3 days ago

Yeah I think that's all we need. Then just expose the delta and merge_delta parameters as you suggested.

def update(self, array: np.ndarray, delta: float, merge_delta: float)

shubhanshu02 commented 3 days ago

Got it. Thanks.