postmates / quantiles

Approximations to Histograms
MIT License
64 stars 7 forks source link

benchmark: CKMS slow + excessive memory #32

Open ljw1004 opened 3 years ago

ljw1004 commented 3 years ago

I wrote a benchmark to test the performance of CKMS and GK.

Findings: CKMS error=0.0001 delivers better and faster results than the error=0.001 suggested in its doc-comment. However, CKMS doesn't have any "sweet spot" - over the entire range where CKMS is feasible, it's slower and more space-intensive than Gk and even than just blindly storing every single value. This is at odds with what I expected from the paper, and also with the claimed memory bounds, so I wonder if there's an implementation bug? (Also, if we can live with just P99, then "store the top 1% of values in a priority queue" is competitive up to 10M values!!)

Method: The benchmark does ckms./gk.insert(value) a number of times then obtains quantiles. I measured wall-time using std::time::Instant::now() / .elapsed(), and I measured heap memory with stats_alloc::Region::new(&GLOBAL) / .change().bytes_allocated - bytes_deallocated + bytes_reallocated. I ran it with cargo run --release on my Macbook. I tried with a normal distribution in the range -0.5 to 1.5, and a pareto distribution in the range 5.0 to 20.0. As a baseline, I added another algorithm "ALL" which keeps every single value in memory - this tells me "perfect" expected values of min/P50/P99/max to judge how accurate GK/CKMS are, and there's no justification in taking more memory than this!

ljw1004 commented 3 years ago

Here's the benchmark source code.

[package]
name = "rf"
version = "0.1.0"
edition = "2018"

[dependencies]
quantiles = "0.7.1"
rand = "0.8.4"
rand_distr = "0.4.2"
ordered-float = "2.8.0"
rand_pcg = "0.3.1"
stats_alloc = "0.1.8"
tdigest = "0.2.2"
thousands = "0.2.0"
fn main() {
    test_counts();
    println!("\n\nHERE'S HOW WE SETTLED ON PARAMETERS");
    test_gk_and_cksm_params();
    test_digest_params();
}

#[allow(dead_code)]
fn test_counts() {
    let counts = vec![10_000, 100_000, 1_000_000, 10_000_000, 100_000_000];
    let ckms_error = 0.0001;
    let gk_error = 0.001;
    let tdigest_batch = 20_000;
    let tdigest_max_size = 200;
    let dn = rand_distr::Normal::new(0.5f64, 0.2f64).unwrap();
    let dp = rand_distr::Pareto::new(5f64, 10f64).unwrap();
    for count in counts {
        let mut a0 = NoAggregate::new();
        let mut am = MeanAggregate::new();
        let mut av = AllValues::new(count);
        let mut at = TopValues::new(count);
        let mut aq = QuantilesCKMS::new(ckms_error);
        let mut ag = QuantilesGK::new(gk_error);
        let mut ad = TDigestAg::new(tdigest_batch, tdigest_max_size);

        println!("\nCOUNT={}, GK_ERROR={}, CKMS_ERROR={}, TDIGEST_BATCH={}, TDIGEST_MAX_SIZE={}", count.separate_with_underscores(), gk_error, ckms_error, tdigest_batch.separate_with_underscores(), tdigest_max_size);
        println!("    NORMAL DISTRIBITION");
        test(count, &mut a0, dn);
        test(count, &mut am, dn);
        test(count, &mut av, dn);
        test(count, &mut at, dn);
        test(count, &mut ag, dn);
        if count < 50_000_000 {test(count, &mut aq, dn);}
        test(count, &mut ad, dn);
        println!("    PARETO DISTRIBUTION");
        test(count, &mut a0, dp);
        test(count, &mut am, dp);
        test(count, &mut av, dp);
        test(count, &mut at, dp);
        test(count, &mut ag, dp);
        if count < 50_000_000 {test(count, &mut aq, dp);}
        test(count, &mut ad, dp);
    }
}

#[allow(dead_code)]
fn test_digest_params() {
    let count = 10_000_000;
    let dp = rand_distr::Pareto::new(5f64, 10f64).unwrap();
    let dn = rand_distr::Normal::new(0.5f64, 0.2f64).unwrap();
    for max_size in [10, 100, 500, 1000, 5000] {
        let batch = 20_000;
        let mut av = AllValues::new(count);
        let mut at = TDigestAg::new(batch, max_size);
        println!("\nMAX_SIZE={}, BATCH={}, COUNT={}", max_size, batch.separate_with_underscores(), count.separate_with_underscores());
        println!("    NORMAL DISTRIBITION");
        test(count, &mut av, dn);
        test(count, &mut at, dn);
        println!("    PARETO DISTRIBUTION");
        test(count, &mut av, dp);
        test(count, &mut at, dp);
    }
    println!("");
    for batch in [100, 1000, 5000, 10_000, 20_000, 50_000, 100_000] {
        let max_size = 200;
        let mut av = AllValues::new(count);
        let mut at = TDigestAg::new(batch, max_size);
        println!("\nBATCH={}, MAX_SIZE={}, COUNT={}", batch.separate_with_underscores(), max_size, count.separate_with_underscores());
        println!("    NORMAL DISTRIBITION");
        test(count, &mut av, dn);
        test(count, &mut at, dn);
        println!("    PARETO DISTRIBUTION");
        test(count, &mut av, dp);
        test(count, &mut at, dp);
    }
}

