Closed safinn closed 7 years ago
@AlexBHarley FWIW, I ran into this as well last week (v2.0.6). Figured you could only have one client per thread (no pool support - is there?) so I simply created new client on each of the threads and it worked fine. I ran a test with 200 active threads running concurrently on my Mac receiving data-sync from another publisher thread/client - worked like a charm - purred without spikes or hiccups wrt mem and cpu.
Thanks for the information! Just to be clear, getting an instance of the client in the thread like this:
new Thread(new Runnable() {
@Override
public void run() {
DeepstreamFactory deepstreamFactory = DeepstreamFactory.getInstance();
DeepstreamClient deepstreamClient = deepstreamFactory.getClient();
Record userRecord = deepstreamClient.record.getRecord("user");
userRecord.set("room", "topic");
userRecord.discard();
}
}).start();
Ref your initial post above the client is created outside the thread.
In another activity I try and use Deepstream:
DeepstreamFactory deepstreamFactory = DeepstreamFactory.getInstance();
**deepstreamClient = deepstreamFactory.getClient();**
new Thread(new Runnable() {
@Override
public void run() {
Record userRecord = deepstreamClient.record.getRecord("user");
userRecord.set("room", "topic");
userRecord.discard();
}
}).start();
Sorry, I'm not able to follow. Could you give me an example?
This line from your original post: deepstreamClient = deepstreamFactory.getClient(); is not inside the thread run().
Is the example in my second post correct?
Right idea. You're missing a login(..) as well.
Right I see. So you have to login()
every time?
Move getClient().login() to inside thread run() worked for me.
public class TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads {
public static void main(String[] args) {
System.out.println("TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads Main Method Start .....");
int numWorkers = 200;
for (int tNum = 1; tNum <= numWorkers; tNum++) {
String threadId = "deepstream-subscriber-client-" + tNum;
DeepstreamClientSubscriberDaemonThread t = new DeepstreamClientSubscriberDaemonThread(threadId);
t.setName( threadId );
t.setDaemon(true);
t.start();
}
String threadId = "deepstream-publisher-client-1";
DeepstreamClientPublisherDaemonThread t = new DeepstreamClientPublisherDaemonThread(threadId);
t.setName( threadId );
t.setDaemon(true);
t.start();
try {
Thread.sleep(60 * 1000);
} catch (InterruptedException x) {
}
System.out.println(".... TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads Main Method Exit");
}
}
Yes, inside each thread class run() method you getClient().login(<usr/pwd>)
Example:
public class DeepstreamClientSubscriberDaemonThread extends Thread {
private String threadId;
public DeepstreamClientSubscriberDaemonThread(String threadId){
this.threadId = threadId;
}
public void run() {
Log.info(threadId + " .... DeepstreamClientSubscriberDaemonThread: run() Method Entry ....");
try {
Log.info(threadId + " .... DeepstreamClientSubscriberDaemonThread .... In run Method: currentThread() - " + Thread.currentThread());
try {
String tixTextPath = "value.tickets.ticketTexts";
String eventKey = "PHX|ERVD2";
DeepstreamClient dsClient = DeepstreamFactory.getInstance().getClient("localhost:6020");
String hostname = System.getenv("HOSTNAME");
if (StringUtils.isEmpty(hostname)) {
try {
InetAddress address = InetAddress.getLocalHost();
hostname = address.getCanonicalHostName();
} catch (UnknownHostException e) {
Log.warn("Could not determine a canonical hostname for this instance: " + e.getMessage());
hostname = "unknown";
}
}
JsonObject auth = new JsonObject();
auth.addProperty("username", threadId + ":" + hostname);
LoginResult result = dsClient.login(auth);
if (result.loggedIn()) {
Log.info(threadId + " .... Deepstream: Login succeeded!");
} else {
Log.error(threadId + " .... ERROR Deepstream: login: " + result.getErrorEvent().toString());
}
Log.info(threadId + " .... GET Deepstream Event (" + eventKey + ") for SUBSCRIBING to REAL-TIME updates onRecChg and onRecPropIdChg for " + tixTextPath);
Record dsEventRec = dsClient.record.getRecord("events/" + eventKey);
// define callback support
RecordChangedCallback recordChangedCallback = new RecordChangedCallback() {
boolean trigger = true;
@Override
public void onRecordChanged(String name, JsonElement data) {
StringBuffer sb = new StringBuffer();
sb.append("Event Record Changed Subscription UPDATE Received (" + name + ")================================== is initial value: " + trigger);
//sb.append("\n... NAME arg ............................. >> " + name);
//sb.append("\n... DATA arg (data.toString() ............ >> " + data.toString());
sb.append("\n... Deepstream Event Record Version # .... >> " + dsEventRec.version());
sb.append("\n");
Log.info(threadId + " .... " + sb.toString());
trigger = trigger ? false : false;
}
};
RecordPathChangedCallback recordPathChangedCallback = new RecordPathChangedCallback() {
boolean trigger = true;
@Override
public void onRecordPathChanged(String name, String path, JsonElement data) {
StringBuffer sb = new StringBuffer();
sb.append("Event Path/PropId Changed Subscription UPDATE Received for (" + name + " : " + path + ") ------------------------ is initial value: " + trigger);
//sb.append("\n... NAME arg ............................. >> " + name);
//sb.append("\n... PATH arg (propId) .................... >> " + path);
sb.append("\n... DATA arg (data.toString() ............ >> " + data.toString());
sb.append("\n... Deepstream Event Record Version # .... >> " + dsEventRec.version());
sb.append("\n");
Log.info(threadId + sb.toString());
trigger = trigger ? false : false;
}
};
// SUBSCRIBE to deepstream event rec chgs and rec propid chgs callbacks
Log.info( threadId + " .... SUBSCRIBING to any Event Record change updates for Event = " + eventKey);
dsEventRec.subscribe(recordChangedCallback, true);
Log.info( threadId + " .... SUBSCRIBING to Event Path/PropId change updates for Event = " + eventKey + " and Path/PropId = " + tixTextPath);
dsEventRec.subscribe(tixTextPath, recordPathChangedCallback, true);
} catch (Exception e) {
e.printStackTrace();
}
while (true) {
try {
Thread.sleep(10);
} catch (InterruptedException x) {
}
//Log.info(threadId + " .... DeepstreamClientSubscriberDaemonThread in run() method .... " + Thread.currentThread());
}
} finally {
Log.info(threadId + " .... DeepstreamClientSubscriberDaemonThread: run() Method Exit");
}
}
@Override
public String toString(){
return "dsSubscriber-ThreadId ==> " + this.threadId;
}
}
To complete the test classes here is the publisher thread (only one instance needed):
public class DeepstreamClientPublisherDaemonThread extends Thread {
private String threadId;
private int updateCnt = 1;
public DeepstreamClientPublisherDaemonThread(String threadId){
this.threadId = threadId;
}
public void run() {
Log.info(threadId + " .... DeepstreamClientPublisherDaemonThread: run() Method Entry ....");
try {
Log.info(threadId + " .... DeepstreamClientPublisherDaemonThread .... In run Method: currentThread() - " + Thread.currentThread());
String tixTextPath = "value.tickets.ticketTexts";
String eventKey = "PHX|ERVD2";
DeepstreamClient dsClient = DeepstreamFactory.getInstance().getClient("localhost:6020");
String hostname = System.getenv("HOSTNAME");
if (StringUtils.isEmpty(hostname)) {
try {
InetAddress address = InetAddress.getLocalHost();
hostname = address.getCanonicalHostName();
} catch (UnknownHostException e) {
Log.warn("Could not determine a canonical hostname for this instance: " + e.getMessage());
hostname = "unknown";
}
}
JsonObject auth = new JsonObject();
auth.addProperty("username", threadId + ":" + hostname);
LoginResult result = dsClient.login(auth);
if (result.loggedIn()) {
Log.info(threadId + " .... Deepstream: Login succeeded!");
} else {
Log.error(threadId + " .... ERROR Deepstream: login: " + result.getErrorEvent().toString());
}
Log.info(threadId + " .... GET Deepstream Event (" + eventKey + ") to publish updates to " + tixTextPath);
Record dsEventRec = dsClient.record.getRecord("events/" + eventKey);
while (true) {
try {
JsonObject newTixTextObj = new JsonObject();
newTixTextObj.addProperty("1","tt-1-rev-" + updateCnt);
newTixTextObj.addProperty("2","tt-2-rev-" + updateCnt);
newTixTextObj.addProperty("3","tt-3-rev-" + updateCnt);
newTixTextObj.addProperty("4","tt-4-rev-" + updateCnt);
newTixTextObj.addProperty("5","tt-5-rev-" + updateCnt);
newTixTextObj.addProperty("6","tt-6-rev-" + updateCnt);
Log.info( threadId + " .... PUBLISHING Deepstream record updates for Event = " + eventKey + " for path: " + tixTextPath + " and update interval value: " + updateCnt);
dsEventRec.set(tixTextPath, newTixTextObj);
updateCnt++;
Thread.sleep(1000);
} catch (InterruptedException x) {
}
//Log.info(threadId + " .... DeepstreamClientPublisherDaemonThread in run() method .... " + Thread.currentThread());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
Log.info(threadId + " .... DeepstreamClientPublisherDaemonThread: run() Method Exit");
}
}
@Override
public String toString(){
return "dsPublisher-ThreadId ==> " + this.threadId;
}
}
@honorcode
Figured you could only have one client per thread (no pool support - is there?) so I simply created new client on each of the threads and it worked fine.
It uses threads under the hood.. the way it works is whenever you get a record it creates a countdownlatch which releases the thread once its complete. Creating a client per thread is quite expensive on the server as well as the client, but seeing the rest of your examples I don't think that is what you meant!
Can you clarify, interested in any suggestions to how we can improve threading!
So then @yasserf, is the behavior I and @safinn observed a bug? Trying to use the same client instance across threads hangs?
Do you use a single client-side keep-alive ws connection under the hood? A pool of client-side ws connections? Is client ws used async/sync w server? blocking?
I have followed the Android Chat example (https://deepstreamhub.com/tutorials/example-apps/android-chat-app/) and in my first activity I create a client and perform the login:
DeepstreamFactory factory = DeepstreamFactory.getInstance();
try {
DeepstreamClient deepstreamClient = factory.getClient("wss://...");
deepstreamClient.login();
} catch (URISyntaxException uriSyntaxException) {
}
// Start new Activity
In the new activity I start after logging in with Deepstream, I get the client but don't log in and store it in a class variable and re-use it in different threads performing different actions:
try {
deepstreamClient = DeepstreamFactory.getInstance().getClient("wss://...");
} catch (URISyntaxException uriSyntaxException) {
uriSyntaxException.printStackTrace();
}
It seems to work well the first time I enter the Activity but when I move between activities getting the client I get this error:
FATAL EXCEPTION: pool-1-thread-1
java.lang.NullPointerException: Attempt to invoke virtual method 'boolean io.deepstream.Event.equals(java.lang.Object)' on a null object reference
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
@honorcode It is one non blocking websocket connection under the hood, all with async callbacks. Having multiple connections actually won't speed anything up ( unless you have EXTREME throughput ) as network reads and writes are usually the most expensive aspect.
The experience you and safin experienced seems to be a bug. I'm reopenning this issue because of that.
@safinn is there a bigger stack trace by any chance? The onError callback is used everywhere!
The following is an successful effort to reproduce this state: Tested with deepstream 2.1.6, java-client 2.0.6, localhost:6020, cache:redis:localhost, rethinkdb:localhost 3 files: 1) test main - set num workers=threads=subscribers (only one publisher thread) (currently set to 3) 2) above ... runs ... single publisher daemon thread 3) above ... runs ... <num config'd subscribers> subscriber daemon threads
Files listed separately below:
package com.ticketmaster.bedrock.rapidoid.iris;
import com.google.gson.JsonObject;
import io.deepstream.DeepstreamClient;
import io.deepstream.DeepstreamFactory;
import io.deepstream.LoginResult;
import org.apache.commons.lang3.StringUtils;
import org.rapidoid.log.Log;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
public class TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2 {
public static void main(String[] args) {
System.out.println("TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2 Main Method Start .....");
DeepstreamClient dsClient = null;
try {
dsClient = DeepstreamFactory.getInstance().getClient("localhost:6020");
String hostname = System.getenv("HOSTNAME");
if (StringUtils.isEmpty(hostname)) {
try {
InetAddress address = InetAddress.getLocalHost();
hostname = address.getCanonicalHostName();
} catch (UnknownHostException e) {
Log.warn("Could not determine a canonical hostname for this instance: " + e.getMessage());
hostname = "unknown";
}
}
JsonObject auth = new JsonObject();
auth.addProperty("username", ":" + hostname);
LoginResult result = dsClient.login(auth);
if (result.loggedIn()) {
Log.info(" .... Deepstream: Login succeeded!");
} else {
Log.error(" .... ERROR Deepstream: login: " + result.getErrorEvent().toString());
}
} catch (URISyntaxException e) {
e.printStackTrace();
System.exit(1);
}
int numWorkers = 3;
for (int tNum = 1; tNum <= numWorkers; tNum++) {
String threadId = "deepstream-subscriber-client-" + tNum;
DeepstreamClientSubscriberDaemonThread2 t = new DeepstreamClientSubscriberDaemonThread2(threadId, dsClient);
t.setName( threadId );
t.setDaemon(true);
t.start();
}
String threadId = "deepstream-publisher-client-1";
DeepstreamClientPublisherDaemonThread2 t = new DeepstreamClientPublisherDaemonThread2(threadId, dsClient);
t.setName( threadId );
t.setDaemon(true);
t.start();
try {
Thread.sleep(60 * 1000);
} catch (InterruptedException x) {
}
System.out.println(".... TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads Main Method Exit");
}
}
package com.ticketmaster.bedrock.rapidoid.iris;
import com.google.gson.JsonObject;
import io.deepstream.DeepstreamClient;
import io.deepstream.Record;
import org.rapidoid.log.Log;
public class DeepstreamClientPublisherDaemonThread2 extends Thread {
private String threadId;
private DeepstreamClient dsClient;
private int updateCnt = 1;
public DeepstreamClientPublisherDaemonThread2(String threadId, DeepstreamClient dsClient){
this.dsClient = dsClient;
this.threadId = threadId;
}
public void run() {
Log.info(threadId + " .... DeepstreamClientPublisherDaemonThread2: run() Method Entry ....");
try {
Log.info(threadId + " .... DeepstreamClientPublisherDaemonThread2 .... In run Method: currentThread() - " + Thread.currentThread());
String tixTextPath = "value.tickets.ticketTexts";
String eventKey = "PHX|ERVD2";
Log.info(threadId + " .... GET Deepstream Event (" + eventKey + ") to publish updates to " + tixTextPath);
Record dsEventRec = dsClient.record.getRecord("events/" + eventKey);
while (true) {
try {
JsonObject newTixTextObj = new JsonObject();
newTixTextObj.addProperty("1","tt-1-rev-" + updateCnt);
newTixTextObj.addProperty("2","tt-2-rev-" + updateCnt);
newTixTextObj.addProperty("3","tt-3-rev-" + updateCnt);
newTixTextObj.addProperty("4","tt-4-rev-" + updateCnt);
newTixTextObj.addProperty("5","tt-5-rev-" + updateCnt);
newTixTextObj.addProperty("6","tt-6-rev-" + updateCnt);
Log.info( threadId + " .... PUBLISHING Deepstream record updates for Event = " + eventKey + " for path: " + tixTextPath + " and update interval value: " + updateCnt);
dsEventRec.set(tixTextPath, newTixTextObj);
updateCnt++;
Thread.sleep(1000);
} catch (InterruptedException x) {
}
//Log.info(threadId + " .... DeepstreamClientPublisherDaemonThread2 in run() method .... " + Thread.currentThread());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
Log.info(threadId + " .... DeepstreamClientPublisherDaemonThread2: run() Method Exit");
}
}
@Override
public String toString(){
return "dsPublisher-ThreadId ==> " + this.threadId;
}
}
package com.ticketmaster.bedrock.rapidoid.iris;
import com.google.gson.JsonElement;
import io.deepstream.DeepstreamClient;
import io.deepstream.Record;
import io.deepstream.RecordChangedCallback;
import io.deepstream.RecordPathChangedCallback;
import org.rapidoid.log.Log;
public class DeepstreamClientSubscriberDaemonThread2 extends Thread {
private String threadId;
private DeepstreamClient dsClient;
public DeepstreamClientSubscriberDaemonThread2(String threadId, DeepstreamClient dsClient){
this.dsClient = dsClient;
this.threadId = threadId;
}
public void run() {
Log.info(threadId + " .... DeepstreamClientSubscriberDaemonThread2: run() Method Entry ....");
try {
Log.info(threadId + " .... DeepstreamClientSubscriberDaemonThread2 .... In run Method: currentThread() - " + Thread.currentThread());
try {
String tixTextPath = "value.tickets.ticketTexts";
String eventKey = "PHX|ERVD2";
Log.info(threadId + " .... GET Deepstream Event (" + eventKey + ") for SUBSCRIBING to REAL-TIME updates onRecChg and onRecPropIdChg for " + tixTextPath);
Record dsEventRec = dsClient.record.getRecord("events/" + eventKey);
// define callback support
RecordChangedCallback recordChangedCallback = new RecordChangedCallback() {
boolean trigger = true;
@Override
public void onRecordChanged(String name, JsonElement data) {
StringBuffer sb = new StringBuffer();
sb.append("Event Record Changed Subscription UPDATE Received (" + name + ")================================== is initial value: " + trigger);
//sb.append("\n... NAME arg ............................. >> " + name);
//sb.append("\n... DATA arg (data.toString() ............ >> " + data.toString());
sb.append("\n... Deepstream Event Record Version # .... >> " + dsEventRec.version());
sb.append("\n");
Log.info(threadId + " .... " + sb.toString());
trigger = trigger ? false : false;
}
};
RecordPathChangedCallback recordPathChangedCallback = new RecordPathChangedCallback() {
boolean trigger = true;
@Override
public void onRecordPathChanged(String name, String path, JsonElement data) {
StringBuffer sb = new StringBuffer();
sb.append("Event Path/PropId Changed Subscription UPDATE Received for (" + name + " : " + path + ") ------------------------ is initial value: " + trigger);
//sb.append("\n... NAME arg ............................. >> " + name);
//sb.append("\n... PATH arg (propId) .................... >> " + path);
sb.append("\n... DATA arg (data.toString() ............ >> " + data.toString());
sb.append("\n... Deepstream Event Record Version # .... >> " + dsEventRec.version());
sb.append("\n");
Log.info(threadId + sb.toString());
trigger = trigger ? false : false;
}
};
// SUBSCRIBE to deepstream event rec chgs and rec propid chgs callbacks
Log.info( threadId + " .... SUBSCRIBING to any Event Record change updates for Event = " + eventKey);
dsEventRec.subscribe(recordChangedCallback, true);
Log.info( threadId + " .... SUBSCRIBING to Event Path/PropId change updates for Event = " + eventKey + " and Path/PropId = " + tixTextPath);
dsEventRec.subscribe(tixTextPath, recordPathChangedCallback, true);
} catch (Exception e) {
e.printStackTrace();
}
while (true) {
try {
Thread.sleep(10);
} catch (InterruptedException x) {
}
//Log.info(threadId + " .... DeepstreamClientSubscriberDaemonThread2 in run() method .... " + Thread.currentThread());
}
} finally {
Log.info(threadId + " .... DeepstreamClientSubscriberDaemonThread2: run() Method Exit");
}
}
@Override
public String toString(){
return "dsSubscriber-ThreadId ==> " + this.threadId;
}
}
Output:
/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=61882:/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/tools.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Users/rogervandusen/dev/rapidoid/rapidoid-demo-web-client-server-iris/target/test-classes:/Users/rogervandusen/dev/rapidoid/rapidoid-demo-web-client-server-iris/target/classes:/Users/rogervandusen/.m2/repository/io/deepstream/deepstream.io-client-java/2.0.6/deepstream.io-client-java-2.0.6.jar:/Users/rogervandusen/.m2/repository/org/java-websocket/Java-WebSocket/1.3.0/Java-WebSocket-1.3.0.jar:/Users/rogervandusen/.m2/repository/com/google/j2objc/j2objc-annotations/1.1/j2objc-annotations-1.1.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-web/5.1.9/rapidoid-web-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-gui/5.1.9/rapidoid-gui-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-html/5.1.9/rapidoid-html-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-render/5.1.9/rapidoid-render-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-jpa/5.1.9/rapidoid-jpa-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-http-server/5.1.9/rapidoid-http-server-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-http-fast/5.1.9/rapidoid-http-fast-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-net/5.1.9/rapidoid-net-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-buffer/5.1.9/rapidoid-buffer-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-inject/5.1.9/rapidoid-inject-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-http-client/5.1.9/rapidoid-http-client-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-commons/5.1.9/rapidoid-commons-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/javassist/javassist/3.20.0-GA/javassist-3.20.0-GA.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/module/jackson-module-afterburner/2.6.6/jackson-module-afterburner-2.6.6.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/dataformat/jackson-dataformat-yaml/2.6.6/jackson-dataformat-yaml-2.6.6.jar:/Users/rogervandusen/.m2/repository/javax/inject/javax.inject/1/javax.inject-1.jar:/Users/rogervandusen/.m2/repository/org/apache/httpcomponents/httpasyncclient/4.1/httpasyncclient-4.1.jar:/Users/rogervandusen/.m2/repository/org/apache/httpcomponents/httpcore/4.4.1/httpcore-4.4.1.jar:/Users/rogervandusen/.m2/repository/org/apache/httpcomponents/httpcore-nio/4.4.1/httpcore-nio-4.4.1.jar:/Users/rogervandusen/.m2/repository/org/apache/httpcomponents/httpclient/4.4.1/httpclient-4.4.1.jar:/Users/rogervandusen/.m2/repository/org/apache/httpcomponents/httpmime/4.5/httpmime-4.5.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-watch/5.1.9/rapidoid-watch-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-integrate/5.1.9/rapidoid-integrate-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-oauth/5.1.9/rapidoid-oauth-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/apache/oltu/oauth2/org.apache.oltu.oauth2.client/1.0.1/org.apache.oltu.oauth2.client-1.0.1.jar:/Users/rogervandusen/.m2/repository/org/apache/oltu/oauth2/org.apache.oltu.oauth2.common/1.0.1/org.apache.oltu.oauth2.common-1.0.1.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-fluent/5.1.9/rapidoid-fluent-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-essentials/5.1.9/rapidoid-essentials-5.1.9.jar:/Users/rogervandusen/.m2/repository/junit/junit/4.12/junit-4.12.jar:/Users/rogervandusen/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/iris-datagrid-client/1.6.0-rvd-deepstream-SNAPSHOT/iris-datagrid-client-1.6.0-rvd-deepstream-SNAPSHOT.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/iris-rxbus-rxjava-eventbus/1.0.0-SNAPSHOT/iris-rxbus-rxjava-eventbus-1.0.0-SNAPSHOT.jar:/Users/rogervandusen/.m2/repository/io/reactivex/rxjava2/rxjava/2.0.8/rxjava-2.0.8.jar:/Users/rogervandusen/.m2/repository/org/reactivestreams/reactive-streams/1.0.0/reactive-streams-1.0.0.jar:/Users/rogervandusen/.m2/repository/org/apache/geode/geode-core/1.1.0/geode-core-1.1.0.jar:/Users/rogervandusen/.m2/repository/com/github/stephenc/findbugs/findbugs-annotations/1.3.9-1/findbugs-annotations-1.3.9-1.jar:/Users/rogervandusen/.m2/repository/org/jgroups/jgroups/3.6.10.Final/jgroups-3.6.10.Final.jar:/Users/rogervandusen/.m2/repository/antlr/antlr/2.7.7/antlr-2.7.7.jar:/Users/rogervandusen/.m2/repository/commons-io/commons-io/2.3/commons-io-2.3.jar:/Users/rogervandusen/.m2/repository/commons-lang/commons-lang/2.5/commons-lang-2.5.jar:/Users/rogervandusen/.m2/repository/it/unimi/dsi/fastutil/7.0.2/fastutil-7.0.2.jar:/Users/rogervandusen/.m2/repository/javax/resource/javax.resource-api/1.7/javax.resource-api-1.7.jar:/Users/rogervandusen/.m2/repository/javax/transaction/javax.transaction-api/1.2/javax.transaction-api-1.2.jar:/Users/rogervandusen/.m2/repository/net/java/dev/jna/jna/4.0.0/jna-4.0.0.jar:/Users/rogervandusen/.m2/repository/net/sf/jopt-simple/jopt-simple/5.0.1/jopt-simple-5.0.1.jar:/Users/rogervandusen/.m2/repository/org/apache/logging/log4j/log4j-api/2.6.1/log4j-api-2.6.1.jar:/Users/rogervandusen/.m2/repository/org/apache/logging/log4j/log4j-core/2.6.1/log4j-core-2.6.1.jar:/Users/rogervandusen/.m2/repository/org/apache/shiro/shiro-core/1.3.1/shiro-core-1.3.1.jar:/Users/rogervandusen/.m2/repository/commons-beanutils/commons-beanutils/1.8.3/commons-beanutils-1.8.3.jar:/Users/rogervandusen/.m2/repository/org/apache/geode/geode-common/1.1.0/geode-common-1.1.0.jar:/Users/rogervandusen/.m2/repository/org/apache/geode/geode-json/1.1.0/geode-json-1.1.0.jar:/Users/rogervandusen/.m2/repository/com/google/guava/guava/21.0/guava-21.0.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter/1.4.2.RELEASE/spring-boot-starter-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot/1.4.2.RELEASE/spring-boot-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-configuration-processor/1.4.2.RELEASE/spring-boot-configuration-processor-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/json/json/20140107/json-20140107.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/1.4.2.RELEASE/spring-boot-autoconfigure-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/hateoas/spring-hateoas/0.20.0.RELEASE/spring-hateoas-0.20.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-webmvc/4.1.7.RELEASE/spring-webmvc-4.1.7.RELEASE.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/bhs-model/1.2.5/bhs-model-1.2.5.jar:/Users/rogervandusen/.m2/repository/org/projectlombok/lombok/1.16.6/lombok-1.16.6.jar:/Users/rogervandusen/.m2/repository/org/slf4j/slf4j-api/1.7.21/slf4j-api-1.7.21.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/platform/metrics/tmps-metrics-core/2.6.7/tmps-metrics-core-2.6.7.jar:/Users/rogervandusen/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.8.1/jackson-core-asl-1.8.1.jar:/Users/rogervandusen/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.8.1/jackson-mapper-asl-1.8.1.jar:/Users/rogervandusen/.m2/repository/com/codahale/metrics/metrics-jvm/3.0.0/metrics-jvm-3.0.0.jar:/Users/rogervandusen/.m2/repository/com/codahale/metrics/metrics-core/3.0.0/metrics-core-3.0.0.jar:/Users/rogervandusen/.m2/repository/org/hamcrest/hamcrest-library/1.3/hamcrest-library-1.3.jar:/Users/rogervandusen/.m2/repository/joda-time/joda-time/2.2/joda-time-2.2.jar:/Users/rogervandusen/.m2/repository/org/aspectj/aspectjrt/1.8.0/aspectjrt-1.8.0.jar:/Users/rogervandusen/.m2/repository/org/aspectj/aspectjweaver/1.8.0/aspectjweaver-1.8.0.jar:/Users/rogervandusen/.m2/repository/cglib/cglib-nodep/2.2.2/cglib-nodep-2.2.2.jar:/Users/rogervandusen/.m2/repository/javax/measure/jsr-275/0.9.1/jsr-275-0.9.1.jar:/Users/rogervandusen/.m2/repository/commons-collections/commons-collections/20040616/commons-collections-20040616.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-beans/4.2.6.RELEASE/spring-beans-4.2.6.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-context-support/4.2.6.RELEASE/spring-context-support-4.2.6.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/apache/kafka/kafka_2.11/0.10.1.0/kafka_2.11-0.10.1.0.jar:/Users/rogervandusen/.m2/repository/org/apache/kafka/kafka-clients/0.10.1.0/kafka-clients-0.10.1.0.jar:/Users/rogervandusen/.m2/repository/net/jpountz/lz4/lz4/1.3.0/lz4-1.3.0.jar:/Users/rogervandusen/.m2/repository/org/xerial/snappy/snappy-java/1.1.2.6/snappy-java-1.1.2.6.jar:/Users/rogervandusen/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/Users/rogervandusen/.m2/repository/org/scala-lang/scala-library/2.11.8/scala-library-2.11.8.jar:/Users/rogervandusen/.m2/repository/com/101tec/zkclient/0.9/zkclient-0.9.jar:/Users/rogervandusen/.m2/repository/org/apache/zookeeper/zookeeper/3.4.8/zookeeper-3.4.8.jar:/Users/rogervandusen/.m2/repository/jline/jline/0.9.94/jline-0.9.94.jar:/Users/rogervandusen/.m2/repository/io/netty/netty/3.7.0.Final/netty-3.7.0.Final.jar:/Users/rogervandusen/.m2/repository/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.4/scala-parser-combinators_2.11-1.0.4.jar:/Users/rogervandusen/.m2/repository/org/apache/kafka/kafka-streams/0.10.1.0/kafka-streams-0.10.1.0.jar:/Users/rogervandusen/.m2/repository/org/apache/kafka/connect-json/0.10.1.0/connect-json-0.10.1.0.jar:/Users/rogervandusen/.m2/repository/org/apache/kafka/connect-api/0.10.1.0/connect-api-0.10.1.0.jar:/Users/rogervandusen/.m2/repository/org/rocksdb/rocksdbjni/4.9.0/rocksdbjni-4.9.0.jar:/Users/rogervandusen/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/rogervandusen/.m2/repository/commons-codec/commons-codec/1.10/commons-codec-1.10.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.7.3/jackson-datatype-jsr310-2.7.3.jar:/Users/rogervandusen/.m2/repository/org/apache/commons/commons-lang3/3.4/commons-lang3-3.4.jar:/Users/rogervandusen/.m2/repository/org/apache/commons/commons-collections4/4.1/commons-collections4-4.1.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/iris-model2/2.24.1-SNAPSHOT/iris-model2-2.24.1-20170327.190038-3.jar:/Users/rogervandusen/.m2/repository/com/wordnik/swagger-annotations/1.3.7/swagger-annotations-1.3.7.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/iris-metadata/1.7.1-SNAPSHOT/iris-metadata-1.7.1-20170328.184214-2.jar:/Users/rogervandusen/.m2/repository/com/github/wnameless/json-flattener/0.3.0/json-flattener-0.3.0.jar:/Users/rogervandusen/.m2/repository/com/eclipsesource/minimal-json/minimal-json/0.9.4/minimal-json-0.9.4.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/iris-model/1.13.0/iris-model-1.13.0.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/host/ttp-protocol/1.34.0/ttp-protocol-1.34.0.jar:/Users/rogervandusen/.m2/repository/io/netty/netty-all/4.1.5.Final/netty-all-4.1.5.Final.jar:/Users/rogervandusen/.m2/repository/org/reflections/reflections/0.9.11/reflections-0.9.11.jar:/Users/rogervandusen/.m2/repository/org/javatuples/javatuples/1.2/javatuples-1.2.jar:/Users/rogervandusen/.m2/repository/ch/qos/logback/logback-classic/1.2.1/logback-classic-1.2.1.jar:/Users/rogervandusen/.m2/repository/ch/qos/logback/logback-core/1.2.1/logback-core-1.2.1.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter-ws/1.4.2.RELEASE/spring-boot-starter-ws-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter-web/1.4.2.RELEASE/spring-boot-starter-web-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/hibernate/hibernate-validator/5.2.4.Final/hibernate-validator-5.2.4.Final.jar:/Users/rogervandusen/.m2/repository/javax/validation/validation-api/1.1.0.Final/validation-api-1.1.0.Final.jar:/Users/rogervandusen/.m2/repository/org/jboss/logging/jboss-logging/3.2.1.Final/jboss-logging-3.2.1.Final.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/classmate/1.1.0/classmate-1.1.0.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-oxm/4.3.4.RELEASE/spring-oxm-4.3.4.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/ws/spring-ws-core/2.3.1.RELEASE/spring-ws-core-2.3.1.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/ws/spring-xml/2.3.1.RELEASE/spring-xml-2.3.1.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter-tomcat/1.4.2.RELEASE/spring-boot-starter-tomcat-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/apache/tomcat/embed/tomcat-embed-core/8.5.6/tomcat-embed-core-8.5.6.jar:/Users/rogervandusen/.m2/repository/org/apache/tomcat/embed/tomcat-embed-el/8.5.6/tomcat-embed-el-8.5.6.jar:/Users/rogervandusen/.m2/repository/org/apache/tomcat/embed/tomcat-embed-websocket/8.5.6/tomcat-embed-websocket-8.5.6.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter-actuator/1.4.2.RELEASE/spring-boot-starter-actuator-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-actuator/1.4.2.RELEASE/spring-boot-actuator-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/cloud/spring-cloud-config-client/1.2.2.RELEASE/spring-cloud-config-client-1.2.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/cloud/spring-cloud-commons/1.1.5.RELEASE/spring-cloud-commons-1.1.5.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/cloud/spring-cloud-context/1.1.5.RELEASE/spring-cloud-context-1.1.5.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/security/spring-security-rsa/1.0.3.RELEASE/spring-security-rsa-1.0.3.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/security/spring-security-crypto/3.2.8.RELEASE/spring-security-crypto-3.2.8.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/bouncycastle/bcpkix-jdk15on/1.55/bcpkix-jdk15on-1.55.jar:/Users/rogervandusen/.m2/repository/org/bouncycastle/bcprov-jdk15on/1.55/bcprov-jdk15on-1.55.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter-logging/1.4.2.RELEASE/spring-boot-starter-logging-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.21/jcl-over-slf4j-1.7.21.jar:/Users/rogervandusen/.m2/repository/org/slf4j/jul-to-slf4j/1.7.21/jul-to-slf4j-1.7.21.jar:/Users/rogervandusen/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.21/log4j-over-slf4j-1.7.21.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-web/4.3.0.RELEASE/spring-web-4.3.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-aop/4.3.0.RELEASE/spring-aop-4.3.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-context/4.3.0.RELEASE/spring-context-4.3.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-expression/4.3.0.RELEASE/spring-expression-4.3.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-core/4.3.0.RELEASE/spring-core-4.3.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.8.6/jackson-core-2.8.6.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.8.6/jackson-annotations-2.8.6.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.8.6/jackson-databind-2.8.6.jar:/Users/rogervandusen/.m2/repository/com/google/code/gson/gson/2.6.2/gson-2.6.2.jar com.ticketmaster.bedrock.rapidoid.iris.TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2
objc[96274]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/bin/java and /Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/libinstrument.dylib. One of the two will be used. Which one is undefined.
TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2 Main Method Start .....
12:44:29.235 [main] INFO com.ticketmaster.bedrock.rapidoid.iris.TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2 - .... Deepstream: Login succeeded!
12:44:29.240 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... DeepstreamClientSubscriberDaemonThread2: run() Method Entry ....
12:44:29.240 [deepstream-subscriber-client-3] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-3 .... DeepstreamClientSubscriberDaemonThread2: run() Method Entry ....
12:44:29.240 [deepstream-subscriber-client-2] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-2 .... DeepstreamClientSubscriberDaemonThread2: run() Method Entry ....
12:44:29.240 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... DeepstreamClientSubscriberDaemonThread2 .... In run Method: currentThread() - dsSubscriber-ThreadId ==> deepstream-subscriber-client-1
12:44:29.240 [deepstream-subscriber-client-3] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-3 .... DeepstreamClientSubscriberDaemonThread2 .... In run Method: currentThread() - dsSubscriber-ThreadId ==> deepstream-subscriber-client-3
12:44:29.240 [deepstream-subscriber-client-2] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-2 .... DeepstreamClientSubscriberDaemonThread2 .... In run Method: currentThread() - dsSubscriber-ThreadId ==> deepstream-subscriber-client-2
12:44:29.240 [deepstream-subscriber-client-2] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-2 .... GET Deepstream Event (PHX|ERVD2) for SUBSCRIBING to REAL-TIME updates onRecChg and onRecPropIdChg for value.tickets.ticketTexts
12:44:29.240 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... GET Deepstream Event (PHX|ERVD2) for SUBSCRIBING to REAL-TIME updates onRecChg and onRecPropIdChg for value.tickets.ticketTexts
12:44:29.240 [deepstream-subscriber-client-3] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-3 .... GET Deepstream Event (PHX|ERVD2) for SUBSCRIBING to REAL-TIME updates onRecChg and onRecPropIdChg for value.tickets.ticketTexts
12:44:29.240 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... DeepstreamClientPublisherDaemonThread2: run() Method Entry ....
12:44:29.240 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... DeepstreamClientPublisherDaemonThread2 .... In run Method: currentThread() - dsPublisher-ThreadId ==> deepstream-publisher-client-1
12:44:29.241 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... GET Deepstream Event (PHX|ERVD2) to publish updates to value.tickets.ticketTexts
Exception in thread "pool-1-thread-1" java.lang.NullPointerException
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
12:44:29.274 [deepstream-subscriber-client-3] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-3 .... SUBSCRIBING to any Event Record change updates for Event = PHX|ERVD2
Exception in thread "pool-1-thread-2" java.lang.NullPointerException
at io.deepstream.Record.recoverRecord(Record.java:629)
at io.deepstream.Record.applyUpdate(Record.java:589)
at io.deepstream.Record.onMessage(Record.java:520)
at io.deepstream.RecordHandler.handle(RecordHandler.java:297)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-3" java.lang.NullPointerException
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-4" java.lang.NullPointerException
at io.deepstream.Record.recoverRecord(Record.java:629)
at io.deepstream.Record.applyUpdate(Record.java:589)
at io.deepstream.Record.onMessage(Record.java:520)
at io.deepstream.RecordHandler.handle(RecordHandler.java:297)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-5" java.lang.NullPointerException
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-6" java.lang.NullPointerException
at io.deepstream.Record.recoverRecord(Record.java:629)
at io.deepstream.Record.applyUpdate(Record.java:589)
at io.deepstream.Record.onMessage(Record.java:520)
at io.deepstream.RecordHandler.handle(RecordHandler.java:297)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
12:44:29.280 [deepstream-subscriber-client-3] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2$1 - deepstream-subscriber-client-3 .... Event Record Changed Subscription UPDATE Received (events/PHX|ERVD2)================================== is initial value: true
... Deepstream Event Record Version # .... >> 7188
12:44:29.280 [deepstream-subscriber-client-3] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-3 .... SUBSCRIBING to Event Path/PropId change updates for Event = PHX|ERVD2 and Path/PropId = value.tickets.ticketTexts
12:44:29.280 [deepstream-subscriber-client-3] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2$2 - deepstream-subscriber-client-3Event Path/PropId Changed Subscription UPDATE Received for (events/PHX|ERVD2 : value.tickets.ticketTexts) ------------------------ is initial value: true
... DATA arg (data.toString() ............ >> {"1":"tt-1-rev-7","2":"tt-2-rev-7","3":"tt-3-rev-7","4":"tt-4-rev-7","5":"tt-5-rev-7","6":"tt-6-rev-7"}
... Deepstream Event Record Version # .... >> 7188
This seems to be a duplicate of #72
Let me try again after upgrading to 2.0.7 java client.....
Errors still w v2.0.7 java client: output below:
/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=55119:/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/lib/tools.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Users/rogervandusen/dev/rapidoid/rapidoid-demo-web-client-server-iris/target/test-classes:/Users/rogervandusen/dev/rapidoid/rapidoid-demo-web-client-server-iris/target/classes:/Users/rogervandusen/.m2/repository/io/deepstream/deepstream.io-client-java/2.0.7/deepstream.io-client-java-2.0.7.jar:/Users/rogervandusen/.m2/repository/org/java-websocket/Java-WebSocket/1.3.0/Java-WebSocket-1.3.0.jar:/Users/rogervandusen/.m2/repository/com/google/j2objc/j2objc-annotations/1.1/j2objc-annotations-1.1.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-web/5.1.9/rapidoid-web-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-gui/5.1.9/rapidoid-gui-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-html/5.1.9/rapidoid-html-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-render/5.1.9/rapidoid-render-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-jpa/5.1.9/rapidoid-jpa-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-http-server/5.1.9/rapidoid-http-server-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-http-fast/5.1.9/rapidoid-http-fast-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-net/5.1.9/rapidoid-net-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-buffer/5.1.9/rapidoid-buffer-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-inject/5.1.9/rapidoid-inject-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-http-client/5.1.9/rapidoid-http-client-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-commons/5.1.9/rapidoid-commons-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/javassist/javassist/3.20.0-GA/javassist-3.20.0-GA.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/module/jackson-module-afterburner/2.6.6/jackson-module-afterburner-2.6.6.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/dataformat/jackson-dataformat-yaml/2.6.6/jackson-dataformat-yaml-2.6.6.jar:/Users/rogervandusen/.m2/repository/javax/inject/javax.inject/1/javax.inject-1.jar:/Users/rogervandusen/.m2/repository/org/apache/httpcomponents/httpasyncclient/4.1/httpasyncclient-4.1.jar:/Users/rogervandusen/.m2/repository/org/apache/httpcomponents/httpcore/4.4.1/httpcore-4.4.1.jar:/Users/rogervandusen/.m2/repository/org/apache/httpcomponents/httpcore-nio/4.4.1/httpcore-nio-4.4.1.jar:/Users/rogervandusen/.m2/repository/org/apache/httpcomponents/httpclient/4.4.1/httpclient-4.4.1.jar:/Users/rogervandusen/.m2/repository/org/apache/httpcomponents/httpmime/4.5/httpmime-4.5.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-watch/5.1.9/rapidoid-watch-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-integrate/5.1.9/rapidoid-integrate-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-oauth/5.1.9/rapidoid-oauth-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/apache/oltu/oauth2/org.apache.oltu.oauth2.client/1.0.1/org.apache.oltu.oauth2.client-1.0.1.jar:/Users/rogervandusen/.m2/repository/org/apache/oltu/oauth2/org.apache.oltu.oauth2.common/1.0.1/org.apache.oltu.oauth2.common-1.0.1.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-fluent/5.1.9/rapidoid-fluent-5.1.9.jar:/Users/rogervandusen/.m2/repository/org/rapidoid/rapidoid-essentials/5.1.9/rapidoid-essentials-5.1.9.jar:/Users/rogervandusen/.m2/repository/junit/junit/4.12/junit-4.12.jar:/Users/rogervandusen/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/iris-datagrid-client/1.6.0-rvd-deepstream-SNAPSHOT/iris-datagrid-client-1.6.0-rvd-deepstream-20170403.020031-9.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/iris-rxbus-rxjava-eventbus/1.0.0-SNAPSHOT/iris-rxbus-rxjava-eventbus-1.0.0-SNAPSHOT.jar:/Users/rogervandusen/.m2/repository/io/reactivex/rxjava2/rxjava/2.0.8/rxjava-2.0.8.jar:/Users/rogervandusen/.m2/repository/org/reactivestreams/reactive-streams/1.0.0/reactive-streams-1.0.0.jar:/Users/rogervandusen/.m2/repository/org/apache/geode/geode-core/1.1.0/geode-core-1.1.0.jar:/Users/rogervandusen/.m2/repository/com/github/stephenc/findbugs/findbugs-annotations/1.3.9-1/findbugs-annotations-1.3.9-1.jar:/Users/rogervandusen/.m2/repository/org/jgroups/jgroups/3.6.10.Final/jgroups-3.6.10.Final.jar:/Users/rogervandusen/.m2/repository/antlr/antlr/2.7.7/antlr-2.7.7.jar:/Users/rogervandusen/.m2/repository/commons-io/commons-io/2.3/commons-io-2.3.jar:/Users/rogervandusen/.m2/repository/commons-lang/commons-lang/2.5/commons-lang-2.5.jar:/Users/rogervandusen/.m2/repository/it/unimi/dsi/fastutil/7.0.2/fastutil-7.0.2.jar:/Users/rogervandusen/.m2/repository/javax/resource/javax.resource-api/1.7/javax.resource-api-1.7.jar:/Users/rogervandusen/.m2/repository/javax/transaction/javax.transaction-api/1.2/javax.transaction-api-1.2.jar:/Users/rogervandusen/.m2/repository/net/java/dev/jna/jna/4.0.0/jna-4.0.0.jar:/Users/rogervandusen/.m2/repository/net/sf/jopt-simple/jopt-simple/5.0.1/jopt-simple-5.0.1.jar:/Users/rogervandusen/.m2/repository/org/apache/logging/log4j/log4j-api/2.6.1/log4j-api-2.6.1.jar:/Users/rogervandusen/.m2/repository/org/apache/logging/log4j/log4j-core/2.6.1/log4j-core-2.6.1.jar:/Users/rogervandusen/.m2/repository/org/apache/shiro/shiro-core/1.3.1/shiro-core-1.3.1.jar:/Users/rogervandusen/.m2/repository/commons-beanutils/commons-beanutils/1.8.3/commons-beanutils-1.8.3.jar:/Users/rogervandusen/.m2/repository/org/apache/geode/geode-common/1.1.0/geode-common-1.1.0.jar:/Users/rogervandusen/.m2/repository/org/apache/geode/geode-json/1.1.0/geode-json-1.1.0.jar:/Users/rogervandusen/.m2/repository/com/google/guava/guava/21.0/guava-21.0.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter/1.4.2.RELEASE/spring-boot-starter-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot/1.4.2.RELEASE/spring-boot-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-configuration-processor/1.4.2.RELEASE/spring-boot-configuration-processor-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/json/json/20140107/json-20140107.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/1.4.2.RELEASE/spring-boot-autoconfigure-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/hateoas/spring-hateoas/0.20.0.RELEASE/spring-hateoas-0.20.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-webmvc/4.1.7.RELEASE/spring-webmvc-4.1.7.RELEASE.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/bhs-model/1.2.5/bhs-model-1.2.5.jar:/Users/rogervandusen/.m2/repository/org/projectlombok/lombok/1.16.6/lombok-1.16.6.jar:/Users/rogervandusen/.m2/repository/org/slf4j/slf4j-api/1.7.21/slf4j-api-1.7.21.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/platform/metrics/tmps-metrics-core/2.6.7/tmps-metrics-core-2.6.7.jar:/Users/rogervandusen/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.8.1/jackson-core-asl-1.8.1.jar:/Users/rogervandusen/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.8.1/jackson-mapper-asl-1.8.1.jar:/Users/rogervandusen/.m2/repository/com/codahale/metrics/metrics-jvm/3.0.0/metrics-jvm-3.0.0.jar:/Users/rogervandusen/.m2/repository/com/codahale/metrics/metrics-core/3.0.0/metrics-core-3.0.0.jar:/Users/rogervandusen/.m2/repository/org/hamcrest/hamcrest-library/1.3/hamcrest-library-1.3.jar:/Users/rogervandusen/.m2/repository/joda-time/joda-time/2.2/joda-time-2.2.jar:/Users/rogervandusen/.m2/repository/org/aspectj/aspectjrt/1.8.0/aspectjrt-1.8.0.jar:/Users/rogervandusen/.m2/repository/org/aspectj/aspectjweaver/1.8.0/aspectjweaver-1.8.0.jar:/Users/rogervandusen/.m2/repository/cglib/cglib-nodep/2.2.2/cglib-nodep-2.2.2.jar:/Users/rogervandusen/.m2/repository/javax/measure/jsr-275/0.9.1/jsr-275-0.9.1.jar:/Users/rogervandusen/.m2/repository/commons-collections/commons-collections/20040616/commons-collections-20040616.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-beans/4.2.6.RELEASE/spring-beans-4.2.6.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-context-support/4.2.6.RELEASE/spring-context-support-4.2.6.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/apache/kafka/kafka_2.11/0.10.1.0/kafka_2.11-0.10.1.0.jar:/Users/rogervandusen/.m2/repository/org/apache/kafka/kafka-clients/0.10.1.0/kafka-clients-0.10.1.0.jar:/Users/rogervandusen/.m2/repository/net/jpountz/lz4/lz4/1.3.0/lz4-1.3.0.jar:/Users/rogervandusen/.m2/repository/org/xerial/snappy/snappy-java/1.1.2.6/snappy-java-1.1.2.6.jar:/Users/rogervandusen/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/Users/rogervandusen/.m2/repository/org/scala-lang/scala-library/2.11.8/scala-library-2.11.8.jar:/Users/rogervandusen/.m2/repository/com/101tec/zkclient/0.9/zkclient-0.9.jar:/Users/rogervandusen/.m2/repository/org/apache/zookeeper/zookeeper/3.4.8/zookeeper-3.4.8.jar:/Users/rogervandusen/.m2/repository/jline/jline/0.9.94/jline-0.9.94.jar:/Users/rogervandusen/.m2/repository/io/netty/netty/3.7.0.Final/netty-3.7.0.Final.jar:/Users/rogervandusen/.m2/repository/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.4/scala-parser-combinators_2.11-1.0.4.jar:/Users/rogervandusen/.m2/repository/org/apache/kafka/kafka-streams/0.10.1.0/kafka-streams-0.10.1.0.jar:/Users/rogervandusen/.m2/repository/org/apache/kafka/connect-json/0.10.1.0/connect-json-0.10.1.0.jar:/Users/rogervandusen/.m2/repository/org/apache/kafka/connect-api/0.10.1.0/connect-api-0.10.1.0.jar:/Users/rogervandusen/.m2/repository/org/rocksdb/rocksdbjni/4.9.0/rocksdbjni-4.9.0.jar:/Users/rogervandusen/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/rogervandusen/.m2/repository/commons-codec/commons-codec/1.10/commons-codec-1.10.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.7.3/jackson-datatype-jsr310-2.7.3.jar:/Users/rogervandusen/.m2/repository/org/apache/commons/commons-lang3/3.4/commons-lang3-3.4.jar:/Users/rogervandusen/.m2/repository/org/apache/commons/commons-collections4/4.1/commons-collections4-4.1.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/iris-model2/2.24.1-SNAPSHOT/iris-model2-2.24.1-20170327.190038-3.jar:/Users/rogervandusen/.m2/repository/com/wordnik/swagger-annotations/1.3.7/swagger-annotations-1.3.7.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/iris-metadata/1.7.1-SNAPSHOT/iris-metadata-1.7.1-20170328.184214-2.jar:/Users/rogervandusen/.m2/repository/com/github/wnameless/json-flattener/0.3.0/json-flattener-0.3.0.jar:/Users/rogervandusen/.m2/repository/com/eclipsesource/minimal-json/minimal-json/0.9.4/minimal-json-0.9.4.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/bedrock/iris-model/1.13.0/iris-model-1.13.0.jar:/Users/rogervandusen/.m2/repository/com/ticketmaster/host/ttp-protocol/1.34.0/ttp-protocol-1.34.0.jar:/Users/rogervandusen/.m2/repository/io/netty/netty-all/4.1.5.Final/netty-all-4.1.5.Final.jar:/Users/rogervandusen/.m2/repository/org/reflections/reflections/0.9.11/reflections-0.9.11.jar:/Users/rogervandusen/.m2/repository/org/javatuples/javatuples/1.2/javatuples-1.2.jar:/Users/rogervandusen/.m2/repository/ch/qos/logback/logback-classic/1.2.1/logback-classic-1.2.1.jar:/Users/rogervandusen/.m2/repository/ch/qos/logback/logback-core/1.2.1/logback-core-1.2.1.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter-ws/1.4.2.RELEASE/spring-boot-starter-ws-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter-web/1.4.2.RELEASE/spring-boot-starter-web-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/hibernate/hibernate-validator/5.2.4.Final/hibernate-validator-5.2.4.Final.jar:/Users/rogervandusen/.m2/repository/javax/validation/validation-api/1.1.0.Final/validation-api-1.1.0.Final.jar:/Users/rogervandusen/.m2/repository/org/jboss/logging/jboss-logging/3.2.1.Final/jboss-logging-3.2.1.Final.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/classmate/1.1.0/classmate-1.1.0.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-oxm/4.3.4.RELEASE/spring-oxm-4.3.4.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/ws/spring-ws-core/2.3.1.RELEASE/spring-ws-core-2.3.1.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/ws/spring-xml/2.3.1.RELEASE/spring-xml-2.3.1.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter-tomcat/1.4.2.RELEASE/spring-boot-starter-tomcat-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/apache/tomcat/embed/tomcat-embed-core/8.5.6/tomcat-embed-core-8.5.6.jar:/Users/rogervandusen/.m2/repository/org/apache/tomcat/embed/tomcat-embed-el/8.5.6/tomcat-embed-el-8.5.6.jar:/Users/rogervandusen/.m2/repository/org/apache/tomcat/embed/tomcat-embed-websocket/8.5.6/tomcat-embed-websocket-8.5.6.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter-actuator/1.4.2.RELEASE/spring-boot-starter-actuator-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-actuator/1.4.2.RELEASE/spring-boot-actuator-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/cloud/spring-cloud-config-client/1.2.2.RELEASE/spring-cloud-config-client-1.2.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/cloud/spring-cloud-commons/1.1.5.RELEASE/spring-cloud-commons-1.1.5.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/cloud/spring-cloud-context/1.1.5.RELEASE/spring-cloud-context-1.1.5.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/security/spring-security-rsa/1.0.3.RELEASE/spring-security-rsa-1.0.3.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/security/spring-security-crypto/3.2.8.RELEASE/spring-security-crypto-3.2.8.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/bouncycastle/bcpkix-jdk15on/1.55/bcpkix-jdk15on-1.55.jar:/Users/rogervandusen/.m2/repository/org/bouncycastle/bcprov-jdk15on/1.55/bcprov-jdk15on-1.55.jar:/Users/rogervandusen/.m2/repository/org/springframework/boot/spring-boot-starter-logging/1.4.2.RELEASE/spring-boot-starter-logging-1.4.2.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.21/jcl-over-slf4j-1.7.21.jar:/Users/rogervandusen/.m2/repository/org/slf4j/jul-to-slf4j/1.7.21/jul-to-slf4j-1.7.21.jar:/Users/rogervandusen/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.21/log4j-over-slf4j-1.7.21.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-web/4.3.0.RELEASE/spring-web-4.3.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-aop/4.3.0.RELEASE/spring-aop-4.3.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-context/4.3.0.RELEASE/spring-context-4.3.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-expression/4.3.0.RELEASE/spring-expression-4.3.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/org/springframework/spring-core/4.3.0.RELEASE/spring-core-4.3.0.RELEASE.jar:/Users/rogervandusen/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.8.6/jackson-core-2.8.6.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.8.6/jackson-annotations-2.8.6.jar:/Users/rogervandusen/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.8.6/jackson-databind-2.8.6.jar:/Users/rogervandusen/.m2/repository/com/google/code/gson/gson/2.6.2/gson-2.6.2.jar com.ticketmaster.bedrock.rapidoid.iris.TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2
objc[99605]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/bin/java and /Library/Java/JavaVirtualMachines/jdk1.8.0_72.jdk/Contents/Home/jre/lib/libinstrument.dylib. One of the two will be used. Which one is undefined.
TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2 Main Method Start .....
13:38:10.443 [main] INFO com.ticketmaster.bedrock.rapidoid.iris.TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2 - .... Deepstream: Login succeeded!
13:38:10.449 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... DeepstreamClientSubscriberDaemonThread2: run() Method Entry ....
13:38:10.449 [deepstream-subscriber-client-2] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-2 .... DeepstreamClientSubscriberDaemonThread2: run() Method Entry ....
13:38:10.449 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... DeepstreamClientSubscriberDaemonThread2 .... In run Method: currentThread() - dsSubscriber-ThreadId ==> deepstream-subscriber-client-1
13:38:10.449 [deepstream-subscriber-client-2] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-2 .... DeepstreamClientSubscriberDaemonThread2 .... In run Method: currentThread() - dsSubscriber-ThreadId ==> deepstream-subscriber-client-2
13:38:10.449 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... GET Deepstream Event (PHX|ERVD2) for SUBSCRIBING to REAL-TIME updates onRecChg and onRecPropIdChg for value.tickets.ticketTexts
13:38:10.449 [deepstream-subscriber-client-2] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-2 .... GET Deepstream Event (PHX|ERVD2) for SUBSCRIBING to REAL-TIME updates onRecChg and onRecPropIdChg for value.tickets.ticketTexts
13:38:10.449 [deepstream-subscriber-client-3] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-3 .... DeepstreamClientSubscriberDaemonThread2: run() Method Entry ....
13:38:10.449 [deepstream-subscriber-client-3] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-3 .... DeepstreamClientSubscriberDaemonThread2 .... In run Method: currentThread() - dsSubscriber-ThreadId ==> deepstream-subscriber-client-3
13:38:10.450 [deepstream-subscriber-client-3] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-3 .... GET Deepstream Event (PHX|ERVD2) for SUBSCRIBING to REAL-TIME updates onRecChg and onRecPropIdChg for value.tickets.ticketTexts
13:38:10.451 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... DeepstreamClientPublisherDaemonThread2: run() Method Entry ....
13:38:10.451 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... DeepstreamClientPublisherDaemonThread2 .... In run Method: currentThread() - dsPublisher-ThreadId ==> deepstream-publisher-client-1
13:38:10.451 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... GET Deepstream Event (PHX|ERVD2) to publish updates to value.tickets.ticketTexts
Exception in thread "pool-1-thread-1" java.lang.NullPointerException
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
13:38:10.498 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... SUBSCRIBING to any Event Record change updates for Event = PHX|ERVD2
Exception in thread "pool-1-thread-2" java.lang.NullPointerException
at io.deepstream.Record.recoverRecord(Record.java:636)
at io.deepstream.Record.applyUpdate(Record.java:596)
at io.deepstream.Record.onMessage(Record.java:527)
at io.deepstream.RecordHandler.handle(RecordHandler.java:297)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-3" java.lang.NullPointerException
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-4" java.lang.NullPointerException
at io.deepstream.Record.recoverRecord(Record.java:636)
at io.deepstream.Record.applyUpdate(Record.java:596)
at io.deepstream.Record.onMessage(Record.java:527)
at io.deepstream.RecordHandler.handle(RecordHandler.java:297)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-5" java.lang.NullPointerException
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
13:38:10.507 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2$1 - deepstream-subscriber-client-1 .... Event Record Changed Subscription UPDATE Received (events/PHX|ERVD2)================================== is initial value: true
... Deepstream Event Record Version # .... >> 7188
13:38:10.507 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... SUBSCRIBING to Event Path/PropId change updates for Event = PHX|ERVD2 and Path/PropId = value.tickets.ticketTexts
13:38:10.508 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2$2 - deepstream-subscriber-client-1Event Path/PropId Changed Subscription UPDATE Received for (events/PHX|ERVD2 : value.tickets.ticketTexts) ------------------------ is initial value: true
... DATA arg (data.toString() ............ >> {"1":"tt-1-rev-7","2":"tt-2-rev-7","3":"tt-3-rev-7","4":"tt-4-rev-7","5":"tt-5-rev-7","6":"tt-6-rev-7"}
... Deepstream Event Record Version # .... >> 7188
Exception in thread "pool-1-thread-6" java.lang.NullPointerException
at io.deepstream.Record.recoverRecord(Record.java:636)
at io.deepstream.Record.applyUpdate(Record.java:596)
at io.deepstream.Record.onMessage(Record.java:527)
at io.deepstream.RecordHandler.handle(RecordHandler.java:297)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Re: #72 - this test only has one producer thread doing a set, no conflicts possible. Let me clear the old test data and try from scratch.
Still exception errors thrown: Note: the "...Version# .... >> 0" (all data and state was cleared/reset) Deepstream console output showed 3 subscriptions: MULTIPLE_SUBSCRIPTIONS | repeat supscription to "events/PHX|ERVD2" by :USSCORVA1-ML MULTIPLE_SUBSCRIPTIONS | repeat supscription to "events/PHX|ERVD2" by :USSCORVA1-ML MULTIPLE_SUBSCRIPTIONS | repeat supscription to "events/PHX|ERVD2" by :USSCORVA1-ML
Exception in thread "pool-1-thread-1" java.lang.NullPointerException
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-3" Exception in thread "pool-1-thread-2" java.lang.NullPointerException
at io.deepstream.Record.recoverRecord(Record.java:636)
at io.deepstream.Record.applyUpdate(Record.java:596)
at io.deepstream.Record.onMessage(Record.java:527)
at io.deepstream.RecordHandler.handle(RecordHandler.java:297)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-5" Exception in thread "pool-1-thread-4" Exception in thread "pool-1-thread-6" java.lang.NullPointerException
at io.deepstream.Record.recoverRecord(Record.java:636)
at io.deepstream.Record.applyUpdate(Record.java:596)
at io.deepstream.Record.onMessage(Record.java:527)
at io.deepstream.RecordHandler.handle(RecordHandler.java:297)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at io.deepstream.Record.recoverRecord(Record.java:636)
at io.deepstream.Record.applyUpdate(Record.java:596)
at io.deepstream.Record.onMessage(Record.java:527)
at io.deepstream.RecordHandler.handle(RecordHandler.java:297)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
13:52:09.058 [deepstream-subscriber-client-2] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-2 .... SUBSCRIBING to any Event Record change updates for Event = PHX|ERVD2
13:52:09.059 [deepstream-subscriber-client-2] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2$1 - deepstream-subscriber-client-2 .... Event Record Changed Subscription UPDATE Received (events/PHX|ERVD2)================================== is initial value: true
... Deepstream Event Record Version # .... >> 0
13:52:09.059 [deepstream-subscriber-client-2] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-2 .... SUBSCRIBING to Event Path/PropId change updates for Event = PHX|ERVD2 and Path/PropId = value.tickets.ticketTexts
13:52:09.060 [deepstream-subscriber-client-2] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2$2 - deepstream-subscriber-client-2Event Path/PropId Changed Subscription UPDATE Received for (events/PHX|ERVD2 : value.tickets.ticketTexts) ------------------------ is initial value: true
... DATA arg (data.toString() ............ >> null
... Deepstream Event Record Version # .... >> 0
If I set the num of subscriber threads to 1 (w the single publisher thread) both using the same DS client I get an single exception but the publisher thread continues to run (different from prev test with > 1 subscriber threads). SEE OUTPUT in next comment.
I killed the test after the value => 22 in last line below was produced while it was actively running and publishing new incremental values.
TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2 Main Method Start .....
13:57:11.940 [main] INFO com.ticketmaster.bedrock.rapidoid.iris.TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2 - .... Deepstream: Login succeeded!
13:57:11.956 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... DeepstreamClientSubscriberDaemonThread2: run() Method Entry ....
13:57:11.956 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... DeepstreamClientSubscriberDaemonThread2 .... In run Method: currentThread() - dsSubscriber-ThreadId ==> deepstream-subscriber-client-1
13:57:11.956 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... GET Deepstream Event (PHX|ERVD2) for SUBSCRIBING to REAL-TIME updates onRecChg and onRecPropIdChg for value.tickets.ticketTexts
13:57:11.960 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... DeepstreamClientPublisherDaemonThread2: run() Method Entry ....
13:57:11.960 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... DeepstreamClientPublisherDaemonThread2 .... In run Method: currentThread() - dsPublisher-ThreadId ==> deepstream-publisher-client-1
13:57:11.960 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... GET Deepstream Event (PHX|ERVD2) to publish updates to value.tickets.ticketTexts
Exception in thread "pool-1-thread-1" java.lang.NullPointerException
13:57:11.994 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 1
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-2" java.lang.NullPointerException
at io.deepstream.Record.recoverRecord(Record.java:636)
at io.deepstream.Record.applyUpdate(Record.java:596)
at io.deepstream.Record.onMessage(Record.java:527)
at io.deepstream.RecordHandler.handle(RecordHandler.java:297)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
13:57:13.001 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 2
13:57:14.006 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 3
13:57:15.010 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 4
13:57:16.015 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 5
13:57:17.019 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 6
13:57:18.024 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 7
13:57:19.029 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 8
13:57:20.034 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 9
13:57:21.040 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 10
13:57:22.045 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 11
13:57:23.046 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 12
13:57:24.052 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 13
13:57:25.055 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 14
13:57:26.059 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 15
13:57:27.064 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 16
13:57:28.068 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 17
13:57:29.073 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 18
13:57:30.077 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 19
13:57:31.083 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 20
13:57:32.087 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 21
13:57:33.093 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... PUBLISHING Deepstream record updates for Event = PHX|ERVD2 for path: value.tickets.ticketTexts and update interval value: 22
"pool-1-thread-2" that the exception was thrown from would be the MAIN thread of the test app where the .getClient(...).login() was run. DS client is then passed as arg to constuctors of both publisher and subscriber worker/thread classes.
Output of same test above but reversing thread run order: running the Publisher thread first followed by the subscribers: EXCEPTION but a little diff stacktrace and running with only one subscriber does not see the publisher continue to actively publish like the output in the last test case above (only diff is startup order of running threads, subs -> pub, still only 1 sub thread used in test above)
TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2 Main Method Start .....
14:08:45.289 [main] INFO com.ticketmaster.bedrock.rapidoid.iris.TestPubSubScalingWithConcurrentDeepstreamClientsDaemonThreads2 - .... Deepstream: Login succeeded!
14:08:45.292 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... DeepstreamClientPublisherDaemonThread2: run() Method Entry ....
14:08:45.292 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... DeepstreamClientPublisherDaemonThread2 .... In run Method: currentThread() - dsPublisher-ThreadId ==> deepstream-publisher-client-1
14:08:45.292 [deepstream-publisher-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientPublisherDaemonThread2 - deepstream-publisher-client-1 .... GET Deepstream Event (PHX|ERVD2) to publish updates to value.tickets.ticketTexts
14:08:45.293 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... DeepstreamClientSubscriberDaemonThread2: run() Method Entry ....
14:08:45.293 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... DeepstreamClientSubscriberDaemonThread2 .... In run Method: currentThread() - dsSubscriber-ThreadId ==> deepstream-subscriber-client-1
14:08:45.293 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... GET Deepstream Event (PHX|ERVD2) for SUBSCRIBING to REAL-TIME updates onRecChg and onRecPropIdChg for value.tickets.ticketTexts
Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-2" java.lang.NullPointerException
at io.deepstream.Record.recoverRecord(Record.java:636)
at io.deepstream.Record.applyUpdate(Record.java:596)
at io.deepstream.Record.onMessage(Record.java:527)
at io.deepstream.RecordHandler.handle(RecordHandler.java:297)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
14:08:45.330 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... SUBSCRIBING to any Event Record change updates for Event = PHX|ERVD2
14:08:45.331 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2$1 - deepstream-subscriber-client-1 .... Event Record Changed Subscription UPDATE Received (events/PHX|ERVD2)================================== is initial value: true
... Deepstream Event Record Version # .... >> 22
14:08:45.331 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2 - deepstream-subscriber-client-1 .... SUBSCRIBING to Event Path/PropId change updates for Event = PHX|ERVD2 and Path/PropId = value.tickets.ticketTexts
14:08:45.331 [deepstream-subscriber-client-1] INFO com.ticketmaster.bedrock.rapidoid.iris.DeepstreamClientSubscriberDaemonThread2$2 - deepstream-subscriber-client-1Event Path/PropId Changed Subscription UPDATE Received for (events/PHX|ERVD2 : value.tickets.ticketTexts) ------------------------ is initial value: true
... DATA arg (data.toString() ............ >> {"1":"tt-1-rev-22","2":"tt-2-rev-22","3":"tt-3-rev-22","4":"tt-4-rev-22","5":"tt-5-rev-22","6":"tt-6-rev-22"}
... Deepstream Event Record Version # .... >> 22
Hope all this helps you figure this out. @WolframHempel We really need to be able to maximize resource use at scale. A DS client per thread works but I quickly run out-of-memory scaling up thread counts (test case: 200 max). From what @yasserf says we should be able to scale many threads concurrently publishing updates and receiving updates, in the same process, with the same SINGLE DS client instance. That would be a big WIN!
@honorcode The fix was only pushed into master two days, it will be in the next release ( early this week ).
In terms of scaling many threads part, what I meant is we have zero blocking code under since under the hood everything is async.
Also, if you have two threads with two clients setting the same record at the same time you would expect this behavior ( minus the fix that needs to be released ). Having multiple clients update the same record in the same process at the same time means you either: a- Don't split your records with seperation of concerns in mind b- Have duplicated updates paths in your application
Oh, thx @yasserf, I missed that, just glanced at commits and saw new 2.0.7.
This example and our use case will only have single dedicated PUBLISHERs per Record but the data streams will be paritioned and guarantees both update order and serialized updates for records, no concurrent publisher clients competing to change the same records(no merge strategy conflict resolution necessary). But many publishers (clients) update many different records at the same time.
Any other thoughts on this?
updating different records at the same time is fine using different clients, the problem is pretty much if its the same record.
If you only have a single client that updates a record, you should never get a record conflict unless the connection drops and reconnects. If your 100% sure that you will always have that guarantee I would recommend setting a custom merge strategy that logs whenever a merge conflict occurs. Also log the connection state to ensure the connection never randomly drops.
Those are the best two ways I can guarantee that. Out of curiosity, are you using listening in this usecase?
Thanks for the feedback. No, I explored that but it doesn't quite fit for this use case - yet!
So you don't see a problem with the original design (after bugfix is deployed in next version) where we use say: a single spring bean DS client shared across multiple threads all concurrently writing at the same time but never to the same record? (NOT a DS Client per thread?)
@yasserf is the "fix" you mentioned above in the server (2.2.0 version) or the java client (2.0.7 version)?
@yasserf @WolframHempel @AlexBHarley I can not currently share a single DS client instance across threads with the most recent releases: server 2.2.0 with Java client 2.0.7 (built locally from repo).
The "fix" @yasserf mentioned above in latest code commits is still not working for my test code above and the exceptions are reproducible.
This is a kind of a big deal. My current work refactoring our java client is supposed to expose a singleton DS client bean the way @yasserf described above which is shared across multiple threads to concurrently update records (never the same record, no possible merge conflicts due to kafka updates topic partitioning per thread ordering guarantee by key).
I can build the client with a diff design: instantiate a DS client instance per thread but based on my understanding of @yasserf description/explanation above I shouldn't have to - one DS client should be able to handle the work on all the threads.
Is this the case? Can this be fixed/released quickly? Or do you recommend I take the latter approach and give each thread/worker it's own DS client for now?
This is a blocker for now. I would rather not have to do and redo this work. Hoping for a quick reply/feedback on path to take here. I can go either way, I just want to go the optimal way and not have to refactor it all over again soon.
@AlexBHarley raised a PR.
FYI - I re-ran my tests, both versions (one client shared by all threads, one client per thread) passed with no exceptions running locally with 200 threads.
Btw, I was/am still thinking of integrating our own RxJava (2.0) EventBus component to handle all the DS client work, that should ensure we don't run into any more of these race conditions/state collisions in the DS java client.
Any thoughts? @AlexBHarley @yasserf @WolframHempel
It might seem like it's unnecessary given what I think I understand about how DS client is implemented, but we've had success with our lightweight RxJava component managing multi-threaded async workflow in other similar use-cases.
Thanks for identifying this race-condition and fixing it @AlexBHarley 👍
@yasserf
This is the full trace:
FATAL EXCEPTION: pool-1-thread-1
java.lang.NullPointerException: Attempt to invoke virtual method 'boolean
io.deepstream.Event.equals(java.lang.Object)' on a null object reference
at io.deepstream.DeepstreamClientAbstract.onError(DeepstreamClientAbstract.java:48)
at io.deepstream.RecordHandler.handle(RecordHandler.java:263)
at io.deepstream.Connection$4.run(Connection.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
at java.lang.Thread.run(Thread.java:818)
In my initial Activity I set up Deepstreem as shown below:
In another activity I try and use Deepstream:
My problem is that it never seems to get past the getRecord() line and the data does not show up in Deepstream hub. It just seems to hang on that line. @AlexBHarley