kubemq-io / kubemq-Java

Java client library for KubeMQ server
https://kubemq.io
MIT License
27 stars 10 forks source link
java kubemq kubemq-sdk kubemq-server kubernetes message-broker message-queue sdk

This repository is no longer active The new Java SDK is here https://github.com/kubemq-io/kubemq-java-v2

Java

The KubeMQ SDK for Java enables Java developers to communicate with KubeMQ server.

Install KubeMQ Community Edition

Please visit KubeMQ Community for intallation steps.

General SDK description

The SDK implements all communication patterns available through the KubeMQ server:

Prerequisites

KubeMQ-SDK-Java works with JDK 8+

Installing

The recommended way to use the SDK for Java in your project is to consume it from Maven. https://oss.sonatype.org/service/local/repositories/releases/content/io/kubemq/sdk/kubemq-sdk-Java/1.0.6/kubemq-sdk-Java-1.0.6.jar To build with Gradle, add the dependency below to your build.gradle file.

compile group: 'io.kubemq.sdk', name: 'kubemq-sdk-Java', version: '1.0.6'

Configurations

The only required configuration setting is the KubeMQ server address.

Configuration can be set by using one of the following:

Configuration via Environment Variable

Set KubeMQServerAddress to the KubeMQ Server Address

Configuration via Java Property

by passing the -DKubeMQServerAddress= option to the JVM) Within the code

Configuration via code

When setting the KubeMQ server address within the code, simply pass the address as a parameter to the various constructors. See exactly how in the code examples in this document.

Generating Documentation

Javadoc is used for documentation. You can generate HTML locally with the following:

.gradlew javadoc

Running the examples

The examples are standalone projects that showcase the usage of the SDK.

To run the examples, you need to have a running instance of KubeMQ.

You can use the Gradle tasks to run the examples:

.gradlew commandQueryChannel
.gradlew commandQueryInitiator
.gradlew commandQueryResponder
.gradlew commandQueryResponderAsync
.gradlew eventChannel
.gradlew eventSender
.gradlew eventSubscriber

Building from source

Once you check out the code from GitHub, you can build it using Gradle.

.gradlew build

Running the tests

To run the automated tests for this system execute:

.gradlew test

Main Concepts

Event/EventStore/Command/Query

Queue

QueueMessageAttributes.(proto struct)

  message QueueMessageAttributes {
      int64               Timestamp                   =1;
      uint64              Sequence                    =2;
      string              MD5OfBody                   =3;
      int32               ReceiveCount                =4;
      bool                ReRouted                    =5;
      string              ReRoutedFromQueue           =6;
      int64               ExpirationAt                =7;
      int64               DelayedTo                   =8;

  }

Event/EventStore/Command/Query SubscribeRequest Object:

A struct that is used to initialize SubscribeToEvents/SubscribeToRequest, the SubscribeRequest contains the following:

Queue

KubeMQ supports distributed durable FIFO based queues with the following core features:

Send Message to a Queue

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  SendMessageResult resSend = queue.SendQueueMessage(new Message()
          .setBody(Converter.ToByteArray("some-simple_queue-queue-message"))
          .setMetadata("someMeta"));
  if (resSend.getIsError()) {
      System.out.printf("Message enqueue error, error: %s", resSend.getError());
  }

Send Message to a Queue with Expiration

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  SendMessageResult resSend = queue
          .SendQueueMessage(new Message()
          .setBody(Converter.ToByteArray("some-simple_queue-queue-message"))
          .setMetadata("someMeta")
          .setExpiration(5));
  if (resSend.getIsError()) {
      System.out.printf("Message enqueue error, error: %s", resSend.getError());
  }

Send Message to a Queue with Delay

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  SendMessageResult resSend = queue.SendQueueMessage(new Message()
          .setBody(Converter.ToByteArray("some-simple_queue-queue-message"))
          .setMetadata("someMeta")
          .setDelay(3));
  if (resSend.getIsError()) {
      System.out.printf("Message enqueue error, error: %s", resSend.getError());
  }

