kafka4beam / brod

Apache Kafka client library for Erlang/Elixir
Apache License 2.0
662 stars 198 forks source link

key in batch does not seem to work #429

Closed alotela closed 3 years ago

alotela commented 3 years ago

Hi,

I'm trying to produce a batch with a specific key for each message. I would like to produce batch with each message sent to its partition based on the key in the map of the list (Value).

:brod.produce(:my_client, "my_topic", 0, "", [%{key: "key1", value: "value1"}, %{key: "key2", value: "value2"}, %{key: "key3", value: "value3"}...])

No matter if I put a partition number or a partitioner function for the third parameter, the whole batch always goes to the same partition. How can I do to produce a batch with each message on its own partition based on the key (key1, key2, key3...) ?

Thanks

zmstone commented 3 years ago

Hi @alotela brod spawns per-partition workers, and produce requests are always sent per-partition. The batch api is intended to reduce message passing between processes, but not to just save the number of calls. If the call involves multiple processes, it will complicate the error handling.

If your messages are meant for different partitions, you'll have to partition the batch on the caller side

alotela commented 3 years ago

Hey @zmstone , thank you.

So, if I send a batch on a partition, and brod returns ok, does it mean that ALL messages are in kafka on the specified topic/partition ?

zmstone commented 3 years ago

That is correct. The produce API was originally designed to send one message at a time. Later it was extended to accept batch of messages for the same partition. If the 3rd arg is a partitioner function, the 4th argument is its input (not the message keys in the batch).

alotela commented 3 years ago

ok. So, when do you use the key ? It seems it is just sent in the kpro_req record (https://github.com/klarna/kafka_protocol/blob/master/src/kpro_req_lib.erl#L2666), but in this structure#msg there is also the partition, and everything is sent by the topic/partition producer choose at the beginning (https://github.com/klarna/brod/blob/master/src/brod.erl#L514)... So for me, it is like it has no utility... Am I wrong ?

zmstone commented 3 years ago

In this case, Keys are never 'used' by brod, they are sent as 'data' to Kafka. You are right, brod is lack of the partitioning utility for message batches, and the behaviour of this API is a design decision.

We can perhaps add partitioning support for batch produces as a new API, however the returning result will not as straightforward because it will have to return a list of per-partition results, and probably a list of the failed messages too (so the caller can retry) --- point being, it's rather challenging to make it generic enough for everybody.

alotela commented 3 years ago

How would it work ? I need to look at kafka protocol to better understand, but when you send messages to kafka, you send a whole payload in one time like an atomic transaction ? Or you send message by message ? Because if you say that brod returns a per-partition result, this result would come from kafka itself or from this "new api brod client" ?

zmstone commented 3 years ago

It'a LIST of per-partition BATCHes of MESSAGEs encoded in a produce request to Kafka. A Kafka broker (one single member in the cluster) accepts the whole LIST as long as all the partition leaders are local, otherwise a partial failure.

brod always sends one element LIST of BATCH of MESSAGEs, so there could never be a partial success/failure in brod

alotela commented 3 years ago

ho ok (thank you for the time to explain...) You mean, that since brods sends a list of 1 batch of messages for a uniq partition, it can't fail ? right ?

It also means that, no matter how you build the client and use Kafka API, if I want to send a kind of "transaction" (as a sequence of messages of "a uniq job" executed in my code in one time, like multiple DB inserts or updates) to kafka, if these messages go to different topics/partitions, the "client" will have to handle to find the leader partition for each topic and send in multiple requests the whole job ? So it is no more a transaction, but this is what you said in your first message : it has to manage itself this whole process ? And it also means, at the end, if there are some failures, some of the messages will be in kafka and others not, right ? So my state in kafka is no more consistent with my database...

zmstone commented 3 years ago

You mean, that since brods sends a list of 1 batch of messages for a uniq partition, it can't fail ? right ?

Yes, brod send 1 batch to a single partition. It may still fail, but with an either-none-or-all sort of atomicity

If you are after cross topic-partition atomicity, Kafka actually has transactional APIs, and kafka_protocol provides some primitive supports for beam, but we never had the chance to support higher level abstraction in brod.

Comparing Kafka APIs to distributed databases, I think the biggest difference is Kafka pushes all the complicated exception flows to the client side. e.g. if a partition leader moves from one node to another, the serving broker will send an error back to the client and the client is responsible for finding the new leader and retry. i.e. Kafka does not provide anything like what distributed db may provide: delegation, proxy, handoff etc.

alotela commented 3 years ago

ok.

for transactionnal API, is it "usable" in kafka_protocol ? Or are there any missing blocking parts ? And about idempotence, Do you support it ?

zmstone commented 3 years ago

They are usable but not seriously tested, and at a too-primitive level. The so called idempotent producers/consumers utilise the same underlying 'transactional' features provided in Kafka protocol.

alotela commented 3 years ago

ok, a good exercise when I have time to contribute to this project... ;) Do you expect to finish this work soon or it's not a priority ?

