apache / rocketmq-client-cpp

Apache RocketMQ cpp client
https://rocketmq.apache.org/
Apache License 2.0
359 stars 157 forks source link

DefaultMQProducer发送消息时,当网络延迟3S时,程序直接崩溃 #472

Open HUHANK opened 2 months ago

HUHANK commented 2 months ago

系统:CentOS7 下图是测试代码: 企业微信截图_1716774732151 下图为rocketmq_client.log日志内容: 企业微信截图_17167748931431 下图为Coredump文件的调用堆栈息: 企业微信截图_17167749677297

请大神看到帮忙解决一下,谢谢!@

HUHANK commented 2 months ago

tc qdisc add dev ens192 root netem delay 3000ms 我是用这个命令在rocketmq server端设置3s延迟的

HUHANK commented 2 months ago

@ifplusor 大牛,请帮忙看看,谢谢。

HUHANK commented 2 months ago

网络延迟有抛异常,但是同时也Coredump,请帮忙看看coredump的原因,谢谢。 @jonnxu @vongosling @ifplusor

ifplusor commented 1 month ago

@HUHANK 你用的哪个版本?你再看看别的线程的堆栈

HUHANK commented 1 month ago

@HUHANK 你用的哪个版本?你再看看别的线程的堆栈 版本是:2.2.0 下面是线程的调用堆栈信息:

(gdb) i threads
Id   Target Id         Frame 
11   Thread 0x7f27ad681700 (LWP 20472) 0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
10   Thread 0x7f27a6ffd700 (LWP 20476) 0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
9    Thread 0x7f27ace80700 (LWP 20473) 0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
8    Thread 0x7f27a7fff700 (LWP 20474) 0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
7    Thread 0x7f27ade82700 (LWP 20471) 0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
6    Thread 0x7f27a77fe700 (LWP 20475) 0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
5    Thread 0x7f27ae683700 (LWP 20486) 0x00007f27afdf50e3 in epoll_wait () from /lib64/libc.so.6
4    Thread 0x7f27a5ffb700 (LWP 20478) 0x00007f27afdf50e3 in epoll_wait () from /lib64/libc.so.6
3    Thread 0x7f27b612bac0 (LWP 20469) 0x00007f27b5414017 in pthread_join () from /lib64/libpthread.so.0
2    Thread 0x7f27a4ff9700 (LWP 20485) 0x00007f27afdf50e3 in epoll_wait () from /lib64/libc.so.6
* 1    Thread 0x7f27a67fc700 (LWP 20477) 0x00007f27afe4c6a6 in __memcpy_ssse3_back () from /lib64/libc.so.6
(gdb) thread apply all bt

Thread 11 (Thread 0x7f27ad681700 (LWP 20472)):
#0  0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f27b581c8ac in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /home/hank/lib/librocketmq.so
#2  0x00007f27b58490b3 in boost::asio::io_context::run() () from /home/hank/lib/librocketmq.so
#3  0x00007f27b590e1df in thread_proxy () from /home/hank/lib/librocketmq.so
#4  0x00007f27b5412ea5 in start_thread () from /lib64/libpthread.so.0
#5  0x00007f27afdf4b0d in clone () from /lib64/libc.so.6

Thread 10 (Thread 0x7f27a6ffd700 (LWP 20476)):
#0  0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f27b581c8ac in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /home/hank/lib/librocketmq.so
#2  0x00007f27b58bc18e in rocketmq::TcpRemotingClient::boost_asio_work() () from /home/hank/lib/librocketmq.so
#3  0x00007f27b590e1df in thread_proxy () from /home/hank/lib/librocketmq.so
#4  0x00007f27b5412ea5 in start_thread () from /lib64/libpthread.so.0
#5  0x00007f27afdf4b0d in clone () from /lib64/libc.so.6

Thread 9 (Thread 0x7f27ace80700 (LWP 20473)):
#0  0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f27b581c8ac in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /home/hank/lib/librocketmq.so
#2  0x00007f27b58490b3 in boost::asio::io_context::run() () from /home/hank/lib/librocketmq.so
#3  0x00007f27b590e1df in thread_proxy () from /home/hank/lib/librocketmq.so
#4  0x00007f27b5412ea5 in start_thread () from /lib64/libpthread.so.0
#5  0x00007f27afdf4b0d in clone () from /lib64/libc.so.6

