bmatsuo / lmdb-go

Bindings for the LMDB C library
BSD 3-Clause "New" or "Revised" License
157 stars 59 forks source link

env.Update don't block other goroutine's env.Update? #84

Closed ghost closed 8 years ago

ghost commented 8 years ago

I my mind, I thought write txn is something like exclusive lock, it'll block other threads/processes to write db, but in my code, before a write txn exit, another goroutine get a write txn, I don't know why. Below is my code:

func (t *lmdbTopic) persistMessages(msgs []*Message) {
    isFull := false
    err := t.queueEnv.Update(func(txn *lmdb.Txn) error {
        log.Printf("In persistMessages, gid: %d get Update Txn", GoID())
        offset := t.persistedOffset(txn)
        log.Printf("In persistMessages, gid: %d, offset: %d", GoID(), offset)
        offset, err := t.persistToPartitionDB(offset, msgs)
        if err == nil {
            log.Printf("In persistMessages, gid: %d, update offset: %d", GoID(), offset)
            t.updatePersistOffset(txn, offset)
            log.Printf("In persistMessages, gid: %d will release Update Txn", GoID())
            return nil
        }
        return err
    })
    if err == nil {
        return
    }
    if lmdb.IsMapFull(err) {
        isFull = true
    } else {
        panic(err)
    }
    if isFull {
        log.Printf("In persistMessages, gid: %d will rotate persist partititon", GoID())
        t.rotatePersistPartition()
        t.persistMessages(msgs)
    }
}

Below is my log:

2016/08/31 16:23:56 In persistMessages, gid: 152 get Update Txn, t.queueEnv's addr: 0xc420356100 2016/08/31 16:23:56 In persistMessages, gid: 152, offset: 27394 2016/08/31 16:23:56 In persistMessages, gid: 152, update offset: 27395 2016/08/31 16:23:56 In persistMessages, gid: 152 will release Update Txn 2016/08/31 16:23:56 In persistMessages, gid: 152 get Update Txn, t.queueEnv's addr: 0xc420356100 2016/08/31 16:23:56 In persistMessages, gid: 152, offset: 27395 2016/08/31 16:23:56 In persistMessages, gid: 40 get Update Txn, t.queueEnv's addr: 0xc420356110 2016/08/31 16:23:56 In persistMessages, gid: 152, update offset: 27396 2016/08/31 16:23:56 In persistMessages, gid: 40, offset: 27395 2016/08/31 16:23:56 In persistMessages, gid: 152 will release Update Txn panic: mdb_put: MDB_KEYEXIST: Key/data pair already exists

From log I see goroutine 40 call queueEnv.Update before goroutine 152 exit queueEnv.Update.

ghost commented 8 years ago

I write a sample c code:

#include "lmdb.h"
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

typedef struct {
  MDB_env *env;
  MDB_dbi dbi;
  MDB_val key, val;
  MDB_txn *txn;
} producer;

void* run(void *arg) {
  const char *path = arg;
  producer *p = calloc(sizeof(*p), 1);
  int err = mdb_env_create(&p->env);
  if (err != 0) {
    fprintf(stderr, "mdb_env_create failed: %s\n", mdb_strerror(err));
  }
  err = mdb_env_set_maxreaders(p->env, 100);
  if (err != 0) {
    fprintf(stderr, "mdb_env_set_maxreaders failed: %s\n", mdb_strerror(err));
  }
  err = mdb_env_set_mapsize(p->env, 1048576000);
  if (err != 0) {
    fprintf(stderr, "mdb_env_set_mapsize failed: %s\n", mdb_strerror(err));
  }
  err = mdb_env_open(p->env, "__meta__", MDB_NOSYNC|MDB_NOSUBDIR, 0644);
  if (err != 0) {
    fprintf(stderr, "mdb_env_open failed: %s\n", mdb_strerror(err));
  }
  err = mdb_txn_begin(p->env, NULL, 0, &p->txn);
  fprintf(stderr, "tid: %d get txn\n", (int)pthread_self());
  if (err != 0) {
    fprintf(stderr, "mdb_txn_open failed: %s\n", mdb_strerror(err));
  }
  mdb_dbi_open(p->txn, NULL, 0, &p->dbi);
  if (err != 0) {
    fprintf(stderr, "mdb_dbi_open failed: %s\n", mdb_strerror(err));
  }

  sleep(5);

  mdb_txn_commit(p->txn);
  fprintf(stderr, "tid: %d close txn\n", (int)pthread_self());

  return NULL;
}

int main() {
  pthread_t tids[4];
  int i = 0;
  for (; i < 4; i++) {
    int err = pthread_create(&tids[i], NULL, run, (void *)"/Users/zwb/workspace/CProjects/test-lmdb");
    if (err != 0) {
      fprintf(stderr, "pthread_create failed: %s\n", mdb_strerror(err));
    }
  }

  for (i=0; i < 4; i++) {
    int err = pthread_join(tids[i], NULL);
    if (err != 0) {
      fprintf(stderr, "pthread_join failed: %s\n", mdb_strerror(err));
    }
  }
}

Below is the output:

tid: 37556224 get txn tid: 37556224 close txn tid: 37019648 get txn tid: 37019648 close txn tid: 38629376 get txn tid: 38629376 close txn tid: 38092800 get txn tid: 38092800 close txn

This looks ok, is there anything wrong with my golang code?