Closed RichardHightower closed 4 years ago
package io.nats.bridge.integration.a.mq;
import io.nats.bridge.MessageBridge;
import io.nats.bridge.MessageBus;
import io.nats.bridge.TestUtils;
import io.nats.bridge.messages.Message;
import io.nats.bridge.messages.MessageBuilder;
import io.nats.bridge.support.MessageBridgeBuilder;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
public class NatsToIBM_MQForwardExpirationDeliveryTime272Test {
private final AtomicBoolean stop = new AtomicBoolean(false);
private final AtomicReference<String> responseFromServer = new AtomicReference<>();
private final AtomicReference<Long> expiration = new AtomicReference<>();
private final AtomicReference<Long> deliveryTime = new AtomicReference<>();
private CountDownLatch resultSignal;
private CountDownLatch serverStopped;
private CountDownLatch bridgeStopped;
private MessageBus serverMessageBus;
private MessageBus clientMessageBus;
private MessageBus bridgeMessageBusSource;
private MessageBus bridgeMessageBusDestination;
private MessageBus responseBusServer;
private MessageBus responseBusClient;
private MessageBridge messageBridge;
public void runServerLoop(final AtomicBoolean stop, final MessageBus serverMessageBus, final MessageBus responseBusServer,
final CountDownLatch serverStopped) {
final Thread thread = new Thread(() -> {
while (true) {
if (stop.get()) {
serverMessageBus.close();
break;
}
final Optional<Message> receive = serverMessageBus.receive();
receive.ifPresent(message -> {
System.out.println("Handle message " + message.bodyAsString() + "....................");
expiration.set(message.expirationTime());
deliveryTime.set(message.deliveryTime());
if (message.correlationID()!=null) {
responseBusServer.publish(MessageBuilder.builder().withBody("Hello " + new String(message.getBodyBytes(), StandardCharsets.UTF_8)).build());
} else {
responseBusServer.publish(MessageBuilder.builder()
.withCorrelationID(message.correlationID()).withBody("Hello " + new String(message.getBodyBytes(), StandardCharsets.UTF_8)).build());
}
});
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
serverMessageBus.process();
}
serverStopped.countDown();
});
thread.start();
}
@Before
public void setUp() throws Exception {
final String busName = "MessagesOnly_A_255";
final String responseName = "RESPONSE_A_255";
clientMessageBus = TestUtils.getMessageBusNats("CLIENT", busName);
serverMessageBus = TestUtils.getMessageBusIbmMQCopyHeader("SERVER", true);
resultSignal = new CountDownLatch(1);
serverStopped = new CountDownLatch(1);
bridgeStopped = new CountDownLatch(1);
bridgeMessageBusSource = TestUtils.getMessageBusNats("BRIDGE_SOURCE", busName);
bridgeMessageBusDestination = TestUtils.getMessageBusIbmMQCopyHeader("BRIDGE_DEST", false);
responseBusServer = TestUtils.getMessageBusJms("SERVER_RESPONSE", responseName);
responseBusClient = TestUtils.getMessageBusJms("CLIENT_RESPONSE", responseName);
messageBridge = MessageBridgeBuilder.builder().withDestinationBus(bridgeMessageBusDestination)
.withSourceBus(bridgeMessageBusSource).withRequestReply(false).withName("NatsToIBM_MQForwardCopyHeaders254Test").build();
}
@Test
public void test() throws Exception {
TestUtils.drainBus(serverMessageBus);
drainClientLoop();
runServerLoop();
runBridgeLoop();
runClientLoop();
final long deliveryTime = System.currentTimeMillis();
final long expirationTime = System.currentTimeMillis() + 100_000;
final Message message = MessageBuilder.builder()
.withDeliveryTime(deliveryTime).withExpirationTime(expirationTime)
.withBody("Rick").build();
clientMessageBus.publish(message);
for (int index = 0 ; index < 20; index++) {
resultSignal.await(1, TimeUnit.SECONDS);
if (responseFromServer.get()!=null) break;
}
resultSignal.await(10, TimeUnit.SECONDS);
assertEquals("Hello Rick", responseFromServer.get());
/**
* Looks like you can’t set delivery time direct. It has to be set on the producer in JMS so the behavior is expected and we have unit tests to show that we are setting the properties. They are just not getting passed bc IBM MQ implementation.
* https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.0.0/com.ibm.mq.dev.doc/q119200_.htm
* Same with expiration time.. the queue has to be configured to accept this
* https://www.ibm.com/support/pages/how-specify-expiration-mq-message-and-when-expired-messages-are-removed-queue
*/
//assertEquals(expirationTime, (long) expiration.get());
//assertEquals(deliveryTime, (long) this.deliveryTime.get());
/**
* Convert before publish
* JMSMessage class: jms_bytes
* JMSType: null
* JMSDeliveryMode: 2
* JMSDeliveryDelay: 0
* JMSDeliveryTime: 0
* JMSExpiration: 1601055758654
* JMSPriority: 4
* JMSMessageID: null
* JMSTimestamp: 0
* JMSCorrelationID: null
* JMSDestination: null
* JMSReplyTo: null
* JMSRedelivered: false
* The JMSExpiration is getting set but is never delivered ot the service from JMS.
* Setting the delivery time is ignored.
*/
stopServerAndBridgeLoops();
}
private void runClientLoop() {
Thread th = new Thread(() -> {
Optional<Message> receive;
while (true) {
receive = responseBusClient.receive();
if (!receive.isPresent()) {
System.out.println("No Client Message");
}
if (receive.isPresent()) {
Message message = receive.get();
responseFromServer.set(message.bodyAsString());
resultSignal.countDown();
break;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
th.start();
}
private void drainClientLoop() throws Exception {
TestUtils.drainBus(responseBusClient);
}
private void runBridgeLoop() {
TestUtils.runBridgeLoop(messageBridge, stop, bridgeStopped);
}
private void stopServerAndBridgeLoops() throws Exception {
TestUtils.stopServerAndBridgeLoops(stop, serverStopped, bridgeStopped);
}
private void runServerLoop() {
runServerLoop(stop, serverMessageBus, responseBusServer, serverStopped);
}
}
The convert code does copy it
package io.nats.bridge.jms.support;
import io.nats.bridge.messages.Message;
import io.nats.bridge.messages.MessageBuilder;
import io.nats.bridge.mock.jms.JMSBinaryMessage;
import io.nats.bridge.util.SupplierWithException;
import org.junit.Before;
import org.junit.Test;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ConvertBridgeMessageToJmsMessageWithHeadersTest {
private MessageBuilder messageBuilder;
private Message messageBusMessage;
private javax.jms.BytesMessage testJMSMessage;
private ConvertBridgeMessageToJmsMessageWithHeaders messageConverter;
private io.nats.bridge.messages.Message bridgeMessage;
@Before
public void setUp() throws Exception {
testJMSMessage = new JMSBinaryMessage("Hello Mom");
messageConverter = new ConvertBridgeMessageToJmsMessageWithHeaders(new SupplierWithException<BytesMessage>() {
@Override
public BytesMessage get() throws Exception {
return testJMSMessage;
}
});
messageBuilder = MessageBuilder.builder();
initTestMessage();
messageBusMessage = messageBuilder.build();
}
private void initTestMessage() throws JMSException {
messageBuilder.withDeliveryMode(1);
messageBuilder.withCorrelationID("foo");
messageBuilder.withPriority(1);
messageBuilder.withExpirationTime(2L);
messageBuilder.withDeliveryTime(3L);
messageBuilder.withRedelivered(true);
messageBuilder.withTimestamp(4L);
messageBuilder.withHeader("someprop", true);
messageBuilder.withHeader("replyTo", "somequeuename");
}
@Test
public void test() throws Exception {
javax.jms.Message message = messageConverter.apply(messageBusMessage);
assertEquals("foo", message.getJMSCorrelationID());
assertEquals(1, message.getJMSPriority());
assertEquals(2L, message.getJMSExpiration());
assertEquals(3L, message.getJMSDeliveryTime());
assertTrue(message.getJMSRedelivered());
assertEquals(4L, message.getJMSTimestamp());
assertTrue(message.getBooleanProperty("someprop"));
assertEquals("somequeuename", message.getStringProperty("replyTo"));
}
}
The serialization for messages works for expiration time and delivery time.
package io.nats.bridge.messages;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class BaseMessageWithHeadersTest {
@Test
public void createMessage() {
long time = System.currentTimeMillis();
long expirationTime = time + 30_000;
final MessageBuilder builder = MessageBuilder.builder();
builder.withBody("Hello Cruel World".getBytes(StandardCharsets.UTF_8));
builder.withPriority(1);
builder.withExpirationTime(expirationTime).withTimestamp(time);
builder.withDeliveryMode(3);
builder.withRedelivered(true);
builder.withDeliveryTime(time + 1);
builder.withHeader("header1", 1);
builder.withHeader("header2", true);
builder.withHeader("header3", 1L);
builder.withHeader("header4", "hello");
builder.withHeader("header5", 1.1);
builder.withHeader("header6", 12f);
builder.withHeader("header7", (short) 12);
builder.withHeader("header7", (byte) 12);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
}
@Test
public void createMessageJustPriority() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withBody("Hello Cruel World".getBytes(StandardCharsets.UTF_8));
builder.withPriority(1);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
}
@Test
public void createMessageJustCorrelationId() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withBody("Hello Cruel World".getBytes(StandardCharsets.UTF_8));
builder.withCorrelationID("abc");
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
assertEquals("abc", message1.correlationID());
assertEquals("abc", message2.correlationID());
}
@Test
public void createMessageJustMessageType() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withBody("Hello Cruel World".getBytes(StandardCharsets.UTF_8));
builder.withType("abc");
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
assertEquals("abc", message1.type());
assertEquals("abc", message2.type());
}
@Test
public void createMessageJustTimestamp() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withBody("Hello Cruel World".getBytes(StandardCharsets.UTF_8));
builder.withTimestamp(999L);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
assertEquals(999L, message1.timestamp());
assertEquals(999L, message2.timestamp());
}
@Test
public void createMessageJustDeliveryMode() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withBody("Hello Cruel World".getBytes(StandardCharsets.UTF_8));
builder.withDeliveryMode(1);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
assertEquals(1, message1.deliveryMode());
assertEquals(1, message2.deliveryMode());
}
@Test
public void createMessageExpirationTime() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withBody("Hello Cruel World".getBytes(StandardCharsets.UTF_8));
builder.withExpirationTime(777L);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
assertEquals(777L, message1.expirationTime());
assertEquals(777L, message2.expirationTime());
}
@Test
public void createMessageDeliveryTime() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withBody("Hello Cruel World".getBytes(StandardCharsets.UTF_8));
builder.withDeliveryTime(111L);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
assertEquals(111L, message1.deliveryTime());
assertEquals(111L, message2.deliveryTime());
}
@Test
public void createMessageJustRedelivered() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withBody("Hello Cruel World".getBytes(StandardCharsets.UTF_8));
builder.withRedelivered(true);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
assertTrue(message1.redelivered());
assertTrue(message2.redelivered());
}
@Test
public void createMessageWithType() {
long time = System.currentTimeMillis();
long expirationTime = time + 30_000;
final MessageBuilder builder = MessageBuilder.builder();
builder.withPriority(1);
builder.withExpirationTime(expirationTime).withTimestamp(time);
builder.withType("TYPE_MESSAGE");
//builder.withCorrelationID("767856");
builder.withTimestamp(7L);
builder.withExpirationTime(99L);
builder.withDeliveryMode(3);
builder.withDeliveryTime(time + 1);
builder.withHeader("header1", 700);
builder.withHeader("header2", false);
builder.withHeader("header3", 1L);
StringBuilder builder1 = new StringBuilder();
for (int index = 0; index < 513; index++) {
builder1.append('a');
}
builder.withHeader("header4", builder1.toString());
builder.withHeader("header5", 1.1);
builder.withHeader("header6", 12f);
builder.withHeader("header7-short", (short) -5);
builder.withHeader("header8", (byte) 12);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
}
@Test
public void justShort() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withHeader("header7-short", (short) 500);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
}
@Test
public void justInt500() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withHeader("header-int", 500);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
}
@Test
public void justIntNeg() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withHeader("header-int", -1);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
}
@Test
public void justString() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withHeader("header-str", "foo");
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
}
@Test
public void justBigString() {
final MessageBuilder builder = MessageBuilder.builder();
StringBuilder builder1 = new StringBuilder();
for (int index = 0; index < 513; index++) {
builder1.append('a');
}
builder.withHeader("header-str", builder1.toString());
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
}
@Test
public void justByte() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withHeader("header-byte", (byte) 10);
final BaseMessageWithHeaders message1 = (BaseMessageWithHeaders) builder.build();
final byte[] bytes = message1.getMessageAsBytes();
final BaseMessageWithHeaders message2 = (BaseMessageWithHeaders) MessageBuilder.builder().buildFromBytes(bytes);
assertEquals(message1, message2);
}
@Test
public void smallMessage() {
final MessageBuilder builder = MessageBuilder.builder();
builder.withBody("hi");
Message message = builder.build();
final byte[] bytes = message.getBodyBytes();
final Message message2 = MessageBuilder.builder().buildFromBytes(bytes);
assertTrue(message2 instanceof BytesMessage);
assertEquals("hi", message2.bodyAsString());
}
@Test
public void bigMessage() {
final MessageBuilder builder = MessageBuilder.builder();
StringBuilder builder1 = new StringBuilder();
for (int index = 0; index < 513; index++) {
builder1.append('a');
}
String longMessage = builder1.toString();
builder.withHeader("header-str", builder1.toString());
builder.withBody(longMessage);
Message message = builder.build();
final byte[] bytes = message.getBodyBytes();
final Message message2 = MessageBuilder.builder().buildFromBytes(bytes);
assertTrue(message2 instanceof BytesMessage);
assertEquals(longMessage, message2.bodyAsString());
}
}
JMS Expiration and delivery time is not getting copied.
It is. It is ignored by JMS IBM MQ.