tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.66k stars 519 forks source link

Multiple kakfa consumer with the same groupId and different topics #1040

Open radinail opened 3 years ago

radinail commented 3 years ago

Describe the bug I'm trying to have 3 consumers with the same groupId and 3 topics, and each consumer subscribe to one topic. When i use the same groupId, i just see one consumer in "describe --all-groups" command But when i use different groupIds, i can see the three consumers

My question is : Why i having the same groupId for all the consumers is not working ?

To Reproduce import { Kafka } from 'kafkajs';

const readble1 = new Kafka({ brokers: ['localhost:9092'], }).consumer({ groupId: 'GROUP_TEST' });

const readble2 = new Kafka({ brokers: ['localhost:9092'], }).consumer({ groupId: 'GROUP_TEST' });

const readble3 = new Kafka({ brokers: ['localhost:9092'], }).consumer({ groupId: 'GROUP_TEST' });

const init = async (consumer, topic) => { await consumer.connect();

await consumer.subscribe({ topic }); console.log('subscirbe ok to topic = ', topic); await consumer.run({ eachMessage: async (payload) => { consumer.pause([{ topic: payload?.topic }]); try { if (!payload?.message) { throw new Error('empty message'); } } catch (error) { console.log('error = ', error); } consumer.resume([{ topic: payload?.topic }]); }, }); };

init(readble1, 'TOPIC1'); init(readble2, 'TOPIC2'); init(readble3, 'TOPIC3');

Nevon commented 3 years ago

You cannot have different consumers in the same consumer group be subscribed to different topics. If they are subscribing to different topics, they should not be in the same consumer group.

Either use different group ids for different consumer groups (if you want each consumer group to only consume from one topic) or have a single consumer group consume from all three topics.

827

javigaralva commented 2 years ago

You cannot have different consumers in the same consumer group be subscribed to different topics. If they are subscribing to different topics, they should not be in the same consumer group.

Either use different group ids for different consumer groups (if you want each consumer group to only consume from one topic) or have a single consumer group consume from all three topics.

827

Is this behavior a limitation of kafkaJS or does the kafka protocol behave like this? Thanks!

Nevon commented 2 years ago

This is how Kafka consumer groups work.

kpr9991 commented 2 years ago

Do we have any proof from documentation??

15096819 commented 2 years ago

This is not true. Using Kafka Java client OR "node-rdkafka", I can get what @radinail need: Two consumers in the same group, and each for a different topic: image

javigaralva commented 2 years ago

So, It seems like a limitation of KafkaJS. Perhaps it could be added as a feature request, don't you think?

Thanks.

Nevon commented 2 years ago

So I looked into this a bit more, to try to understand how they have achieved this, and I think I understand now how they've done this in the Java client. It is indeed possible to do.

  1. In the GroupJoin request, each member encodes the topics they are subscribed to as part of the protocol metadata. In the Kafka protocol, this is just a hunk of bytes, so the assigner protocol can put whatever it wants in there. But for these purposes, each member puts the topics it is subscribed to in there.
  2. When the group leader gets the GroupJoin response, that includes all the members of the group along with the protocol metadata per member. Once the group leader has selected an assigner protocol to use from the common ones, it can use that assigner to decode the member metadata (the assigner was the one that encoded the member metadata, so it knows how to decode it). Now it knows which members are subscribed to which topics.
  3. It can then use this information in the assigner to ensure that partitions for a given topic are only subscribed to individual members that are subscribed to that topic.

What confused me was that the consumer group itself is always acting on the superset of all subscribed topics - the core difference is actually in assignment and not assigning partitions across the entire consumer group. Each assigner needs to implement support for this, rather than it being something that the consumer itself does.

Nevon commented 2 years ago

Took a stab this morning just to see if this might be an easy thing to implement, and it's almost easy. Thought I'd jot down what I learned before I forget.

The assigner logic requires just a small change by decoding the member metadata using MemberMetadata.decode to get the subscribed topics per member, and then when assigning partitions you just skip a member if it isn't subscribed to the topic.

The only thing that prevents it from being an almost trivial change is that the leader needs to have metadata loaded for all topics that the group is subscribed to, even if it itself is not subscribed to those topics. The current design is that the consumer only ever fetches metadata for topics it is subscribed to, so that needs to change a bit so that it gets metadata for all topics that the whole group is subscribed to, before handing off to the assigner.

javigaralva commented 1 year ago

Any plan to implement this feature in a short time? 😅😅😅 Thanks.

skv-commits commented 1 year ago

We have also same issue with this , running on three instances hitting same broker one topic per instance doesn't work when we use the same groupID on multiple instances ? topic A - a.broker.com - running on instance 1 topic B - a.broker.com - running on instance 2 topic C - a.broker.com - running on instance 3 They follow separate deployment patterns
Any workaround for it , Please Advise

