annidy / notes

0 stars 0 forks source link

内存乱序 #297

Open annidy opened 1 month ago

annidy commented 1 month ago

在并行开发时,由于每一个CPU core都有一个自己的cache,在修改内存地址时,CPU core倾向于先写到自己的cache中,然后再同步到内存里。和所有的缓存一样,都可能遇到缓存失效问题,CPU内部有一套复杂的同步机制,前提是使用了正确的指令。一般情况下,CPU只保证在当前core里,代码是按顺序执行的,多线程看到代码执行顺序可能会发生了变化。

http://preshing.com/20120515/memory-reordering-caught-in-the-act/ 给了一个能跑的例子:

#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>

// Set either of these to 1 to prevent CPU reordering
#define USE_CPU_FENCE              0
#define USE_SINGLE_HW_THREAD       0  // Supported on Linux, but not Cygwin or PS3

#if USE_SINGLE_HW_THREAD
#include <sched.h>
#endif

//-------------------------------------
//  MersenneTwister
//  A thread-safe random number generator with good randomness
//  in a small number of instructions. We'll use it to introduce
//  random timing delays.
//-------------------------------------
#define MT_IA  397
#define MT_LEN 624

class MersenneTwister
{
    unsigned int m_buffer[MT_LEN];
    int m_index;

public:
    MersenneTwister(unsigned int seed);
    // Declare noinline so that the function call acts as a compiler barrier:
    unsigned int integer() __attribute__((noinline));
};

MersenneTwister::MersenneTwister(unsigned int seed)
{
    // Initialize by filling with the seed, then iterating
    // the algorithm a bunch of times to shuffle things up.
    for (int i = 0; i < MT_LEN; i++)
        m_buffer[i] = seed;
    m_index = 0;
    for (int i = 0; i < MT_LEN * 100; i++)
        integer();
}

unsigned int MersenneTwister::integer()
{
    // Indices
    int i = m_index;
    int i2 = m_index + 1; if (i2 >= MT_LEN) i2 = 0; // wrap-around
    int j = m_index + MT_IA; if (j >= MT_LEN) j -= MT_LEN; // wrap-around

    // Twist
    unsigned int s = (m_buffer[i] & 0x80000000) | (m_buffer[i2] & 0x7fffffff);
    unsigned int r = m_buffer[j] ^ (s >> 1) ^ ((s & 1) * 0x9908B0DF);
    m_buffer[m_index] = r;
    m_index = i2;

    // Swizzle
    r ^= (r >> 11);
    r ^= (r << 7) & 0x9d2c5680UL;
    r ^= (r << 15) & 0xefc60000UL;
    r ^= (r >> 18);
    return r;
}

//-------------------------------------
//  Main program, as decribed in the post
//-------------------------------------
sem_t beginSema1;
sem_t beginSema2;
sem_t endSema;

int X, Y;
int r1, r2;

void *thread1Func(void *param)
{
    MersenneTwister random(1);
    for (;;)
    {
        sem_wait(&beginSema1);  // Wait for signal
        while (random.integer() % 8 != 0) {}  // Random delay

        // ----- THE TRANSACTION! -----
        X = 1;
#if USE_CPU_FENCE
        asm volatile("mfence" ::: "memory");  // Prevent CPU reordering
#else
        asm volatile("" ::: "memory");  // Prevent compiler reordering
#endif
        r1 = Y;

        sem_post(&endSema);  // Notify transaction complete
    }
    return NULL;  // Never returns
};

void *thread2Func(void *param)
{
    MersenneTwister random(2);
    for (;;)
    {
        sem_wait(&beginSema2);  // Wait for signal
        while (random.integer() % 8 != 0) {}  // Random delay

        // ----- THE TRANSACTION! -----
        Y = 1;
#if USE_CPU_FENCE
        asm volatile("mfence" ::: "memory");  // Prevent CPU reordering
#else
        asm volatile("" ::: "memory");  // Prevent compiler reordering
#endif
        r2 = X;

        sem_post(&endSema);  // Notify transaction complete
    }
    return NULL;  // Never returns
};

int main()
{
    // Initialize the semaphores
    sem_init(&beginSema1, 0, 0);
    sem_init(&beginSema2, 0, 0);
    sem_init(&endSema, 0, 0);

    // Spawn the threads
    pthread_t thread1, thread2;
    pthread_create(&thread1, NULL, thread1Func, NULL);
    pthread_create(&thread2, NULL, thread2Func, NULL);

#if USE_SINGLE_HW_THREAD
    // Force thread affinities to the same cpu core.
    cpu_set_t cpus;
    CPU_ZERO(&cpus);
    CPU_SET(0, &cpus);
    pthread_setaffinity_np(thread1, sizeof(cpu_set_t), &cpus);
    pthread_setaffinity_np(thread2, sizeof(cpu_set_t), &cpus);
#endif

    // Repeat the experiment ad infinitum
    int detected = 0;
    for (int iterations = 1; ; iterations++)
    {
        // Reset X and Y
        X = 0;
        Y = 0;
        // Signal both threads
        sem_post(&beginSema1);
        sem_post(&beginSema2);
        // Wait for both threads
        sem_wait(&endSema);
        sem_wait(&endSema);
        // Check if there was a simultaneous reorder
        if (r1 == 0 && r2 == 0)
        {
            detected++;
            printf("%d reorders detected after %d iterations\n", detected, iterations);
        }
    }
    return 0;  // Never returns
}

printf的打印非常频繁,也就是说乱序出现的概率非常高。

这段代码可以在macOS上编译,但不能运行。原因是macOS不支持匿名信号量sem_init,必须用sem_open代替

1 reorders detected after 1296 iterations
2 reorders detected after 8094 iterations
3 reorders detected after 8987 iterations
4 reorders detected after 9346 iterations
5 reorders detected after 10332 iterations
6 reorders detected after 15332 iterations
7 reorders detected after 19589 iterations
8 reorders detected after 22191 iterations

当开启USE_CPU_FENCE后,会在修改r1、r2前,设置一个内存屏障,这样就能读到真实的值了。

现代C++原子操作

上面的问题使用atomic库也可解决


std::atomic<int> X, Y;
int r1, r2;  // 在这个case里,r1和r2可以不原子

原先

X = 1; 
r1 = Y;

反汇编后的指令是

mov        eax, 0x1
xchg       dword [_X], eax ; _X
mov        eax, dword [_Y] ; _Y
mov        dword [_r1], eax ; _r1

对比不使用atomic,生成的指令

mov        dword [_X], 0x1 ; _X
mov        eax, dword [_Y] ; _Y
mov        dword [_r1], eax ; _r1

仅有一处不同。xchg指令看似能绕过core的缓存,直接操作内存

使用锁也可以解决上面的问题


std::mutex mtx;
线程内部
mtx.lock();
X = 1;
r1 = Y;
mtx.unlock();

反汇编后的代码

call       imp___stubs___ZNSt3__15mutex4lockEv ; std::__1::mutex::lock()
mov        dword [_X], 0x1 ; _X
mov        eax, dword [_Y] ; _Y
mov        dword [_r1], eax ; _r1
mov        rdi, r15
call       imp___stubs___ZNSt3__15mutex6unlockEv ; std::__1::mutex::unlock()

虽然还是mov指令,但是这个系统调用会产生一个内存屏障,等效于最初的方案。