Refinitiv / Real-Time-SDK

Other
180 stars 128 forks source link

OmmNiProvider connection recovery #33

Closed thomasthebaker closed 6 years ago

thomasthebaker commented 7 years ago

I have been running a few tests and I'm having trouble with the resiliency of the OmmNiProvider. It appears from the logs that after a connection failure, everything recovers nicely and the reactor channel underneath is being dispatched again. However, any subscription clients do not see the messages that are sent after a network failure. Here are the steps:

  1. Start NiProvider and subscribe to messages using a separate client
  2. Pause NiProvider for 30s (or bounce ADH) to cause connection failure
  3. Observe that ADH sees the reconnection and the dispatch loop in OmmBaseImpl is working again
  4. Subscriber sees outage but never receives the messages that are sent after the outage.
  5. Restarting the subscriber does not fix the issue.
  6. Bounce the provider and the subscriber receives the messages again

The adh seems to recognise that the application is active again after the outage:

<rmds.1.adh.sourceSSLDistribution.sslDispatcher.EXT2: Info: Wed Aug 16  10:49:54 2017>
Server "EXT2" stopped for RSSL application "EXT2:2" from instance "LONMRT02S.46". The application terminated the session.
<END>

<rmds.1.adh.sourceSSLDistribution.sslDispatcher.channel: Warning: Wed Aug 16  10:50:10 2017>
Session disconnected (route "ripcSession.24", host "", port ). Channel initialization failed. Text:
<Impl/ripcsrvr.c:3335> Error: 1007 Unknown connection type.

<END>

<rmds.1.adh.sourceSSLDistribution.sslDispatcher.channel: Info: Wed Aug 16  10:50:10 2017>
Accepted source RSSL channel connection. SocketId: 24 Hostname: LONMRT02S IP: 10.200.31.216 Port: 53788
<END>

<rmds.1.adh.sourceSSLDistribution.sslDispatcher.EXT2: Info: Wed Aug 16  10:50:10 2017>
Server "EXT2" started for RSSL application "EXT2:0" from instance "LONMRT02S.48". The application sent a service up state.
<END>

The logs from the Reactor channel appear to show that everything has been setup correctly for the recovery:

2017-08-16 10:50:09.642 WARN  [ElektronConsumerDispatcher] [EMA] access.OmmConsumerImpl - loggerMsg| ClientName: ChannelDictionary| Severity: Warning| Text:    RDMDictionary stream state was changed to suspect with status message| streamId 3| Reason State: Open/Suspect/None - text: "channel down."| loggerMsgEnd| 

2017-08-16 10:50:09.642 WARN  [ElektronConsumerDispatcher] [EMA] access.OmmConsumerImpl - loggerMsg| ClientName: ChannelDictionary| Severity: Warning| Text:    RDMDictionary stream state was changed to suspect with status message| streamId 4| Reason State: Open/Suspect/None - text: "channel down."| loggerMsgEnd| 

2017-08-16 10:50:10,618 INFO  [publisher] ElektronClient - Published 22 messages so far.

2017-08-16 10:50:10.618 TRACE [pub_worker_0] [EMA] access.OmmNiProviderImpl - loggerMsg| ClientName: AHL_NIPROVIDER_2| Severity: Trace| Text:    Received UpdateMsg with market domain; Handle = 1, user assigned streamId = 0.| loggerMsgEnd| 

2017-08-16 10:50:10.618 ERROR [pub_worker_0] [EMA] access.OmmNiProviderImpl - loggerMsg| ClientName: AHL_NIPROVIDER_2| Severity: Error| Text:    Internal error: rsslChannel.submit() failed in OmmNiProviderImpl.submit(UpdateMsg)RsslChannel 0| Error Id -1| Internal sysError 0| Error Location Reactor.submitChannel| Error Text ReactorChannel is closed, aborting.| loggerMsgEnd| 

2017-08-16 10:50:10,619 DEBUG [pub_worker_0] InjectableEmaFactory - Failed to submit UpdateMsg. Reason: ReactorReturnCodes.FAILURE. Error text: ReactorChannel is closed, aborting.
2017-08-16 10:50:10.679 INFO  [ElektronConsumerDispatcher] [EMA] access.OmmConsumerImpl - loggerMsg| ClientName: ChannelCallbackClient| Severity: Info| Text:    Received ChannelUp event on channel Channel| Instance Name AHL_CONSUMER_CUSTOM_1| Component Version ads2.6.9.L1.linux.tis.rrg 64-bit| loggerMsgEnd| 

