streamnative / aop

AMQP on Pulsar protocol handler
Apache License 2.0
114 stars 47 forks source link

[BUG] Error when using proxy #872

Open aliamaza opened 1 year ago

aliamaza commented 1 year ago

When I try to use the Proxy I receive this error in the client.

Exception in thread "main" java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
        at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:423)
        at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1110)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1067)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1025)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1187)
        at com.gm.pulsarpoc.tutorial.AMQP.amqp_poc_consumer.main(amqp_poc_consumer.java:37)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
        ... 8 more
Caused by: java.io.EOFException
        at java.base/java.io.DataInputStream.readUnsignedByte(DataInputStream.java:296)
        at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
        at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:184)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:645)
        at java.base/java.lang.Thread.run(Thread.java:833)

Looking in the broker logs I can see this:

2023-04-05T07:57:41,902-0400 [pulsar-io-4-2] ERROR io.streamnative.pulsar.handlers.amqp.proxy.ProxyConnection - Lookup broker failed; may retry.
io.streamnative.pulsar.handlers.amqp.proxy.ProxyException: Unable to locate metadata for the broker of the topic: persistent://public/default/__lookup__
        at io.streamnative.pulsar.handlers.amqp.proxy.PulsarServiceLookupHandler.lambda$findBroker$1(PulsarServiceLookupHandler.java:75) ~[cIYN-EeoapXWiBkI2xvGAQ/:?]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
        at org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$findBroker$1(BinaryProtoLookupService.java:164) ~[org.apache.pulsar-pulsar-client-original-2.9.4.jar:2.9.4]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
        at org.apache.pulsar.client.impl.ClientCnx.handleLookupResponse(ClientCnx.java:586) ~[org.apache.pulsar-pulsar-client-original-2.9.4.jar:2.9.4]
        at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:140) ~[org.apache.pulsar-pulsar-common-2.9.4.jar:2.9.4]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1246) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1286) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) ~[io.netty-netty-codec-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385) ~[io.netty-netty-transport-classes-epoll-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
        at java.lang.Thread.run(Thread.java:833) ~[?:?]
2023-04-05T07:57:41,902-0400 [pulsar-io-4-2] INFO  io.streamnative.pulsar.handlers.amqp.proxy.ProxyConnection - handle connect residue retryTimes: 0
2023-04-05T07:57:41,902-0400 [pulsar-io-4-2] WARN  io.streamnative.pulsar.handlers.amqp.proxy.ProxyConnection - Handle connect retryTimes is 0.
2023-04-05T07:57:41,902-0400 [pulsar-io-4-2] INFO  io.streamnative.pulsar.handlers.amqp.proxy.ProxyConnection - ProxyConnection close.
2023-04-05T07:57:41,902-0400 [amqp-redirect-io-56-4] INFO  io.streamnative.pulsar.handlers.amqp.proxy.ProxyConnection - ProxyConnection close.

I am using this config:

messagingProtocols=amqp protocolHandlerDirectory=./protocols amqpListeners=amqp://127.0.0.1:5672

amqpProxyEnable=true amqpProxyPort=5682

casuallc commented 1 year ago

Should create tenant public and namespace default, and the number of namespace bundles should be 1.

aliamaza commented 1 year ago

The tenant and namespace have been created like so

$PULSAR_HOME/bin/pulsar-admin namespaces create -b 1 public/default
$PULSAR_HOME/bin/pulsar-admin namespaces set-retention -s 100M -t 3d public/default
casuallc commented 1 year ago

Should connect proxyPort 5682.