streamnative / aop

AMQP on Pulsar protocol handler
Apache License 2.0
114 stars 47 forks source link

[BUG] Error receiving message with aop. #276

Open yasu1209 opened 3 years ago

yasu1209 commented 3 years ago

Describe the bug I'm using pulsar 2.8.0 and aop 2.8.0-rc-202106071430. And I'm using the sample code in README.MD to publish and subscrbe message. But no messages returned.

To Reproduce Steps to reproduce the behavior:

  1. I run the pulsar on a server, and add aop configuration into standalone.conf
    messagingProtocols=mqtt,amqp,kafka
    amqpListeners=amqp://127.0.0.1:5672
    advertisedAddress=10.191.5.110
  2. Run the pulsar as standalone.
  3. Use the java code for testing.

    @Test
    public void test04() throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("vhost1");
        connectionFactory.setHost("10.191.5.110");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
    
        String exchange = "ex";
        String queue = "qu";
    
        // exchage declare
        channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null);
    
        // queue declare and bind
        channel.queueDeclare(queue, true, false, false, null);
        channel.queueBind(queue, exchange, "");
    
        // publish some messages
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(exchange, "", null, ("hello-" + i).getBytes());
        }
    
        // consume messages
        CountDownLatch countDownLatch = new CountDownLatch(100);
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("receive msg: " + new String(body));
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    
        // release resource
        channel.close();
        connection.close();
    }
  4. Run the test and no messages returned.

And when publishing messages to pulsar, warning logs are printed.

14:18:09.771 [BookKeeperClientWorker-OrderedExecutor-0-0] WARN  io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator - Route message failed.
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
        at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.client.impl.MessageImpl.deserialize(MessageImpl.java:261) ~[org.apache.pulsar-pulsar-client-original-2.8.0.jar:2.8.0]
        at io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange$1.readProcess(PersistentExchange.java:87) ~[?:?]
        at io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator.readEntriesComplete(AmqpExchangeReplicator.java:197) ~[?:?]
        at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:156) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
14:18:09.773 [BookKeeperClientWorker-OrderedExecutor-0-0] WARN  io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator - Route message failed.
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
        at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
        at org.apache.pulsar.client.impl.MessageImpl.deserialize(MessageImpl.java:261) ~[org.apache.pulsar-pulsar-client-original-2.8.0.jar:2.8.0]
        at io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange$1.readProcess(PersistentExchange.java:87) ~[?:?]
        at io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator.readEntriesComplete(AmqpExchangeReplicator.java:197) ~[?:?]
        at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:156) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_282]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_282]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]

And I can receive the messages when using pulsar consumer.

Screenshots In pulsar-manager the subscription type is none. image

I may done something wrong. Please tell me.

yasu1209 commented 3 years ago

I've tried pulsar 2.7.3 and aop 2.7.2.9, the error during publishment is gone. But still no message returned.

WrathLi commented 3 years ago

I have same problem,I've tried both pulsar 2.7.0 and aop 2.7.0.3,pulsar 2.8.0 and aop 2.8.1.0. And no message returned,no error in logs.

kuangye098 commented 3 years ago

I encountered the same problem. So yesterday I tried to solve this problem, and found the problem during testing: the internal development version of AOP is not compatible with the current apache pulsar 2.8.0 version, because the ServiceConfigurationUtils.getAppliedAdvertisedAddress method was modified, resulting in an error when loading the plug-in. At present, apache pulsar 2.8.0 is only compatible with AOP 2.8.0.3 or earlier versions, and version 2.8.0.4 or later versions needs to wait for the official release of apache pulsar 2.8.1.

yasu1209 commented 3 years ago

I encountered the same problem. So yesterday I tried to solve this problem, and found the problem during testing: the internal development version of AOP is not compatible with the current apache pulsar 2.8.0 version, because the ServiceConfigurationUtils.getAppliedAdvertisedAddress method was modified, resulting in an error when loading the plug-in. At present, apache pulsar 2.8.0 is only compatible with AOP 2.8.0.3 or earlier versions, and version 2.8.0.4 or later versions needs to wait for the official release of apache pulsar 2.8.1.

Thank you for your information. I'll give it a try. Did you try pulsar 2.7? The combination of pulsar 2.7.3 and aop 2.7.2.9 seems not work too.

kuangye098 commented 3 years ago

I am using apache pulsar 2.8.0 and aop 2.8.0.3 . The official test examples can pass normally. You can try apache pulsar 2.7.3 and aop 2.8.0.3 or 2.7.2 and 2.7.2.9?

yasu1209 commented 3 years ago

I am using apache pulsar 2.8.0 and aop 2.8.0.3 . The official test examples can pass normally. You can try apache pulsar 2.7.3 and aop 2.8.0.3 or 2.7.2 and 2.7.2.9?