#[allow(dead_code)]
fn test_gk_and_cksm_params() {
    let count = 1_000_000;
    let dp = rand_distr::Pareto::new(5f64, 10f64).unwrap();
    let dn = rand_distr::Normal::new(0.5f64, 0.2f64).unwrap();
    for error in [0.1, 0.01, 0.001, 0.0001, 0.000_01, 0.000_001] {
        let mut av = AllValues::new(count);
        let mut ag = QuantilesGK::new(error);
        let mut aq = QuantilesCKMS::new(error);
        println!("\nERROR={}, COUNT={}", error, count.separate_with_underscores());
        println!("    NORMAL DISTRIBITION");
        test(count, &mut av, dn);
        if error > 0.000005 {test(count, &mut ag, dn);}
        test(count, &mut aq, dn);
        println!("    PARETO DISTRIBUTION");
        test(count, &mut av, dp);
        if error > 0.000005 {test(count, &mut ag, dp);}
        test(count, &mut aq, dp);
    }
}

trait Aggregate {
    fn anew(&self) -> Self;
    fn insert(&mut self, value: f64);
    fn render(&mut self) -> String;
}

// INSTRUMENTED_SYSTEM is an instrumented instance of the system allocator
#[global_allocator]
static GLOBAL: &stats_alloc::StatsAlloc<std::alloc::System> = &stats_alloc::INSTRUMENTED_SYSTEM;

fn test<A: Aggregate, D: rand::distributions::Distribution<f64>>(
    count: usize,
    aggregate: &mut A,
    distribution: D,
) {
    let mut rng = rand_pcg::Pcg64::new(0xcafef00dd15ea5e5, 0xa02bdbf7bb3c0a7ac28fa16a64abf96);
    let start = std::time::Instant::now();
    let startmem = stats_alloc::Region::new(&GLOBAL);
    let mut aggregate = aggregate.anew();
    for _ in 0..count {
        let value = distribution.sample(&mut rng);
        aggregate.insert(value);
    }
    let insert_elapsed = start.elapsed().as_secs_f64();
    let start = std::time::Instant::now();
    let fmt = aggregate.render();
    let fmt_elapsed = start.elapsed().as_secs_f64();
    let mem = startmem.change();
    let bytes_change = mem.bytes_allocated as isize - mem.bytes_deallocated as isize + mem.bytes_reallocated;
    println!(
        "        {:.3}s+{:.3}s, {}k heap, {}k stack, {}",
        insert_elapsed,
        fmt_elapsed,
        bytes_change / 1024,
        std::mem::size_of_val(&aggregate) / 1024,
        fmt,
    );
}

struct NoAggregate {}
impl NoAggregate {
    fn new() -> Self {
        Self {}
    }
}
impl Aggregate for NoAggregate {
    fn insert(&mut self, _value: f64) {}
    fn anew(&self) -> NoAggregate { Self::new() }
    fn render(&mut self) -> String {
        format!("NoAggregate")
    }
}

#[derive(Default)]
struct MeanAggregate {
    min: Option<f64>,
    max: Option<f64>,
    mean: f64,
    variance_sum: f64,
    count: usize,
}
impl MeanAggregate {
    fn new() -> Self {
        std::default::Default::default()
    }
}
impl Aggregate for MeanAggregate {
    fn render(&mut self) -> String {
        format!(
            "MeanAggregate, min={:.4}, mean={:.4} (stdev {:.4}),  max={:.4}",
            self.min.unwrap(),
            self.mean,
            (self.variance_sum / self.count as f64).sqrt(),
            self.max.unwrap(),
        )
    }

    fn anew(&self) -> Self { MeanAggregate::new() }
    fn insert(&mut self, value: f64) {
        match self.min {
            None => self.min = Some(value),
            Some(min) if value < min => self.min = Some(value),
            _ => {}
        }
        match self.max {
            None => self.max = Some(value),
            Some(max) if value > max => self.max = Some(value),
            _ => {}
        }
        self.count += 1;
        let new_mean = self.mean + (value - self.mean) / self.count as f64;
        self.variance_sum += (value - self.mean) * (value - new_mean);
        self.mean = new_mean;
    }
}

