apache / brpc

brpc is an Industrial-grade RPC framework using C++ Language, which is often used in high performance system such as Search, Storage, Machine learning, Advertisement, Recommendation etc. "brpc" means "better RPC".
https://brpc.apache.org
Apache License 2.0
16.56k stars 3.98k forks source link

是否支持创建 "bthread 线程池" #2736

Closed utopia9527 closed 3 months ago

utopia9527 commented 3 months ago

Describe the bug (描述bug) 在使用brpc redis 时, 创建了一个 "bthread 线程池" 用于执行redis 读取操作, 当小流量时一切正常,如果加大并发量,最终都会卡死。 加大bthread 线程池的数量时,并发量会增大,但是最终还是会卡死。请问下是不支持 "bthread 线程池" 这种模式么

To Reproduce (复现方法)

Expected behavior (期望行为)

Versions (各种版本) OS: Compiler: brpc: protobuf:

Additional context/screenshots (更多上下文/截图)

yanglimingcn commented 3 months ago

https://github.com/apache/brpc/blob/master/docs/cn/bthread_tagged_task_group.md 这个能否满足?

utopia9527 commented 3 months ago

https://github.com/apache/brpc/blob/master/docs/cn/bthread_tagged_task_group.md 这个能否满足?

似乎不太一样。 顺带请问教在上述这种当并发量过大,bthread_worker_usage 打满后为什么恢复不了

yanglimingcn commented 3 months ago

打满了就是cpu用满了,扩展线程数?你这个场景打满了符合预期吗?

utopia9527 commented 3 months ago

打满了就是cpu用满了,扩展线程数?你这个场景打满了符合预期吗?

cpu没满,工作线程数满了,疑惑的是停止请求后,服务并不能恢复,还是直接卡死

yanglimingcn commented 3 months ago

那我听上去是应用方面的bug呢。否则这问题并发大了,大家都应该会触发到。

chenBright commented 3 months ago

可以用gdb看看worker线程都卡在哪

utopia9527 commented 3 months ago

可以用gdb看看worker线程都卡在哪 都卡在了 CallMethod这里。 下述线程池中的bthread锁和条件变量都换成 std锁、条件变量之后不会发生这种问题。

使用池子里的bthread 去进行redis rpc 调用,如果一开始给一个较小的并发调用,不会卡死,并发量增加时,直接卡死在这个方法(redis_channel->CallMethod(NULL, &cntl, &request, &response, NULL);) 不太清楚是否和使用了 std::future 有关

调试信息

#0  0x00007f2309b6952d in syscall () from /lib64/libc.so.6
#1  0x00007f230a491841 in std::__atomic_futex_unsigned_base::_M_futex_wait_until(unsigned int*, unsigned int, bool, std::chrono::duration<long, std::ratio<1l, 1l> >, std::chrono::duration<long, std::ratio<1l, 1000000000l> >) () from /lib64/libstdc++.so.6
#2  0x00000000004be95e in std::__atomic_futex_unsigned<2147483648u>::_M_load_and_test_until (this=0x7f22d05cb020, __assumed=0, __operand=1, __equal=true, __mo=std::memory_order_acquire, __has_timeout=false, __s=..., __ns=...) at /usr/include/c++/8/bits/atomic_futex.h:102
#3  0x00000000004aee85 in std::__atomic_futex_unsigned<2147483648u>::_M_load_and_test (this=0x7f22d05cb020, __assumed=0, __operand=1, __equal=true, __mo=std::memory_order_acquire) at /usr/include/c++/8/bits/atomic_futex.h:122
#4  0x000000000048f2c1 in std::__atomic_futex_unsigned<2147483648u>::_M_load_when_equal (__mo=std::memory_order_acquire, __val=1, this=0x7f22d05cb020) at /usr/include/c++/8/bits/atomic_futex.h:162
#5  std::__future_base::_State_baseV2::wait (this=0x7f22d05cb010) at /usr/include/c++/8/future:337
#6  0x00000000005a3577 in std::__basic_future<std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > >::_M_get_result (this=0x7f22d0028b80) at /usr/include/c++/8/future:717
#7  0x00000000005a272a in std::future<std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > >::get (this=0x7f22d0028b80) at /usr/include/c++/8/future:796

代码