2017-08-16 10:50:10.680 INFO  [ProviderDispatcher] [EMA] access.OmmNiProviderImpl - loggerMsg| ClientName: ChannelCallbackClient| Severity: Info| Text:    Received ChannelUp event on channel Channel| Instance Name AHL_NIPROVIDER_2| Component Version adh2.6.9.L1.linux.tis.rrg 64-bit| loggerMsgEnd| 

2017-08-16 10:50:10.686 TRACE [ElektronConsumerDispatcher] [EMA] access.OmmConsumerImpl - loggerMsg| ClientName: LoginCallbackClient| Severity: Trace| Text:    RDMLogin stream was open with refresh message| LoginRefresh: | streamId: 1| name: ****| nameType: 1| State: Open/Ok/None - text: "Login accepted by host rmds."| isSolicited: true| applicationId: | applicationName: ADS| position: 10.200.31.216| providePermissionProfile: 0| providePermissionExpressions: 1| singleOpen: 1| allowSuspectData: 1| supportBatchRequests: 1| supportBatchReissues: 1| supportBatchCloses: 1| supportOMMPost: 1| supportOptimizedPauseResume: 0| supportStandby: 0| supportViewRequests: 0| supportEnhancedSymbolList: 1| State: State: Open/Ok/None - text: "Login accepted by host rmds."| loggerMsgEnd| 

2017-08-16 10:50:10.688 TRACE [ProviderDispatcher] [EMA] access.OmmNiProviderImpl - loggerMsg| ClientName: LoginCallbackClient| Severity: Trace| Text:    RDMLogin stream was open with refresh message| LoginRefresh: | streamId: 1| name: ****| nameType: 1| State: Open/Ok/None - text: "Login accepted by host rmds."| isSolicited: true| applicationId: | supportProviderDictionaryDownload: 1| State: State: Open/Ok/None - text: "Login accepted by host rmds."| loggerMsgEnd| 

2017-08-16 10:50:10,689 INFO  [ProviderDispatcher] PublishingWorkerFactory - LoginClient. Msg: RefreshMsg
    streamId="1"
    domain="Login Domain"
    solicited
    RefreshComplete
    state="Open / Ok / None / 'Login accepted by host rmds.'"
    itemGroup="00 00"
    name="****"
    nameType="1"
    Attrib dataType="ElementList"
        ElementList
            ElementEntry name="ApplicationId" dataType="Ascii" value="(blank data)"
            ElementEntry name="SupportProviderDictionaryDownload" dataType="UInt" value="1"
        ElementListEnd
    AttribEnd
RefreshMsgEnd
, Event: com.thomsonreuters.ema.access.OmmEventImpl@1583c06b

2017-08-16 10:50:10.689 TRACE [ProviderDispatcher] [EMA] access.OmmNiProviderImpl - loggerMsg| ClientName: ChannelCallbackClient| Severity: Trace| Text:    Received ChannelReady event on channel Channel| Instance Name AHL_NIPROVIDER_2| loggerMsgEnd| 
2017-08-16 10:50:10.689 TRACE [ProviderDispatcher] [EMA] access.OmmNiProviderImpl - loggerMsg| ClientName: AHL_NIPROVIDER_2| Severity: Trace| Text:    Reload of user submitted source directories.| loggerMsgEnd| 

2017-08-16 10:50:10.689 TRACE [ProviderDispatcher] [EMA] access.OmmNiProviderImpl - loggerMsg| ClientName: AHL_NIPROVIDER_2| Severity: Trace| Text:    User submitted source directoies were sent out on the wire after reconnect.| loggerMsgEnd| 

2017-08-16 10:50:10,690 INFO  [ProviderDispatcher] PublishingWorkerFactory - LoginClient. Msg: StatusMsg
    streamId="1"
    domain="Login Domain"
    state="Open / Ok / None / 'channel up'"
    name="****"
    nameType="1"
StatusMsgEnd
, Event: com.thomsonreuters.ema.access.OmmEventImpl@1583c06b

2017-08-16 10:50:24,372 INFO  [publisher] ElektronClient - Published 23 messages so far.

