rabbitmq / discussions

Please use RabbitMQ mailing list for questions. Issues that are questions, discussions or lack details necessary to investigate them are moved to this repository.
3 stars 4 forks source link

Delayed message exchange: messages are returned with error code NO_ROUTE #106

Closed grpubr closed 4 years ago

grpubr commented 4 years ago

Hi,

I'm working with spring-boot-starter-amqp(2.1.3),which has dependcy with spring-amqp(2.1.4).I have found that even when a delayed message is already sent to consumer, the publisher-returns callback is called with argument( 312,NO_ROUTE).

my rabbitMQ server is a docker image with the version 3.8.3-management, below is my dockerfile

FROM  rabbitmq:3.8.3-management
COPY ./rabbitmq_delayed_message_exchange-20191008-3.8.x.ez /opt/rabbitmq/plugins/
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

RabbitMQ plugins

root@rabbitmq-0:/# rabbitmq-plugins list 
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@rabbitmq-0.rabbitmq.infrastructure.svc.cluster.local
 |/
[  ] rabbitmq_amqp1_0                  3.8.3
[  ] rabbitmq_auth_backend_cache       3.8.3
[  ] rabbitmq_auth_backend_http        3.8.3
[  ] rabbitmq_auth_backend_ldap        3.8.3
[  ] rabbitmq_auth_backend_oauth2      3.8.3
[  ] rabbitmq_auth_mechanism_ssl       3.8.3
[  ] rabbitmq_consistent_hash_exchange 3.8.3
[E*] rabbitmq_delayed_message_exchange 20191008-3.8.x
[  ] rabbitmq_event_exchange           3.8.3
[  ] rabbitmq_federation               3.8.3
[  ] rabbitmq_federation_management    3.8.3
[  ] rabbitmq_jms_topic_exchange       3.8.3
[E*] rabbitmq_management               3.8.3
[e*] rabbitmq_management_agent         3.8.3
[  ] rabbitmq_mqtt                     3.8.3
[  ] rabbitmq_peer_discovery_aws       3.8.3
[e*] rabbitmq_peer_discovery_common    3.8.3
[  ] rabbitmq_peer_discovery_consul    3.8.3
[  ] rabbitmq_peer_discovery_etcd      3.8.3
[E*] rabbitmq_peer_discovery_k8s       3.8.3
[  ] rabbitmq_prometheus               3.8.3
[  ] rabbitmq_random_exchange          3.8.3
[  ] rabbitmq_recent_history_exchange  3.8.3
[  ] rabbitmq_sharding                 3.8.3
[  ] rabbitmq_shovel                   3.8.3
[  ] rabbitmq_shovel_management        3.8.3
[  ] rabbitmq_stomp                    3.8.3
[  ] rabbitmq_top                      3.8.3
[  ] rabbitmq_tracing                  3.8.3
[  ] rabbitmq_trust_store              3.8.3
[e*] rabbitmq_web_dispatch             3.8.3
[  ] rabbitmq_web_mqtt                 3.8.3
[  ] rabbitmq_web_mqtt_examples        3.8.3
[  ] rabbitmq_web_stomp                3.8.3
[  ] rabbitmq_web_stomp_examples       3.8.3

RabbitMQ server logs seems no useful log even the log level is set to debug

2020-05-29 10:00:13.297 [debug] <0.1202.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2020-05-29 10:00:17.659 [debug] <0.1213.0> Plugins discovery: ignoring accept, not a RabbitMQ plugin
2020-05-29 10:00:17.659 [debug] <0.1213.0> Plugins discovery: ignoring base64url, not a RabbitMQ plugin
2020-05-29 10:00:17.659 [debug] <0.1213.0> Plugins discovery: ignoring syslog, not a RabbitMQ plugin
2020-05-29 10:00:18.298 [debug] <0.1214.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2020-05-29 10:00:19.427 [debug] <0.594.0> Peer discovery: checking for partitioned nodes to clean up.
2020-05-29 10:00:19.428 [debug] <0.594.0> Peer discovery: all known cluster nodes are up.
2020-05-29 10:00:20.958 [debug] <0.492.0> Asked to [re-]register this node (rabbit@rabbitmq-2.rabbitmq.infrastructure.svc.cluster.local) with epmd...
2020-05-29 10:00:21.404 [debug] <0.492.0> [Re-]registered this node (rabbit@rabbitmq-2.rabbitmq.infrastructure.svc.cluster.local) with epmd at port 25672
2020-05-29 10:00:23.300 [debug] <0.1222.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2020-05-29 10:00:28.299 [debug] <0.1228.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2020-05-29 10:00:33.299 [debug] <0.1234.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2020-05-29 10:00:36.401 [debug] <0.1243.0> Plugins discovery: ignoring accept, not a RabbitMQ plugin
2020-05-29 10:00:36.401 [debug] <0.1243.0> Plugins discovery: ignoring base64url, not a RabbitMQ plugin
2020-05-29 10:00:36.402 [debug] <0.1243.0> Plugins discovery: ignoring syslog, not a RabbitMQ plugin
2020-05-29 10:00:38.343 [debug] <0.1249.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2020-05-29 10:00:38.368 [debug] <0.1251.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2020-05-29 10:00:38.391 [debug] <0.1253.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal

client log

[DEBUG] [2020-05-29 19:25:57] [] o.s.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$768/1606860322 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@10.168.136.193:32076/,2), conn: Proxy@55a01b1b Shared Rabbit Connection: SimpleConnection@4f89ae4a [delegate=amqp://guest@10.168.136.193:32076/, localPort= 56013] 
[DEBUG] [2020-05-29 19:25:57] [] o.s.amqp.rabbit.core.RabbitTemplate - Publishing message (Body:'212507529' MessageProperties [headers={x-delay=6000}, contentType=text/plain, contentEncoding=UTF-8, contentLength=9, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [Ex.LazyExchange], routingKey = [lazy.boot] 
[INFO ] [2020-05-29 19:25:57] [] c.cmos.qrcode.task.rabbitmq.MQSender - return exchange: Ex.LazyExchange, routingKey: lazy.boot, replyCode: 312, replyText: NO_ROUTE 
1234
[DEBUG] [2020-05-29 19:25:57] [] o.s.a.r.c.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://guest@10.168.136.193:32076/,2) PC:Ack:4:false 
[DEBUG] [2020-05-29 19:25:57] [] o.s.a.r.c.PublisherCallbackChannelImpl - Sending confirm PendingConfirm [correlationData=CorrelationData [id=time-1590751557282]] 
[INFO ] [2020-05-29 19:25:57] [] c.cmos.qrcode.task.rabbitmq.MQSender - correlationData: CorrelationData [id=time-1590751557282] 
[INFO ] [2020-05-29 19:25:57] [] c.cmos.qrcode.task.rabbitmq.MQSender - ack: true 

A code example or terminal transcript that can be used to reproduce

@Configuration
public class MQConfig {

    public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
    public static final String LAZY_QUEUE = "MQ.LazyQueue";
    public static final String LAZY_KEY = "lazy.#";

    @Bean
    public TopicExchange lazyExchange(){
        TopicExchange exchange =
                new TopicExchange(LAZY_EXCHANGE, true, false);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Queue lazyQueue(){
        Queue queue= new Queue(LAZY_QUEUE, true);
        return queue;
    }

    @Bean
    public Binding lazyBinding(){
        return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
    }
}
public class MQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    final RabbitTemplate.ConfirmCallback
            confirmCallback= (correlationData, ack, cause) -> {
        log.info("correlationData: " + correlationData);
        log.info("ack: " + ack);
        if(!ack){
            log.info("error");
        }
    };

    final RabbitTemplate.ReturnCallback
            returnCallback = (message, replyCode, replyText, exchange, routingKey) -> {
        log.info("return exchange: " + exchange + ", routingKey: "
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);

    };

    public void sendLazy(Object message){
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);

        CorrelationData correlationData = new CorrelationData("time-"+new Date().getTime());

        rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
                message1 -> {
message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//every thing works fine if the following line is commented
                    message1.getMessageProperties().setHeader("x-delay", "6000");
                  // message1.getMessageProperties().setDelay(10000);
                    log.info("processor: " + new String(message1.getBody()));
                    return message1;
                }, correlationData);
    }
}

Full exception stack traces (a single line message is not enough!) no exception

rabbitmqctl report and rabbitmqctl environment output

see attachment file

Other relevant details about the environment and workload, e.g. a traffic capture rabbitmqctl_report.txt

rabbitmqctl_environment.txt

traffic .zip

michaelklishin commented 4 years ago

Thank you for your time.

Team RabbitMQ uses GitHub issues for specific actionable items engineers can work on. GitHub issues are not used for questions, investigations, root cause analysis, discussions of potential issues, etc (as defined by this team).

We get at least a dozen of questions through various venues every single day, often light on details. At that rate GitHub issues can very quickly turn into a something impossible to navigate and make sense of even for our team. Because GitHub is a tool our team uses heavily nearly every day, the signal/noise ratio of issues is something we care about a lot.

Please post this to rabbitmq-users.

Thank you.

michaelklishin commented 4 years ago

There seems to be some confusion around what NO_ROUTE means and how it may relate to consumers. Long story short: there is no connection. Publishers are entirely unaware of consumers in AMQP 0-9-1. A NO_ROUTE means that a message published as mandatory could not be routed to any queues.

Delivery to consumers happens after routing and enqueueing by queue processes. There is no connection to the original publishing channel at that point. So the "delivered to consumers" part is very likely a red herring.

Your code example uses mandatory publishing. From Limitations in the delayed message exchange plugin's README:

Closely related to the above, the mandatory flag is not supported by this exchange: we cannot be sure that at the future publishing point in time.

Since publishing by this plugin is delayed, and potentially by minutes or hours, there is no guarantee that the original channel on which we must send a return is still around. Therefore there is no sensible way to support this flag. Exchanges used for delayed publishing often have no bindings of their own (why would they?), so mandatory publishing will always result in a return.