helpshift / ekaf

A minimal, high-performance Kafka client in Erlang.
https://engineering.helpshift.com
Other
165 stars 50 forks source link

ekaf is not honoring broker's partition change #15

Open mubarak opened 9 years ago

mubarak commented 9 years ago

It appears from our production environment that broker's partition change is not being honored in ekaf and we are seeing

Produce request with correlation id 2628 from client ekaf on partition [layer-insights,0] failed due to Leader not local for partition [layer-insights,0] on broker 2 (kafka.server.KafkaApis)

From kafka protocol:

The client does not need to keep polling to see if the cluster has changed; it can fetch metadata once when it is instantiated cache that metadata until it receives an error indicating that the metadata is out of date. This error can come in two forms: (1) a socket error indicating the client cannot communicate with a particular broker, (2) an error code in the response to a request indicating that this broker no longer hosts the partition for which data was requested.

seems like (2) (invalid partition error code) is not handled in ekaf.

bosky101 commented 9 years ago

1) Is this error a response to a sync or async call 2) can you check if the socket was closed around the time of a partition change ? This will be visible as lot of worker up/down metrics

HTH ~B

mubarak commented 9 years ago

1) we use async call to publish data using ekaf:produce_async Here is our producer code:

-module(kafka_client).

-include_lib("ekaf/include/ekaf_definitions.hrl").
%% analytics topic in Kafka
-define(ANALYTICS_TOPIC, <<"analytics">>).

%% insights topic in Kafka
-define(INSIGHTS_TOPIC, <<"insights">>).

%% API
-export([start_node/1,
  stop_node/0,
  publish_metric/1,
  publish_metric/2,
  kafka_callback/5]).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Standard methods for managing kafka clients backed by service discovery.
%%% When downstream nodes die, the clients will get callback from ekaf. ekaf
%%% maintains the connection pool (workers) and using gen_fsm it changes the connection
%%% state of workers
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-spec start_node({string(), integer()}) -> ok.
start_node({Host, Port}) ->

  IsStarted = fun(MyApp) ->
    lists:any(fun({AppName, _Desc, _Version}) ->
      MyApp == AppName end, application:which_applications())
  end,

  case IsStarted(ekaf) of
    false ->
      %%% Callbacks for server state and worker (connection) state
      application:set_env(ekaf, ?EKAF_CALLBACK_DOWNTIME_REPLAYED, {kafka_client, kafka_callback}),
      application:set_env(ekaf, ?EKAF_CALLBACK_DOWNTIME_SAVED, {kafka_client, kafka_callback}),
      application:set_env(ekaf, ?EKAF_CALLBACK_FLUSH, {kafka_client, kafka_callback}),
      application:set_env(ekaf, ?EKAF_CALLBACK_FLUSHED_REPLIED, {kafka_client, kafka_callback}),
      application:set_env(ekaf, ?EKAF_CALLBACK_TIME_TO_CONNECT, {kafka_client, kafka_callback}),
      application:set_env(ekaf, ?EKAF_CALLBACK_WORKER_DOWN, {kafka_client, kafka_callback}),
      application:set_env(ekaf, ?EKAF_CALLBACK_WORKER_UP, {kafka_client, kafka_callback}),

      PartitionWorkers = ekaf_per_partition_workers,
      PartitionWorkersMax = ekaf_per_partition_workers_max,
      BufferTtl = ekaf_buffer_ttl,

      application:set_env(ekaf, PartitionWorkers, application:get_env(ekaf, PartitionWorkers, 20)),
      application:set_env(ekaf, PartitionWorkersMax, application:get_env(ekaf, PartitionWorkersMax, 100)),
      application:set_env(ekaf, ekaf_bootstrap_broker, {Host,Port}),
      application:set_env(ekaf, BufferTtl, application:get_env(ekaf, BufferTtl, 1000)),

      {ok, _} = application:ensure_all_started(ekaf);
    _ -> ok
  end,
  ok.

stop_node() ->
  application:stop(ekaf),
  ok.

