Open 0x1997 opened 7 years ago
@edenhill It's application specific. Basically custom_hash(static_cast<State*>(msg_opaque)) % partition_cnt
in C++.
Okay, what you'll need to do in the meantime is get the partition count (with GetMetadata()) for your topic(s) and then run your partitioner prior to calling Produce() and setting Message.Partition accordingly.
The problem with implementing partitioner_cb support in high-level language bindings is that the partitioner callback may be called from an internal librdkafka thread and this isn't trivial to handle in cgo, cpython, et, al. This should be fixed in librdkafka, rather than the bindings, but this isnt trivial either, that's why this functionality is currently missing from our bindings.
But here's a dumb idea: what if you, as a Go app developer, implemented the partitioner in C (cgo) and the Go client provided an API to set the C partitioner_cb. You wouldn't be allowed to call any Go methods from this callback, but since partitioners are pretty minimal by design this might be okay.
It would look something like this:
/*
#include <librdkafka/rdkafka.h>
static int32_t my_partitioner (rd_kafka_topic_t *rkt, ..) {
..some custom hasher goes here..
return hash % partition_cnt;
}
*/
import "C"
...
conf := ConfigMap{..., "default.topic.config": &ConfigMap{"partitioner_cb", C.my_partitioner}}
p, err := NewProducer(conf)
...
I know it is ugly, but would it be a reasonable workaround for you until proper Go partitioners are supported?
I wouldn't even need an entirely custom partitioner, I would just need the same key to always go to the same partition. Perhaps it would be possible to bake in something like that, for example by adding another constant besides PartitionAny
... ?
@tchap the default partitioner is Consistent-Random, which maps the same key to the same partition, so you should be fine. https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L1606
The next version of librdkafka (the underlying Kafka client) will expose the builtin partitioners as configuration properties, allowing you to change to an alternative builtin partitioner, such as the Java compatible murmur2_random partitioner.
Search for 'partitioner' here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Custom partitioners are not yet exposed in the Go client though.
@edenhill
Custom partitioners are not yet exposed in the Go client though.
I have a usecase which really needs this, and I'm trying any kind of workaround and it seems not to be easy. First, I tried the workaround mentioned here https://github.com/confluentinc/confluent-kafka-go/issues/16#issuecomment-266298255 , but I get the error:
Failed to create producer: Invalid value type unsafe.Pointer for key partitioner_cb (expected string,bool,int,ConfigMap)
Then, I casted the pointer to an uintptr, and then, casted again to int, and then I ran into a runtime exception saying:
Failed to create producer: Property "partitioner_cb" must be set through dedicated ..set..() function
At this point, I stopped trying, but it looks like it wants me to use rd_kafka_topic_conf_set_partitioner_cb()
through some more CGo magic, but I gave up since I didn't know what to put in the first param to that function, which is rd_kafka_topic_conf_t *topic_conf
, and I don't know how to get the rd_kafka_topic_conf_t
in the context of my golang program.
Do you have any tips for completing this workaround? Thanks!
Due to the generic way configuration is passed from Go to C it is a bit tricky to add a special case for set_partitioner_cb()
, so for the sake of proof-of-concepting I suggest you insert a call to ..set_partitioner_cb()
with a hardcoded C-function callback here, right before rd_kafka_conf_set_default_topic_conf()
:
https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/config.go#L149
Do note that this callback may be called from internal librdkafka threads and it is not clear to me how to safely trigger a Go call from such a thread.
Thank you, @edenhill ! What I actually need is not generic custom partitioner. Rather, what I'm looking for is a way to hash based on something other than the kafka key, since I'm using the key for other purposes. It looks like hashing based on the msg_opaque would work for me.
const rd_kafka_topic_t *rkt,
const void *keydata,
size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque
as long as I can set the msg_opaque at the golang level and it looks like i can (https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/message.go#L79).
Would you accept a PR to add a new builtin partitioner to librdkafka, like this:
consistent_opaque - CRC32 hash of msg_opaque (Empty and NULL msg_opaque are mapped to single partition)
? Although I'm not sure if the CRC32 should be applied to the pointer address msg_opaque
, or "the whole data behind the pointer, and since it doesn't have a corresponding length argument, the msg_opaque would have to point to a c-string so the CRC32 knows how far to read...
I strongly advise you to stick to the existing semantics of keys in Kafka, they are used for partitioning and compaction. If you need additional data with your message you can either create a richer message payload (using for example avro and schema-registry), or use message headers to "tag along" arbitary data to your liking.
We will not accept a PR that does partitioning on something else than the key, sorry.
Also, the msg_opaque is used by the Go client internally to map C messages back to Go messages.
Also, the msg_opaque is used by the Go client internally to map C messages back to Go messages. Ah, that would certainly kill the idea.
I strongly advise you to stick to the existing semantics of keys in Kafka, they are used for partitioning and compaction.
This ship has sailed for a while at my company. In the future when we upgrade to kafka 0.11, we'll probably stick this metadata in the 0.11+ Headers. Additionally, we have no use for compaction and have it turned off for our use case.
or use message headers to "tag along" arbitary data to your liking.
This is a possibility too. I will re-visit that. It was my first option, but ran into social problems :)
We would love to have this available in the producer, as we're moving from Sarama to confluent-kafka-go, but we still need to support Sarama's default partitioner, which uses the 32 bit FNV-1a hashing algorithm (part of hash/fnv
in Go).
For now we will follow @edenhill's advice (i.e. get topic metadata, run custom Go partitioner prior to Produce
and set Message.Partition
), but it would still be nice to be able to have custom partitioners be supported in some form in the Producer API (either Go or C). Although, I may just try my hand at adding the FNV-1a algo to librdkafka...
Yeah, the simplest alternative is to add it as a builtin partitioner to librdkafka. Look's like the fnv1a code is very simple, so should be fairly straight forward.
Find all occurences of murmur2_random
in the librdkafka/src code and add fnv1a_random
counterparts.
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_msg.c#L870
Aggree with @Manicben :) could be great to have that included in the producer, rather than to have to do a custom go partitioner prior to the Produce
.
Is there a chance to see that feature in the future ?
Btw @edenhill thanks for all the good work 👍
@edenhill hello. I have an idea that the message of key="a" send to partition-0 and other message of key="b" send to partition-1. the other message ... after i debuged, it don't work. my code of partition_cb: `static int32_t partitioner_cb(const rd_kafka_topic_t rkt, const void keydata, size_t keylen, int32_t partition_cnt, void rkt_opaque, void msg_opaque) { / this is a simple example / int32_t partition = 0; if (keylen <= 0) return partition; if (strncmp(keydata, "a", 1) == 0) partition = 0; else if (strncmp(keydata, "b", 1) == 0) partition = 1; else if (strncmp(keydata, "c", 1) == 0) partition = 2; else if (strncmp(keydata, "d", 1) == 0) partition = 3;
return partition;
}
int err = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void )buf, len, (void )key, strlen(key), NULL); ` could you help me? thanks
Out of curiousity, can you share your custom partitioner? It might be generic enough to be integrated.