pmem / pmemstream

Other
9 stars 13 forks source link

Failing tests with TSAN feature #235

Open KFilipek opened 2 years ago

KFilipek commented 2 years ago

ISSUE: Failing tests with TSAN feature

Environment Information

and possibly:

Please provide a reproduction of the bug:

Enabled TSAN in CMake and ctest -R "_none"

How often bug is revealed:

always

Actual behavior:

         40 - concurrent_async_wait_0_none (Failed)
         43 - concurrent_iterate_0_none (Failed)
         44 - concurrent_iterate_with_append_0_none (Failed)
         50 - publish_append_async_0_none (Failed)
         51 - region_runtime_initialize_0_none (Failed)
         68 - singly_linked_list_pmreorder_negative_none (Failed)
         81 - example-05_timestamp_based_order_0_none (Failed)

Expected behavior:

Pika tests image

Details

Additional information about Priority and Help Requested:

Are you willing to submit a pull request with a proposed change? (Yes, No)

Requested priority: Low

igchor commented 2 years ago

@KFilipek Can you create PR/branch with TSAN enabled on CI where those failures would be visible in logs?

igchor commented 2 years ago

After: https://github.com/pmem/pmemstream/pull/236 there is only one problem remaining (which can cause some tests to fail).

miniasync does not annotate storing completion flag inside data_mover_threads.c: https://github.com/pmem/miniasync/blob/master/src/data_mover_threads.c#L168 and also, this load: https://github.com/pmem/miniasync/blob/master/src/core/ringbuf.c#L228 should be atomic

All tests are passing when following patch is applied to miniasync (possibly some annotations are not necessary):

diff --git a/CMakeLists.txt b/CMakeLists.txt
index ef221cd..01482c3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -110,6 +110,8 @@ if(USE_UBSAN)
    add_sanitizer_flag(undefined)
 endif()

+add_sanitizer_flag(thread)
+
 if(COVERAGE)
        set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -coverage")
 endif()
diff --git a/src/core/ringbuf.c b/src/core/ringbuf.c
index d834d86..8167c21 100644
--- a/src/core/ringbuf.c
+++ b/src/core/ringbuf.c
@@ -149,8 +149,10 @@ ringbuf_enqueue_atomic(struct ringbuf *rbuf, void *data)
 {
    LOG(4, NULL);

+   __tsan_release(&rbuf->write_pos_padded.write_pos);
    size_t w = util_fetch_and_add64(&rbuf->write_pos_padded.write_pos, 1)
        & rbuf->len_mask;
+   __tsan_acquire(&rbuf->write_pos_padded.write_pos);

    ASSERT(rbuf->running);

@@ -162,6 +164,7 @@ ringbuf_enqueue_atomic(struct ringbuf *rbuf, void *data)
    while (!util_bool_compare_and_swap64(&rbuf->data[w], NULL, data))
        ;

+   __tsan_release(&rbuf->data[w]);
    VALGRIND_ANNOTATE_HAPPENS_BEFORE(&rbuf->data[w]);
 }

