elixir-lang / gen_stage

Producer and consumer actors with back-pressure for Elixir
http://hexdocs.pm/gen_stage
1.51k stars 192 forks source link

** (FunctionClauseError) no function clause matching in GenStage.Streamer.handle_info/2 #238

Closed mgwidmann closed 5 years ago

mgwidmann commented 5 years ago

I'm working with broadway to process SQS messages containing references to files on s3 and for each SQS message I start up a flow processing pipeline (they're big files, over 1 gb each).

After a few seconds of processing I get the following exception and I'm not quite sure what I've done wrong to cause it.

21:43:44.140 [error] GenServer #PID<0.376.0> terminating
** (FunctionClauseError) no function clause matching in GenStage.Streamer.handle_info/2
    (gen_stage) lib/gen_stage/streamer.ex:34: GenStage.Streamer.handle_info({{#Reference<0.339123414.4269801476.85202>, 58}, {60817408, <<143, 254, 219, 233, 209, 211, 61, 45, 95, 168, 251, 31, 78, 49, 245, 198, 190, 98, 146, 105, 183, 247, 76, 243, 201, 201, 251, 217, 68, 159, 118, 22, 145, 179, 46, 220, 131, 27, 43, 73, 226, 228, 6, 114, 120, 163, ...>>}}, #Function<21.117072283/1 in Stream.do_list_transform/7>)
    (gen_stage) lib/gen_stage.ex:2036: GenStage.noreply_callback/3
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {{#Reference<0.339123414.4269801476.85202>, 58}, {60817408, <<143, 254, 219, 233, 209, 211, 61, 45, 95, 168, 251, 31, 78, 49, 245, 198, 190, 98, 146, 105, 183, 247, 76, 243, 201, 201, 251, 217, 68, 159, 118, 22, 145, 179, 46, 220, 131, 27, 43, 73, 226, 228, 6, 114, 120, 163, ...>>}}
State: #Function<21.117072283/1 in Stream.do_list_transform/7>

If I change it back from flow to Stream instead, everything processes fine (using the exact same input file).

Heres as much of my flow setup as I can share, its getting called from within my Message.update_data/1 call within broadway.

    bucket
    |> ExAws.S3.download_file(path, :memory, chunk_size: 1024 * 1024 * 1, max_concurrency: 25)
    |> ExAws.stream!()
    |> StreamGzip.gunzip()
    |> Stream.concat([:end])
    |> Stream.transform("",fn
      :end, prev ->
        {[prev],""}
      chunk, prev ->
        [last_line | lines] =
          String.split(prev <> chunk, "\n")
          |> Enum.reverse()
        {Enum.reverse(lines), last_line}
    end)
    |> Flow.from_enumerable()
    |> Flow.map(fn line ->
      String.split(line, " ")
    end)
    |> Flow.filter(fn
      [_, _, _, _, "SOME", "VALUE" | _] ->
        true
      [_, _, _, _, "ANOTHER", "VALUE" | _] ->
        true
      _ ->
        false
    end)
    # |> Flow.partition()
    |> Flow.map(fn
      [a, _, _, _, _, b, _, _, c, d | _] ->
        parse_record(a, b, c, d)
      [a, _, _, _, b, _, _, _, c, d | _] ->
        parse_record(a, b, c, d)
      other ->
        IO.puts "Found other unmatched line: #{inspect other}"
    end)
    |> Flow.partition()
    |> Flow.reduce(fn -> %{} end, fn {a, {{b, _}, c}}, map ->
      Map.get_and_update(map, a, fn map2 ->
        {map2, Map.update(map2 || %{}, b, MapSet.new([c]), fn m -> MapSet.put(m, c) end)}
      end)
      |> elem(1)
    end)
    |> Enum.into(%{})

Sorry for the obscure variable names, had to remove them. I looked at the spot where its crashing and it doesn't appear that this message was intended for this process. I'm not sending anything or using self/0 at all.

Any ideas?

josevalim commented 5 years ago

Hi @mgwidmann! I don't know why this error is happening simply from looking at the code, so I would need a way to reproduce it. Maybe it can be reproduced by downloading something from a public S3 bucket?

mgwidmann commented 5 years ago

Ok, I will try to see if I can reproduce and push it to a github repo. I'll have to find a similarly large file to process, so if you know of any that would help me out.

I tried searching all my dependencies for a similar double-two-tuple (like {{a, b}, {c, d}} by searching for {{ and }}) to see what was sending the message or which process that message was intended for but couldn't find either. That leads me to believe its possibly coming from the Erlang VM or Elixir stdlib for some reason but I'm unsure what the message means. Here is my mix deps for reference:

  defp deps do
    [
      {:broadway, "~> 0.3.0"},
      {:broadway_sqs, "~> 0.1.0"},
      {:hackney, "~> 1.9"},
      {:jason, "~> 1.1.2"},
      {:sweet_xml, "~> 0.6"},
      {:ex_aws, "> 0.0.0", github: "mgwidmann/ex_aws", branch: "normalizing-path", override: true},
      # {:ex_aws_s3, "> 0.0.0", github: "ex-aws/ex_aws_s3"},
      {:ex_aws_s3, "> 0.0.0", path: "../ex_aws_s3"},
      {:stream_gzip, "~> 0.4.0"},
      {:flow, "~> 0.14.3"},
      {:timex, "~> 3.5.0"},
      {:horde, "~> 0.5.0"},
      {:libcluster, "~> 3.0.3"},
      {:elixir_uuid, "~> 1.2.0"}
    ]
  end

Like I said, the only thing I've been able to find that processes successfully is switching back to using Stream, and I've never seen this error when doing that.

mgwidmann commented 5 years ago

Ok I got a reproduction case in a repository here: https://github.com/mgwidmann/gen_stage_issue_238

It has a file inside it thats 90mb and gzipped... I've also attached instructions for reproduction. I will play around with it to see if I can continue to minimize it to isolate where the faulty component is.

mgwidmann commented 5 years ago

By commenting things out and trying to narrow down which line of my code was causing this issue I've found that the following lines (both) make the issue disappear when removed.

    |> Stream.concat([:end])
    # Transform it into lines since it is currently in @chunk_size
    |> Stream.transform("",fn
      :end, prev ->
        {[prev],""}
      chunk, prev ->
        [last_line | lines] =
          String.split(prev <> chunk, "\n")
          |> Enum.reverse()
        {Enum.reverse(lines), last_line}
    end)

I looked through the Stream module briefly but couldn't find anything. Any ideas on what could be causing this?

I pushed a branch with this working here if you'd like to try it out for yourself.

mgwidmann commented 5 years ago

Actually a little more telling of an issue elsewhere is turning the max_concurrency option down to 1 like this:

|> ExAws.S3.download_file(path, :memory, chunk_size: @chunk_size, max_concurrency: 1)

Also fixes the issue. Perhaps this message is coming from a hackney process and is just delivered later than expected? I've updated the above branch with this change.

josevalim commented 5 years ago

Good catch! Is the binary we see in the error message part of any of the downloaded files? Something like <<143, 254, 219, 233, 209, 211, 61, 45, 95, 168, 251, 31, 78, 49, 245, 198, 190, 98, 146, 105, 183, 247, 76, 243, 201, 201, 251, 217, 68, 159, 118, 22, 145, 179, 46, 220, 131, 27, 43, 73, 226, 228, 6, 114, 120, 163>>. In any case, we should discard any pending message!

mgwidmann commented 5 years ago

It may be, not sure. If so its gzipped and so unrecoverable without knowing the state of the gzip at that point in the file. I tried replacing hackney with ibrowse but was ultimately unsuccessful as it operates differently than hackney and doesn't work as a drop in replacement into ex_aws. I'll keep trying to confirm where this was sent from but, do you want to keep this issue open just to track the missing handle_info clause? Would you accept a PR for it?

josevalim commented 5 years ago

Yes, please do send a pull request that logs and ignores for those cases. You can copy the default warning from GenStage itself. Pls let me know if you have any questions.

mgwidmann commented 5 years ago

It looks like other similar situations do something like:

log = '** Undefined handle_info in ~tp~n** Unhandled message: ~tp~n'
:error_logger.warning_msg(log, [mod, msg])

However, within the GenStage.Streamer (which is an instance of a GenStage producer), the state received in handle_info/2 has only the continuation function and not the module where it occurred. Since this seems to be a behind-the-scenes gen stage, it's not perfectly clear where to get the "current module" information from. Should I leave that out or try to find it from somewhere else?

josevalim commented 5 years ago

I would say the module is GenStage.Streamer itself. Also when you start the process you can get the current stacktrace with process.info and keep that around to include in the error message. WDYT?

josevalim commented 5 years ago

Closing in favor of PR.