apache / brpc

brpc is an Industrial-grade RPC framework using C++ Language, which is often used in high performance system such as Search, Storage, Machine learning, Advertisement, Recommendation etc. "brpc" means "better RPC".
https://brpc.apache.org
Apache License 2.0
16.54k stars 3.97k forks source link

在bthread_start_background(...)创建bthread后,在内部起pthread是定义良好的行为吗,pthread执行中使用CountdownEvent涉及butex_wait是走pthread还是bthread的逻辑呢? #2818

Open imdouyu opened 6 days ago

imdouyu commented 6 days ago

Describe the bug (描述bug)


boost::lockfree::queue queue;

class Producer {
public:
  Producer(int worker_size) {
    if (bthread_start_background(&bid_, NULL, call_back, nullptr) != 0) {
      cout << "Fail to create bthread" << endl;
    }
    // init worker
  }

  void *call_back() {
    foo();
    return nullptr;
  }

  void foo() {
    boost::asio::thread_pool pool(3);
    while (!bthread_stopped(bid_)) {
      DataPtr data;
      data = std::move(kafka->consume());
      boost::asio::post(pool, [data]() { bar(data); });
    }
    pool.join();
  }

  void bar(DataPtr data) {
    auto idx = rand();
    while (true) {
      auto &worker = workers[idx % workers.size()];
      if (worker->Submit(data)) {
        break;
      } else {
        idx++;
      }
    }
  }
  bthread_t bid_;
  vector<WorkerPtr> workers;
};

class Worker {
public:
  Worker(int batch_size) {
    // init member
  }
  bool Submit(DataPtr data) {
    auto idx = index.fetch_add(1);
    if (idx < batch_size) {
      extract(data, idx);
      return true;
    } else if (idx == batch_size) {
      batch_count_down_event.wait();
      queue.push(std::move(batch_data));
      batch_data.resize(batch_size);
      index = 0;
      batch_count_down_event.reset(batch_size);
      return false;
    }
    return false;
  }

  void extractData(DataPtr data, int idx) {
    batch_data[idx] = do_sth(data);
    batch_count_down_event.signal(1);
  }

  int batch_size;
  vector<DataProcessed> batch_data;
  bthread::CountdownEvent batch_count_down_event;
  std::atomic_uint64_t index{0};
};

int main() { producer = new Producer(); }

To Reproduce (复现方法)

Expected behavior (期望行为)

Versions (各种版本) OS: Compiler: brpc: protobuf:

Additional context/screenshots (更多上下文/截图)

Huixxi commented 5 days ago

辛苦贴下具体代码

imdouyu commented 3 days ago

辛苦贴下具体代码

已更新

chenBright commented 3 days ago
  1. bthread内可以起pthread。但是pthread_join会阻塞worker线程,慎用。
  2. bthread同步原语支持bthread和pthread。在pthread执行的是pthread的同步逻辑。
imdouyu commented 2 days ago
  1. bthread内可以起pthread。但是pthread_join会阻塞worker线程,慎用。
  2. bthread同步原语支持bthread和pthread。在pthread执行的是pthread的同步逻辑。

这里说的worker线程指bthread运行的线程吗

chenBright commented 2 days ago

是的