rh-messaging / cli-java

cli-java is a collection of commandline messaging clients suitable for interacting with Message Oriented Middleware such as ActiveMQ Artemis broker or Qpid Dispatch router.
Apache License 2.0
8 stars 7 forks source link

Redirect support for cli qpid-jms #293

Closed lenosi closed 3 years ago

lenosi commented 3 years ago

Feature request: Add support for redirect in cli qpid-jms client

➜  podman run --rm --env PN_TRACE_FRM=1 quay.io/rhmessagingqe/cli-java:ubi8-openjdk11 cli-qpid-receiver -b amqp://artemis-broker-A:6113 -a 'loadbalance_test_q' 
[779962960:0] -> SASL
[779962960:0] <- SASL
[779962960:0] <- SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
[779962960:0] -> SaslInit{mechanism=ANONYMOUS, initialResponse=, hostname='artemis-broker-A'}
[779962960:0] <- SaslOutcome{_code=OK, _additionalData=null}
[779962960:0] -> AMQP
[779962960:0] -> Open{ containerId='ID:d465c75a-fd5a-41ef-9c30-6c73936ba252:1', hostname='artemis-broker-A', maxFrameSize=1048576, channelMax=32767, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, version=1.1.0, platform=JVM: 11.0.12, 11.0.12+7-LTS, Red Hat, Inc., OS: Linux, 5.13.13-zen1-1-zen, amd64}}
[779962960:0] <- AMQP
[779962960:0] <- Open{ containerId='', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[779962960:0] <- Close{error=Error{condition=amqp:connection:redirect, description='Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-first-element', info={port=6111, network-host=artemis-broker-B}}}
[779962960:0] -> Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[779962960:0] -> Close{error=null}
07:49:34,111 ERROR Error while creating session! Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-first-element [condition = amqp:connection:redirect]
org.apache.qpid.jms.JmsConnectionRemotelyClosedException: Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-first-element [condition = amqp:connection:redirect]
        at org.apache.qpid.jms.provider.exceptions.ProviderConnectionRemotelyClosedException.toJMSException(ProviderConnectionRemotelyClosedException.java:37)
        at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
        at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
        at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698)
        at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:682)
        at org.apache.qpid.jms.JmsConnection.createJmsConnection(JmsConnection.java:593)
        at org.apache.qpid.jms.JmsConnection.createSession(JmsConnection.java:314)
        at com.redhat.mqe.lib.CoreClient.createSession(CoreClient.java:133)
        at com.redhat.mqe.lib.ReceiverClient.consumeMessage(ReceiverClient.java:173)
        at com.redhat.mqe.lib.ReceiverClient.startClient(ReceiverClient.java:149)
        at com.redhat.mqe.lib.Main.main(Main.java:46)
        at com.redhat.mqe.jms.Main.main(Main.java:80)
        at com.redhat.mqe.jms.Main.main(Main.java:84)
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderConnectionRedirectedException: Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-first-element [condition = amqp:connection:redirect]
        at org.apache.qpid.jms.provider.amqp.AmqpSupport.createRedirectException(AmqpSupport.java:241)
        at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToConnectionClosedException(AmqpSupport.java:132)
        at org.apache.qpid.jms.provider.amqp.AmqpConnection.processRemoteClose(AmqpConnection.java:148)
        at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:962)
        at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:872)
        at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563)
        at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.base/java.lang.Thread.run(Thread.java:829)
jiridanek commented 3 years ago

Turning on failover: will not enable following the redirects

podman run --rm --env PN_TRACE_FRM=1 quay.io/rhmessagingqe/cli-java:ubi8-openjdk11 cli-qpid-receiver -b 'failover:(amqp://artemis-broker-A:6113)' -a 'loadbalance_test_q'
[602975070:0] -> SASL
[602975070:0] <- SASL
[602975070:0] <- SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
[602975070:0] -> SaslInit{mechanism=ANONYMOUS, initialResponse=, hostname='artemis-broker-A'}
[602975070:0] <- SaslOutcome{_code=OK, _additionalData=null}
[602975070:0] -> AMQP
[602975070:0] <- AMQP
[602975070:0] -> Open{ containerId='ID:349df389-32cc-497a-a901-9c6cfab2f382:1', hostname='artemis-broker-A', maxFrameSize=1048576, channelMax=32767, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, version=1.1.0, platform=JVM: 11.0.12, 11.0.12+7-LTS, Red Hat, Inc., OS: Linux, 5.13.13-zen1-1-zen, amd64}}
[602975070:0] <- Open{ containerId='', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[602975070:0] <- Close{error=Error{condition=amqp:connection:redirect, description='Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-first-element', info={port=6111, network-host=artemis-broker-B}}}
[602975070:0] -> Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[602975070:0] -> Close{error=null}
08:35:23,929 ERROR Error while creating session! Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-first-element [condition = amqp:connection:redirect]
org.apache.qpid.jms.JmsConnectionRemotelyClosedException: Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-first-element [condition = amqp:connection:redirect]
        at org.apache.qpid.jms.provider.exceptions.ProviderConnectionRemotelyClosedException.toJMSException(ProviderConnectionRemotelyClosedException.java:37)
        at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
        at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
        at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698)
        at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:682)
        at org.apache.qpid.jms.JmsConnection.createJmsConnection(JmsConnection.java:593)
        at org.apache.qpid.jms.JmsConnection.createSession(JmsConnection.java:314)
        at com.redhat.mqe.lib.CoreClient.createSession(CoreClient.java:133)
        at com.redhat.mqe.lib.ReceiverClient.consumeMessage(ReceiverClient.java:173)
        at com.redhat.mqe.lib.ReceiverClient.startClient(ReceiverClient.java:149)
        at com.redhat.mqe.lib.Main.main(Main.java:46)
        at com.redhat.mqe.jms.Main.main(Main.java:80)
        at com.redhat.mqe.jms.Main.main(Main.java:84)
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderConnectionRedirectedException: Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-first-element [condition = amqp:connection:redirect]
        at org.apache.qpid.jms.provider.amqp.AmqpSupport.createRedirectException(AmqpSupport.java:241)
        at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToConnectionClosedException(AmqpSupport.java:132)
        at org.apache.qpid.jms.provider.amqp.AmqpConnection.processRemoteClose(AmqpConnection.java:148)
        at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:962)
        at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:872)
        at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563)
        at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.base/java.lang.Thread.run(Thread.java:829)
