Refinitiv / Real-Time-SDK

Other
180 stars 128 forks source link

Incorrect OmmConsumer closure object returned to OmmConsumerClient #26

Closed thomasthebaker closed 7 years ago

thomasthebaker commented 7 years ago

I'm seeing a regression between Elektron 1.0.8 and 1.1.0. I am using the closure mechanism to track the subject used to make subscriptions. When the OmmConsumerClient is called back after making a series of requests the wrong closure is passed to the onRefreshMsg method.

The specific scenario where this fails is when I am snapshotting a RIC chain. I make a series of snapshot requests each with different closure objects:

Here is how the request is made:

void snapshot(SubscriptionContext context) {
        RmdsSubject subject = context.getSubject();
        LOGGER.info("closure:{}, item:{}", context, subject.getItem());
        consumer.registerClient(EmaFactory.createReqMsg()
                .serviceName(subject.getService())
                .name(subject.getOMMItem())
                .interestAfterRefresh(false), dispatchingClient, context);
    }

Here is the OmmConsumerClient callback:

 @Override
    public void onRefreshMsg(RefreshMsg msg, OmmConsumerEvent consumerEvent) {
        if (consumerEvent.closure() instanceof SubscriptionContext) {
            SubscriptionContext context = (SubscriptionContext) consumerEvent.closure();
            if (!msg.name().equals(context.getSubject().getItem())) {
                LOGGER.warn("Bad context: Msg.name: {}, closure:{}, item:{}", msg.name(), consumerEvent.closure(), ((SubscriptionContext) consumerEvent.closure()).getSubject());
            }
            handleMarketPrice(context,
                   MessageType.IMAGE,
                   msg,
                   msg.hasSeqNum() ? msg.seqNum() : 0);
        } else {
            LOGGER.info("Got refresh: {}", msg);
        }
    }

And here are the logs that show that the closure object is being used for the wrong messages:

2017-05-10 17:30:57.362 [INFO ] [EventWorker_3] elektron.ElektronSubscriber - closure:com.mi.ahl.pubsub.elektron.ChainSnapshotContext$LinkContext@41d8484, item:JGB1555R7
...
2017-05-10 17:30:57.402 [WARN ] [pool-11-thread-1] client.DispatchingClient - Bad context: Msg.name: JGB1555S7, closure:com.mi.ahl.pubsub.elektron.ChainSnapshotContext$LinkContext@41d8484, item:RSF|JGB1555R7

Out of ~150 items there are almost 50% with the wrong context, it seems to change run by run. If I move back to Elektron 1.0.8 there are no problems.

This is a major regression, it makes 1.1.0 unusable as I cannot trust that the correct closures are being sent back in my callback methods.

maydu commented 7 years ago

After looking at your test case and snap code, I built a similar test app by creating a Closure object which also contains an item name for a request msg. Then verified the closure returned from the refreshmsg, and don’t see the mismatching issue between msg and closure.

In order to help us to identify the issue as soon as possible, can you provide us some answers for the following questions? 1> EMA internally only holds a reference on closure object. After sending an item request with registerClient() call by passing the closure, is there any chance the application modifies the memory or content of the closure object that was passed in before receiving the refreshMsg? 2> When you said you move back to 1.0.8 version, you mean that you use the same exact test app by changing classpath point to old version Elektron 1.0.8? 3> One thing I noticed in your code is that when receiving refreshmsg from ema callback, the app uses: if (!msg.name().equals(context.getSubject(). getItem()))

But when the app sends request, the app uses: consumer.registerClient(EmaFactory.createReqMsg() .serviceName(subject.getService()) .name(subject.getOMMItem()) .interestAfterRefresh(false), dispatchingClient, context);

msg.name() set from subject.getOMMItem() compares with context.getSubject().getItem(). Is this intentional and are these two methods expected to provide the same string back?

4> Can you take a look at the test app I created, try to run in your environment to see if there is an similar issue exist? If you can point out the difference between our test app doing and your app doing, it may help us to identify the issue quickly.

Thank you very much for your support and feedback.

Consumer.java.txt

thomasthebaker commented 7 years ago

Hi Maydu, Thanks for looking into this. My replies below:

1) The classes used for the closures are immutable. com.mi.ahl.pubsub.elektron.ChainSnapshotContext$LinkContext holds a final reference to the RmdsSubject and RmdsSubject is immutable. So they are not being changed.

2) Yes, I used the same test application and just switched the Elektron version in my maven pom.xml. I've deployed the Elektron artifacts to our internal maven repository. Incidentally, it would be nice if you could use maven for Elektron SDK.

  1. getItem() and getOMMItem() provide the same value when there isn't an exchange (which there isn't in this case). I only added the diff lines to find the bug, you are right that they should be the same, but in this instance they are equivalent.

4) I've updated your code to send requests in different threads. This is now showing that the wrong closures are being sent back. If I switch Elektron back to 1.0.8 the closures are fine.

2017-05-11 17:20:34.933 [INFO ] [main] access.OmmConsumerImpl - loggerMsg
    ClientName: ChannelCallbackClient
    Severity: Info
    Text:    Received ChannelUp event on channel Channel_1
    Instance Name AHLConsumer_1
    Component Version ads2.6.9.L1.linux.tis.rrg 64-bit
loggerMsgEnd