// 往池子里加任务
template <class F, class... Args>
auto BThreadPool::enqueue(F&& f, Args&&... args)
    -> std::future<typename std::invoke_result<F, Args...>::type> {
  using return_type = typename std::invoke_result<F, Args...>::type;
  auto task = std::make_shared<std::packaged_task<return_type()>>(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...));

  std::future<return_type> res = task->get_future();
  {
    std::lock_guard<bthread_mutex_t> lock(queue_mutex_);
    if (stop_) throw std::runtime_error("enqueue on stop_ped ThreadPool");
    tasks.emplace([task]() { (*task)(); });
  }
  bthread_cond_signal(&condition_);
  return res;
}

// 从池子取出任务
void* BThreadPool::TaskFunction(void* args) {
  auto* tp = reinterpret_cast<BThreadPool*>(args);
  while (true) {
    std::function<void()> task;
    {
      // Already specialize "std::lock_guard" and "std::unique_lock" for
      // bthread_mutex_t
      std::lock_guard<bthread_mutex_t> lock(tp->queue_mutex_);
      while (tp->tasks.empty()) {
        if (tp->stop_) break;
        bthread_cond_wait(&tp->condition_, &tp->queue_mutex_);
      }
      if (tp->stop_ && tp->tasks.empty()) {
        return nullptr;
      }
      task = std::move(tp->tasks.front());
      tp->tasks.pop();
    }
    task();
  }
}
utopia9527 commented 3 months ago

那我听上去是应用方面的bug呢。否则这问题并发大了,大家都应该会触发到。

是的,看上去是用法不对

ivanallen commented 3 months ago

enqueue 返回 std::future ,外部在这个 future 上 wait 会阻塞线程。 你可能需要单独写个适配 bthread 的 future,而不是直接使用标准库的 future。

chenBright commented 3 months ago

enqueue 返回 std::future ,外部在这个 future 上 wait 会阻塞线程。 你可能需要单独写个适配 bthread 的 future,而不是直接使用标准库的 future。

std::future::get会阻塞worker线程,当std::future::get的并发数大于worker数,就会阻塞全部worker线程了,这时候没法调度其他bthread了。除非在其他独立线程上std::promise::set_value,但是这样只是不会”卡死“,长尾可能已经导致服务不可用了。

可以用baidu最近开源的babylon,有支持bthread的future

utopia9527 commented 3 months ago

enqueue 返回 std::future ,外部在这个 future 上 wait 会阻塞线程。 你可能需要单独写个适配 bthread 的 future,而不是直接使用标准库的 future。

std::future::get会阻塞worker线程,当std::future::get的并发数大于worker数,就会阻塞全部worker线程了,这时候没法调度其他bthread了。除非在其他独立线程上std::promise::set_value,但是这样只是不会”卡死“,长尾可能已经导致服务不可用了。

可以用baidu最近开源的babylon,有支持bthread的future

有个小问题, 线程池里面如果使用std:mutex的话不会出现这种现象。 是说可能加mutex锁的时候让当前worker的其它bthread 被"偷过去"了么

chenBright commented 3 months ago

使用std:mutex的话,会阻塞worker线程直到取出任务为止,是不是std::promise::set_value所在bthread会一直占有worker线程呢?

可能只是概率小一些而已。当std::future::get的并发数大于worker数,就会阻塞全部worker线程了,这是潜在的问题,跟std:mutex还是bthread_mutex已经没啥关系了。

utopia9527 commented 3 months ago

mutex已经没啥关系了。 确实也会卡住。 如果去掉池子,每次都创建多个bthread 来进行rpc 调用。 请教下这块频繁创建和销毁 bthread 带来的性能消耗有相关数值和参考资料么

chenBright commented 3 months ago

没有。相对于rpc来说,创建和销毁bthread的性能开销不大吧,bthread meta和栈是从资源池或者对象池获取的,有复用机制,应该很快的。前面”bthread池“会有同步的消耗,不一定比创建性能高吧。你可以测一下,对比两者的性能。

chenBright commented 3 months ago

发异步rpc应该可以减少bthread数量吧。

utopia9527 commented 3 months ago

没有。相对于rpc来说,创建和销毁bthread的性能开销不大吧,bthread meta和栈是从资源池或者对象池获取的,有复用机制,应该很快的。前面”bthread池“会有同步的消耗,不一定比创建性能高吧。你可以测一下,对比两者的性能。

多谢