rabbitmq / rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
https://www.rabbitmq.com/
Other
11.91k stars 3.9k forks source link

Ack on a quorum queue via Webstomp results in unexpected message exception #2603

Open meredrica opened 3 years ago

meredrica commented 3 years ago

Description

When quorum queues are used (in this case on a cluster of 3 nodes running in openshift and joined using the k8s plugin) messages cannot be ACK'd via webstomp. It works fine with single node setups or classic queues. AMQP works fine but for our use case we must support webstomp.

Steps to Reproduce

Versions

RabbitMQ: docker image 3.8.8-management-alpine

Listener java depenedencies:

<dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-websocket</artifactId>
      <version>5.3.0</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-messaging</artifactId>
      <version>5.3.0</version>
    </dependency>

    <dependency>
      <groupId>org.glassfish.tyrus.bundles</groupId>
      <artifactId>tyrus-standalone-client-jdk</artifactId>
      <version>1.17</version>
    </dependency>

Stacktrace

[info] <0.20142.2296> Web STOMP: unexpected message {{'basic.deliver',<<"T_0">>,1,false,<<>>,<<"test-quorum-queue">>},{amqp_msg,{'P_basic',undefined,undefined,[],2,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},<<"{}">>}}
michaelklishin commented 3 years ago

@meredrica please provide at least some code that demonstrates what exactly your consumer looks like. We also need full logs from all nodes, a single line is not enough.

meredrica commented 3 years ago

@michaelklishin there are no other logs. Writing an example will take time, but I can try in python.

meredrica commented 3 years ago

Sorry, I never found time to implement the demo. We sadly had to roll back to non-quorum queues. The steps to reproduce are still valid. Join 3 nodes as Cluster, create a Quorum Queue and try to acknowledge any message.

wmichalak007 commented 2 years ago

Apologies for necromancy but I have exactly the same problem and can't seem to find the right solution. I use 3 node cluster based on docker rabbitmq:3-management and stomp-js for front end.

Queue is created using STOMP SUBSCRIBE:

var client = new StompJs.Client({
  brokerURL: 'ws://localhost:15676/ws',
  connectHeaders: {
    login: username,
    passcode: password,
    host: '/'
  },
  debug: function (str) {
    console.log(str);
  },
  reconnectDelay: 5000,
  heartbeatIncoming: 4000,
  heartbeatOutgoing: 4000,
  logRawCommunication: true,
});

var queue='/exchange/testEx/testpattern';
var queueHeaders = {'content-type': 'text/plain', 'durable':true, 'auto-delete':false, 'x-queue-name':'aaaaa', 'x-queue-type':'quorum', 'ack':'client'};

client.onConnect = function(frame) {
  var id = client.subscribe(queue, function(d) {
    console.log("Message consumed.");
    var node = document.createTextNode(d.body + '<br>');
    document.getElementById('chat').appendChild(node);
    d.ack();
  }, queueHeaders);

  if (!client.connected) {
    console.log("Broker disconnected, can't send message.");
  }
}

Messages are published using STOMP SEND:

var exchange='/exchange/testEx/testpattern';
var publishHeaders = {};

function sendMsg() {
  var msg = document.getElementById('msg').value;
  if (client.connected) {
    client.publish({destination: exchange, body: msg, headers:publishHeaders});
  }else{
    console.log("STOMP Not connected");
  }
}

This code can subscribe without problems, correct queue is created and bound to an existing exchange with given routing key. I can publish messages and they appear on the queue, I can get and republish them using management console. Problem appears on consumption, when client connects to the queue. Rabbit MQ logs this error:

2022-02-28 11:02:18.139 [info] <0.13836.0> Web STOMP: unexpected message {{'basic.deliver',<<"T_sub-0">>,1,true,<<"testEx">>,<<"testpattern">>},{amqp_msg,{'P_basic',undefined,undefined,[{<<"content-length">>,longstr,<<"4">>},{<<"x-delivery-count">>,long,42}],undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},<<"asda">>}}

Message would be then consumed if not for the manual ACK setting.

If I change the x-queue-type to classic everything works fine, I publish and consume messages without anny issues. @michaelklishin Let me know if I can provide anything extra to reproduce the issue.