helpshift / ekaf

A minimal, high-performance Kafka client in Erlang.
https://engineering.helpshift.com
Other
165 stars 50 forks source link

In a distributed environment, ekaf_picker:pick(Topic) will fail if the node started first goes down #52

Open satish-olx opened 7 years ago

satish-olx commented 7 years ago

Hi, In distributed environment, does ekaf not automatically restart worker if the host node of processes goes down ?

Steps to reproduce:

  1. Start ekaf in node A.
  2. Start node B in the cluster with A
  3. Shut down node A.
  4. ekaf_picker:pick(Topic) returns {error, bootstrapping}.
satish-olx commented 7 years ago

I think this is because

pick_sync(Topic, Callback, _Strategy, _Attempt)->
    case pg2:get_closest_pid(?PREFIX_EKAF(Topic)) of
        PoolPid when is_pid(PoolPid) ->
            handle_callback(Callback,PoolPid);
        {error, {no_process,_}}->
            handle_callback(Callback,{error,bootstrapping});
        {error,{no_such_group,_}}->
            ekaf:prepare(Topic, Callback);
        _E ->
            error_logger:info_msg("~p pick_sync ERROR: ~p",[?MODULE,_E]),
            handle_callback(Callback,_E)
    end.

where get_closest_pid fails to recover because the other node dies off.

bosky101 commented 7 years ago

@satish-olx hi. Every node, creates workers - and the number of workers is based on the configuration. Can you share the number you have - you maybe setting one too high, can you share the ekaf config (masking the host maybe)

satish-olx commented 7 years ago

@bosky101 I have set per_partition_worker to be 10

    application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
    application:set_env(ekaf, ekaf_per_partition_workers, 10),
    application:set_env(ekaf, ekaf_bootstrap_broker, {KafkaIP, KafkaPort}) 

I am currently publishing to 4 topics synchronously Also I looked into the code and for picking sync it is using

 pg2:get_closest_pid(?PREFIX_EKAF(Topic))

and checking the console, I find that in case of a cluster setup, remote processes are being used to send the messages.

(node@localhost) io:format("~p", [pg2:get_members(<<"ekaf.newmessage">>)]). 
[<6539.12452.6>,<6539.17083.6>,<6539.4139.7>,<6539.25751.9>,<6539.13577.10>,
 <6539.25647.10>,<6539.25658.10>,<6539.31451.10>,<6539.31460.10>,
 <6539.996.14>,<6539.28927.14>,<6539.3606.103>,<6539.3679.103>,
 <6539.5143.103>,<6539.5669.103>,<6539.6030.103>,<6539.6856.103>,
 <6539.8940.103>,<6539.24373.103>,<6539.9430.104>,<6539.16693.104>,
 <6539.17546.104>,<6539.11013.109>,<6539.12082.109>,<6539.12356.109>]

Am I missing something important ?