rust-lang / rust

Empowering everyone to build reliable and efficient software.
https://www.rust-lang.org
Other
98.34k stars 12.72k forks source link

lock free ringbuffer can't work on --release mode #103916

Closed soloist-v closed 2 years ago

soloist-v commented 2 years ago

I tried this code:

use std::path::Display;
use std::sync::Arc;

#[derive(Debug)]
pub struct RingBuffer<T, const m_size: usize> {
    idx_head: usize,
    idx_tail: usize,
    m_data: [T; m_size],
}

pub trait Queue<T> {
    fn new_empty() -> Self;
    fn push(&mut self, value: T) -> bool;
    fn pop(&mut self) -> Option<&T>;
    fn is_full(&self) -> bool;
    fn is_empty(&self) -> bool;
}

pub trait Init<T, F: FnMut(usize) -> T> {
    fn new(initializer: F) -> Self;
}

impl<T, const Size: usize, F: FnMut(usize) -> T> Init<T, F> for RingBuffer<T, Size> {
    fn new(initializer: F) -> Self {
        RingBuffer::<T, Size> {
            idx_head: 0,
            idx_tail: 0,
            m_data: array_init::array_init(initializer),
        }
    }
}

impl<T, const Size: usize> Queue<T> for RingBuffer<T, Size>
{
    fn new_empty() -> Self {
        RingBuffer::<T, Size> {
            idx_head: 0,
            idx_tail: 0,
            m_data: array_init::array_init(|_| {
                unsafe {
                    std::mem::zeroed()
                }
            }),
        }
    }

    fn push(&mut self, value: T) -> bool {
        let mut head = self.idx_head + 1;
        if head == Size {
            head = 0;
        }
        if head == self.idx_tail {
            return false;
        }
        self.m_data[self.idx_head] = value;
        self.idx_head = head;
        return true;
    }

    fn pop(&mut self) -> Option<&T> {
        let mut tail = self.idx_tail;
        if self.idx_head == tail {
            return None;
        }
        let res = &self.m_data[tail];
        tail += 1;
        if tail == Size {
            tail = 0;
        }
        self.idx_tail = tail;
        return Some(res);
    }

    fn is_full(&self) -> bool {
        self.idx_tail == (self.idx_head + 1) % Size
    }

    fn is_empty(&self) -> bool {
        self.idx_head == self.idx_tail
    }
}

impl<T, const Size: usize> RingBuffer<T, Size> {
    pub fn clear(&mut self) {
        self.m_data = array_init::array_init(|_| {
            unsafe {
                std::mem::zeroed()
            }
        });
        self.idx_head = 0;
        self.idx_tail = 0;
    }
    pub fn front(&self) -> usize {
        self.idx_head
    }
    pub fn rear(&self) -> usize {
        self.idx_tail
    }
    pub fn size() -> usize {
        Size
    }
}

pub struct SharedRingBuffer<T, const m_size: usize> {
    pub ringbuffer: Arc<RingBuffer<T, m_size>>,
}

impl<T, const Size: usize> Clone for SharedRingBuffer<T, Size> {
    fn clone(&self) -> Self {
        // println!("clone before: {:p}", &(self.ringbuffer.idx_head));
        let res = Self {
            ringbuffer: self.ringbuffer.clone(),
        };
        // println!("clone after: {:p}", &res.ringbuffer.idx_head);
        res
    }
}

impl<T, const Size: usize, F: FnMut(usize) -> T> Init<T, F> for SharedRingBuffer<T, Size> {
    fn new(initializer: F) -> Self {
        Self {
            ringbuffer: Arc::new(RingBuffer::<T, Size>::new(initializer)),
        }
    }
}

impl<T, const Size: usize, > Queue<T> for SharedRingBuffer<T, Size> {
    fn new_empty() -> Self {
        Self {
            ringbuffer: Arc::new(RingBuffer::<T, Size>::new_empty()),
        }
    }

    fn push(&mut self, value: T) -> bool {
        unsafe {
            (*Arc::get_mut_unchecked(&mut self.ringbuffer)).push(value)
        }
    }

    fn pop(&mut self) -> Option<&T> {
        unsafe {
            (*Arc::get_mut_unchecked(&mut self.ringbuffer)).pop()
        }
    }

    fn is_full(&self) -> bool {
        self.ringbuffer.is_full()
    }

    fn is_empty(&self) -> bool {
        self.ringbuffer.is_empty()
    }
}
//////////////////////////////////TEST///////////////////////////////////////////
fn test_speed1() {
    let mut q: SharedRingBuffer<i32, 8> = SharedRingBuffer::new_empty();
    let mut t0 = std::time::SystemTime::now();
    let t = {
        let mut q = q.clone();
        std::thread::spawn(move || {
            loop {
                let t = match q.pop() {
                    None => {
                        // std::thread::sleep(Duration::from_millis(10)); // if enable this. Everything will be ok.
                        continue;
                    }
                    Some(res) => res
                };
                if *t == -1 {
                    break;
                }
                std::thread::sleep(Duration::from_millis(1));
            }
            let now = std::time::SystemTime::now();
            println!("res: {}", now.duration_since(t0).unwrap().as_millis());
        })
    };
    for i in 0..99 {
        loop {
            if q.push(i) {
                // std::thread::sleep(Duration::from_millis(10)); // if enable this. Everything will be ok.
                break;
            }
        }
    }
    q.push(-1);
    t.join().unwrap();
}

