apache / arrow-rs

Official Rust implementation of Apache Arrow
https://arrow.apache.org/
Apache License 2.0
2.64k stars 809 forks source link

interleave with memory prefetching: 2x faster? #6813

Open richox opened 4 days ago

richox commented 4 days ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do. memory prefetching is widely used in randomly accessing array items, which is very suitable in some cases of take/interleave kernels.

i have done some fast benchmark, it shows for completely random input, interleave with prefetching gains 2x performance with the current interleave implement:

$ cargo run --release
disable prefetch time: 1.305 sec
 enable prefetch time: 0.695 sec
disable prefetch time: 1.331 sec
 enable prefetch time: 0.690 sec
disable prefetch time: 1.318 sec
 enable prefetch time: 0.724 sec

benchmark code:

#![feature(core_intrinsics)]

use std::error::Error;
use std::intrinsics::prefetch_read_data;
use std::sync::Arc;
use std::time::Instant;

use arrow::array::*;
use arrow::buffer::*;
use arrow::datatypes::*;
use arrow::error::ArrowError;

fn main() -> Result<(), Box<dyn Error>> {
    let generate_random_int_string = || format!("SomeTestStr={}", rand::random::<u64>());
    let mut arrays: Vec<ArrayRef> = vec![];
    for _ in 0..1000 {
        let str_array = (0..1000)
            .map(|_| generate_random_int_string())
            .collect::<Vec<_>>();
        arrays.push(Arc::new(StringArray::from_iter_values(str_array)));
    }

    let mut random_indices = vec![];
    for _ in 0..10000000 {
        random_indices.push((
            rand::random::<usize>() % arrays.len(),
            rand::random::<usize>() % arrays[0].len(),
        ));
    }
    let random_indices_len = random_indices.len();
    for i in 0..random_indices_len {
        random_indices.swap(i, rand::random::<usize>() % random_indices_len);
    }

    fn timer<T>(name: &str, f: impl FnOnce() -> T) -> T {
        let start_time = Instant::now();
        let ret = f();
        println!("{name} time: {:.3} sec", start_time.elapsed().as_secs_f64());
        ret
    }

    // warm up
    assert_eq!(
        &interleave_without_prefetch(&arrays, &random_indices)?,
        &interleave_with_prefetch(&arrays, &random_indices)?,
    );

    // benchmark
    for _ in 0..3 {
        let batch1 = timer("disable prefetch", || interleave_without_prefetch(&arrays, &random_indices))?;
        let batch2 = timer(" enable prefetch", || interleave_with_prefetch(&arrays, &random_indices))?;
        assert_eq!(&batch1, &batch2);
    }
    Ok(())
}

fn interleave_without_prefetch(
    values: &[ArrayRef],
    indices: &[(usize, usize)],
) -> Result<ArrayRef, ArrowError> {
    arrow::compute::interleave(&values.iter().map(|v| v.as_ref()).collect::<Vec<_>>(), indices)
}

fn interleave_with_prefetch(
    values: &[ArrayRef],
    indices: &[(usize, usize)],
) -> Result<ArrayRef, ArrowError> {

    struct Interleave<'a, T> {
        arrays: Vec<&'a T>,
        nulls: Option<NullBuffer>,
    }

    impl<'a, T: Array + 'static> Interleave<'a, T> {
        fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
            let mut has_nulls = false;
            let arrays: Vec<&T> = values
                .iter()
                .map(|x| {
                    has_nulls = has_nulls || x.null_count() != 0;
                    x.as_any().downcast_ref().unwrap()
                })
            .collect();

            let nulls = match has_nulls {
                true => {
                    let mut builder = BooleanBufferBuilder::new(indices.len());
                    for (a, b) in indices {
                        let v = arrays[*a].is_valid(*b);
                        builder.append(v)
                    }
                    Some(NullBuffer::new(builder.finish()))
                }
                false => None,
            };

            Self { arrays, nulls }
        }
    }

    fn interleave_bytes<T: ByteArrayType>(
        values: &[&dyn Array],
        indices: &[(usize, usize)],
        ) -> Result<ArrayRef, ArrowError> {
        let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);

        let mut capacity = 0;
        let mut offsets = BufferBuilder::<T::Offset>::new(indices.len() + 1);
        offsets.append(T::Offset::from_usize(0).unwrap());
        for (a, b) in indices {
            let o = interleaved.arrays[*a].value_offsets();
            let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
            capacity += element_len;
            offsets.append(T::Offset::from_usize(capacity).expect("overflow"));
        }

        let mut values = MutableBuffer::new(capacity);
        for (i, (a, b)) in indices.iter().enumerate() {
            ////////////////////////////////////////////////////////////
            // prefetch next values
            ////////////////////////////////////////////////////////////
            const PREFETCH_AHEAD: usize = 4;
            if i + PREFETCH_AHEAD < indices.len() {
                let (pa, pb) = indices[i + PREFETCH_AHEAD];
                unsafe {
                    let array = interleaved.arrays.get_unchecked(pa);
                    let start = *array.value_offsets().get_unchecked(pb);
                    let ptr = array.values().as_ptr().wrapping_add(start.as_usize());
                    prefetch_read_data(ptr, 3);
                }
            }
            values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
        }

        // Safety: safe by construction
        let array = unsafe {
            let offsets = OffsetBuffer::new_unchecked(offsets.finish().into());
            GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
        };
        Ok(Arc::new(array))
    }

    let values = values.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
    match values.get(0).map(|v| v.data_type()) {
        Some(DataType::Utf8) => interleave_bytes::<GenericStringType<i32>>(&values, indices),
        Some(DataType::Binary) => interleave_bytes::<GenericBinaryType<i32>>(&values, indices),
        _ => arrow::compute::interleave(&values, indices),
    }
}

Describe the solution you'd like

  1. i would like to introduce memory prefetching tech in arrow-rs. since not all scenarios get benefit from prefetching, i suggest adding another kernel function like interleave_with_memory_prefetching so we don't break current implementation.
  2. prefetch_read_data is still unstable, i'm not sure how we can use it in stable rust.

Describe alternatives you've considered

Additional context

tustvold commented 4 days ago

I would have expected the compiler to be doing this for us, I wonder if the capacity checks in extend_from_slice are throwing this off. Given we've already determined the buffer capacity, perhaps we could use unchecked variants?

i'm not sure how we can use it in stable rust.

I think this is probably a hard blocker, we really don't want to be depending on unstable features if we can avoid it

usamoi commented 3 days ago

prefetch_read_data is exposed llvm intrinsics and it's not intended to be used by users, but prefetch_read_data(_, 3) could be implemented with arch-specific intrinsics.

pub fn prefetch_read_data_locality_3(data: *const ()) {
    #[cfg(target_arch = "x86")]
    unsafe {
        core::arch::x86::_mm_prefetch(data.cast(), core::arch::x86::_MM_HINT_T0);
    }
    #[cfg(target_arch = "x86_64")]
    unsafe {
        core::arch::x86_64::_mm_prefetch(data.cast(), core::arch::x86_64::_MM_HINT_T0);
    }}
    // `core::arch::aarch64::_prefetch` is unstable,
    // tracked in https://github.com/rust-lang/rust/issues/117217.
    // Inline assembly may be used here.
}