08:35:23,930 ERROR ExceptionListener error detected!
Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-first-element [condition = amqp:connection:redirect]
{}
org.apache.qpid.jms.provider.exceptions.ProviderConnectionRedirectedException: Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-first-element [condition = amqp:connection:redirect]
        at org.apache.qpid.jms.provider.amqp.AmqpSupport.createRedirectException(AmqpSupport.java:241)
        at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToConnectionClosedException(AmqpSupport.java:132)
        at org.apache.qpid.jms.provider.amqp.AmqpConnection.processRemoteClose(AmqpConnection.java:148)
        at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:962)
        at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:872)
        at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563)
        at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.base/java.lang.Thread.run(Thread.java:829)
org.apache.qpid.jms.JmsConnectionRemotelyClosedException: Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-first-element [condition = amqp:connection:redirect]
lenosi commented 3 years ago

The issue causes when -b is used instead of --broker-uri

podman run --rm --env PN_TRACE_FRM=1 quay.io/rhmessagingqe/cli-java:ubi8-openjdk11 cli-qpid-receiver --broker-uri 'failover:(amqp://artemis-broker-A:6114)' -a 'loadbalance_test_q' -c 1                  
[1246082397:0] -> SASL
[1246082397:0] <- SASL
[1246082397:0] <- SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
[1246082397:0] -> SaslInit{mechanism=ANONYMOUS, initialResponse=, hostname='artemis-broker-A'}
[1246082397:0] <- SaslOutcome{_code=OK, _additionalData=null}
[1246082397:0] <- AMQP
[1246082397:0] -> AMQP
[1246082397:0] -> Open{ containerId='ID:5d8d7cad-4f43-486e-83f8-9d1b1a810bb9:1', hostname='artemis-broker-A', maxFrameSize=1048576, channelMax=32767, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, version=1.1.0, platform=JVM: 11.0.12, 11.0.12+7-LTS, Red Hat, Inc., OS: Linux, 5.13.13-zen1-1-zen, amd64}}
[1246082397:0] <- Open{ containerId='', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1246082397:0] <- Close{error=Error{condition=amqp:connection:redirect, description='Connection redirected to artemis-broker-B:6111 by broker balancer balancer-A-round-robin', info={port=6111, network-host=artemis-broker-B}}}
[1246082397:0] -> Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1246082397:0] -> Close{error=null}
[1628128436:0] -> SASL
[1628128436:0] <- SASL
[1628128436:0] <- SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
[1628128436:0] -> SaslInit{mechanism=ANONYMOUS, initialResponse=, hostname='artemis-broker-B'}
[1628128436:0] <- SaslOutcome{_code=OK, _additionalData=null}
[1628128436:0] <- AMQP
[1628128436:0] -> AMQP
[1628128436:0] -> Open{ containerId='ID:5d8d7cad-4f43-486e-83f8-9d1b1a810bb9:1', hostname='artemis-broker-B', maxFrameSize=1048576, channelMax=32767, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, version=1.1.0, platform=JVM: 11.0.12, 11.0.12+7-LTS, Red Hat, Inc., OS: Linux, 5.13.13-zen1-1-zen, amd64}}
[1628128436:0] <- Open{ containerId='artemis-broker-B', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null, properties={product=apache-activemq-artemis, version=2.18.0.redhat-00002}}
[1628128436:0] -> Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1628128436:0] <- Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1628128436:1] -> Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1628128436:1] <- Begin{remoteChannel=1, nextOutgoingId=1, incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1628128436:1] -> Attach{name='qpid-jms:receiver:ID:f786cf99-0ab8-40fa-95ca-91c57e1aae5a:1:1:1:loadbalance_test_q', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='loadbalance_test_q', durable=NONE, expiryPolicy=LINK_DETACH, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=[queue]}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1628128436:1] <- Attach{name='qpid-jms:receiver:ID:f786cf99-0ab8-40fa-95ca-91c57e1aae5a:1:1:1:loadbalance_test_q', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='loadbalance_test_q', durable=NONE, expiryPolicy=LINK_DETACH, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=null, messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=[queue]}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1628128436:1] -> Flow{nextIncomingId=1, incomingWindow=2047, nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=0, linkCredit=1000, available=null, drain=false, echo=false, properties=null}
[1628128436:1] <- Transfer{handle=0, deliveryId=0, deliveryTag=\x00, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (158) "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x00\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00Y\x00\x00\x00\x0a\xa1/ID:4ec54855-20f5-4d48-a015-9a532b81bd0d:1:1:1-1@\xa1\x12loadbalance_test_q@@@@@@\x83\x00\x00\x01{\xc9\xd75\x98\x00Sw\xa1\x03ABC"
[1628128436:1] -> Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Accepted{}, batchable=false}
[1628128436:1] -> Detach{handle=0, closed=true, error=null}
[1628128436:1] <- Detach{handle=0, closed=true, error=null}
[1628128436:1] -> End{error=null}
[1628128436:1] <- End{error=null}
[1628128436:0] -> Close{error=null}
[1628128436:0] <- Close{error=null}

Thanks, @jiridanek and @gemmellr for solving this.