MartinBurian commented 1 year ago

The case when different instances (replicas) in one consumer group consume different sets of topics also arises every time one applies a rolling update which adds another topic to be consumed. If the application starts to consume a topic, but the group leader is elected from the old replicas without it, the partitions of the topic are not assigned. Since we monitor the partition assignment in our apps, this failure prevented the rolling upgrade. We added a fix for the issue, but did not really have the time to contribute it back upstream properly. At least I can drop the fixed assigner here, it may help... It may have issues, but it seems to work for us.

// { PartitionAssigner
  async assign({ members }: { members: GroupMember[]; topics: string[] }) {
    const assignment = {};

    const memberTopics: Record<string, string[]> = {};

    members.forEach((member) => {
      const meta = AssignerProtocol.MemberMetadata.decode(member.memberMetadata)!;
      memberTopics[member.memberId] = meta.topics;
    });

    const topicMembers = Object.keys(memberTopics).reduce((acc: Record<string, string[]>, memberId: string) => {
      memberTopics[memberId].forEach((topic) => {
        if (acc[topic]) {
          acc[topic].push(memberId);
        } else {
          acc[topic] = [memberId];
        }
      });

      return acc;
    }, {});
    Object.values(topicMembers).forEach((v) => v.sort());

    const consumedTopics = uniq(flatten(Object.values(memberTopics)));
    for (const topic of consumedTopics) {
      await cluster.addTargetTopic(topic);
    }

    const topicsPartionArrays = consumedTopics.map((topic) => {
      const partitionMetadata = cluster.findTopicPartitionMetadata(topic);

      return partitionMetadata.map((m) => ({ topic: topic, partitionId: m.partitionId }));
    });
    const topicsPartitions = flatten(topicsPartionArrays);

    topicsPartitions.forEach(({ topic, partitionId }, i) => {
      const assignee = topicMembers[topic][i % topicMembers[topic].length];

      if (!assignment[assignee]) {
        assignment[assignee] = Object.create(null);
      }

      if (!assignment[assignee][topic]) {
        assignment[assignee][topic] = [];
      }

      assignment[assignee][topic].push(partitionId);
    });

    return Object.keys(assignment).map((memberId) => ({
      memberId,
      memberAssignment: AssignerProtocol.MemberAssignment.encode({
        version: this.version,
        assignment: assignment[memberId],
        userData: Buffer.of(),
      }),
    }));
  }
// }
maranice88 commented 1 year ago

We have same issue on our side, any update or plans to fix this ?

UglyBarnacle commented 1 year ago

Same culprit here on our side. We were suffering from low performance and failing messages in our applications, so I had a look and discovered just a single consumer (and group) subscribed to multiple topics which slowly filled up processing all the messages. So I was thinking, hey, in our Java based applications it works too, same group with multiple consumers, each one subscribed to a different topic. Quickly updating the code and.... no topics assigned? at all? Trying to implement the one-group-per-topic at the moment, but having mind bending issues about loosing offsets and causing mayhem if everything gets processed again. Quick status here would be nice, thanks!

dmytro-kuchura commented 1 year ago

Today we collided with this issue, what about any news or plans to fix this or quick-fix?

slice-krrish commented 1 year ago

Any update on this? @Nevon Any comment on solution proposed by @MartinBurian?

MartinBurian commented 1 year ago

The assigner implementation I posted above can simply be plugged into the Kafka consumer:

const kafka = new Kafka(...);
const consumer = kafka.consumer({
    // ...
    partitionAssigners: [ myPartitionAssigner ]
});

Just note that all members of a consumer group must agree on a common partition assigner name during rebalance, so make sure the assigner name is the same as the default, or update all replicas in the same CG at once instead of a rolling update. I should also mention that the implementation may not distribute the partitions optimally when the CG members have different subscriptions, but I consider that to be a transient state and am not really concerned with it.

It would be nice if it was a part of the library, I just really don't have the time to contribute it properly with tests and stuff, sorry :family:.

slice-krrish commented 1 year ago

Thanks @MartinBurian .

anuvrat-rastogi commented 10 months ago

Bumped into this one today after scratching all the configuration options. Will try the custom partition assigner for this as the delivery is near. Any hopes of having this in standard? More importantly, I am looking for the exact behaviour like Java Client.

icecat20 commented 6 months ago

They have solved this problem by using topics with an array in Consumer, but when I want to configure each consumer for each topic to operate independently, it still doesn't work.

carloseduardosx commented 1 month ago

+1 on this. I'm having the same problem.

okisetiawan0101 commented 2 weeks ago

+1 on this. I'm having the same problem.

Is there any update about this?