coders51 / rabbitmq-stream-js-client

rabbitmq-stream-js-client
MIT License
34 stars 5 forks source link

declareSuperStreamConsumer problem in more than 1 partition #216

Open oneleg opened 1 week ago

oneleg commented 1 week ago

hello, i tried to make 5 consumer using declareSuperStreamConsumer for my 5 partition

but all consumers listening all partitions message

this is my consumer script (i deploy this script 5 time using docker)

` consumerConfig = { superStream: superStreamName, offset: rabbit.Offset.next() }

consumer = await client.declareSuperStreamConsumer(
  consumerConfig,
  (message) => {

    const receivedMessage = JSON.parse(message.content.toString());
    console.log(`Received message from user ${receivedMessage.telegram_id}:`, receivedMessage);
  }
);

`

tunniclm commented 1 week ago

@oneleg I just hit the same issue with my application and I think I worked out the problem.

The library always enables single-active-consumer semantics on superstreams, however, by default it will generate a different name for each process by combining the stream name with a UUID - you can see this on the RabbitMQ management interface when you view the stream. You will see that the name argument for each consumer will be different and all will be marked active.

You need to supply you own name in order to place these separate consumers into the same group by passing a consumerRef in the configuration when declaring the consumer.

If you do this then you should see in the management console both consumers have the same name argument and only one will be active.

Sidenote: It was useful when working this out to also look at the output from these commands running in my docker container (named rabbitmq):

docker exec rabbitmq rabbitmq-streams list_stream_consumer_groups

and

docker exec rabbitmq rabbitmq-streams list_stream_consumers

Example output from list_stream_consumer_groups when not supplying consumerRef:

Listing stream consumer groups ...
┌─────────────────┬────────────────────────────────────────────────────┬─────────────────┬───────────┐
│ stream          │ reference                                          │ partition_index │ consumers │
├─────────────────┼────────────────────────────────────────────────────┼─────────────────┼───────────┤
│ mysuperstream-0 │ mysuperstream-41310db8-a7fc-40b6-b01a-1619528d621c │ -1              │ 1         │
├─────────────────|────────────────────────────────────────────────────┼─────────────────┼───────────┤
│ mysuperstream-0 │ mysuperstream-dd465990-6e6e-4169-87fc-05676a91eede │ -1              │ 1         │
├─────────────────┼────────────────────────────────────────────────────┼─────────────────┼───────────┤
│ mysuperstream-1 │ mysuperstream-41310db8-a7fc-40b6-b01a-1619528d621c │ -1              │ 1         │
├─────────────────┼────────────────────────────────────────────────────┼─────────────────┼───────────┤
│ mysuperstream-1 │ mysuperstream-dd465990-6e6e-4169-87fc-05676a91eede │ -1              │ 1         │
├─────────────────┼────────────────────────────────────────────────────┼─────────────────┼───────────┤
│ mysuperstream-2 │ mysuperstream-41310db8-a7fc-40b6-b01a-1619528d621c │ -1              │ 1         │
├─────────────────┼────────────────────────────────────────────────────┼─────────────────┼───────────┤
│ mysuperstream-2 │ mysuperstream-dd465990-6e6e-4169-87fc-05676a91eede │ -1              │ 1         │
└─────────────────┴────────────────────────────────────────────────────┴─────────────────┴───────────┘

and list_stream_consumers:

Listing stream consumers ...
┌─────────────────┬─────────────────┬───────────────────┬────────┬────────────┬─────────┬────────┬─────────────────┬─────────────────────────────────────────────────────────────────┐
│ subscription_id │ stream          │ messages_consumed │ offset │ offset_lag │ credits │ active │ activity_status │ properties                                                      │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼─────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-1 │ 2                 │ 1      │ 0          │ 2       │ true   │ single_active   │ #{<<"name">> =>                                                 │
│                 │                 │                   │        │            │         │        │                 │       <<"mysuperstream-dd465990-6e6e-4169-87fc-05676a91eede">>, │
│                 │                 │                   │        │            │         │        │                 │   <<"single-active-consumer">> => <<"true">>}                   │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼─────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-0 │ 0                 │ 0      │ 0          │ 2       │ true   │ single_active   │ #{<<"name">> =>                                                 │
│                 │                 │                   │        │            │         │        │                 │       <<"mysuperstream-dd465990-6e6e-4169-87fc-05676a91eede">>, │
│                 │                 │                   │        │            │         │        │                 │   <<"single-active-consumer">> => <<"true">>}                   │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼─────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-0 │ 0                 │ 0      │ 0          │ 2       │ true   │ single_active   │ #{<<"name">> =>                                                 │
│                 │                 │                   │        │            │         │        │                 │       <<"mysuperstream-41310db8-a7fc-40b6-b01a-1619528d621c">>, │
│                 │                 │                   │        │            │         │        │                 │   <<"single-active-consumer">> => <<"true">>}                   │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼─────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-2 │ 0                 │ 0      │ 0          │ 2       │ true   │ single_active   │ #{<<"name">> =>                                                 │
│                 │                 │                   │        │            │         │        │                 │       <<"mysuperstream-41310db8-a7fc-40b6-b01a-1619528d621c">>, │
│                 │                 │                   │        │            │         │        │                 │   <<"single-active-consumer">> => <<"true">>}                   │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼─────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-2 │ 0                 │ 0      │ 0          │ 2       │ true   │ single_active   │ #{<<"name">> =>                                                 │
│                 │                 │                   │        │            │         │        │                 │       <<"mysuperstream-dd465990-6e6e-4169-87fc-05676a91eede">>, │
│                 │                 │                   │        │            │         │        │                 │   <<"single-active-consumer">> => <<"true">>}                   │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼─────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-1 │ 2                 │ 1      │ 0          │ 2       │ true   │ single_active   │ #{<<"name">> =>                                                 │
│                 │                 │                   │        │            │         │        │                 │       <<"mysuperstream-41310db8-a7fc-40b6-b01a-1619528d621c">>, │
│                 │                 │                   │        │            │         │        │                 │   <<"single-active-consumer">> => <<"true">>}                   │
└─────────────────┴─────────────────┴───────────────────┴────────┴────────────┴─────────┴────────┴─────────────────┴─────────────────────────────────────────────────────────────────┘