Send Message to a Queue with Dead-letter Queue

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  SendMessageResult resSend = queue
          .SendQueueMessage(new Message()
          .setBody(Converter.ToByteArray("some-simple_queue-queue-message"))
          .setMetadata("someMeta")
          .setMaxReciveCount(3)
          .setMaxReciveQueue("DeadLetterQueue"));
  if (resSend.getIsError()) {
      System.out.printf("Message enqueue error, error: %s", resSend.getError());
  }

Send Batch Messages

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  Collection<Message> batch = new ArrayList<Message>();

  for (int i = 0; i < 10; i++) {
      batch.add(new Message()
      .setBody(Converter.ToByteArray("Batch Message " + i)));
  }

  SendBatchMessageResult resBatch = queue.SendQueueMessagesBatch(batch);
  if (resBatch.getHaveErrors()) {
      System.out.print("Message sent batch has errors");
  }
  for (SendMessageResult resSend : resBatch.getResults()) {
      if (resSend.getIsError()) {
          System.out.printf("Message enqueue error, error: %s", resSend.getError());
      } else {
          System.out.printf("Send to Queue Result: MessageID: %s, Sent At:%s", resSend.getMessageID(),
                Converter.FromUnixTime(resSend.getSentAt()).toString());

      }

  }

Receive Messages from a Queue

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  ReceiveMessagesResponse resRec = queue.ReceiveQueueMessages(10, 1);
  if (resRec.getIsError()) {
      System.out.printf("Message dequeue error, error: %s", resRec.getError());
      return;
  }
  System.out.printf("Received Messages %s:", resRec.getMessagesReceived());
  for (Message msg : resRec.getMessages()) {
      System.out.printf("MessageID: %s, Body:%s", msg.getMessageID(), Converter.FromByteArray(msg.getBody()));
  }

Peek Messages from a Queue

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  ReceiveMessagesResponse resPek = queue.PeekQueueMessage(10, 1);
  if (resPek.getIsError()) {
      System.out.printf("Message dequeue error, error: %s", resPek.getError());
      return;
  }
  System.out.printf("Received Messages: %s", resPek.getMessagesReceived());
  for (Message msg : resPek.getMessages()) {
      System.out.printf("MessageID: %s, Body: %s", msg.getMessageID(), Converter.FromByteArray(msg.getBody()));
  }        

Ack All Messages In a Queue

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  AckAllMessagesResponse resAck = queue.AckAllQueueMessages();
  if (resAck.getIsError()) {
      System.out.printf("AckAllQueueMessagesResponse error, error: %s", resAck.getError());
      return;
  }
  System.out.printf("Ack All Messages: %d completed", resAck.getAffectedMessages());

Transactional Queue - Ack

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  Transaction tran = queue.CreateTransaction();

  TransactionMessagesResponse resRec = tran.Receive(10, 10);
  if (resRec.getIsError()) {
      System.out.printf("Message dequeue error, error: %s", resRec.getError());
      return;
  }
  System.out.printf("MessageID: %d, Body: %s", resRec.getMessage().getMessageID(),
          Converter.FromByteArray(resRec.getMessage().getBody()));
  System.out.println("Doing some work.....");

  Thread.sleep(1000);
  System.out.println("Done, ack the message");
  TransactionMessagesResponse resAck = tran.AckMessage();
  if (resAck.getIsError()) {
      System.out.printf("Ack message error: %s", resAck.getError());
  }

Transactional Queue - Reject

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  Transaction tran = queue.CreateTransaction();
  TransactionMessagesResponse resRec = tran.Receive(10, 10);
  if (resRec.getIsError()) {
      System.out.printf("Message dequeue error, error: %s", resRec.getError());
      return;
  }
  System.out.printf("MessageID: %d, Body: %s", resRec.getMessage().getMessageID(),
          Converter.FromByteArray(resRec.getMessage().getBody()));
  System.out.println("Reject message");
  TransactionMessagesResponse resRej = tran.RejectMessage();
  if (resRej.getIsError()) {
      System.out.printf("Message dequeue error, error: %s", resRej.getError());
      return;
  }

