tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.76k stars 529 forks source link

Add manual assignment of TopicPartitions to Consumer #373

Open JaapRood opened 5 years ago

JaapRood commented 5 years ago

In using Kafka for stream processing it's common practice to have to manually assign topic-partitions for consumption, rather than subscribing and receiving an assignment through the group.

For example, when processing messages from a partition the consumer subscribes to, you might have to keep some state. To make sure this state is durable, it's often replicated to a changelog, also implemented by a Kafka topic. That way, when the consumer is designed another partition or crashes, the state can be restored. To make sure the right processing state is restored before processing continues, the changelog is written to a topic with the same partition for which we're processing, a process called copartitioning. By simply consuming the state-replication-topic from the same partition for which you're processing messages, you're guaranteed to restore the right state.

In the above example, subscribing to a ConsumerGroup won't work: you've already been assigned a partition for the input topic and you need exactly that same partition number. There's other examples, too, like replicating a changelog as a local Cache on every node, ready to be queried through HTTP. In that case, you want to consume all partitions, rather than just be assigned a couple.

The documentation of the official Java client describes it as well:

In the previous examples, we subscribed to the topics we were interested in and let Kafka dynamically assign a fair share of the partitions for those topics based on the active consumers in the group. However, in some cases you may need finer control over the specific partitions that are assigned. For example:

  • If the process is maintaining some kind of local state associated with that partition (like a local on-disk key-value store), then it should only get records for the partition it is maintaining on disk.
  • If the process itself is highly available and will be restarted if it fails (perhaps using a cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In this case there is no need for Kafka to detect the failure and reassign the partition since the consuming process will be restarted on another machine.

Proposed solution

Like the Java KafkaConsumer does, allow to call consumer.assign instead of consumer.subscribe with an array of topic partitions.

To make this practical, implementing consumer.assignments() might be necessary too, returning the list of topic partitions assigned to the consumer (through either subscription or manual assignment).

JaapRood commented 5 years ago

I'm pretty happy to take this one on, but won't be able to start for a while. We're still right in the middle of our switch to using KafkaJS for all our stream processing and don't need manual assignment until we get to the more complicated tasks (where, as described above, we use Kafka for the changelog of stateful processing).

tulios commented 5 years ago

Nice, let's spec this change and have a project to track the progress.

patrykwegrzyn commented 5 years ago

This is something i will be interested in too, im working at similar project described by the @JaapRood ,and i see a lot of use cases for implementing stream processing, ktable or http < kafka proxy
I would love to use kafkajs on my topic ui / browser project but for that i would need fine control of what is assigned to the consumer , right now im forced to use alternative lib ;/

AlexKonstantinov1991 commented 5 years ago

Hello guys! Is that in progress?

JaapRood commented 5 years ago

Forces beyond my control (client work) keep pushing this forward on my schedule, so I thought I'd at least share the approach @tulios has helped to outline.

To allow us to ship initial support within a reasonable time frame, the biggest constraint we should add is that assignments have to be made before consumer.run and can't be change while running.

The API would look as follows:

const consumer = kafka.consumer({ groupId: 'my-group' })
await consumer.connect()
await consumer.assign({
  topic: 'test-topic',
  partitions: [
    { partition: 0, offset: '30' },
    { partition: 1, offset: '8' },
  ]
})

await consumer.run({ /* define eachMessage or eachBatch */ })

For the use cases I've described in this issue and have seen other describing, being able to change assignments while a Consumer is running is definitely a major convenience, but the assumption assignments can't change are throughout Consumer, Runner and ConsumerGroup. Several major changes would be required to make that work. However, if we assume assignments are static while the consumer runs, a couple of changes should do it:

There are plenty of details along the way that should be hashed out (for example what to do with the GROUP_JOIN instrumentation event), but I imagine more of that would become clear as we work towards a first implementation.

While assignments can't be changed while the consumer is running, nothing should prevent a user from stopping the consumer, changing assignments and starting again:

/* assume running consumer */

await consumer.stop()

await consumer.assign({
  topic: 'test-topic',
  partitions: [
    { partition: 1, offset: '8' },
    { partition: 4, offset: '299' },
  ]
})

await consumer.run({ /* define eachMessage or eachBatch */ })
jaysequr commented 4 years ago

Any progress?

JaapRood commented 4 years ago

Nope, you @jaysequr?

anonimitoraf commented 4 years ago

I really like how this library is written in general. I think the API is very clean and straightforward so it'd be such a waste for this to be a dealbreaker for us. I'm happy to take this on, @JaapRood if you're backing out

JaapRood commented 4 years ago

Go for it @anonimitoraf, with my new job I can't really justify putting the required time towards this. More than happy to assist anyone else attempting it, though!

sOfekS commented 4 years ago

Hey @anonimitoraf (or anyone else), was there any progress made on this? Is there maybe an ETA for this feature?

anonimitoraf commented 4 years ago

Hey, sorry didn't/haven't gotten around to doing this (simply because life got too busy). Do you want to give it a shot @sOfekS ?

pimpelsang commented 4 years ago

Just to clarify - will this new API (assign instead of subscribe) support creating consumers without specifying "group.id" in consumer constructor? I'd like prevent consumer group to be registered at all..

Question from this related issue: https://github.com/tulios/kafkajs/issues/853

JaapRood commented 4 years ago

@pimpelsang that's right. Rather than relying on the ConsumerGroup mechanism to provide which partitions are being consumed, the Consumer would read from the topic+partition manually provided directly. Each Consumer configured in this way should run irrespective of others like it, so using a group would not make sense.

shaniMaayan commented 3 years ago

@JaapRood hi it's been a while, any new updates?

JaapRood commented 3 years ago

Nothing has changed since June 2020:

Go for it @anonimitoraf, with my new job I can't really justify putting the required time towards this. More than happy to assist anyone else attempting it, though!

dwinrick-lever commented 2 years ago

Anyone taking this on? I see we have partitionAssigners but the API seems very unclear, not sure if it's useful for this use case.

Nevon commented 2 years ago

No, partition assigners is the mechanism by which assignments are done within a consumer group. For this use case, there would be no consumer group.

dwinrick-lever commented 2 years ago

I think what was confusing for me about the API today was that memberAssignment is a Buffer, maybe it should be a more self documenting type alias like EncodedMemberAssignment, thoughts?

export type GroupMemberAssignment = { memberId: string; memberAssignment: Buffer }

Also it's confusing as to what userData is...

For those wondering, here's what my single-partition assigner looks like:

const SinglePartitionAssigner: PartitionAssigner = () => ({
            name: 'SinglePartitionAssigner',
            version: 1,
            async assign() {
                return [
                    {
                        memberId: 'what',
                        memberAssignment: AssignerProtocol.MemberAssignment.encode({
                            version: this.version,
                            assignment: {
                                [topic]: [partition]
                            },
                            userData: Buffer.from([]) // no idea what this is for
                        })
                    }
                ]
            },
            protocol({topics}) {
                return {
                    name: this.name,
                    metadata: AssignerProtocol.MemberMetadata.encode({
                        version: this.version,
                        topics,
                        userData: Buffer.from([]) // no idea what this is for
                    }),
                }
            }
        });
paulovitorweb commented 2 years ago

The unsubscribe method would really be very useful for a number of use cases. Without it, you need to instantiate a new consumer and it's quite difficult to manage that.

vertho commented 2 years ago

Hi everyone. Has there been any progress here? As it stands it is only possible to publish with a partition key, but on the consumption side you have to specify a partition number.

lamweili commented 2 years ago

Hi everyone. Has there been any progress here? As it stands it is only possible to publish with a partition key, but on the consumption side you have to specify a partition number.


For publishing, it is possible to publish with a partition key or partition number (refer to message structure):

If a partition is specified in the message, use it If no partition is specified but a key is present choose a partition based on a hash (murmur2) of the key If no partition or key is present choose a partition in a round-robin fashion


For consuming, at the moment, there is no way to specify a partition number. Currently, KafkaJS does not support manual assignment via assign (which is the rationale for this issue).

JaapRood commented 2 years ago

Nothing has changed since June 2020, waiting for someone willing to invest the time to take this on:

[...] with my new job I can't really justify putting the required time towards this. More than happy to assist anyone else attempting it, though!

tbaggaley commented 2 years ago

In case it's of any use to others seeking to subscribe multiple group members to all partitions within a given topic, here's my solution using partitionAssigners which seems to work. Thanks @dwinrick-lever for the single-partition example above!

The partition assigner generated by the generateAllPartitionAssigner function (given your topic name) will get the metadata for your topic + pull all the partition IDs, then assign all group members to all partitions.

import { Kafka, PartitionAssigner, AssignerProtocol } from "kafkajs";

const generateAllPartitionAssigner =
  (topic: string): PartitionAssigner =>
  ({ cluster }) => ({
    name: "AllPartitionAssigner",
    version: 1,
    protocol({ topics }) {
      return {
        name: this.name,
        metadata: AssignerProtocol.MemberMetadata.encode({
          version: this.version,
          topics,
          userData: Buffer.from([]),
        }),
      };
    },
    assign: async ({ members }) => {
      await cluster.connect();
      await cluster.refreshMetadata();
      const partitionMetadata = cluster.findTopicPartitionMetadata(topic);
      const availablePartitions = partitionMetadata.map((pm) => pm.partitionId);

      return members.map((member) => ({
        memberId: member.memberId,
        memberAssignment: AssignerProtocol.MemberAssignment.encode({
          version: 1,
          assignment: { [topic]: availablePartitions },
          userData: Buffer.from([]),
        }),
      }));
    },
  });

const consumer = kafka.consumer({
  groupId: "my.super.special.group.name",
  partitionAssigners: [generateAllPartitionAssigner("YOUR.TOPIC.NAME.HERE")],
});
etlr commented 2 years ago

Hey y'all, any update on this one? I'm writing a command line tool to consume messages from a given topic for analysis and automation purposes. This tool would only need to read messages coming in, and I want to avoid being assigned to a group and committing an offset so as not to interrupt the services consuming these messages.

Thank you!

lazyboson commented 1 year ago

I think what was confusing for me about the API today was that memberAssignment is a Buffer, maybe it should be a more self documenting type alias like EncodedMemberAssignment, thoughts?

export type GroupMemberAssignment = { memberId: string; memberAssignment: Buffer }

Also it's confusing as to what userData is...

For those wondering, here's what my single-partition assigner looks like:

const SinglePartitionAssigner: PartitionAssigner = () => ({
            name: 'SinglePartitionAssigner',
            version: 1,
            async assign() {
                return [
                    {
                        memberId: 'what',
                        memberAssignment: AssignerProtocol.MemberAssignment.encode({
                            version: this.version,
                            assignment: {
                                [topic]: [partition]
                            },
                            userData: Buffer.from([]) // no idea what this is for
                        })
                    }
                ]
            },
            protocol({topics}) {
                return {
                    name: this.name,
                    metadata: AssignerProtocol.MemberMetadata.encode({
                        version: this.version,
                        topics,
                        userData: Buffer.from([]) // no idea what this is for
                    }),
                }
            }
        });

No Luck.

Sigoloh commented 11 months ago

Hello @lazyboson any progress in the issue? I'm really willing to take this forward. It would be great to have this functionality in one of my projects

izakdvlpr commented 5 months ago

1+

fabioroger commented 5 months ago

+1

On Thu, 20 Jun 2024 at 14:06, Isaque Lima @.***> wrote:

1+

— Reply to this email directly, view it on GitHub https://github.com/tulios/kafkajs/issues/373#issuecomment-2180636100, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACWTMNH4SSZPLTDIAZ22BDZILHU5AVCNFSM4HQMXALKU5DIOJSWCZC7NNSXTN2JONZXKZKDN5WW2ZLOOQ5TEMJYGA3DGNRRGAYA . You are receiving this because you are subscribed to this thread.Message ID: @.***>