-spec publish_metric(list(binary()) | binary()) -> ok.
publish_metric(MetricName) ->
    publish_metric(analytics, MetricName).

-spec publish_metric(atom() | binary(), list(binary()) | binary()) -> ok.
publish_metric(insights, MetricMessage) ->
    ekaf:produce_async(?INSIGHTS_TOPIC, MetricMessage);
publish_metric(analytics, MetricName) ->
    ekaf:produce_async(?ANALYTICS_TOPIC, MetricName);
publish_metric(TopicName, MetricMessage) when is_binary(TopicName) ->
    ekaf:produce_async(TopicName, MetricMessage).

%%%%% Callback handlers for kafka-broker (node) and worker (connection) state %%%
%%%   Recycled from ekaf demo code
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
kafka_callback(Event, _From, _StateName,
                #ekaf_server{ topic = Topic },
                Extra) ->
    Stat = <<Topic/binary,".",  Event/binary>>,
    case Event of
        ?EKAF_CALLBACK_DOWNTIME_SAVED ->
            %io:format("~n ~s => 1",[Stat]),
            ok;
        ?EKAF_CALLBACK_DOWNTIME_REPLAYED ->
            %io:format("~n ~s => 1 during ~p",[Stat, StateName]),
            ok;
        ?EKAF_CALLBACK_TIME_DOWN ->
            case Extra of
                {ok, Micros}->
                    io:format("~n ~s => ~p",[Stat, ekaf_utils:ceiling(Micros/1000)]);
                _ ->
                    ok
            end;
        ?EKAF_CALLBACK_WORKER_DOWN ->
            FinalStat = <<Topic/binary,".mainbroker_unreachable">>,
            io:format("~n ~s => 1",[FinalStat]),
            ok;
        _ ->
            ?INFO_MSG("ekaf_server callback got ~p ~p",[Event, Extra])
    end;

kafka_callback(Event, _From, _StateName,
                #ekaf_fsm{topic = Topic, broker = _Broker, partition = PartitionId, last_known_size = _BufferLength, cor_id = CorId, leader = Leader},
                Extra)->
    Stat = <<Topic/binary,".",  Event/binary, ".broker", (ekaf_utils:itob(Leader))/binary, ".", (ekaf_utils:itob(PartitionId))/binary>>,
    case Event of
        ?EKAF_CALLBACK_FLUSH ->
            ok;
        ?EKAF_CALLBACK_FLUSHED_REPLIED ->
            case Extra of
                {ok, {{replied, _, _}, #produce_response{ cor_id = ReplyCorId }} }->
                    Diff = case (CorId - ReplyCorId  ) of Neg when Neg < 0 -> 0; SomeDiff -> SomeDiff end,
                    FinalStat = <<Stat/binary,".diff">>,
                    io:format("~n~s ~w",[FinalStat, Diff]);
                _ ->
                    ?INFO_MSG("ekaf_fsm callback got ~p some:~p ~nextra:~p",[Event, Extra])
            end;
        ?EKAF_CALLBACK_WORKER_UP ->
            io:format("~n ~s 1",[Stat]),
            ok;
        ?EKAF_CALLBACK_WORKER_DOWN ->
            io:format("~n ~s 1",[Stat]),
            ok;
        ?EKAF_CALLBACK_TIME_TO_CONNECT ->
            case Extra of
                {ok, Micros}->
                    io:format("~n ~s => ~p",[Stat, ekaf_utils:ceiling(Micros/1000)]);
                _ ->
                    ok
            end;
        _ ->
            ?INFO_MSG("ekaf_fsm callback got ~p ~p",[Event, Extra])
    end.

2) will check it out

janin commented 9 years ago

@bosky101 For 2.: There are no worker down messages, but there are worker up message. I'm assuming the workers were still up because the nodes didn't actually go down. And then ekaf_server would start new workers because the metadata has changed but it never stopped the old ones. Could that be the case?

As far as I can tell, there is no checking of server response on produce calls, so errors like "no longer leader" (error code 6) are silently ignored.