helpshift / ekaf

A minimal, high-performance Kafka client in Erlang.
https://engineering.helpshift.com
Other
165 stars 50 forks source link

produce_async_batched #16

Closed gaynetdinov closed 9 years ago

gaynetdinov commented 9 years ago

Hello.

I'm a bit confused with the documentation regarding the way how to produce messages async and in batches.

The README says the following

%% sync
{buffered, _Partition, _BufferSize} =
    ekaf:produce_async_batched(
        Topic,
        [ekaf_utils:itob(X) || X<- lists:seq(1,1000) ]
    ).

%% async
{buffered, _Partition, _BufferIndex} =
    ekaf:produce_async_batched(
        Topic,
        [<<"foo">>, {<<"key">>, <<"value">>}, <<"back_to_binary">> ]
    ).

It's a bit weird to me that both sync and async ways to send messages in batches are done using the same method produce_async_batched. Is that expected to be so or it is a typo?

Also in this example to send messages asynchronous in batches I don't quite get the values which are used. What is <<"foo">>? As I understand <<"key">> is routing key for the message which is represented as <<"value">> here, right? What is <<"back_to_binary">>? Also if I open ekaf_tests.erl I see that produce_async_batched is called as the following:

Sent = [ <<(ekaf_utils:itob(X))/binary,".async multi batch">> || X<- lists:seq(9,19)],
Response  = ekaf:produce_async_batched(?TEST_TOPIC, Sent ),

or

Sent = <<"20.async in batch">>,
Response  = ekaf:produce_async_batched(?TEST_TOPIC, Sent),

As I can see in the examples from the test file there are no <<"foo">>, <<"key">>, <<"value">> or <<"back_to_binary">>, there are just topic name and message(or list of messages), that's it. Could you please clarify how should I send messages asynchronous in batches?

Thank you so much!

bosky101 commented 9 years ago

Yes that is a typo, thanks for bringing it to my attention.

ekaf:produce_async_batched(
    Topic,
    [<<"foo">>, {<<"key">>, <<"value">>}, <<"back_to_binary">> ]
)

adds 3 messages to be queued (to the batch). first one is foo, second one is value, and third being back_to_binary

ekaf:produce_async_batched(
    Topic,
    <<"bar">>
)

adds 1 message to be queued (to the batch).

The protocol says it is upto the client to come up to decide which partition. ekaf does not have a way to provide a partion picking function ( semantic partitioning as in the docs). Currently this is done via strategies - sticky_round_robin, strict_round_robin, or random.

in your sys.config, i recommend using strict_round_robin so that load is split nicely between all your brokers and partitions for the topic.

{ekaf, [
      {ekaf_partition_strategy, strict_round_random}
]}

or for different for different topics

{ekaf, [
    {ekaf_partition_strategy, [
     {<<"small_load">>, random},
     {<<"other_topic">>, sticky_round_robin},
     {ekaf_partition_strategy,  strict_round_robin}  %% default
 ]}

Here is something that we use in production. (also check ekaf_demo & ekaf_tests)

  {ekaf,[
    {ekaf_max_buffer_size, [{<<"some_topic">>,1000},
                            {ekaf_max_buffer_size,100}]}, % default
    {ekaf_per_partition_workers, [{<<"tightly_packed_topic">>, 1},
                                  {ekaf_per_partition_workers, 100}]},  % default
    {ekaf_partition_strategy, strict_round_robin},
    {ekaf_bootstrap_topics, [ <<"some_topic">> ]}, % can be ignored
    {ekaf_bootstrap_broker, { {{kafka_host}} , {{kafka_port}} } }
]},

or dynamically in your code.

application:load(ekaf),.... % see ekaf_tests.erl for required apps
application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),

also you need not call prepare or metadata or any bootstrap function. just call the produce function from anywhere you want, and ekaf will find out metadata, and setup connection pool on the fly, queue the unsent messages, and send them when we know where it should go to.

NOTE: when we say product_async_batched, it queues up the messages for max of t seconds or ekaf_max_buffer_size messages whichever comes first. (configurable). Then it will send ekaf_max_buffer_size messages to kafka. But a recent commit also added a feature of sending the batch as 1 event of ekaf_max_buffer_size items as a list. In most cases, you just want to send the ekaf_max_buffer_size items. but there may be times where you want the batch to go as 1 message so that the consumer can handle many events as once. Do make sure your kafka settings however has significant max message size to support this though. eg: couple of MB or whatever fits your use case.

let me know if this solves your queries, will close the issue when i fix the typo. thanks again. ~B

gaynetdinov commented 9 years ago

Thank you for such a great answer!