Thread 8 (Thread 0x7f27a7fff700 (LWP 20474)):
#0  0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f27b581c8ac in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /home/hank/lib/librocketmq.so
#2  0x00007f27b58490b3 in boost::asio::io_context::run() () from /home/hank/lib/librocketmq.so
#3  0x00007f27b590e1df in thread_proxy () from /home/hank/lib/librocketmq.so
#4  0x00007f27b5412ea5 in start_thread () from /lib64/libpthread.so.0
#5  0x00007f27afdf4b0d in clone () from /lib64/libc.so.6

Thread 7 (Thread 0x7f27ade82700 (LWP 20471)):
#0  0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f27b581c8ac in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /home/hank/lib/librocketmq.so
#2  0x00007f27b58490b3 in boost::asio::io_context::run() () from /home/hank/lib/librocketmq.so
#3  0x00007f27b590e1df in thread_proxy () from /home/hank/lib/librocketmq.so
#4  0x00007f27b5412ea5 in start_thread () from /lib64/libpthread.so.0
#5  0x00007f27afdf4b0d in clone () from /lib64/libc.so.6

Thread 6 (Thread 0x7f27a77fe700 (LWP 20475)):
#0  0x00007f27b5416a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f27b581c8ac in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /home/hank/lib/librocketmq.so
#2  0x00007f27b58490b3 in boost::asio::io_context::run() () from /home/hank/lib/librocketmq.so
#3  0x00007f27b590e1df in thread_proxy () from /home/hank/lib/librocketmq.so
#4  0x00007f27b5412ea5 in start_thread () from /lib64/libpthread.so.0
#5  0x00007f27afdf4b0d in clone () from /lib64/libc.so.6

Thread 5 (Thread 0x7f27ae683700 (LWP 20486)):
#0  0x00007f27afdf50e3 in epoll_wait () from /lib64/libc.so.6
#1  0x00007f27b58154d8 in boost::asio::detail::epoll_reactor::run(long, boost::asio::detail::op_queue<boost::asio::detail::scheduler_operation>&) () from /home/hank/lib/librocketmq.so
#2  0x00007f27b581c799 in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /home/hank/lib/librocketmq.so
#3  0x00007f27b5825000 in rocketmq::TopicPublishInfo::boost_asio_work() () from /home/hank/lib/librocketmq.so
#4  0x00007f27b590e1df in thread_proxy () from /home/hank/lib/librocketmq.so
#5  0x00007f27b5412ea5 in start_thread () from /lib64/libpthread.so.0
#6  0x00007f27afdf4b0d in clone () from /lib64/libc.so.6

Thread 4 (Thread 0x7f27a5ffb700 (LWP 20478)):
#0  0x00007f27afdf50e3 in epoll_wait () from /lib64/libc.so.6
#1  0x00007f27b5908fca in epoll_dispatch () from /home/hank/lib/librocketmq.so
#2  0x00007f27b58f93ed in event_base_loop () from /home/hank/lib/librocketmq.so
#3  0x00007f27b58f8d81 in event_base_dispatch () from /home/hank/lib/librocketmq.so
#4  0x00007f27b58b7769 in rocketmq::EventLoop::runLoop() () from /home/hank/lib/librocketmq.so
#5  0x00007f27b5b94c9f in execute_native_thread_routine () from /home/hank/lib/librocketmq.so
#6  0x00007f27b5412ea5 in start_thread () from /lib64/libpthread.so.0
#7  0x00007f27afdf4b0d in clone () from /lib64/libc.so.6

Thread 3 (Thread 0x7f27b612bac0 (LWP 20469)):
#0  0x00007f27b5414017 in pthread_join () from /lib64/libpthread.so.0
#1  0x00007f27b06910f7 in std::thread::join() () from /lib64/libstdc++.so.6
#2  0x00007f27b58b79e9 in rocketmq::EventLoop::stop() () from /home/hank/lib/librocketmq.so
#3  0x00007f27b58b7a37 in rocketmq::EventLoop::~EventLoop() () from /home/hank/lib/librocketmq.so
#4  0x00007f27afd2fce9 in __run_exit_handlers () from /lib64/libc.so.6
#5  0x00007f27afd2fd37 in exit () from /lib64/libc.so.6
#6  0x00007f27afd1855c in __libc_start_main () from /lib64/libc.so.6
#7  0x000000000040d1f7 in _start ()

