IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.57k stars 1.76k forks source link

Crc32 Partitioning Scheme Incompatible with librdkafka #2430

Closed csm8118 closed 1 year ago

csm8118 commented 1 year ago
Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
v1.29.0 1.15

Problem Description

There is a difference between how sarama partitions when using the Crc32 hash and what is done in librdkafka (and ruby-kafka). It looks like this issue was reported some time ago but nothing came of it (https://github.com/Shopify/sarama/issues/1213). The main issue is that librdkafka/ruby-kafka treat the hashed value from Crc32 as an unsigned value, whereas sarama casts it to a signed value. However, when using other hashing algorithms, the signed cast is necessary.

Here are some sample inputs that show the difference:

#ruby: key of "SheetJS", num partitions: 100 -> partition 26
 DEV [8] pry(main)> Zlib.crc32('SheetJS') % 100
=> 26
//go: key of "SheetJS", num partitions: 100 -> partition 70
sc := sarama.NewConfig()
sc.Producer.Partitioner = sarama.NewCustomHashPartitioner(crc32.NewIEEE)
partitioner := sc.Producer.Partitioner("test")

msg := &sarama.ProducerMessage{
    Key: sarama.StringEncoder("SheetJS"),
}
p, _ := partitioner.Partition(msg, 100)
fmt.Println(p) 
// prints 70

// Using the "reference abs" partitioner
sc := sarama.NewConfig()
sc.Producer.Partitioner = sarama.NewCustomPartitioner(
    sarama.WithAbsFirst(),
    sarama.WithCustomHashFunction(crc32.NewIEEE),
)
partitioner := sc.Producer.Partitioner("test")

msg := &sarama.ProducerMessage{
    Key: sarama.StringEncoder("SheetJS"),
}
p, _ := partitioner.Partition(msg, 100)
fmt.Println(p) 
// prints 78

I created a custom partitioner (and tests) that replicates the ruby/librdkafka behavior. The relevant snippet of that code is below.

func NewRubyKafkaCompatiblePartitioner(topic string) sarama.Partitioner {
    p := new(rubyKafkaCompatiblePartitioner)
    p.random = sarama.NewRandomPartitioner(topic)
    p.hasher = crc32.NewIEEE()
    return p
}

func (p *rubyKafkaCompatiblePartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
    if message.Key == nil {
        return p.random.Partition(message, numPartitions)
    }
    bytes, err := message.Key.Encode()
    if err != nil {
        return -1, err
    }
    p.hasher.Reset()
    _, err = p.hasher.Write(bytes)
    if err != nil {
        return -1, err
    }

    partition := p.hasher.Sum32() % uint32(numPartitions)

    return int32(partition), nil
}

Sample testing output:

sc := sarama.NewConfig()
sc.Producer.Partitioner = NewRubyKafkaCompatiblePartitioner
partitioner := sc.Producer.Partitioner("test")

msg := &sarama.ProducerMessage{
    Key: sarama.StringEncoder("SheetJS"),
}
p, _ := partitioner.Partition(msg, 100)
fmt.Println(p) // prints 26

I would like to contribute this code to the sarama codebase, but I wanted to open this issue and have a discussion about it prior to creating a pull request.

github-actions[bot] commented 1 year ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

dnwe commented 1 year ago

@csm8118 thanks for raising this issue. Is it something you’re still interested in contributing? Presumably we’d have retain the old (wrong) method with a config option to switch to the compatible version

csm8118 commented 1 year ago

@dnwe I'd be happy to contribute this. My initial thinking was to introduce something like NewRubyKafkaCompatiblePartitioner as I described in the comment. If that sounds good to you, I'll get a PR going with those changes.

dnwe commented 1 year ago

I quite like the names that librdkafka assigns to it’s available partitioners:

Partitioner: random - random distribution, consistent - CRC32 hash of key (Empty and NULL keys are mapped to single partition), consistent_random - CRC32 hash of key (Empty and NULL keys are randomly partitioned), murmur2 - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), murmur2_random - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.), fnv1a - FNV-1a hash of key (NULL keys are mapped to single partition), fnv1a_random - FNV-1a hash of key (NULL keys are randomly partitioned). Type: string

Especially interesting as I hadn’t necessarily realised before that we are FNV-1a by default, librdkafka and its brethren are all CRC32 by default and Java is murmur2 by default

csm8118 commented 1 year ago

I created https://github.com/IBM/sarama/pull/2560 to implement this change. I tried to pick good names for things. Let me know what you think of the changes!

csm8118 commented 1 year ago

Thanks @dnwe! Closing this issue out.