SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 627 forks source link

how to use parameter partitionerType in kafka.Producer? #1094

Open mittaus opened 6 years ago

mittaus commented 6 years ago

Some detailed information on how each option of the partitionerType parameter affects the storage of information sent by the producer

var kafka = require('kafka-node')
var  client = new kafka.Client()

const options = {
    // Configuration for when to consider a message as acknowledged, default 1
    requireAcks: 1,
    // The amount of time in milliseconds to wait for all acks before considered, default 100ms
    ackTimeoutMs: 100,
    // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
    partitionerType: 2
}

var   producer = new kafka.Producer(client, options)

Examples:

partitionerType (values for assign):

0: default 1: random 2: cyclic 3: keyed 4: especified how to custom implementation

aikar commented 6 years ago

0 = bad. shouldn't be named default. always puts in first partition.

1 = random partition for every payload

2 = first payload goes into partition 1, 2nd goes into partition 2, repeat until end then loops around, evenly dividing messages up across all partitions

3 = requires use of keyed messages. when you produce a keyed message, the value passed for the key field then is used to determine which partition to put it in, so every message with same key ends up in same partition

4 = used with the 3rd parameter of the producer. you pass a function thats called for each payload, and you return a partition to assign it to.

the problem with 4 is it doesn't support promises :/

I'm actually writing a custom partitioner myself at the moment, but doing it before I call the client send method (you can preset the partition before sending payloads and skip partitioners all together).

I needed async and I'm hesitant on trying to adapt the partitioners to support dual return types.

mittaus commented 6 years ago

Thankz @aikar for giving your time and helping me.

I suggest the note part in the Producer section of the README.MD file, as shown below:

Producer(client, [options], [customPartitioner])

{
    // Configuration for when to consider a message as acknowledged, default 1
    requireAcks: 1,
    // The amount of time in milliseconds to wait for all acks before considered, default 100ms
    ackTimeoutMs: 100,
    // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
    partitionerType: 2
}

Note:

Possible values for option requireAcks: 0: It defines that it is not necessary to receive an ack for each message, this improves the performance in an important way but it is possible to lose messages. 1: It defines that a leader broker's ack that receives the message is necessary, this ensures that the leader received the message but does not assure that all the replicas have it. all: It defines that a leader broker's ack is necessary and all the replicas, this ensures that both the leader and all the replicas received the message.

Possible values for option partitionerType: 0: Always puts in first partition. 1: Random partition for every payload 2: First payload goes into partition 1, 2nd goes into partition 2, repeat until end then loops around, evenly dividing messages up across all partitions 3: Requires use of keyed messages. when you produce a keyed message, the value passed for the key field then is used to determine which partition to put it in, so every message with same key ends up in same partition 4: Used with the 3rd parameter of the producer. you pass a function thats called for each payload, and you return a partition to assign it to.

var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    client = new kafka.Client(),
    producer = new Producer(client);
Knove commented 5 years ago

I think some of the parameters in readme.md are too vague. I think we should reconstruct readme.md.

s50600822 commented 4 years ago

3: Requires use of keyed messages. when you produce a keyed message, the value passed for the key field then is used to determine which partition to put it in, so every message with same key ends up in same partition

Pretty sure it doesn't work

const uuid = require('uuid/v4');
const kafkaProducer = new kafka.Producer(kClient,{
    // Configuration for when to consider a message as acknowledged, default 1
    requireAcks: 1,
    // The amount of time in milliseconds to wait for all acks before considered, default 100ms
    ackTimeoutMs: 1000,
    partitionerType: 3
});
  let payload = [{
    topic: 'hoa',
    messages: new kafka.KeyedMessage(uuid(), JSON.stringify(geneEvent))
  }];

will send all to same topic every time regardless random key:

Screen Shot 2020-05-03 at 4 31 31 am

actually below works(but then KeyedMessage is a bit misleading if it key is not used for load balancing purpose):

  const mkey = uuid()
  const payload = [{
    topic: 'hoa',
    key: mkey
    messages: new kafka.KeyedMessage(mkey, JSON.stringify(geneEvent))
  }];
adihat commented 4 years ago

@s50600822 There is a difference between a topic and a partition. A single topic can have multiple partitions. This ensures one can read in parallel from a single topic.

kafka.KeyedMessage() deals with partitions and NOT topics.

Would recommend this article for more details as the illustrations will make things clear for you.

aniketkalamkar commented 4 years ago

0 = bad. shouldn't be named default. always puts in first partition.

1 = random partition for every payload

2 = first payload goes into partition 1, 2nd goes into partition 2, repeat until end then loops around, evenly dividing messages up across all partitions

3 = requires use of keyed messages. when you produce a keyed message, the value passed for the key field then is used to determine which partition to put it in, so every message with same key ends up in same partition

4 = used with the 3rd parameter of the producer. you pass a function thats called for each payload, and you return a partition to assign it to.

the problem with 4 is it doesn't support promises :/

I'm actually writing a custom partitioner myself at the moment, but doing it before I call the client send method (you can preset the partition before sending payloads and skip partitioners all together).

I needed async and I'm hesitant on trying to adapt the partitioners to support dual return types.

@aikar do you have any example of custom partitioner function definition and its usage.

aikar commented 4 years ago

I had this https://github.com/aikar/kafka-node/commit/ca7979ab94024b0c95c29d182c1025c9123a727f We used this for fairness so 1 client would be locked to a range of partitions and let other clients use other ranges, to ensure 1 client cant hog every partition.