Thread 2 (Thread 0x7f27a4ff9700 (LWP 20485)):
#0  0x00007f27afdf50e3 in epoll_wait () from /lib64/libc.so.6
#1  0x00007f27b58154d8 in boost::asio::detail::epoll_reactor::run(long, boost::asio::detail::op_queue<boost::asio::detail::scheduler_operation>&) () from /home/hank/lib/librocketmq.so
#2  0x00007f27b581c799 in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /home/hank/lib/librocketmq.so
#3  0x00007f27b5825000 in rocketmq::TopicPublishInfo::boost_asio_work() () from /home/hank/lib/librocketmq.so
#4  0x00007f27b590e1df in thread_proxy () from /home/hank/lib/librocketmq.so
#5  0x00007f27b5412ea5 in start_thread () from /lib64/libpthread.so.0
#6  0x00007f27afdf4b0d in clone () from /lib64/libc.so.6

Thread 1 (Thread 0x7f27a67fc700 (LWP 20477)):
#0  0x00007f27afe4c6a6 in __memcpy_ssse3_back () from /lib64/libc.so.6
#1  0x000000000040f89a in std::char_traits<char>::copy (__s1=0x7f278f206028 "", __s2=0xdef3f8 "ALIYUN", __n=14651856) at /opt/rh/devtoolset-10/root/usr/include/c++/10/bits/char_traits.h:402
#2  0x00000000004118ee in std::string::_M_copy (__d=0x7f278f206028 "", __s=0xdef3f8 "ALIYUN", __n=14651856) at /opt/rh/devtoolset-10/root/usr/include/c++/10/bits/basic_string.h:3429
#3  0x0000000000411a3a in std::string::_Rep::_M_clone (this=0xdef3e0, __alloc=..., __res=0) at /opt/rh/devtoolset-10/root/usr/include/c++/10/bits/basic_string.tcc:1076
#4  0x0000000000410fe0 in std::string::_Rep::_M_grab (this=0xdef3e0, __alloc1=..., __alloc2=...) at /opt/rh/devtoolset-10/root/usr/include/c++/10/bits/basic_string.h:3289
#5  0x000000000041613a in std::string::assign (this=0x7f27a67f04e0, __str=...) at /opt/rh/devtoolset-10/root/usr/include/c++/10/bits/basic_string.tcc:693
#6  0x00007f27b580a19a in rocketmq::MQClientFactory::getSessionCredentialFromProducerTable(rocketmq::SessionCredentials&) () from /home/hank/lib/librocketmq.so
#7  0x00007f27b580c991 in rocketmq::MQClientFactory::getSessionCredentialsFromOneOfProducerOrConsumer(rocketmq::SessionCredentials&) () from /home/hank/lib/librocketmq.so
#8  0x00007f27b58127b8 in rocketmq::MQClientFactory::sendHeartbeatToAllBroker() () from /home/hank/lib/librocketmq.so
#9  0x00007f27b5812e0a in rocketmq::MQClientFactory::timerCB_sendHeartbeatToAllBroker(boost::system::error_code&, boost::shared_ptr<boost::asio::basic_deadline_timer<boost::posix_time::ptime, boost::asio::time_traits<boost::posix_time::ptime>, boost::asio::any_io_executor> >) () from /home/hank/lib/librocketmq.so
#10 0x00007f27b5809d90 in void boost::_bi::list3<boost::_bi::value<rocketmq::MQClientFactory*>, boost::_bi::value<boost::system::error_code>, boost::_bi::value<boost::shared_ptr<boost::asio::basic_deadline_timer<boost::posix_time::ptime, boost::asio::time_traits<boost::posix_time::ptime>, boost::asio::any_io_executor> > > >::operator()<boost::_mfi::mf2<void, rocketmq::MQClientFactory, boost::system::error_code&, boost::shared_ptr<boost::asio::basic_deadline_timer<boost::posix_time::ptime, boost::asio::time_traits<boost::posix_time::ptime>, boost::asio::any_io_executor> > >, boost::_bi::rrlist1<boost::system::error_code const&> >(boost::_bi::type<void>, boost::_mfi::mf2<void, rocketmq::MQClientFactory, boost::system::error_code&, boost::shared_ptr<boost::asio::basic_deadline_timer<boost::posix_time::ptime, boost::asio::time_traits<boost::posix_time::ptime>, boost::asio::any_io_executor> > >&, boost::_bi::rrlist1<boost::system::error_code const&>&, int) [clone .isra.983] () from /home/hank/lib/librocketmq.so
#11 0x00007f27b581a6fd in boost::asio::detail::wait_handler<boost::_bi::bind_t<void, boost::_mfi::mf2<void, rocketmq::MQClientFactory, boost::system::error_code&, boost::shared_ptr<boost::asio::basic_deadline_timer<boost::posix_time::ptime, boost::asio::time_traits<boost::posix_time::ptime>, boost::asio::any_io_executor> > >, boost::_bi::list3<boost::_bi::value<rocketmq::MQClientFactory*>, boost::_bi::value<boost::system::error_code>, boost::_bi::value<boost::shared_ptr<boost::asio::basic_deadline_timer<boost::posix_time::ptime, boost::asio::time_traits<boost::posix_time::ptime>, boost::asio::any_io_executor> > > > >, boost::asio::any_io_executor>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) () from /home/hank/lib/librocketmq.so
#12 0x00007f27b581cb07 in boost::asio::detail::scheduler::run(boost::system::error_code&) () from /home/hank/lib/librocketmq.so
#13 0x00007f27b58135a2 in rocketmq::MQClientFactory::startScheduledTask(bool) () from /home/hank/lib/librocketmq.so
#14 0x00007f27b590e1df in thread_proxy () from /home/hank/lib/librocketmq.so
#15 0x00007f27b5412ea5 in start_thread () from /lib64/libpthread.so.0
#16 0x00007f27afdf4b0d in clone () from /lib64/libc.so.6
(gdb) 
ifplusor commented 1 month ago

