rsocket / rsocket-cpp

C++ implementation of RSocket
http://rsocket.io
Apache License 2.0
253 stars 99 forks source link

Prevent crash if onComplete is called on a cancelled subscriber #837

Closed phoad closed 6 years ago

phoad commented 6 years ago

It's possible that users of our library to call onComplete() or onNext() of a Subscriber which has an already cancelled Subscription.

I have observed that the app is crashing for such instances and pointing exactly this line. I have noticed that cancellation etc is causing consumingSubscriber_ object to be assigned to nullptr. So, we need to check if this object is nullptr before using it.

I wonder if I should keep a local reference to the subscriber before calling onNext on it, but I did not want to be over protective as this fixed the failure already.

E1205 11:19:50.777717 792147 BenchmarkHandler.h:133] CALLING ON COMPLETE!
*** Aborted at 1512501590 (Unix time, try 'date -d @1512501590') ***
*** Signal 11 (SIGSEGV) (0x0) received by PID 786509 (pthread TID 0x7fdeadb17700) (linux TID 786633) (maybe from PID 0, UID 0) (code: address not mapped to object), stack trace: ***
    @ 00007fdebb0a090d folly::symbolizer::(anonymous namespace)::innerSignalHandler(int, siginfo_t*, void*)
                       ./folly/experimental/symbolizer/SignalHandler.cpp:414
    @ 00007fdebb09f8d7 folly::symbolizer::(anonymous namespace)::signalHandler(int, siginfo_t*, void*)
                       ./folly/experimental/symbolizer/SignalHandler.cpp:424
    @ 00007fdebdab8acf (unknown)
    @ 00007fdec531421f rsocket::ConsumerBase::processPayload(rsocket::Payload&&, bool)
                       ./rsocket/dev/rsocket/statemachine/ConsumerBase.cpp:69
    @ 00007fdec5311f4a rsocket::ChannelResponder::onNextPayloadFrame(unsigned int, rsocket::Payload&&, bool, bool)
                       ./rsocket/dev/rsocket/statemachine/ChannelResponder.cpp:81
    @ 00007fdec5312063 rsocket::ChannelResponder::handlePayload(rsocket::Payload&&, bool, bool)
                       ./rsocket/dev/rsocket/statemachine/ChannelResponder.cpp:72
    @ 00007fdec532c6c4 rsocket::RSocketStateMachine::handleStreamFrame(unsigned int, rsocket::FrameType, std::unique_ptr<folly::IOBuf, std::default_delete<folly::IOBuf> >)
                       ./rsocket/dev/rsocket/statemachine/RSocketStateMachine.cpp:661
    @ 00007fdec5326e42 rsocket::RSocketStateMachine::processFrame(std::unique_ptr<folly::IOBuf, std::default_delete<folly::IOBuf> >)
                       ./rsocket/dev/rsocket/statemachine/RSocketStateMachine.cpp:479
    @ 00007fdec52a371b rsocket::FrameTransportImpl::onNext(std::unique_ptr<folly::IOBuf, std::default_delete<folly::IOBuf> >)
                       ./rsocket/dev/rsocket/framing/FrameTransportImpl.cpp:112
    @ 00007fdec52b0a40 rsocket::FramedReader::parseFrames()
                       ./rsocket/dev/rsocket/framing/FramedReader.cpp:126
    @ 00007fdec52af379 rsocket::FramedReader::onNext(std::unique_ptr<folly::IOBuf, std::default_delete<folly::IOBuf> >)
                       ./rsocket/dev/rsocket/framing/FramedReader.cpp:78
    @ 00007fdebbbcede5 rsocket::TcpReaderWriter::readBufferAvailable(std::unique_ptr<folly::IOBuf, std::default_delete<folly::IOBuf> >)
                       ./rsocket/dev/rsocket/transports/tcp/TcpDuplexConnection.cpp:161
    @ 00007fdebbbce83e rsocket::TcpReaderWriter::readDataAvailable(unsigned long)