spreedly / kaffe

An opinionated Elixir wrapper around brod, the Erlang Kafka client, that supports encrypted connections to Heroku Kafka out of the box.
https://hex.pm/packages/kaffe
MIT License
154 stars 62 forks source link

kaffe cannot recover from unreachable Kafka #93

Closed d53dave closed 3 years ago

d53dave commented 5 years ago

Hello!

I was integrating kaffe into a consumer-only application I'm working on and I've encountered the following behaviour:

(Assuming I have everything set up pretty much default using the examples from the README)

(1) If I start my application and Kafka is not reachable (e.g. getting econnrefused), kaffe never recovers from that. If Kafka comes back online, the errors

error] gen_server my_consumer_group terminated with reason: [{{"localhost",9092},{econnrefused....

stop, but my consumers are never initialized.

(2) If, during normal use of my app, Kafka becomes unreachable, kaffe will never recover from this. When Kafka is reachable again, the error messages stop, but consumers are not re-created.

Am I making a mistake here? Do I need a special ceremony to make this work? I tried wrapping the Kaffe.GroupMemberSupervisor with a supervisor3 so that it can be restarted after a delay in case it dies, but this didn't seem to solve the issue.

I'm using Elixir 1.9.0 and kaffe 1.14.1 on MacOS 10.14.6.

objectuser commented 5 years ago

@d53dave That's correct, a higher level service is required to manage that.

I would think that supervisor3 would work for you, but I have limited experience with it. I wonder if instead of wrapping it around Kaffe.GroupMemberSupervisor, you should wrap it around the application supervisor that starts Kaffe.

If you can come up with a recipe, I'd love to add it to the Kaffe README or otherwise incorporate it into Kaffe itself.

d53dave commented 5 years ago

@objectuser thanks for your response. I've found the issue and from what I see here, it cannot be solved by proper supervision alone.

I've successfully wrapped Kaffe.GroupMemberSupervisor with a supervisor3 and got the desired behavior: instead of the supervisor dying, it was restarting the GroupMemberSupervisor. This however proved to be insufficient, because Kaffe.GroupManager doesn't support the case where it is created but brod already created a client, because GroupManager is calling kafka().start_client/3 and only matching on :ok.

Since brod itself already handles connections problems (i.e. the client recovers or is re-created when the connection comes back) via :supervisor3, I think kaffe should not error out in the case when start_client returns {:error, :already_present}, but continue like so:

case kafka().start_client(config.endpoints, config.subscriber_name, config.consumer_config) do
  :ok ->
    :ok
  {_, :already_present} ->
    Logger.info("The brod client is already created, continuing.")
end

One alternative way would be to link a supervisor from the brod tree into the kaffe supervision and restart them together but this feels dirty since brod already has logic that handles connection problems.

Do you agree? If so, would you accept a merge request regarding this issue?

PS: the application supervisor that starts Kaffe doesn't seem to be linked to consumer groups and manually killing it in the observer doesn't trigger any action in my consumer supervision tree.

objectuser commented 5 years ago

@d53dave Did your PR work out? If so, could you share your usage of supervisor3?

d53dave commented 5 years ago

It did work out, thanks! I'm on holidays right now but I will update this thread with our supervisor3 once I'm back.

d53dave commented 5 years ago

My solution is quite simple as I didn't go through the work of making GroupMemberSupervisor work with supervisor3. Instead, I have another supervisor with @behaviour :supervisor3 that has a GroupMemberSupervisor child with the delayed restart policy.

In short it could look like this:

defmodule MyFancySupervisor do
  @behaviour :supervisor3

  def start_link() do
    :supervisor3.start_link({:local, __MODULE__}, __MODULE__, [])
  end

  def init(_options) do
    {:ok, {{:one_for_all, 0, 1000}, 
      {"GroupMemberSupervisor", {Kaffe.GroupMemberSupervisor, :start_link, []}, {:permanent, 11}, 4000, :worker, [Kaffe.GroupMemberSupervisor]}
    }}
  end

  def post_init(_) do
    :ignore
  end
end

The interesting bit is {:permanent, 11}, which states that instead of escalating to the parent supervisor when reached_max_restart_intensity, it will wait 11 seconds and try to restart this child. Unfortunately, the supervisor3 documentation is rather thin, but it was sufficient for me. Using {:one_for_all, 0, 1000} means it will trigger the delayed restart if it encounters a single crash over a period of 1000 seconds, i.e. always. This is important because otherwise it will try to restart the child until it hits the maximum restart intensity before supervisor3 does it's delayed restart magic, because that would be the expected behaviour of a supervisor.

It would probably be cleaner if Kaffe.GroupMemberSupervisor supported this directly but I haven't looked too much into that direction.

EDIT: as for me, this issue can be closed if there are no more things to discuss from your side.

objectuser commented 5 years ago

@d53dave Thank you very much for explaining your approach! We're going to look into this further. We might do as you suggest and leverage supervisor3 or add your approach to the README for others to leverage.

jamesgaul commented 4 years ago

It looks like the support for :already_present is in kaffe 1.18. This version of kaffe brings in brod 3.15 which does not seem to bring in the deps for supervisor3 and kafka_protocol even though I see them in the brod rebar.config. I am using otp 22 and elixir 1.10. What could I be doing wrong?

objectuser commented 4 years ago

@jamesgaul I think supervisor3 et al should be included. You don't see them in your mix.lock?

jamesgaul commented 4 years ago

"kaffe": {:hex, :kaffe, "1.18.0", "c6064797e5e0e0b625df8de1b9bff2d0b5e48104ea28e8677d9db73e641b263e", [:mix], [{:brod, "~> 3.0", [hex: :brod, repo: "hexpm", optional: false]}], "hexpm", "fccf831ee0d9c07374c6a60c1be1bb878051db43f2ec3d563a45bd9ae5271c55"}, "kafka_protocol": {:hex, :kafka_protocol, "1.1.2", "96d09c4287377795ae2c488aceae2dc6872b94edba8309a27ace933d9ae32333", [:rebar, :rebar3], [{:snappyer, "1.2.1", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "782ba96f060f30cb989806c89bb4114df070640e1193b3e04ad4378244e84a76"}, . . . "supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], [], "hexpm", "e6f489d6b819df4d8f202eb00a77515a949bf87dae4d0a060f534722a63d8977"},

objectuser commented 4 years ago

OK that looks correct. What are you experiencing?

jamesgaul commented 4 years ago

When I do mix deps.get supervisor3 and kafka_protocol deps do not get downloaded. Could it be I do not have rebar3 installed properly?

objectuser commented 4 years ago

@jamesgaul You shouldn't have to do that explicitly. Maybe just mix deps.clean --all and then mix deps.get to see what happens.

jamesgaul commented 4 years ago

$ mix deps.clean --all ; mix deps.get

include/brod_int.hrl:21: can't find include lib "kafka_protocol/include/kpro.hrl"; Make sure kafka_protocol is in your app file's 'applications' list

* (Mix) Could not compile dependency :brod, "/Users/james/.asdf/installs/elixir/1.10.3/.mix/rebar3 bare compile --paths="/Users/james/repos/ESE/vfb/_build/dev/lib//ebin"" command failed. You can recompile this dependency with "mix deps.compile brod", update it with "mix deps.update brod" or clean it with "mix deps.clean brod"

objectuser commented 4 years ago

That's weird. I'd suggest reinstalling Erlang and Elixir, just getting a clean environment.

jamesgaul commented 4 years ago

I will try that. Thanks

jamesgaul commented 4 years ago

I still cannot build brod from an elixir app with kaffe 1.18.0. It doesn't seemed to want to download the supervisor3 and kafka_protocol dependencies.

jamesgaul commented 4 years ago

i tried to install elixir 1.10.3 and erlang 22 on a clean VM. Created a new elixir project and included kaffe 1.18.0 in deps and still get same results.

objectuser commented 4 years ago

I updated one of our apps to the same versions and think I'm getting the same issue.

I was able to pin brod to 3.12.0 and it compiled. same with 3.13.0 and 3.14.0. But once I set it to 3.15.0 I get:

===> Compiling brod
===> Compiling src/brod_group_member.erl failed
include/brod.hrl:21: can't find include lib "kafka_protocol/include/kpro_public.hrl"

include/brod_int.hrl:21: can't find include lib "kafka_protocol/include/kpro.hrl"

** (Mix) Could not compile dependency :brod, ".../.asdf/installs/elixir/1.10.2-otp-22/.mix/rebar3 bare compile --paths=".../dev/kafka/gateway-routing/_build/dev/lib/*/ebin"" command failed. You can recompile this dependency with "mix deps.compile brod", update it with "mix deps.update brod" or clean it with "mix deps.clean brod"

Looking at the diff here I don't really see the problem. 🤔