@HUHANK 你这个是发到第几条时崩溃的?

HUHANK commented 1 month ago

@HUHANK 你这个是发到第几条时崩溃的?

消息发送的过程中,我只要一设置rocketmq server服务器延迟,producer程序就崩溃; image Send: 11 之前的消息都是没有延迟的,Send: 11之后设置延迟3S,然后就报错了,并coredump了。

HUHANK commented 1 month ago

这个是测试代码:

void test1()
{
    try
    {
        DefaultMQProducer producer("A-Hank-TEST_GROUP_P");
        producer.setNamesrvAddr("192.168.28.235:9876");
        producer.setInstanceName("A-Hank-TEST_GROUP_P_INS");
        producer.setSendMsgTimeout(1000);
        producer.setRetryTimes(6);
        producer.setRetryTimes4Async(6);
        producer.start();

        // MQMessageQueue mQueue;
        // mQueue.setTopic("Hank-Test");
        // mQueue.setQueueId(0);
        // mQueue.setBrokerName("broker-b");
        for(int i=0; i<1000; i++)
        {

            MQMessage msg("Hank-Test", "", "", "Hello RocketMQ " + to_string(i));
            producer.send(msg);
            // producer.sendOneway(msg, mQueue);
            cout << "Send: " << i << endl;
            this_thread::sleep_for(500ms);
        }

        producer.shutdown();
    }
    catch (exception &ex)
    {
        cout << "ERROR: " << ex.what() << endl;
    }
}

服务器设置延迟的命令: tc qdisc add dev ens192 root netem delay 3000ms

rocketmq服务和producer程序分别部署在不同的服务器上;

HUHANK commented 1 month ago

场景1,rocketmq 服务设置延迟3S,然后启动producer测试程序,程序报错,并能正常退出; 场景2,rocketmq服务0延迟,启动producer测试程序,启动成功,并正常发送消息给rocketmq server; 过程中,突然设置rocketmq server延迟3S,producer程序就会出现上面的情况,并core dump