bitbouncer / kspp

A high performance/ real-time C++ Kafka streams framework (C++17)
Boost Software License 1.0
112 stars 24 forks source link

fix "pure virtual method called" in kafka_source #34

Open vyskubov opened 3 years ago

vyskubov commented 3 years ago

Sometimes there happens a call to kafka_source_base::parse() which is pure virtual.

This patch moves initialization of kafka_source::_thread from kafka_source_base initializer list to a call kafka_source_base::start() when kafka_source::thread_f() is available.

skarlsson commented 3 years ago

the "while(!_started) std::this_thread::sleep_for(std::chrono::milliseconds(100)); " part in thread_f() is supposed to protect about this and I don't think the suggested patch changes anything (for better or worse) . When do you see this error?

vyskubov commented 3 years ago

The error "pure virtual method called" happens on 146 line.

And now I caught it on patched version of kspp.

So, it may be that we are calling a parse() method when the object is about to be destructed and kafka_source::parse() deleted from virtual table at that moment. And a call to kafka_source_base::parse() is performed.

In my program I create and destroy the topology in a loop. So constructors and destructors a called pretty frequently.

Here is a backtrace (frame 6):

#0  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:50
#1  0x00007ffff76c7859 in __GI_abort () at abort.c:79
#2  0x00007ffff794f911 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#3  0x00007ffff795b38c in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#4  0x00007ffff795b3f7 in std::terminate() () from /lib/x86_64-linux-gnu/libstdc++.so.6
#5  0x00007ffff795c155 in __cxa_pure_virtual () from /lib/x86_64-linux-gnu/libstdc++.so.6
#6  0x000055555558a19f in kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::thread_f (this=0x555555e38ac0) at /usr/include/kspp/sources/kafka_source.h:146
#7  0x000055555558fadb in std::__invoke_impl<void, void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(), kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>*> (
    __f=@0x555555e39040: (void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(class kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes> * const)) 0x555555589e4e <kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::thread_f()>, __t=@0x555555e39038: 0x555555e38ac0) at /usr/include/c++/9/bits/invoke.h:73
#8  0x000055555558f737 in std::__invoke<void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(), kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>*> (
    __fn=@0x555555e39040: (void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(class kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes> * const)) 0x555555589e4e <kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::thread_f()>) at /usr/include/c++/9/bits/invoke.h:95
#9  0x000055555558f093 in std::thread::_Invoker<std::tuple<void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(), kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>*> >::_M_invoke<0ul, 1ul>
    (this=0x555555e39038) at /usr/include/c++/9/thread:244
#10 0x000055555558e49c in std::thread::_Invoker<std::tuple<void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(), kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>*> >::operator() (
    this=0x555555e39038) at /usr/include/c++/9/thread:251
#11 0x000055555558ca6e in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(), kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>*> > >::_M_run (this=0x555555e39030) at /usr/include/c++/9/thread:195
#12 0x00007ffff7987de4 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#13 0x00007ffff7e7a609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#14 0x00007ffff77c4293 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
vyskubov commented 3 years ago

That will do. Now segfault'd gone.

Should I create new PR without mistakes or you squash these commits?

Or you want to see more deeply why the error happens?