silviucpp / erlkaf

Erlang kafka driver based on librdkafka
MIT License
84 stars 41 forks source link

Pause/Resume Consumer #7

Closed krgn closed 5 years ago

krgn commented 5 years ago

As far as I can tell, pause/resume of consumers and producers is currently not supported by erlkaf. I would like to see if this is at all a possibility. My main use case for this is controlling the fetching of messages in cases where downstream data storage is e.g. offline. I imagine that for consumers it could be set returning a {pause, #state{}} tuple or some such.

Would it be possible to implement this in erlkaf?

silviucpp commented 5 years ago

Hello,

I'm not sure I understand your request.

First pause/remove of producer at driver level is a non sense for me. In case you don't want to produce the messages just don't call the produce api from the app level based on your own logic.

Producer in erlkaf is collecting your messages and flush them at every x seconds to kafka. In case kafka is offline at that moment the messages are stored in memory up to N records and after this number they are:

Details here: https://github.com/silviucpp/erlkaf#message-queues

Consumer wise the offsets are committed once the handle_message returns {ok, State}. Otherwise if you return anything else, or an exception is thrown you will receive same message over and over again and the offset won't advance.

To implement your behavior at app level you just can do something like:

handle_message(Msg, State) ->
    case is_backend_offline() of
        true ->
            timer:sleep(1000),
            {error, backend_offline, State};
        false ->
            % do stuffs here with the message
            {ok, State}   
    end.

Also you can cache the backend online/offline flag into the State. I don't see any reason why erlkaf needs changes for this scenario.

But in case I'm missing something let me know.

Silviu