Send Request: Item Name: JGB1460F7   Closure : 1218384854
Send Request: Item Name: 0#JGB+   Closure : 1263469024
###########Wrong closure received####################
Received Refresh: Item Name: JGB1460F7  Closure : 1263469024   Closure item Name: 0#JGB+
Received Refresh: Item Name: 0#JGB+  Closure : 1263469024   Closure item Name: 0#JGB+
total refresh count: 1  total status count: 0   invalid refresh count: 1
total refresh count: 1  total status count: 0   invalid refresh count: 1
total refresh count: 1  total status count: 0   invalid refresh count: 1
Send Request: Item Name: 0#JGB+   Closure : 1459736983
Send Request: Item Name: JGB1460F7   Closure : 1745116157
Received Refresh: Item Name: JGB1460F7  Closure : 1745116157   Closure item Name: JGB1460F7
###########Wrong closure received####################
Received Refresh: Item Name: 0#JGB+  Closure : 1745116157   Closure item Name: JGB1460F7
total refresh count: 2  total status count: 0   invalid refresh count: 2
total refresh count: 2  total status count: 0   invalid refresh count: 2
total refresh count: 2  total status count: 0   invalid refresh count: 2
Send Request: Item Name: 0#JGB+   Closure : 93698541
Send Request: Item Name: JGB1460F7   Closure : 410398520
Received Refresh: Item Name: JGB1460F7  Closure : 410398520   Closure item Name: JGB1460F7
###########Wrong closure received####################
Received Refresh: Item Name: 0#JGB+  Closure : 410398520   Closure item Name: JGB1460F7
total refresh count: 3  total status count: 0   invalid refresh count: 3
total refresh count: 3  total status count: 0   invalid refresh count: 3

Process finished with exit code 130 (interrupted by signal 2: SIGINT)

Using 1.0.8 shows no errors after running for a few minutes:

total refresh count: 134    total status count: 0   invalid refresh count: 0

The key thing that is happening in my application is I am registering the client and receiving the message in different threads. You can see the thread names in the log lines I posted ([EventWorker_3] and [pool-11-thread-1]). Just to stress again, it is fine in 1.0.8 and breaks as soon as I move to 1.1.0. This regression showed up in one of my integration tests which have been reliable with 1.0.8 Elektron.

Updated test code: ElektronBug.java.txt

maydu commented 7 years ago

Hi Tom, We can reproduce the issue with the code you sent. We noticed it only happens during multiple threads calling Ema consumer registerClient(). Internally Ema tries to reuse some objects to avoid GC, and there is a thread lock issue on one of these reused objects. There is no problem when using only one thread on registerClient() call.

We created an issue ticket, and will work on to resolve this issue.

If your application design requires to make call on registerClient() by multiple threads, here is one workaround you can try to avoid the issue. But this workaround will create extra objects for GC. Go to Java\Ema\Src\main\java\impl\com\thomsonreuters\ema\access folder to replace one file "ItemCallbackClient.java" with the attached file. Inside this file, you can see the one line we changed for a temporary workaround by searching "Elektron_Bug".

Try it, see if it works for your case. Thank you again for your help and quick responses.

ItemCallbackClient.java.txt

thomasthebaker commented 7 years ago

Hi Maydu, I'm glad that you can reproduce.

I'm concerned that there weren't any tests that exposed this sort of threading issue in the EMA library. None of the test code seems to be in the github repository, it would be good if ThomsonReuters could add the unit and integration tests to github.

I don't want to patch the source code and run a custom version of Elektron, the ant build doesn't integrate well with our maven setup so I will wait on version 1.0.8 until this has been fixed. This project would really benefit from using maven to build it! I could then easily build and deploy a branch and access the documentation and source code in my IDE.

Thanks for your help. Tom

BrianSandri commented 7 years ago

@thomasthebaker Thank you for your patience on this issue. We do have tests that exercise EMA across thread boundaries but none that were pushing it in the specific way you were. We have incorporated this into our regression test suite and appreciate your clear instructions on reproduction. The team is working on a solution and we will commit it to the repo after it is through the QA and performance testing process.

Regarding your comments on sharing our unit and integration tests - we plan to do this and you should begin seeing these start to appear over the next few months. We have been working to better align our internal development git repositories so they are in synch with our github presence. Our hope is that we can have much better transparency as a result. We have to rework some of our tests to function within the structure of our github layout and we will be adding them into the repository as they become available.

With respect to Maven, we have received several requests for this and the team is planning to investigate. No projected timeframes as of yet, but stay tuned.

maydu commented 7 years ago

Tom, We fixed the issue, and generated a Preview branch for you use. We are still working on QA fully regression testing, and will migrate the fix later into the master branch.

Thank you again for helping us to improve our product.

May

maydu commented 7 years ago

Tom, We just integrated our fix into the master. Please pull and try it out. Thanks.

thomasthebaker commented 7 years ago

Hi May, One of the other big things that would be good for you guys to tackle is to mavenise the build. It's a manual process for me to build and upload the jars, repackaging them into maven format. So I don't generally test anything but the main releases. If you guys were using a maven build I could easily integrate it into our environment.... :)

Thanks Tom

On 21 June 2017 at 18:37, may.du notifications@github.com wrote:

Tom, We just integrated our fix into the master. Please pull and try it out. Thanks.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/thomsonreuters/Elektron-SDK/issues/26#issuecomment-310151858, or mute the thread https://github.com/notifications/unsubscribe-auth/ABkrymf_Iusz3E9CIUpV2JCWsVHCtN1Fks5sGVTVgaJpZM4NW7VR .