Example output from list_stream_consumer_groups when supplying consumerRef: 'my-app':

Listing stream consumer groups ...
┌─────────────────┬───────────┬─────────────────┬───────────┐
│ stream          │ reference │ partition_index │ consumers │
├─────────────────┼───────────┼─────────────────┼───────────┤
│ mysuperstream-0 │ my-app    │ -1              │ 2         │
├─────────────────┼───────────┼─────────────────┼───────────┤
│ mysuperstream-1 │ my-app    │ -1              │ 2         │
├─────────────────┼───────────┼─────────────────┼───────────┤
│ mysuperstream-2 │ my-app    │ -1              │ 2         │
└─────────────────┴───────────┴─────────────────┴───────────┘

and list_stream_consumers:

Listing stream consumers ...
┌─────────────────┬─────────────────┬───────────────────┬────────┬────────────┬─────────┬────────┬─────────────────┬──────────────────────────────────────────────────────────────────────────┐
│ subscription_id │ stream          │ messages_consumed │ offset │ offset_lag │ credits │ active │ activity_status │ properties                                                               │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼──────────────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-0 │ 0                 │ 0      │ 0          │ 2       │ false  │ waiting         │ #{<<"name">> => <<"my-app">>,<<"single-active-consumer">> => <<"true">>} │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼──────────────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-1 │ 2                 │ 1      │ 0          │ 2       │ true   │ single_active   │ #{<<"name">> => <<"my-app">>,<<"single-active-consumer">> => <<"true">>} │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼──────────────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-2 │ 0                 │ 0      │ 0          │ 2       │ false  │ waiting         │ #{<<"name">> => <<"my-app">>,<<"single-active-consumer">> => <<"true">>} │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼──────────────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-1 │ 0                 │ 0      │ 0          │ 2       │ false  │ waiting         │ #{<<"name">> => <<"my-app">>,<<"single-active-consumer">> => <<"true">>} │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼──────────────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-0 │ 0                 │ 0      │ 0          │ 2       │ true   │ single_active   │ #{<<"name">> => <<"my-app">>,<<"single-active-consumer">> => <<"true">>} │
├─────────────────┼─────────────────┼───────────────────┼────────┼────────────┼─────────┼────────┼─────────────────┼──────────────────────────────────────────────────────────────────────────┤
│ 0               │ mysuperstream-2 │ 0                 │ 0      │ 0          │ 2       │ true   │ single_active   │ #{<<"name">> => <<"my-app">>,<<"single-active-consumer">> => <<"true">>} │
└─────────────────┴─────────────────┴───────────────────┴────────┴────────────┴─────────┴────────┴─────────────────┴──────────────────────────────────────────────────────────────────────────┘

Other useful references:

tunniclm commented 1 week ago

I've been comparing my application to the Java-based reference example (https://github.com/acogoluegnes/rabbitmq-stream-single-active-consumer#super-streams) and I'm seeing two issues with my current super-stream / single-active-consumer setup which makes me think there is something missing/wrong in the way I have configured my consumers in my previous comment:

  1. rabbit-streams list_stream_consumer_groups is showing -1 for the partition_index unlike the Java implementation and according to the doc that means the streams are not part of a super stream
  2. When I start a new instance of my application the single active connections are not rebalanced as they are in the Java example.

I'm not sure if these are related, but at the moment this means I do not get any scaling - the messages are distributed across the streams of the super stream, but all the messages go to the same client.

Notes:

tunniclm commented 1 week ago
  • I have a suspicion that the super-stream consumer property might be involved in the partition_index being -1 since I see evidence that the individual stream consumers created in the Java client have this property set to the super stream name, but I couldn't find reference to it in the JS client.

I tried modifying rabbitmq-stream-js-client to pass the super-stream consume property and it seemed to resolve both the partition_index issue and the rebalancing of single active consumers.

When the second application starts I now immediately see it made the active consumer on half of the streams.


Instance 1 client logger output:

