kafka4beam / brod

Apache Kafka client library for Erlang/Elixir
Apache License 2.0
651 stars 197 forks source link

brod:fold doesn't work when there have been transactions #588

Open indrekj opened 1 week ago

indrekj commented 1 week ago

A failing test:

t_fold_transactions(Config) when is_list(Config) ->
  Client = ?config(client),
  Topic = ?TOPIC,
  Partition = 0,
  Batch = [#{value => <<"one">>}, #{value => <<"two">>}],
  {ok, Tx} = brod:transaction(Client, <<"some_transaction">>, []),
  {ok, Offset} = brod:txn_produce(Tx, ?TOPIC, Partition, Batch),
  erlang:display("TXNPRODUCE"),
  erlang:display(Offset),
  ok = brod:commit(Tx),
  FoldF =
    fun F(#kafka_message{value = V}, Acc) -> {ok, F(V, Acc)};
        F(V, Acc) -> [V | Acc]
    end,
  FetchOpts = #{max_bytes => 100},
  ?assertMatch({_Result, O, reached_end_of_partition}
                when O =:= Offset + length(Batch) + 1,
    brod:fold(Client, Topic, Partition, Offset, FetchOpts, [], FoldF, #{})),
  ok.

Output:

%%% brod_consumer_SUITE ==> t_fold_transactions: FAILED
%%% brod_consumer_SUITE ==> {function_clause,
    [{lists,last,[[]],[{file,"lists.erl"},{line,228}]},
     {brod_utils,handle_fetch_rsp,7,
         [{file,"/home/indrek/gems/brod/src/brod_utils.erl"},{line,661}]},
     {brod_consumer_SUITE,t_fold_transactions,1,
         [{file,"/home/indrek/gems/brod/test/brod_consumer_SUITE.erl"},
          {line,443}]},
     {test_server,ts_tc,3,[{file,"test_server.erl"},{line,1783}]},
     {test_server,run_test_case_eval1,6,
         [{file,"test_server.erl"},{line,1292}]},
     {test_server,run_test_case_eval,9,
         [{file,"test_server.erl"},{line,1224}]}]}

This is the line that throws an exception: https://github.com/kafka4beam/brod/blob/636e448f7d0183dc54fef31776cbe34d5d6f243b/src/brod_utils.erl#L644

The brod_utils:fetch (flatten_batches) seems to be control message aware, but at some point something gets lost.

I dug pretty deep, but I'm unsure how to fix this.

indrekj commented 1 week ago

The LastOffset in the flatten_batches function seems to be the correct one. Fetch function even has it as NewBeginOffset0 +1. But the fetch does not return it. If it would, then we wouldn't need to do the https://github.com/kafka4beam/brod/blob/636e448f7d0183dc54fef31776cbe34d5d6f243b/src/brod_utils.erl#L644 call.

We could improve the brod_utils:fetch/4, but it's an exported function and is also used by brod:fetch. We could change the latter to keep backward compatibility, but I don't know if brod_utils is also considered a public interface from the compatibility point of view. @zmstone, what do you think?

indrekj commented 1 week ago

Got something working, hopefully it makes sense: https://github.com/kafka4beam/brod/pull/589