I may have found a solution for my problem, but I would need to be able to stop the client if the client connection is closed. I see that for now, there is no way to "know" the client closed the connection and reopened a new one. Am I wrong ? Maybe I can try to add a config to stop the client if the connection is closed instead of restarting a new one ? https://github.com/klarna/brod/blob/master/src/brod_client.erl#L682 Or call a callback?

To stop the client, the only thing I need to do is:

:brod_group_subscriber_v2.stop(groupProcessId)
:brod.stop_client(clientPid)

?

zmstone commented 3 years ago

Sorry. I am not quite following what your problem is and why it has anything to do with the underlying TCP connection

alotela commented 3 years ago

ho ok, hehe sorry ;) I'm trying to find the best way to produce from a PG WAL (Write Ahead Log, logs of all modifications on a selection of tables, here we take just one table) to kafka with epgsql. Since the PG WAL replication slot sends me all inserts/updates/deletes in a stream (with a WAL offset for each one), at the beginning I though that i can take all messages in the queue, build a batch and send it in sync to kafka and then update PG WAL offset of the replication slot with the last WAL log offset sent in the batch. Easy... but as you explained to me, it's not as simple as that, since I can't batch on multiple partitions in one batch.

Now, if we say a "log" is a modification on the database (an insert, an udpate, or a delete) received by WAL stream from PG:

I need to find another solution, that would be:

A friend of mine told me that if we are always on the same connection, a brod topic/partition producer can produce (not in sync) to kafka and does not need to wait for answers. If one of the messages is in error, then all messages before should be registered in kafka (since kafka write all messages on a partition in queue). So I can send all messages on a partition in multiple async call, if there is an error, I can be sure, if I'm on the same connection that all previous messages will be in kafka. If this is correct, I can send in async on a partition, even log by log. But I need to be able to stop the process as soon as the client connection is down. If this is not correct, it means I must send all logs in the producer queue in batch in sync, slower...

I hope you understand now better, and maybe you can confirm to me if the async way can work like this or not.

Thank you

zmstone commented 3 years ago

thanks for the clarification. so there are two levels of async-ness that I need to make clear before going into the details. brod:produce APIs (except for the ones ends with _sync) are actually synced calls in a sense that they only return after the produce request which included the message(s) in the call has sent to the tcp socket. They are async in a sense that they do not wait for Kafka's produce-response message.

brod_produer tries to recover from connection failures (socket error returned in send calls) and errors returned from Kakfa e.g. if the request is sent to a non-leader broker. however it has a retry limit, if limit is exceeded, the caller will get an error in brod_produce_reply as a message.

The easiest design IMO, would be for the caller(s) to collect batches and call produce_sync API with a retry on error return. Otherwise the callers can call produce API, but keep the messages in a buffer in order to correlate async error replies.

WRT your specific use case, I am not sure how pg WAL could be distributed to multiple partitions, mostly due to concerns on atomicity and message ordering. If you can make use of the WAL offset for the consumer side to assemble back the original stream, at-least-once delivery should work just fine. i.e. there is no need to do transactions towards Kafka.

ok, a good exercise when I have time to contribute to this project... ;) Do you expect to finish this work soon or it's not a priority ?

I myself do not have the time for this, and I can't speak for Klarna. One thing for sure though, contributions are always welcome.

alotela commented 3 years ago

Hi, thanks. I'm building the system with produce_sync for now. But, async would be great. So for the produce (async), if I send TWO batches with 2 calls sequentially (same connection, same process...), they will be in that order in Kafka. If the first fails to be written to partition by kafka, the second should fail too ? Or is it possible that kafka fail the first batch and register in partition the second one ?

zmstone commented 3 years ago

