Open kzemek opened 7 years ago
application:set_env(ekaf, ekaf_partition_strategy, [{ekaf_partition_strategy, custom}])
All messages are produced with a key.
handle_info({worker, up, WorkerUp, WorkerUpStateName, WorkerUpState, _}, StateName, #ekaf_server { topic = Topic, messages = OfflineMessages } = State)
send_messages(StateName, #ekaf_server{ topic = Topic } = State, Messages)
produce_async_batched(Topic, Data)
common_async(Event, Topic, [{Key,Data}|Rest])
The code waits for a reply from the TopicWorker, but the current process is the topic worker. The code would deadlock if we used gen_fsm:sync_send_all_state_event as in https://github.com/helpshift/ekaf/pull/50 . Here instead, we send {pick, {Key,Data}, self()} to self(), immediately receive it in clause _E and the message is discarded.
TopicWorker
gen_fsm:sync_send_all_state_event
{pick, {Key,Data}, self()}
self()
receive
_E
Config
application:set_env(ekaf, ekaf_partition_strategy, [{ekaf_partition_strategy, custom}])
All messages are produced with a key.
What happens:
handle_info({worker, up, WorkerUp, WorkerUpStateName, WorkerUpState, _}, StateName, #ekaf_server { topic = Topic, messages = OfflineMessages } = State)
send_messages(StateName, #ekaf_server{ topic = Topic } = State, Messages)
produce_async_batched(Topic, Data)
common_async(Event, Topic, [{Key,Data}|Rest])
The code waits for a reply from the
TopicWorker
, but the current process is the topic worker. The code would deadlock if we usedgen_fsm:sync_send_all_state_event
as in https://github.com/helpshift/ekaf/pull/50 . Here instead, we send{pick, {Key,Data}, self()}
toself()
, immediatelyreceive
it in clause_E
and the message is discarded.