In debug mode, it can work well. But in release mode, it is blocked forever.

I expected to see this happen: explanation

It should be print 'res: deltatime '

Instead, this happened: explanation

Aactually it is blocked forever. Note: If you addition any print code or time-consuming code it will work well.

Meta

rustc --version --verbose:

rustc 1.67.0-nightly (95a3a7277 2022-10-31)
binary: rustc
commit-hash: 95a3a7277b44bbd2dd3485703d9a05f64652b60e
commit-date: 2022-10-31
host: x86_64-pc-windows-msvc
release: 1.67.0-nightly
LLVM version: 15.0.4
Backtrace

``` ```

Noratrieb commented 2 years ago

Your code contains undefined behavior (a data race), as you mutate your ring buffer from several threads concurrently without synchronization (you break the invariants of Arc::get_unchecked). As this is undefined behavior, there are no restrictions on the behavior of the program, so it hanging is not a surprise (and a rather positive result, it could cause memory corruption instead which would arguably be worse).

As a sidenote, using mem::zeroed to create arbitrary T is unsound as well.

soloist-v commented 2 years ago

I seem to understand. But how can I modify a structure in two threads at the same time ? If Arc is not used

Noratrieb commented 2 years ago

You can use atomics for that.

Rageking8 commented 2 years ago

Just a gentle reminder that such generic questions should not be asked in this repo's issue tracker.

Please ask in one of the following places: https://discord.gg/rust-lang/ https://discord.gg/rust-lang-community/ https://users.rust-lang.org/ https://rust-lang.zulipchat.com/

Thanks.

soloist-v commented 2 years ago

You can use atomics for that.

There is a cost to atomic, but ringbuffer does not need any atomic operation. When only one thread writes and another thread reads, ringbuffer can ensure security,Therefore, I want the simplest and most direct implementation of ringbuffer

Noratrieb commented 2 years ago

Atomics are fairly cheap. Using them is required here. You cannot concurrently read/write to the same memory without synchronization, it's not allowed and leads to problems like the one in the issue.

soloist-v commented 2 years ago

This is actually caused by the CPU cache,The solution is as follows:

fn push(&mut self, value: T) -> bool {
        let mut head  = unsafe {
            std::ptr::read_volatile(&self.idx_head) + 1
        };
        let tail = unsafe {
            std::ptr::read_volatile(&self.idx_tail)
        };
        if head == Size {
            head = 0;
        }
        if head == tail {
            return false;
        }
        self.m_data[self.idx_head] = value;
        unsafe {
            std::ptr::write_volatile(&mut self.idx_head, head);
        }
        return true;
    }

    fn pop(&mut self) -> Option<&T> {
        let mut tail = unsafe {
            std::ptr::read_volatile(&self.idx_tail)
        };
        let head  = unsafe {
            std::ptr::read_volatile(&self.idx_head)
        };
        if head == tail {
            return None;
        }
        let res = &self.m_data[tail];
        tail += 1;
        if tail == Size {
            tail = 0;
        }
        unsafe {
            std::ptr::write_volatile(&mut self.idx_tail, tail);
        }
        return Some(res);
    }
Noratrieb commented 2 years ago

This is still undefined behavior. You have to use atomics instead of volatile. While this code will "work" on x86 CPUs, it may break on CPUs with weaker memory orderings like ARM. Or it could also break just because the compiler decides to have a fun day and compile it in weird broken ways, as it's totally allowed to do anything to code with undefined behavior.

Also if you benchmark it with atomics you should find out that there isn't really much overhead on x86, probably no overhead at all.

thomcc commented 2 years ago

There is a cost to atomic

In rust atomics are just a way to communicate to the compiler that you are performing concurrent operations to a varible. It is not true that there is always cost to them -- in particular, you are likely thinking of read-modify-write atomic operations, which are costly and are not needed in many single-producer/single-consumer (SPSC) algorithms. This is not (necessarily) what people are talking about when they are telling you to use atomics.

Concretely, depending on your choice of Ordering and architecture you run on, atomic loads/stores that you would not pay for normal load/store.

If you choose a strict ordering, there may be a memory fence which has a cost, but to implement SPSC algorithms, you may not need this. On x86-family, there is no cost to atom.load(Ordering::Acquire), atom.store(v, Ordering::Release), for example, which may be enough to implement your ringbuffer (if it's SPSC, this is plausible).

but ringbuffer does not need any atomic operation

Fundamentally, this is not true, as you've found. What may be true is you may not need any read-modify-write atomic operations, or fences. If this is the case, there would be no cost with using AtomicUsize with load(Ordering::Acquire) and store(Ordering::Release).

But to be clear I have not looked at your algorithm, so this may not be sufficient, I don't know.

Note that many classic algorithms from textbooks are hopelessly broken on modern CPUs and compilers without addition of very strong expensive fences, and a great deal of old code is broken in similar ways.

soloist-v commented 2 years ago

Thanks for your professional answer, I will reimplement using atomics.