darkautism / lfqueue

Minimize lock-free queue ever!
Do What The F*ck You Want To Public License
131 stars 27 forks source link

having memory crash while multiple dequeue #9

Closed femady closed 6 years ago

femady commented 6 years ago
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <sys/time.h>
#include <stdio.h>
#include "lfq.h"

void one_enq_and_multi_deq(pthread_t *threads);
void*  worker_s(void *);
void*  worker_c(void *);

struct timeval  tv1, tv2;
#define total_put 50000
int nthreads = 4; //sysconf(_SC_NPROCESSORS_ONLN); // Linux
int one_thread = 1;
int nthreads_exited = 0;
//lfqueue_t *myq;
struct lfq_ctx *myq;

void*  worker_c(void *arg) {
    int i = 0;
    int *int_data;
    int total_loop = total_put * (*(int*)arg);
    while (i++ < total_loop) {
        /*Dequeue*/
        while ((int_data = lfq_dequeue(myq)) == 0) {

        }
        //  printf("%d\n", *int_data);

        free(int_data);
    }
    __sync_add_and_fetch(&nthreads_exited, 1);
    return 0;
}

/** Worker Keep Sending at the same time, do not try instensively **/
void*  worker_s(void *arg)
{
    int i = 0, *int_data;
    int total_loop = total_put * (*(int*)arg);
    while (i++ < total_loop) {
        int_data = (int*)malloc(sizeof(int));
        assert(int_data != NULL);
        *int_data = i;
        /*Enqueue*/

        while (lfq_enqueue(myq, int_data)) {
            // printf("ENQ FULL?\n");
        }
    }
    // __sync_add_and_fetch(&nthreads_exited, 1);
    return 0;
}

#define detach_thread_and_loop \
for (i = 0; i < nthreads; i++)\
pthread_detach(threads[i]);\
while ( nthreads_exited < nthreads ) \
    usleep(2000);\
if(myq->count != 0){\
usleep(2000);\
printf("current size= %d\n", myq->count);\
}

void one_enq_and_multi_deq(pthread_t *threads) {
    printf("-----------%s---------------\n", "one_enq_and_multi_deq");
    int i;
    for (i = 0; i < nthreads; i++)
        pthread_create(threads + i, NULL, worker_c, &one_thread);

    worker_s(&nthreads);

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wimplicit-function-declaration"
    detach_thread_and_loop;
#pragma GCC diagnostic pop
}

int main(void)
{
    int n;

    myq = malloc(sizeof (struct lfq_ctx));
    lfq_init(myq, 4);

    for (n = 0; n < 1000; n++) {
        printf("Current running at =%d, ", n);
        nthreads_exited = 0;

        /* Spawn threads. */
        pthread_t threads[nthreads];
        printf("Using %d thread%s.\n", nthreads, nthreads == 1 ? "" : "s");
        printf("Total requests %d \n", total_put);
        gettimeofday(&tv1, NULL);

        one_enq_and_multi_deq(threads);
        //one_deq_and_multi_enq(threads);
        // multi_enq_deq(threads);

        gettimeofday(&tv2, NULL);
        printf ("Total time = %f seconds\n",
                (double) (tv2.tv_usec - tv1.tv_usec) / 1000000 +
                (double) (tv2.tv_sec - tv1.tv_sec));

        //getchar();
        usleep(1000);
        assert ( 0 == myq->count && "Error, all queue should be consumed but not");
    }
    lfq_clean(myq);
    free(myq);
    return 0;
}
darkautism commented 6 years ago

Thanks for your report. @pcordes 's code seems resolved this bug.

femady commented 6 years ago

image

Issue still existed. By the time you hazard pointer not found the pointer and going to free, at the same time the pointer has put into your hazard pointer.

darkautism commented 6 years ago

Your screenshot not like new code.

Newest lfq_dequeue_tid:

void * lfq_dequeue_tid(struct lfq_ctx *ctx, int tid ) {
    //int cn_runtimes = 0;
    volatile struct lfq_node *old_head, *new_head;
#if 1  // HP[tid] stuff is necessary for deallocation.  (but it's still not safe).
    do {

I still run this test code on latest code, but still no hit crash. Only crash on old test script. https://github.com/darkautism/lfqueue/pull/7#issuecomment-419326061

Did you mind put this testing code into this repository?

femady commented 6 years ago

Thanks for mentioned, i re-clone your newest project and run it. Issue Solved.