2017-08-16 10:50:24.374 WARN  [ProviderDispatcher] [EMA] access.OmmNiProviderImpl - loggerMsg| ClientName: ChannelCallbackClient| Severity: Warning| Text:    Received ChannelDownReconnecting event on channel Channel| RsslReactor @41867bf1| RsslChannel @69297e3| Error Id 0| Internal sysError 0| Error Location null| Error text SocketChannel.read returned -1 (end-of-stream)| loggerMsgEnd| 

2017-08-16 10:50:24.375 TRACE [ElektronConsumerDispatcher] [EMA] access.OmmConsumerImpl - loggerMsg| ClientName: DirectoryCallbackClient| Severity: Trace| Text:    Received RDMDirectory update message| loggerMsgEnd| 

2017-08-16 10:50:24.375 TRACE [ElektronConsumerDispatcher] [EMA] access.OmmConsumerImpl - loggerMsg| ClientName: ChannelCallbackClient| Severity: Trace| Text:    Received ChannelReady event on channel Channel| Instance Name AHL_CONSUMER_CUSTOM_1| loggerMsgEnd| 

2017-08-16 10:50:24.375 TRACE [ElektronConsumerDispatcher] [EMA] access.OmmConsumerImpl - loggerMsg| ClientName: DirectoryCallbackClient| Severity: Trace| Text:    Received RDMDirectory update message| loggerMsgEnd| 

2017-08-16 10:50:24.375 TRACE [ElektronConsumerDispatcher] [EMA] access.OmmConsumerImpl - loggerMsg| ClientName: DirectoryCallbackClient| Severity: Trace| Text:    Received Update action for RDMService| Service name EXT2| Service id 8027| loggerMsgEnd| 

2017-08-16 10:50:38,732 INFO  [ProviderDispatcher] PublishingWorkerFactory - Connection recovery!!

2017-08-16 10:50:38,732 INFO  [publisher] ElektronClient - Published 24 messages so far.

Given the OmmNiProvider has logged in again and send the Directory messages, should I expect the provider to be fully functional after an outage like this? At the moment I will have to uninitialise the original Provider and instantiate a new one to work around this issue.

Have I missed some configuration somewhere?

seba5 commented 7 years ago

Hi Tom,

please take a look at the NiProvider example example360MarketPriceConnectionRecovery. I have run the example with the ADH and ETA consumer example. I have bounced the ADH and my consumer example starts getting the updates after the ADH comes back up.

thomasthebaker commented 7 years ago

Hi seba5, Thanks for your pointer. I got the example working and it's fine, just as you described. I then dug a bit deeper into our own application that wasn't working. I've found the reason why the subscription streams do not recover after the NI Provider reconnects. The Directory message that is sent by the Elektron components (I think a cached version of the user's original Directory message) changes the service ID. The ADH reports the connection up event with a different service ID. If I set the service ID to '0' to match the behaviour of the retransmitted directory message then the subscription stream recovers.

I am not using XML to configure the service as I want to switch the service name at run time. Perhaps this is a bug with the connection recovery feature in Elektron, should it preserve the Service ID that the user originally sets? Do you know the relevance of the ServiceID outside of a single NI Provider application? It obviously has an effect on the subscription streams.

