stomp-js / rx-stomp

STOMP adaptor for RxJS
Apache License 2.0
111 stars 21 forks source link

Angular 13 connection with queues of type stream #471

Closed tycyssg closed 1 year ago

tycyssg commented 1 year ago

Hello sir,

I try to use rx-stomp library to connect on a rabbitmq queue of type stream. As you can observe in the picture after the connection immediately the connection drops and it tries to connect again. If I change the type of the queue in classic, everything works perfectly.

image

I'm doing something wrong or there is a problem regarding this type of queue?

Thank you.

kum-deepak commented 1 year ago

Some users have reported that it works. They did point out that they needed additional headers -ack and prefetch-count. Please check https://www.rabbitmq.com/stomp.html#stream-support

tycyssg commented 1 year ago

Thank you so much for pointing that. Yes if you set that out it's working. It still drops the connection but now the messages are received.

private headers = {
    'ack':'client',
    'prefetch-count':'10',
    'x-queue-type':'stream',
    'x-max-length-bytes': '200000000',
    'x-stream-max-segment-size-bytes': '100000000',
    'x-stream-offset': 'first'
  }
  private receivedMessages: string[] = [];

  constructor(
    private readonly rxStompService: RxStompService
  ) { }

  ngOnInit(): void {
    this.rxStompService.watch('test', this.headers).subscribe( (message) => {
     this.receivedMessages.push(message.body);
    })
  }

I let here a snipped of code in case someone does don't know how to configure it.

tycyssg commented 1 year ago

One more thing I would like to specify is the fact that prefetch-count and x-stream-offset are dependent on each other. I'm not entirely sure how it suppose to work but at this moment with the offset set to first, the maximum number of messages read at the start is equal to the number set in the prefetch.

E.g: "If in the stream queue are 15 messages, you set the offset to first and prefetch-count is set to 10, only 10 messages are received not all 15.

I'm testing the same scenario with a spring boot consumer, this behavior does not happen there.

I'm the only one with this issue? Maybe I'm doing something wrong somewhere in the config.

Thank you.

kum-deepak commented 1 year ago

The behavior of the broker is correct.

The concept of prefetch is to limit the number of unacknowledged messages a client can have. 'ack':'client' indicates that the client will acknowledge messages. So, once you acknowledge those, you would receive more messages. See https://stomp-js.github.io/guide/stompjs/using-stompjs-v5.html#acknowledgment

tycyssg commented 1 year ago

Is there any way to bypass that? I don't want to acknowledge anything, I just want to receive the messages. But if I take one of these headers out (prefetch or ack) the connection keeps dropping out and I do not receive any messages.

kum-deepak commented 1 year ago

It is actually a safety mechanism implemented by RabbitMQ to help the client. These streams can have lots of messages (say hundreds of thousands) which can easily overwhelm the client.

Acknowledging is easily achieved - add message.ack(); above this.receivedMessages.push(message.body);.

In case you are not happy with RabbitMQ mandating both ack and prefetch for this case, you would need to open a ticket with RabbitMQ.

tycyssg commented 1 year ago

Cool, thank you for clarification. I will try this.

I think I'm happy with your answer. Sorry if i though something is wrong with the library behavior.