@@ -214,8 +217,10 @@ ringbuf_dequeue_atomic(struct ringbuf *rbuf)
 {
    LOG(4, NULL);

+   __tsan_release(&rbuf->read_pos_padded.read_pos);
    size_t r = util_fetch_and_add64(&rbuf->read_pos_padded.read_pos, 1)
        & rbuf->len_mask;
+   __tsan_acquire(&rbuf->read_pos_padded.read_pos);
    /*
     * Again, in most cases, there won't be even a single loop, but if one
     * thread stalls while others perform work, it might happen that two
@@ -223,10 +228,13 @@ ringbuf_dequeue_atomic(struct ringbuf *rbuf)
     */
    void *data = NULL;

+   __tsan_acquire(&rbuf->data[r]);
    VALGRIND_ANNOTATE_HAPPENS_AFTER(&rbuf->data[r]);
    do {
-       while ((data = rbuf->data[r]) == NULL)
+       while (data == NULL) {
+           util_atomic_load_explicit64(&rbuf->data[r], &data, __ATOMIC_RELAXED);
            __sync_synchronize();
+       }
    } while (!util_bool_compare_and_swap64(&rbuf->data[r], data, NULL));

    return data;
diff --git a/src/data_mover_threads.c b/src/data_mover_threads.c
index 5350ce6..e4fea11 100644
--- a/src/data_mover_threads.c
+++ b/src/data_mover_threads.c
@@ -18,6 +18,8 @@
 #define DATA_MOVER_THREADS_DEFAULT_NTHREADS 12
 #define DATA_MOVER_THREADS_DEFAULT_RINGBUF_SIZE 128

+#include <sanitizer/tsan_interface.h>
+
 #define SUPPORTED_FLAGS 0

 struct data_mover_threads_op_fns {
@@ -126,6 +128,7 @@ data_mover_threads_do_operation(struct data_mover_threads_data *data,
    if (data->desired_notifier == FUTURE_NOTIFIER_WAKER) {
        FUTURE_WAKER_WAKE(&data->notifier.waker);
    }
+   __tsan_release(&data->complete);
    util_atomic_store_explicit64(&data->complete, 1, memory_order_release);
 }

@@ -165,6 +168,7 @@ data_mover_threads_operation_check(void *data,
    struct data_mover_threads_data *tdata = data;

    uint64_t complete;
+   __tsan_acquire(&tdata->complete);
    util_atomic_load_explicit64(&tdata->complete,
        &complete, memory_order_acquire);
    if (complete)
KFilipek commented 2 years ago

Added missing header (implicit declaration of function)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index ef221cd..01482c3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -110,6 +110,8 @@ if(USE_UBSAN)
    add_sanitizer_flag(undefined)
 endif()

+add_sanitizer_flag(thread)
+
 if(COVERAGE)
        set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -coverage")
 endif()
diff --git a/src/core/ringbuf.c b/src/core/ringbuf.c
index d834d86..8f8d0d0 100644
--- a/src/core/ringbuf.c
+++ b/src/core/ringbuf.c
@@ -12,6 +12,7 @@
 #pragma warning(disable : 4127)
 #endif

+#include <sanitizer/tsan_interface.h>
 #include "core/valgrind_internal.h"

 #include "ringbuf.h"
@@ -149,8 +150,10 @@ ringbuf_enqueue_atomic(struct ringbuf *rbuf, void *data)
 {
    LOG(4, NULL);

+   __tsan_release(&rbuf->write_pos_padded.write_pos);
    size_t w = util_fetch_and_add64(&rbuf->write_pos_padded.write_pos, 1)
        & rbuf->len_mask;
+   __tsan_acquire(&rbuf->write_pos_padded.write_pos);

    ASSERT(rbuf->running);

@@ -162,6 +165,7 @@ ringbuf_enqueue_atomic(struct ringbuf *rbuf, void *data)
    while (!util_bool_compare_and_swap64(&rbuf->data[w], NULL, data))
        ;

+   __tsan_release(&rbuf->data[w]);
    VALGRIND_ANNOTATE_HAPPENS_BEFORE(&rbuf->data[w]);
 }

@@ -214,8 +218,10 @@ ringbuf_dequeue_atomic(struct ringbuf *rbuf)
 {
    LOG(4, NULL);

+   __tsan_release(&rbuf->read_pos_padded.read_pos);
    size_t r = util_fetch_and_add64(&rbuf->read_pos_padded.read_pos, 1)
        & rbuf->len_mask;
+   __tsan_acquire(&rbuf->read_pos_padded.read_pos);
    /*
     * Again, in most cases, there won't be even a single loop, but if one
     * thread stalls while others perform work, it might happen that two
@@ -223,10 +229,13 @@ ringbuf_dequeue_atomic(struct ringbuf *rbuf)
     */
    void *data = NULL;

+   __tsan_acquire(&rbuf->data[r]);
    VALGRIND_ANNOTATE_HAPPENS_AFTER(&rbuf->data[r]);
    do {
-       while ((data = rbuf->data[r]) == NULL)
+       while (data == NULL) {
+           util_atomic_load_explicit64(&rbuf->data[r], &data, __ATOMIC_RELAXED);
            __sync_synchronize();
+       }
    } while (!util_bool_compare_and_swap64(&rbuf->data[r], data, NULL));

    return data;
diff --git a/src/data_mover_threads.c b/src/data_mover_threads.c
index 5350ce6..e4fea11 100644
--- a/src/data_mover_threads.c
+++ b/src/data_mover_threads.c
@@ -18,6 +18,8 @@
 #define DATA_MOVER_THREADS_DEFAULT_NTHREADS 12
 #define DATA_MOVER_THREADS_DEFAULT_RINGBUF_SIZE 128

+#include <sanitizer/tsan_interface.h>
+
 #define SUPPORTED_FLAGS 0

 struct data_mover_threads_op_fns {
@@ -126,6 +128,7 @@ data_mover_threads_do_operation(struct data_mover_threads_data *data,
    if (data->desired_notifier == FUTURE_NOTIFIER_WAKER) {
        FUTURE_WAKER_WAKE(&data->notifier.waker);
    }
+   __tsan_release(&data->complete);
    util_atomic_store_explicit64(&data->complete, 1, memory_order_release);
 }

@@ -165,6 +168,7 @@ data_mover_threads_operation_check(void *data,
    struct data_mover_threads_data *tdata = data;

    uint64_t complete;
+   __tsan_acquire(&tdata->complete);
    util_atomic_load_explicit64(&tdata->complete,
        &complete, memory_order_acquire);
    if (complete)