struct AllValues {
    values: Vec<f32>,
}
impl AllValues {
    fn new(count: usize) -> Self {
        let values = Vec::with_capacity(count);
        AllValues { values }
    }
}
impl Aggregate for AllValues {
    fn render(&mut self) -> String {
        self.values.sort_by(|a, b| a.partial_cmp(b).unwrap());
        let len = self.values.len();
        format!(
            "AllValues, min={:.4}, P50={:.4}, P99={:.4}, max={:.4}",
            self.values[0],
            self.values[len / 2],
            self.values[len * 99 / 100],
            self.values[len - 1],
        )
    }

    fn anew(&self) -> Self { Self::new(self.values.capacity()) }
    fn insert(&mut self, value: f64) {
        self.values.push(value as f32);
    }
}

struct TopValues {
    count: usize,
    values: std::collections::BinaryHeap<std::cmp::Reverse<ordered_float::NotNan<f32>>>,
}
impl TopValues {
    fn new(count: usize) -> Self {
        let capacity = std::cmp::max(count / 100, 1);
        let values = std::collections::BinaryHeap::with_capacity(capacity);
        TopValues { count, values }
    }
}
impl Aggregate for TopValues {
    fn render(&mut self) -> String {
        let p99 = self.values.peek().unwrap().0;
        let max = self.values.drain().min().unwrap().0;
        format!("TopValues, p99={:.4}, max={:.4}", p99, max)
    }
    fn anew(&self) -> Self { Self::new(self.count) }
    fn insert(&mut self, value: f64) {
        let value = value as f32;
        let value = std::cmp::Reverse(unsafe { ordered_float::NotNan::new_unchecked(value) });

        if self.values.len() < self.values.capacity() {
            self.values.push(value);
        } else if self.values.peek().unwrap().0 < value.0 {
            self.values.pop();
            self.values.push(value);
        } else {
        }
    }
}

struct QuantilesCKMS {
    error: f64,
    q: quantiles::ckms::CKMS<f64>,
}
impl QuantilesCKMS {
    fn new(error: f64) -> Self {
        let q = quantiles::ckms::CKMS::new(error);
        QuantilesCKMS { error, q }
    }
}
impl Aggregate for QuantilesCKMS {
    fn render(&mut self) -> String {
        format!(
            "QuantilesCKMS, min={:.4}, mean={:.4}, p50={:.4}, P99={:.4}, max={:.4}",
            self.q.query(0.0).unwrap().1,
            self.q.cma().unwrap(),
            self.q.query(0.5).unwrap().1,
            self.q.query(0.99).unwrap().1,
            self.q.query(1.0).unwrap().1,
        )
    }
    fn anew(&self) -> Self {Self::new(self.error) }
    fn insert(&mut self, value: f64) {
        self.q.insert(value);
    }
}

struct QuantilesGK {
    error: f64,
    q: quantiles::greenwald_khanna::Stream<ordered_float::NotNan<f64>>,
}
impl QuantilesGK {
    fn new(error: f64) -> Self {
        let q = quantiles::greenwald_khanna::Stream::new(error);
        QuantilesGK { q, error }
    }
}
impl Aggregate for QuantilesGK {
    fn render(&mut self) -> String {
        format!(
            "QuantilesGK, min={:.4}, p50={:.4}, P99={:.4}, max={:.4}",
            self.q.quantile(0.0),
            self.q.quantile(0.5),
            self.q.quantile(0.99),
            self.q.quantile(1.0),
        )
    }
    fn anew(&self) -> Self {
        Self::new(self.error)
    }
    fn insert(&mut self, value: f64) {
        let value = unsafe { ordered_float::NotNan::new_unchecked(value) };
        self.q.insert(value);
    }
}

struct TDigestAg {
    batch: Vec<f64>,
    t : tdigest::TDigest,
}
impl TDigestAg {
    fn new(batch: usize, max_size: usize) -> Self { 
        let batch = Vec::with_capacity(batch);
        let t = tdigest::TDigest::new_with_size(max_size);
        TDigestAg { batch, t}
    }
    fn merge(&mut self) {
        let capacity = self.batch.capacity();
        let prev = std::mem::replace(&mut self.batch, Vec::with_capacity(capacity));
        self.t = self.t.merge_unsorted(prev);
        self.batch.clear();
    }
}
impl Aggregate for TDigestAg {
    fn render(&mut self) -> String {
        self.merge();
        format!("TDigest, min={:.4}, mean={:.4}, P50={:.4}, P99={:.4}, max={:.4}",
        self.t.min(),
        self.t.mean(),
        self.t.estimate_quantile(0.5),
        self.t.estimate_quantile(0.99),
        self.t.max(),
    )
    }
    fn anew(&self) -> Self { Self::new(self.batch.capacity(), self.t.max_size()) }
    fn insert(&mut self, value: f64) {
        if self.batch.len() == self.batch.capacity() {
            self.merge();
        }
        self.batch.push(value);
    }
}
ljw1004 commented 3 years ago

Here are the raw results from the benchmark on my Macbook.

How do the algorithms scale with number of values?

How did we settle on "error" parameter for GK/CKMS?

How did we settle on "batch" and "max-size" parameters for TDigest?