Transactional Queue - Extend Visibility

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  Transaction tran = queue.CreateTransaction();
  TransactionMessagesResponse resRec = tran.Receive(5, 10);
  if (resRec.getIsError()) {
      System.out.printf("Message dequeue error, error: %s", resRec.getError());
      return;
  }
  System.out.printf("MessageID: %d, Body: %s", resRec.getMessage().getMessageID(),
          Converter.FromByteArray(resRec.getMessage().getBody()));
  System.out.println("work for 1 seconds");
  Thread.sleep(1000);
  System.out.println("Need more time to process, extend visibility for more 3 seconds");
  TransactionMessagesResponse resExt = tran.ExtendVisibility(3);
  if (resExt.getIsError()) {
      System.out.printf("Message dequeue error, error: %s", resExt.getError());
      return;
  }
  System.out.println("Approved. work for 2.5 seconds");
  Thread.sleep(2500);
  System.out.println("Work done... ack the message");
  TransactionMessagesResponse resAck = tran.AckMessage();
  if (resAck.getIsError()) {
      System.out.printf("Ack message error: %s", resAck.getError());

  }
  System.out.println("Ack done");

Transactional Queue - Resend to New Queue

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  Transaction tran = queue.CreateTransaction();
  TransactionMessagesResponse resRec = tran.Receive(500, 10);
  if (resRec.getIsError()) {
      System.out.printf("Message dequeue error, error: %s", resRec.getError());
      return;
  }
  System.out.printf("MessageID: %d, Body:%s", resRec.getMessage().getMessageID(),
          Converter.FromByteArray(resRec.getMessage().getBody()));
  TransactionMessagesResponse resMod = tran
          .Modify(resRec.getMessage().setQueue("receiverB").setMetadata("new meatdata"));
  if (resMod.getIsError()) {
      System.out.printf("Message Modify error, error::%s", resMod.getError());
      return;
  }

Transactional Queue - Resend Modified Message

  Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
  Transaction tran = queue.CreateTransaction();
  TransactionMessagesResponse resRec = tran.Receive(5, 10);
  if (resRec.getIsError()) {
      System.out.printf("Message dequeue error, error: %s", resRec.getError());
      return;
  }
  System.out.printf("MessageID: %d, Body:%s", resRec.getMessage().getMessageID(),
          Converter.FromByteArray(resRec.getMessage().getBody()));

  System.out.println("Resend to new queue");
  TransactionMessagesResponse resResend = tran.ReSend("new-queue");
  if (resResend.getIsError()) {
      System.out.printf("Message dequeue error, error: %s", resResend.getError());
      return;
  }
  System.out.println("Done");

Event

Sending Events

Single event

  String ChannelName = "testing_event_channel", ClientID = "hello-world-sender",
          KubeMQServerAddress = "localhost:50000";

  io.kubemq.sdk.event.Channel channel = new io.kubemq.sdk.event.Channel(ChannelName, ClientID, false,
          KubeMQServerAddress);
  Event event = new Event();
  event.setBody(Converter.ToByteArray("hello kubemq - sending single event"));
  Result result;
  try {
      result = channel.SendEvent(event);
      if (!result.isSent()) {
          System.out.println("Could not send single message");
          return;
      }
  } catch (ServerAddressNotSuppliedException e) {
      System.out.printf("Could not send single message: %s", e.getMessage());
      e.printStackTrace();
  }

Stream Events

  String ChannelName = "testing_event_channel", ClientID = "hello-world-sender",
          KubeMQServerAddress = "localhost:50000";

  io.kubemq.sdk.event.Channel channel = new io.kubemq.sdk.event.Channel(ChannelName, ClientID, false,
          KubeMQServerAddress);
  Event event = new Event();
  event.setBody(Converter.ToByteArray("hello kubemq - sending single event"));

  StreamObserver<Result> streamResObserver = new StreamObserver<Result>() {

      @Override
      public void onNext(Result value) {
          System.out.printf("Sent event: %s", value.getEventId());
      }

      @Override
      public void onError(Throwable t) {
          System.out.printf("Could not send single message");
      }

      @Override
      public void onCompleted() {

      }
  };

  StreamObserver<Event> stream = channel.StreamEvent(streamResObserver);
  stream.onNext(event);