Here's our configuration code. It's the SERVICE_KEY value that get's ignored by Elektron on connection recovery:

  /**
     * Builds an MMT_DIRECTORY RefreshMsg used to register service information with an ADH when logging in.
     * @param publishServiceName the name of the service that will be published (e.g. STAT, EXT2...)
     * @return a fully populated RefreshMsg that can be used to perform directory registration with an ADH server.
     */
    private static RefreshMsg buildDirectoryMessage(String publishServiceName) {
        OmmArray capablities = EmaFactory.createOmmArray();
        capablities.add(EmaFactory.createOmmArrayEntry().uintValue( EmaRdm.MMT_MARKET_PRICE));

        OmmArray dictionaryUsed = EmaFactory.createOmmArray();
        dictionaryUsed.add(EmaFactory.createOmmArrayEntry().ascii(DictionaryClient.DictionaryType.RWFFld.name()));
        dictionaryUsed.add(EmaFactory.createOmmArrayEntry().ascii(DictionaryClient.DictionaryType.RWFEnum.name()));

        ElementList serviceInfoId = EmaFactory.createElementList();
        serviceInfoId.add(EmaFactory.createElementEntry().ascii(EmaRdm.ENAME_NAME, publishServiceName));
        serviceInfoId.add(EmaFactory.createElementEntry().uintValue(EmaRdm.ENAME_IS_SOURCE, 0)); // service is consolidation of multiple sources into a single service. Affects connection recovery on outage.
        serviceInfoId.add(EmaFactory.createElementEntry().uintValue(EmaRdm.ENAME_ACCEPTING_CONS_STATUS, 0)); // We do not process ADH messages relating to Hot/Standby status
        serviceInfoId.add(EmaFactory.createElementEntry().uintValue(EmaRdm.ENAME_SUPPS_OOB_SNAPSHOTS, 0)); // Not supported
        serviceInfoId.add(EmaFactory.createElementEntry().uintValue(EmaRdm.ENAME_SUPPS_QOS_RANGE, 0)); // Not supported
        serviceInfoId.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_CAPABILITIES, capablities));
        serviceInfoId.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_DICTIONARYS_USED, dictionaryUsed));

        ElementList serviceStateId = EmaFactory.createElementList();
        serviceStateId.add( EmaFactory.createElementEntry().uintValue(EmaRdm.ENAME_SVC_STATE, EmaRdm.SERVICE_UP));

        // indicate that we can handle reissue on existing streams. Helpful on service outage.
        serviceStateId.add( EmaFactory.createElementEntry().uintValue(EmaRdm.ENAME_ACCEPTING_REQS, 1));

        // state to send to existing streams
        serviceStateId.add( EmaFactory.createElementEntry()
                .state("Status", OmmState.StreamState.OPEN, OmmState.DataState.OK, OmmState.StatusCode.NONE, "OK"));

        FilterList filterList = EmaFactory.createFilterList();
        filterList.add(EmaFactory.createFilterEntry().elementList(EmaRdm.SERVICE_INFO_ID, FilterEntry.FilterAction.SET, serviceInfoId) );
        filterList.add(EmaFactory.createFilterEntry().elementList(EmaRdm.SERVICE_STATE_ID, FilterEntry.FilterAction.SET, serviceStateId));

        Map map = EmaFactory.createMap();
        map.add( EmaFactory.createMapEntry().keyUInt(SERVICE_KEY, MapEntry.MapAction.ADD, filterList));

        return EmaFactory.createRefreshMsg()
                .domainType(EmaRdm.MMT_DIRECTORY)
                .filter(EmaRdm.SERVICE_INFO_FILTER | EmaRdm.SERVICE_STATE_FILTER)
                .payload(map);
    }

If I just pause the VM for 30s then the ADH logs the following information that shows the service ID is being changed:

<rmds.1.adh.sourceSSLDistribution.sslDispatcher.EXT2: Info: Thu Aug 17  17:37:00 2017>
Server "EXT2" started for RSSL application "EXT2:1" from instance "FOO.26". The application sent a service up state.
<rmds.1.adh.sourceSSLDistribution.sslDispatcher.EXT2: Info: Thu Aug 17  17:37:50 2017>
Server "EXT2" stopped for RSSL application "EXT2:1" from instance "FOO.26". The application terminated the session.
<rmds.1.adh.sourceSSLDistribution.sslDispatcher.EXT2: Info: Thu Aug 17  17:37:56 2017>
Server "EXT2" started for RSSL application "EXT2:0" from instance "FOO.28". The application sent a service up state.
seba5 commented 7 years ago

Tom, thank you for providing more details. It looks like this is a known issue. During the connection recovery we are sending a Source Directory message with serviceId of 0. There is a workaround. You can modify your application to resend the Source Directory message once the connection comes up again. Please let me know if that helps, thank you.

thomasthebaker commented 7 years ago

It's all working if I always set the serviceId to 0. I'm not sure on the significance of the serviceId and how it should be set? The example application uses a serviceID of '11', is there any particular reason? I couldn't find anything specific in the documentation.

seba5 commented 7 years ago

Tom, serviceId needs to be unique. It is linked to the service name. If your application is only publishing one service you can use the serviceId set to 0. The application used a service Id of 11 to show that it can be something other than 0. 11 is not a special number. Let me know if this answers your question. Thanks.