info: new socket for connection 0611675a-e750-4214-951c-6606103b8d2b: [ true, true, undefined, undefined, 'opening' ]
info: Connected to RabbitMQ localhost:5552
info: Returned superstream partitions for superstream mysuperstream
info: Returned stream metadata for streams with names mysuperstream-0
info: Returned stream metadata for streams with names mysuperstream-1
info: new socket for connection 00b7f7fa-7066-4b68-9281-9bcb106dc058: [ true, true, undefined, undefined, 'opening' ]
info: new socket for connection b117368e-cf0a-4473-9527-226d62164389: [ true, true, undefined, undefined, 'opening' ]
info: Returned stream metadata for streams with names mysuperstream-2
info: Returned stream metadata for streams with names mysuperstream-3
info: new socket for connection e9529f2e-1f14-4bc6-8cba-0627f65bfd23: [ true, true, undefined, undefined, 'opening' ]
info: new socket for connection c70952f6-108d-4c44-b181-4c869ea5f5d6: [ true, true, undefined, undefined, 'opening' ]
info: Connected to RabbitMQ localhost:5552
info: Connected to RabbitMQ localhost:5552
info: Connected to RabbitMQ localhost:5552
info: Connected to RabbitMQ localhost:5552
info: New consumer created with stream name mysuperstream-2, consumer id 0 and offset next
info: New consumer created with stream name mysuperstream-0, consumer id 0 and offset next
info: New consumer created with stream name mysuperstream-1, consumer id 0 and offset next
info: New consumer created with stream name mysuperstream-3, consumer id 0 and offset next
info: {  size: 10,  key: 26,  version: 1,  correlationId: 0,  subscriptionId: 0,  active: 1 }
info: {  size: 10,  key: 26,  version: 1,  correlationId: 0,  subscriptionId: 0,  active: 1 }
info: {  size: 10,  key: 26,  version: 1,  correlationId: 0,  subscriptionId: 0,  active: 1 }
info: {  size: 10,  key: 26,  version: 1,  correlationId: 0,  subscriptionId: 0,  active: 1 }
// --- instance 2 starts --- //
info: {  size: 10,  key: 26,  version: 1,  correlationId: 1,  subscriptionId: 0,  active: 0 }
info: {  size: 10,  key: 26,  version: 1,  correlationId: 1,  subscriptionId: 0,  active: 0 }

Instance 2 client logger output:

info: new socket for connection 9d1cb4b6-ae04-4075-a6d5-30d6294dee07: [ true, true, undefined, undefined, 'opening' ]
info: Connected to RabbitMQ localhost:5552
info: Returned stream metadata for streams with names mysuperstream-0
info: Returned stream metadata for streams with names mysuperstream-1
info: Returned stream metadata for streams with names mysuperstream-2
info: Returned stream metadata for streams with names mysuperstream-3
info: new socket for connection a402562e-e0a2-40d1-9c9f-5cd7e9d5351d: [ true, true, undefined, undefined, 'opening' ]
info: new socket for connection 349b5b0f-6ea6-423d-94bd-a6f4dd8c6e26: [ true, true, undefined, undefined, 'opening' ]
info: new socket for connection a86a2c95-23bf-437c-934d-17bceb5ea3d1: [ true, true, undefined, undefined, 'opening' ]
info: new socket for connection 84542835-4d8a-41bb-81b7-468576922e5a: [ true, true, undefined, undefined, 'opening' ]
info: Connected to RabbitMQ localhost:5552
info: Connected to RabbitMQ localhost:5552
info: Connected to RabbitMQ localhost:5552
info: Connected to RabbitMQ localhost:5552
info: New consumer created with stream name mysuperstream-1, consumer id 0 and offset next
info: New consumer created with stream name mysuperstream-2, consumer id 0 and offset next
info: New consumer created with stream name mysuperstream-0, consumer id 0 and offset next
info: New consumer created with stream name mysuperstream-3, consumer id 0 and offset next
info: {  size: 10,  key: 26,  version: 1,  correlationId: 0,  subscriptionId: 0,  active: 1 }
info: {  size: 10,  key: 26,  version: 1,  correlationId: 0,  subscriptionId: 0,  active: 1 }

rabbit-streams list_stream_consumer_groups output:

Listing stream consumer groups ...
┌─────────────────┬───────────┬─────────────────┬───────────┐
│ stream          │ reference │ partition_index │ consumers │
├─────────────────┼───────────┼─────────────────┼───────────┤
│ mysuperstream-0 │ my-app    │ 0               │ 2         │
├─────────────────┼───────────┼─────────────────┼───────────┤
│ mysuperstream-1 │ my-app    │ 1               │ 2         │
├─────────────────┼───────────┼─────────────────┼───────────┤
│ mysuperstream-2 │ my-app    │ 2               │ 2         │
├─────────────────┼───────────┼─────────────────┼───────────┤
│ mysuperstream-3 │ my-app    │ 3               │ 2         │
└─────────────────┴───────────┴─────────────────┴───────────┘

In conclusion, @oneleg - your original problem does seem to be that you need to pass consumerRef, but it looks like there might also another bug here with superstreams and single-active-consumer.