Receiving Events

  String ChannelName = "testing_event_channel", ClientID = "hello-world-subscriber",
          KubeMQServerAddress = "localhost:50000";
  Subscriber subscriber = new Subscriber(KubeMQServerAddress);
  SubscribeRequest subscribeRequest = new SubscribeRequest();
  subscribeRequest.setChannel(ChannelName);
  subscribeRequest.setClientID(ClientID);
  subscribeRequest.setSubscribeType(SubscribeType.Events);

  StreamObserver<EventReceive> streamObserver = new StreamObserver<EventReceive>() {

      @Override
      public void onNext(EventReceive value) {
          try {
              System.out.printf("Event Received: EventID: %d, Channel: %s, Metadata: %s, Body: %s",
                      value.getEventId(), value.getChannel(), value.getMetadata(),
                      Converter.FromByteArray(value.getBody()));
          } catch (ClassNotFoundException e) {
              System.out.printf("ClassNotFoundException: %s", e.getMessage());
              e.printStackTrace();
          } catch (IOException e) {
              System.out.printf("IOException:  %s", e.getMessage());
              e.printStackTrace();
          }

      }

      @Override
      public void onError(Throwable t) {
          System.out.printf("Event Received Error: %s", t.toString());
      }

      @Override
      public void onCompleted() {

      }
  };
  subscriber.SubscribeToEvents(subscribeRequest, streamObserver);

Event Store

Subscription Options

KubeMQ supports 6 types of subscriptions:

Single Event Store

  String ChannelName = "testing_event_channel", ClientID = "hello-world-sender",
          KubeMQServerAddress = "localhost:50000";

  io.kubemq.sdk.event.Channel channel = new io.kubemq.sdk.event.Channel(ChannelName, ClientID, false,
          KubeMQServerAddress);
  for (int i = 0; i < 10; i++) {
      Event event = new Event();
      event.setBody(Converter.ToByteArray("hello kubemq - sending single event"));
      event.setEventId("event-Store-" + i);
      try {
          channel.SendEvent(event);
      } catch (SSLException e) {
          System.out.printf("SSLException: %s", e.getMessage());
          e.printStackTrace();
      } catch (ServerAddressNotSuppliedException e) {
          System.out.printf("ServerAddressNotSuppliedException: %s", e.getMessage());
          e.printStackTrace();
      }
  }

Stream Events Store

  String ChannelName = "testing_event_channel", ClientID = "hello-world-sender",
          KubeMQServerAddress = "localhost:50000";

  io.kubemq.sdk.event.Channel channel = new io.kubemq.sdk.event.Channel(ChannelName, ClientID, false,
          KubeMQServerAddress);

  StreamObserver<Result> streamResObserver = new StreamObserver<Result>() {

      @Override
      public void onNext(Result value) {
          System.out.printf("Stream event: %s", value.getEventId());
      }

      @Override
      public void onError(Throwable t) {
          System.out.printf("Could not send single message");
      }

      @Override
      public void onCompleted() {

      }
  };

  StreamObserver<Event> stream = channel.StreamEvent(streamResObserver);
  for (int i = 0; i < 10; i++) {
      Event event = new Event();
      event.setBody(Converter.ToByteArray("hello kubemq - sending single event"));
      event.setEventId("event-Store-" + i);
      stream.onNext(event);
  }

Receiving Events Store

  String ChannelName = "testing_event_channel", ClientID = "hello-world-subscriber",
          KubeMQServerAddress = "localhost:50000";
  Subscriber subscriber = new Subscriber(KubeMQServerAddress);
  SubscribeRequest subscribeRequest = new SubscribeRequest();
  subscribeRequest.setChannel(ChannelName);
  subscribeRequest.setClientID(ClientID);
  subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
  subscribeRequest.setEventsStoreType(EventsStoreType.StartAtSequence);

  StreamObserver<EventReceive> streamObserver = new StreamObserver<EventReceive>() {

      @Override
      public void onNext(EventReceive value) {
          try {
              System.out.printf("Event Received: EventID: %s, Channel: %s, Metadata: %s, Body: %s",
                      value.getEventId(), value.getChannel(), value.getMetadata(),
                      Converter.FromByteArray(value.getBody()));
          } catch (ClassNotFoundException e) {
              System.out.printf("ClassNotFoundException: %s", e.getMessage());
              e.printStackTrace();
          } catch (IOException e) {
              System.out.printf("IOException: %s", e.getMessage());
              e.printStackTrace();
          }
      }

      @Override
      public void onError(Throwable t) {
          System.out.printf("onError:  %s", t.getMessage());
      }

      @Override
      public void onCompleted() {

      }

  };
  subscriber.SubscribeToEvents(subscribeRequest, streamObserver);