there is a producer config named partition_onwire_limit, default=1. if you keep it default, there is always going to be only one in-flight per-partition request, and the producer will re-queue the failed batch to the head of its internal send queue for retry.

if it's configured with 2 or more, then a later request may succeed even if there are failed ones before it.

alotela commented 3 years ago

ok, haha sorry, I should read the producer's doc a little more ;)

so if I use this feature (= 1), I understand the producer waits for the ack from kafka before sending another request/batch. What would be the difference then between produce_sync and produce with partition_onwire_limit = 1 ?

zmstone commented 3 years ago

Multiple caller process can call produce_sync API towards the same partition-producer process, partition-producer batches the pending calls into the same produce request, send it, wait for response, then finally reply the callers. The callers may send single message, or a batch of messages, but the actual batch sent to Kafka may include multiple calls.

It's the same flow when you use the async API, the only difference is the caller will get result from in a message sent back from producer process (rather than the return value).

zmstone commented 3 years ago

If you have a single caller process, then the on-wire limit config doesn't really matter if it's produce_sync used.

alotela commented 3 years ago

ok, I must precise : in my case, I have only one process calling produce() on a partition-producer, because I need messages ordering guaranty from my PG log to kafka partition. So no parallelism possible on a partition. So I think it is the same in my case for sync or async: I need to wait the ACK from kafka before sending a new request. The only difference seems to be that, in async, when the ack is received from kafka, the next batch is already ready in producer's queue/buffer.

But, now, I do not really well understand what is the difference between a batch I build myself, like this (your example):

brod:produce(
  _Client = brod_client_1,
  _Topic     = <<"brod-test-topic-1">>,
  _Partition = MyPartitionerFun,
  _Key       = KeyUsedForPartitioning,
  _Value     = [ #{key => "k1", value => "v1", headers => [{"foo", "bar"}]}, #{key => "k2", value => "v2"}
]).

and the notion of batch VS request VS " the actual batch sent to Kafka may include multiple calls" sent to kafka by brod partition-producer. I understand that if I call multiple times produce() on a partition-producer with messages or batches, it will queue the messages and batches in its buffer, and then send in one request the whole buffer/queue ? (and this is also defined by the notion of max_linger_ms/max_linger_count). So it's like it "rebuild a batch" from my messages and batches stored in its queue/buffer ? And if so, why would I need to call produce() with a batch, I can call it for each messages ? It would be the same result ? But it is less fast ?

zmstone commented 3 years ago

I think the abusive use of the 'batch' term all over the code base an in this discussion is what causing the confusion. sorry about that. Say, caller provides in the API the msg-list. Producer builds up a backlog of msg-list:s to be sent, In each produce request, there is the batch of messages which is a list of msg-list:s flattened out, but not the entire backlog.

max_linger_ms and max_linger_count are to control the timing and seize of the next request to be sent.

alotela commented 3 years ago

ok, so to be pragmatic: 1/ caller produce a batch of messages [{"", value1}, {"", value2}] => producer queue is [{"", value1}, {"", value2}] 2/ caller produce a message value3 => producer queue is [{"", value1}, {"", value2}, value3] 3/ caller produce a batch of messages [{"", value4}, {"", value5}] => producer queue is [{"", value1}, {"", value2}, value3,{"", value4}, {"", value5}]

if max_linger_count = 4, the next request will send [{"", value1}, {"", value2}, value3,{"", value4}] to kafka and when it receives the ack if it is produce_sync or partition_onwire_limit = 1 ?

(again: thank you for your time...)

zmstone commented 3 years ago

My bad, was not quite accurate wrt max_linger configs, let me start over:

Producer keeps track of the size (bytes) of each call and produce requests have an upper limit in bytes max_batch_size.

If the total number of bytes in the entire backlog is less than max_batch_size, we call the backlog 'starving'. max_linger_xx configs are for a 'starving' backlog to wait for more calls. Since you have a single synced caller, the default value 0 should suit you just fine. i.e. the producer should always send immediately, no backlogging or buffering.

zmstone commented 3 years ago

1/ caller produce a batch of messages [{"", value1}, {"", value2}] => producer queue is [{"", value1}, {"", value2}]

Since you are a single caller and calls the sync API, the caller in this case is blocked until {ok, ...} or {error, ...} is returned, and the messages should have been deleted from producer's backlog/queue/buffer.