helpshift / ekaf

A minimal, high-performance Kafka client in Erlang.
https://engineering.helpshift.com
Other
165 stars 50 forks source link

Async produce Message accumulation #54

Open turtleDeng opened 6 years ago

turtleDeng commented 6 years ago

30k/s msg async write kafka

(emqx@127.0.0.1)3> erlang:process_info(pid(0, 1767, 0)).
[{current_function,{erlang,spawn,3}},
 {initial_call,{proc_lib,init_p,5}},
 {status,running},
 {message_queue_len,13758967},
 {messages,[{tcp,#Port<0.3122>,
                 <<0,27,132,33,0,0,0,1,0,15,109,101,115,115,97,103,101,95,
                   ...>>},
            {tcp,#Port<0.3122>,
                 <<0,27,151,198,0,0,0,1,0,15,109,101,115,115,97,103,101,
                   ...>>},
            {'$gen_event',{produce_async,<<"{\"client_id\":\"pub_1a06b4cd44374f3a947\",\"username\":\"undefined"...>>}},
            {'$gen_event',{produce_async,<<"{\"client_id\":\"pub_f562fb58af2c4ce1aef\",\"username\":\"undef"...>>}},
            {'$gen_event',{produce_async,<<"{\"client_id\":\"pub_f3375a2c3fa74a419f0\",\"username\":\"u"...>>}},
            {'$gen_event',{produce_async,<<"{\"client_id\":\"pub_8952b84cc06046ed9a0\",\"username"...>>}},
            {'$gen_event',{produce_async,<<"{\"client_id\":\"pub_0a41e3193db64fbea0a\",\"user"...>>}},
            {tcp,#Port<0.3122>,<<0,27,143,58,0,0,0,1,0,15,109,...>>},
            {tcp,#Port<0.3122>,<<0,27,194,227,0,0,0,1,0,15,...>>},
            {tcp,#Port<0.3122>,<<0,27,204,77,0,0,0,1,0,...>>},
            {tcp,#Port<0.3122>,<<0,27,72,71,0,0,0,1,...>>},
            {tcp,#Port<0.3122>,<<0,27,204,83,0,0,0,...>>},
            {tcp,#Port<0.3122>,<<0,27,193,96,0,0,...>>},
            {tcp,#Port<0.3122>,<<0,27,132,34,0,...>>},
            {tcp,#Port<0.3122>,<<0,26,247,60,...>>},
            {tcp,#Port<0.3122>,<<0,27,129,...>>},
            {tcp,#Port<0.3122>,<<0,27,...>>},
            {tcp,#Port<0.3122>,<<0,...>>},
            {tcp,#Port<0.3122>,<<...>>},
            {tcp,#Port<0.3122>,...},
            {tcp,...},
            {...}|...]},
 {links,[<0.1709.0>,#Port<0.3122>]},
 {dictionary,[{'$initial_call',{ekaf_fsm,init,1}},
              {'$ancestors',[ekaf_sup,<0.1708.0>]},
              {rand_seed,{#{bits => 58,jump => #Fun<rand.8.77346176>,
                            next => #Fun<rand.5.77346176>,type => exrop,
                            uniform => #Fun<rand.6.77346176>,
                            uniform_n => #Fun<rand.7.77346176>,weak_low_bits => 1},
                          [228663813211387744|65157769469894326]}}]},
 {trap_exit,false},
 {error_handler,error_handler},
 {priority,normal},
 {group_leader,<0.1707.0>},
 {total_heap_size,431772712},
 {heap_size,137319567},
 {stack_size,16},
 {reductions,345736510},
 {garbage_collection,[{max_heap_size,#{error_logger => true,kill => true,size => 0}},
                      {min_bin_vheap_size,46422},
                      {min_heap_size,233},
                      {fullsweep_after,1000},
                      {minor_gcs,1}]},
 {suspending,[]}]
turtleDeng commented 6 years ago

50K/s msg produce async batched write kafka

erlang:process_info(pid(0, 1737, 0)).
[{current_function,{gen_fsm,handle_msg,8}},
 {initial_call,{proc_lib,init_p,5}},
 {status,runnable},
 {message_queue_len,1801072},
 {messages,[{'$gen_sync_all_state_event',{<0.13560.82>,
                                          #Ref<0.2870132398.2018246671.191229>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.15604.82>,
                                          #Ref<0.2870132398.2018246668.202260>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.15614.82>,
                                          #Ref<0.2870132398.2018246668.202261>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.14971.82>,
                                          #Ref<0.2870132398.2018246672.191284>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.15485.82>,
                                          #Ref<0.2870132398.2018246660.163638>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.95.82>,
                                          #Ref<0.2870132398.2018246662.182901>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.15498.82>,
                                          #Ref<0.2870132398.2018246660.163639>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.14417.82>,
                                          #Ref<0.2870132398.2018246665.192196>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.10199.82>,
                                          #Ref<0.2870132398.2018246658.180458>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.3188.82>,
                                          #Ref<0.2870132398.2018246657.173946>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.104.82>,
                                          #Ref<0.2870132398.2018246662.182902>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.10208.82>,
                                          #Ref<0.2870132398.2018246658.180459>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.3740.82>,
                                          #Ref<0.2870132398.2018246669.160600>},
                                         {pick,<<"message_publish">>}},
            {'$gen_sync_all_state_event',{<0.3816.82>,
                                          #Ref<0.2870132398.2018246669.160601>},
                                         {pick,<<"message_publ"...>>}},
            {'$gen_sync_all_state_event',{<0.14424.82>,
                                          #Ref<0.2870132398.2018246665.192197>},
                                         {pick,<<"message_"...>>}},
            {'$gen_sync_all_state_event',{<0.13570.82>,
                                          #Ref<0.2870132398.2018246671.191238>},
                                         {pick,<<"mess"...>>}},
            {'$gen_sync_all_state_event',{<0.14444.82>,
                                          #Ref<0.2870132398.2018246667.186354>},
                                         {pick,<<...>>}},
            {'$gen_sync_all_state_event',{<0.14458.82>,
                                          #Ref<0.2870132398.2018246667.186355>},
                                         {pick,...}},
            {'$gen_sync_all_state_event',{<0.14473.82>,...},{...}},
            {'$gen_sync_all_state_event',{...},...},
            {'$gen_sync_all_state_event',...},
            {...}|...]},
 {links,[<0.1733.0>,#Port<0.13198>]},
 {dictionary,[{'$initial_call',{ekaf_server,init,1}},
              {'$ancestors',[ekaf_sup,<0.1732.0>]},
              {rand_seed,{#{bits => 58,jump => #Fun<rand.8.77346176>,
                            next => #Fun<rand.5.77346176>,type => exrop,
                            uniform => #Fun<rand.6.77346176>,
                            uniform_n => #Fun<rand.7.77346176>,weak_low_bits => 1},
                          [68351820500341124|233036523000975277]}}]},
 {trap_exit,false},
 {error_handler,error_handler},
 {priority,normal},
 {group_leader,<0.1731.0>},
 {total_heap_size,111724598},
 {heap_size,55185655},
 {stack_size,3},
 {reductions,73811275},
 {garbage_collection,[{max_heap_size,#{error_logger => true,kill => true,size => 0}},
                      {min_bin_vheap_size,46422},
                      {min_heap_size,233},
                      {fullsweep_after,1000},
                      {minor_gcs,2}]},
bosky101 commented 6 years ago

Can you share your ekaf config

turtleDeng commented 6 years ago
{ekaf,[
        {ekaf_bootstrap_brokers, [{"localhost", 9092}] },
        {ekaf_bootstrap_topics, [ <<"topic">> ]},
        {ekaf_per_partition_workers,64},
        {ekaf_max_buffer_size, [{<<"topic">>,64},{ekaf_max_buffer_size,64}]},
        {ekaf_partition_strategy, random},
        {ekaf_callback_flush, {xxx,xxx}}
    ]}
turtleDeng commented 6 years ago

produce async batched, more than 30k/s, it will create a lot of processes, memory exhausted https://github.com/helpshift/ekaf/blob/master/src/ekaf_picker.erl#L48

bosky101 commented 6 years ago

I recommend playing around with the buffer size, partition workers, to see what config works best