bakdata / kafka-key-value-store

Queryable Kafka Topics with Kafka Streams.
https://medium.com/bakdata/queryable-kafka-topics-with-kafka-streams-8d2cca9de33f
MIT License
24 stars 12 forks source link

How to implement DefaultPartioner on HAProxy / nginx side? #1

Open Grabber opened 4 years ago

Grabber commented 4 years ago

@rs22

Do you have any insight on how to implement the DefaultPartioner from "/processors" on HAProxy or nginx?

Is it dynamical?

rs22 commented 4 years ago

I haven't implemented this myself, but I think this should be possible with the OpenResty variant of nginx and its builtin lua scripting capabilities.

According to here and here you would mainly need a murmur2 function to be able to derive the partition id from the hash value of your keys.

There even is such a hash function built into nginx, which you could access using a small wrapper, but unfortunately it does not accept a seed, as is used by the DefaultPartitioner -- so probably you'll have to re-implement it in lua...

There might be alternatives to lua scripting as well, but the additional ability to make http requests, parse json and cache the mapping of partitions to stream processors in redis from within lua seem quite well suited for this use case.

torbsto commented 3 years ago

Hi @Grabber, you can find an example implementation in our follow-up blog post and the corresponding repository.

Grabber commented 3 years ago

@torbsto, thank you so much for sharing!

How do you update the partitions variable as soon as a new partition is available without having to re-init the nginx frequently?

torbsto commented 3 years ago

Instead of scheduling the initalization with ngx.timer.at, you could use ngx.timer.every to continously update the mapping.

I haven't looked into it in detail, but there is also the option of a push-based approach. By implementing the StateListener interface, the applications can notify the proxy about changes whenever the state transitions from rebalancing to running.