Commands

Concept

Commands implement synchronous messaging pattern which the sender send a request and wait for a specific amount of time to get a response.
The response can be successful or not. This is the responsibility of the responder to return with the result of the command within the time the sender set in the request

Receiving Commands Requests

  String ChannelName = "testing_Command_channel", ClientID = "hello-world-sender",
          KubeMQServerAddress = "localhost:50000";
  Responder.RequestResponseObserver HandleIncomingRequests;
  Responder responder = new Responder(KubeMQServerAddress);
  HandleIncomingRequests = request -> {

      Response response = new Response(request);
      response.setCacheHit(false);
      response.setError("None");
      response.setClientID(ClientID);
      response.setBody("OK".getBytes());
      response.setExecuted(true);
      response.setMetadata("OK");
      response.setTimestamp(LocalDateTime.now());
      return response;
  };
  SubscribeRequest subscribeRequest = new SubscribeRequest();
  subscribeRequest.setChannel(ChannelName);
  subscribeRequest.setClientID(ClientID);
  subscribeRequest.setSubscribeType(SubscribeType.Commands);

  new Thread() {
      public void run() {
          try {
              responder.SubscribeToRequests(subscribeRequest, HandleIncomingRequests);
          } catch (SSLException e) {
              System.out.printf("SSLException:%s", e.getMessage());
              e.printStackTrace();
          } catch (ServerAddressNotSuppliedException e) {
              System.out.printf("ServerAddressNotSuppliedException:%s", e.getMessage());
              e.printStackTrace();
          }
      }
  }.start();

Sending Command Request

  String ChannelName = "testing_Command_channel", ClientID = "hello-world-sender",
          KubeMQServerAddress = "localhost:50000";
  io.kubemq.sdk.commandquery.ChannelParameters channelParameters = new io.kubemq.sdk.commandquery.ChannelParameters();
  channelParameters.setChannelName(ChannelName);
  channelParameters.setClientID(ClientID);
  channelParameters.setKubeMQAddress(KubeMQServerAddress);
  channelParameters.setRequestType(RequestType.Command);
  channelParameters.setTimeout(10000);
  io.kubemq.sdk.commandquery.Channel channel = new io.kubemq.sdk.commandquery.Channel(channelParameters);
  Request request = new Request();
  request.setBody(Converter.ToByteArray("hello kubemq - sending a command, please reply"));
  Response result = channel.SendRequest(request);
  if (!result.isExecuted()) {
      System.out.printf("Response error: %s", result.getError());
      return;
  }
  System.out.printf("Response Received: %s, ExecutedAt: %d", result.getRequestID(), result.getTimestamp());

Sending Command Request Async

  String ChannelName = "testing_Command_channel", ClientID = "hello-world-sender",
          KubeMQServerAddress = "localhost:50000";
  io.kubemq.sdk.commandquery.ChannelParameters channelParameters = new io.kubemq.sdk.commandquery.ChannelParameters();
  channelParameters.setChannelName(ChannelName);
  channelParameters.setClientID(ClientID);
  channelParameters.setKubeMQAddress(KubeMQServerAddress);
  channelParameters.setRequestType(RequestType.Command);
  channelParameters.setTimeout(1000);
  io.kubemq.sdk.commandquery.Channel channel = new io.kubemq.sdk.commandquery.Channel(channelParameters);
  Request request = new Request();
  request.setBody(Converter.ToByteArray("hello kubemq - sending a command, please reply"));
  StreamObserver<Response> response = new StreamObserver<Response>() {

      @Override
      public void onNext(Response value) {
          if (!value.isExecuted()) {
              System.out.printf("Response error: %s", value.getError());
          }
      }

      @Override
      public void onError(Throwable t) {
          System.out.printf("RPC Error: %s", t.getMessage());
      }

      @Override
      public void onCompleted() {

      }
  };
  channel.SendRequestAsync(request, response);

