Refinitiv / Real-Time-SDK

Other
180 stars 128 forks source link

EMA C++ - lock in OmmConsumerImpl::registerClient (or race condition over pRsslChannel) in RTSDK 2.1.2 #253

Closed mikel-gc closed 6 months ago

mikel-gc commented 8 months ago

We use EMA C++ consumer which is connecting to a slow producer and tries to subscribe to ~10k items and receive all the updates.

Since producer is slow C++ consumer gets a lot of "Request timed out." (I can see that in the log files) and it is possibly being disconnected from producer (I can't see that in the log).

In the end C++ consumer stops receiving any new messages (without any error or exception in the log file) and the stack trace points to a lock:

Thread 8 (Thread 0x7fb40affd700 (LWP 16493) "DelayedFeed"):
#0  0x00007fb41c18e54d in __lll_lock_wait () from /lib64/libpthread.so.0
#1  0x00007fb41c189eb6 in _L_lock_941 () from /lib64/libpthread.so.0
#2  0x00007fb41c189daf in pthread_mutex_lock () from /lib64/libpthread.so.0
#3  0x0000000000b18bea in refinitiv::ema::access::Mutex::lock (this=0x2253540) at /home/michal.lesiak/.conan/data/RTSDK/2.1.2/_/_/build/a2294e22ddbc6c95c41fccb0f10e4e43c3ec7890/Real-Time-SDK-Real-Time-SDK-2.1.2/Cpp-C/Ema/Src/Access/Impl/Mutex.cpp:53
#4  0x0000000000b3d5d3 in refinitiv::ema::access::OmmConsumerImpl::registerClient (this=0x22534a0, reqMsg=..., ommConsClient=..., closure=0x0, parentHandle=0) at /home/michal.lesiak/.conan/data/RTSDK/2.1.2/_/_/build/a2294e22ddbc6c95c41fccb0f10e4e43c3ec7890/Real-Time-SDK-Real-Time-SDK-2.1.2/Cpp-C/Ema/Src/Access/Impl/OmmConsumerImpl.cpp:406
#5  0x0000000000b2fe63 in refinitiv::ema::access::OmmConsumer::registerClient (this=0x2251a20, reqMsg=..., client=..., closure=0x0, parentHandle=0) at /home/michal.lesiak/.conan/data/RTSDK/2.1.2/_/_/build/a2294e22ddbc6c95c41fccb0f10e4e43c3ec7890/Real-Time-SDK-Real-Time-SDK-2.1.2/Cpp-C/Ema/Src/Access/Impl/OmmConsumer.cpp:142
#6  0x000000000097fb26 in gc::pricing::trep::consumer::ElektronMarketDataConsumer::startQuotes (this=0x2258c80, serviceName="STX_PRICES", symbol="FCEM8", foiId=18446744073709551615, snapshotOnly=false) at /home/michal.lesiak/.conan/data/GCTrepClient/1.4.100/_/_/build/7630912b0dbe73c21c150dad4b1edc0f67e913c8/GCTREPClientLib/source/ElektronMarketDataConsumer.cpp:90
#7  0x00000000009a4c6f in gc::pricing::trep::service::Service::startQuotes (this=0x2247980, symbol=std::shared_ptr<gc::pricing::trep::model::Symbol> (use count 3, weak count 0) = {...}, foiId=18446744073709551615, snapshotOnly=false, originatedInChain=false) at /home/michal.lesiak/.conan/data/GCTrepClient/1.4.100/_/_/build/7630912b0dbe73c21c150dad4b1edc0f67e913c8/GCTREPClientLib/source/Service.cpp:370
#8  0x00000000009a4997 in gc::pricing::trep::service::Service::subscribe (this=0x2247980, subscription=...) at /home/michal.lesiak/.conan/data/GCTrepClient/1.4.100/_/_/build/7630912b0dbe73c21c150dad4b1edc0f67e913c8/GCTREPClientLib/source/Service.cpp:335
(...)

which is weird, because there is no other lock in the pstack: pstack.2.1.2.txt

However I managed to run helgrind (valgrind tool) which points to the data race between rsslReactorSubmitMsg (rsslReactor.c:3494) and _reactorWorkerReconnectAfterCredentialUpdate (rsslReactorWorker.c:919). It sounds related to the above issue and might be the cause.

Is it possible that there is a race condition over this data: reactorChannel.pRsslChannel accessed by 2 different threads?

Full helgrind output: helgrind.txt

Thanks, Michal

MitchellKato commented 8 months ago

Can you please provide the following information:

  1. What platform you're using
  2. Just to confirm, you're using the Real-Time-SDK 1.2.1.L1 release, correct?
  3. How is the connection set up? Is this a direct connection to a TREP, a custom RTSDK provider, or is this a connection to RTO?
  4. What is the configuration for the reconnection timeouts?
  5. Can you please give a code snippet that shows the behavior? Or if this isn't possible, a description of what the application is doing, including things like what kind of data you're requesting(snapshots or streaming especially), what sort of request strategy the application is doing, etc.

The data race is somewhat of a red herring. This pointer is set internally in _reactorWorkerReconnectAfterCredentialUpdate because that's where the Reactor Worker thread runs rsslConnect(). In reactorSubmitMsg, the channel pointer isn't ever used in that stack of calls.

In addition, if you set the Logger LoggerSeverity to LoggerSeverity::Verbose, you will get more connection information. This isn't recommended for production, but may help in diagnosing exactly what's going on here.

Thanks, Mitchell

mikel-gc commented 8 months ago
  1. What platform you're using

This is reproducible on Red Hat Enterprise Linux release 8.6 and CentOS Linux release 7.9.2009

  1. Just to confirm, you're using the Real-Time-SDK 1.2.1.L1 release, correct?

No, I have reproduced this issue on Real-Time-SDK-2.0.8.L1 and Real-Time-SDK-2.1.2.L1.

  1. How is the connection set up? Is this a direct connection to a TREP, a custom RTSDK provider, or is this a connection to RTO?

The connections looks like this: TREP <-> TREP proxy where the lock happens <-> TREP client TREP proxy is hanging on lock, while TREP client stops receiving market updates.

  1. What is the configuration for the reconnection timeouts?

For every channel we have following configuration: .addEnum("ChannelType", 0) .addUInt("ConnectionPingTimeout", 50000) .addUInt("TcpNodelay", 0)

and the rest is default.

  1. Can you please give a code snippet that shows the behavior? Or if this isn't possible, a description of what the application is doing, including things like what kind of data you're requesting(snapshots or streaming especially), what sort of request strategy the application is doing, etc.

It will be hard to give a code snippet, but let me try to describe the architecture and behavior:

TREP <-> TREP proxy where the lock happens <-> TREP client

TREP proxy basically subscribes only when TREP clients subscribes to some item and unsubscribes when TREP client unsubscribes. Basically it is forwarding subscriptions, item status messages and item updates.

Scenario: TREP client connects to TREP proxy and subscribes to around 10k items at once.

TREP proxy received most of them and send subscriptions to TREP, however it has received

I don't see any indication that connection to TREP went down.

Thanks for the tip with Verbose, I will run and attach more details if needed.

mikel-gc commented 8 months ago

I have added verbose severity, so there are 2 log files from our TREP proxy, one named EMA_Client_STX_PRICES_21153.log (connection directly to TREP) and one from connection to requester (EMA_Publisher_STX_PRICES_T1_21153.log).

I think the main issue is here in EMA_Client_STX_PRICES_21153.log:

loggerMsg
    TimeStamp: 11:52:28.529
    ClientName: Consumer_1_1
    Severity: Error
    Text:    EMA Exception Handler

Application: EMA Application
Process Id: 0x21153X

File: /home/michal.lesiak/.conan/data/RTSDK/2.1.2/_/_/build/a2294e22ddbc6c95c41fccb0f10e4e43c3ec7890/Real-Time-SDK-Real-Time-SDK-2.1.2/Cpp-C/Ema/Src/Access/Impl/Thread.cpp
Line: 71
Exception occured
loggerMsgEnd

EMA_Client_STX_PRICES_21153.log EMA_Publisher_STX_PRICES_T1_21153.log

mikel-gc commented 8 months ago

I modified the Thread.cpp code slightly to catch the OmmException. The exception which kills this thread is refinitiv::ema::access::OmmInvalidUsageExceptionImpl.

Text in the exception is "Failed to convert to UTF8 in RmtesBufferImpl::toString(). Reason: RSSL_RET_FAILURE".

As a result userLock is still locked and next thread (OmmConsumerImpl::registerClient) trying to lock userLock is hanging.

MitchellKato commented 8 months ago

The application is using API dispatch mode, correct? If so, please make sure that in all callbacks, all exceptions are getting caught. This appears to be an uncaught exception that's getting raised up through the dispatch to the thread function, and ending up with the exception handler, which is then killing the thread. Since this interrupts the dispatch run, the lock is never released.

mikel-gc commented 8 months ago

Correct, thanks Mitchell for help. We have been catching only std::exception, not OmmException.

Additionally adding following code to Thread.cpp:71 helped me to track down where is the issue:

    void *Thread::runThread( void* arg)
    {
        try {
            ((Thread*)arg)->run();
        }
        catch (const OmmException& ex)
        {
            std::cerr << "Exception: " << ex.toString();
        }
        catch (const std::exception& om)
        {
            std::cerr << "Exception: " << om.what();
        }
        catch ( ... )
        {
            ((Thread*)arg)->runLog( NULL, __FILE__, __LINE__ );
            if (((Thread*)arg)->_handleException)
            {
                ((Thread*)arg)->cleanUp();
                return (void*)-1;
            }
            else
              throw;
        }

        return 0;
    }
}

so that application is not hanging in a "locked" state - it simply exits. Does it make sense to add such changes so that user is presented with specific error and application is not hanging?

MitchellKato commented 8 months ago

Unfortunately, by design, RTSDK does not output anything to the console, both for standard out and for standard error streams.

In this case, there's not much that we can do with an uncaught exception. The underlying ETA Reactor code is C code, not C++, so there isn't any way for the API to catch and handle the exception, so it goes back to the thread call, and the thread exits. We can look into a way possibly to get this logged if error logging is turned on.