Closed hikui closed 2 years ago
@hikui Yes you are right . I will fix this soon
Can you test: https://github.com/silviucpp/erlkaf/pull/37 ?
Hi @silviucpp, I had a try on that branch, but strangely I didn't get any headers in both delivery report and consumer. I checked in the dashboard and the headers were sent properly. Not sure if I did something wrong.
@hikui ,
not sure what you are doing but for me works:
-module(test_consume_produce).
-include("erlkaf.hrl").
-define(TOPIC, <<"test_consume_produce">>).
-define(PRODUCER_CLIENT, erlkaf_producer).
-export([
% api
create/0,
create/1,
produce/2,
produce/3,
produce_multiple/4,
% consumer callbacks
init/4,
handle_message/2,
% producer callbacks
delivery_report/2,
stats_callback/2
]).
-behaviour(erlkaf_consumer_callbacks).
% stats_callback is defined in both erlkaf_consumer_callbacks and erlkaf_producer_callbacks
% -behaviour(erlkaf_producer_callbacks).
create() ->
create(undefined).
create(BootstrapServers) ->
erlkaf:start(),
% create consumer
GroupId = atom_to_binary(?MODULE, latin1),
ConsumerConfig = append_bootstrap(BootstrapServers, []),
TopicConf = [{auto_offset_reset, smallest}],
ConsumerTopics = [
{?TOPIC, [
{callback_module, ?MODULE}
]}
],
ok = erlkaf:create_consumer_group(client_consumer, GroupId, ConsumerTopics, ConsumerConfig, TopicConf),
% create producer
ProducerConfig = append_bootstrap(BootstrapServers, [
{delivery_report_only_error, false},
{delivery_report_callback, ?MODULE}
]),
ok = erlkaf:create_producer(?PRODUCER_CLIENT, ProducerConfig),
ok = erlkaf:create_topic(?PRODUCER_CLIENT, ?TOPIC, [{request_required_acks, 1}]),
ok.
produce(Key, Value) ->
ok = erlkaf:produce(?PRODUCER_CLIENT, ?TOPIC, Key, Value).
produce(Key, Value, Headers) ->
ok = erlkaf:produce(?PRODUCER_CLIENT, ?TOPIC, Key, Value, Headers).
produce_multiple(0, _Key, _Value, _Headers) ->
ok;
produce_multiple(Count, Key, Value, Headers) ->
ok = erlkaf:produce(?PRODUCER_CLIENT, ?TOPIC, Key, Value, Headers),
produce_multiple(Count -1, Key, Value, Headers).
% consumer callbacks
init(Topic, Partition, Offset, Args) ->
io:format("### consumer -> init topic: ~p partition: ~p offset: ~p args: ~p ~n", [Topic, Partition, Offset, Args]),
{ok, #{}}.
handle_message(Msg, State) ->
io:format("### consumer -> handle_message : ~p ~n", [Msg]),
{ok, State}.
% producer callbacks
delivery_report(DeliveryStatus, Message) ->
io:format("### producer -> received delivery report: ~p ~n", [{DeliveryStatus, Message}]),
ok.
stats_callback(ClientId, Stats) ->
io:format("### producer -> stats_callback: ~p stats:~p ~n", [ClientId, length(Stats)]).
% internals
append_bootstrap(undefined, V) ->
V;
append_bootstrap(BootstrapServer, V) ->
[{bootstrap_servers, BootstrapServer} | V].
Running in console:
test_consume_produce:produce(<<"k1">>, <<"v1">>, [{<<"aa">>, <<"1">>}]).
### producer -> received delivery report: {ok,
{erlkaf_msg,
<<"test_consume_produce">>,1,0,
<<"k1">>,<<"v1">>,
[{<<"aa">>,<<"1">>}]}}
### consumer -> handle_message : {erlkaf_msg,<<"test_consume_produce">>,1,0,
<<"k1">>,<<"v1">>,
[{<<"aa">>,<<"1">>}]}
Cool I'll test again later
@hikui thanks a lot ! I'm waiting your feedback
Silviu
@silviucpp it works! thanks
I'll close it as I merged in master.
I want to do some retry logic based on the message header. But I noticed that the delivery report doesn't include headers. Is it possible to add them?