Queries

Concept

Queries implement synchronous messaging pattern which the sender send a request and wait for a specific amount of time to get a response.

The response must include metadata or body together with an indication of successful or not operation. This is the responsibility of the responder to return with the result of the query within the time the sender set in the request.

Receiving Query Requests

  String ChannelName = "testing_Command_channel", ClientID = "hello-world-sender",
          KubeMQServerAddress = "localhost:50000";
  Responder.RequestResponseObserver HandleIncomingRequests;
  Responder responder = new Responder(KubeMQServerAddress);
  HandleIncomingRequests = request -> {

      Response response = new Response(request);
      response.setCacheHit(false);
      response.setError("None");
      response.setClientID(ClientID);
      response.setBody("got your query, you are good to goo".getBytes());
      response.setExecuted(true);
      response.setMetadata("this is a response");
      response.setTimestamp(LocalDateTime.now());
      return response;
  };
  SubscribeRequest subscribeRequest = new SubscribeRequest();
  subscribeRequest.setChannel(ChannelName);
  subscribeRequest.setClientID(ClientID);
  subscribeRequest.setSubscribeType(SubscribeType.Queries);

  new Thread() {
      public void run() {

          try {
              responder.SubscribeToRequests(subscribeRequest, HandleIncomingRequests);
          } catch (SSLException e) {
              System.out.printf("SSLException: %s", e.getMessage());
              e.printStackTrace();
          } catch (ServerAddressNotSuppliedException e) {
              System.out.printf("ServerAddressNotSuppliedException: %s", e.getMessage());
              e.printStackTrace();
          }
      }
  }.start();

Sending Query Requests

  String ChannelName = "testing_Command_channel", ClientID = "hello-world-sender",
          KubeMQServerAddress = "localhost:50000";
  io.kubemq.sdk.commandquery.ChannelParameters channelParameters = new io.kubemq.sdk.commandquery.ChannelParameters();
  channelParameters.setChannelName(ChannelName);
  channelParameters.setClientID(ClientID);
  channelParameters.setKubeMQAddress(KubeMQServerAddress);
  channelParameters.setRequestType(RequestType.Query);
  channelParameters.setTimeout(1000);
  io.kubemq.sdk.commandquery.Channel channel = new io.kubemq.sdk.commandquery.Channel(channelParameters);
  Request request = new Request();
  request.setBody(Converter.ToByteArray("hello kubemq - sending a query, please reply"));
  Response result = channel.SendRequest(request);
  if (!result.isExecuted()) {

      System.out.printf("Response error: %s", result.getError());
      return;
  }
  System.out.printf("Response Received: %s, ExecutedAt: %d", result.getRequestID(), result.getTimestamp());

Sending Query Requests async

  String ChannelName = "testing_Command_channel", ClientID = "hello-world-sender",
          KubeMQServerAddress = "localhost:50000";
  io.kubemq.sdk.commandquery.ChannelParameters channelParameters = new io.kubemq.sdk.commandquery.ChannelParameters();
  channelParameters.setChannelName(ChannelName);
  channelParameters.setClientID(ClientID);
  channelParameters.setKubeMQAddress(KubeMQServerAddress);
  channelParameters.setRequestType(RequestType.Query);
  channelParameters.setTimeout(1000);
  io.kubemq.sdk.commandquery.Channel channel = new io.kubemq.sdk.commandquery.Channel(channelParameters);
  Request request = new Request();
  request.setBody(Converter.ToByteArray("hello kubemq - sending a query, please reply"));
  StreamObserver<Response> response = new StreamObserver<Response>() {

      @Override
      public void onNext(Response value) {
          if (!value.isExecuted()) {

              System.out.printf("Response error: %s", value.getError());
              System.out.printf("Response Received: %s, ExecutedAt %d", value.getRequestID(),
                      value.getTimestamp());
          }

      }

      @Override
      public void onError(Throwable t) {
          System.out.printf("onError: %s", t.getMessage());
      }

      @Override
      public void onCompleted() {

      }
  };
  channel.SendRequestAsync(request, response);