dashbitco / broadway_kafka

A Broadway connector for Kafka
233 stars 53 forks source link

Only one processors receives messages #89

Closed slashmili closed 2 years ago

slashmili commented 2 years ago

Hi, I have a case that I set concurrency in my processors config, only one of the processors get the job all the time.

My setup

  1. I'm running the Kafka broker locally but I've seen the same issue in production as well.
  2. I have a topic with 20 partitions
  3. The Broadway module is like:

    defmodule MyApp.Broadway do
    use Broadway
    
    alias Broadway.Message
    require Logger
    
    def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: producer_module(),
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 10
        ]
      ]
    )
    end
    
    defp producer_module do
    {BroadwayKafka.Producer,
     [
       hosts: "localhost:9092",
       group_id: "foo_group_id2",
       topics: ["event.foo"]
     ]}
    end
    
    @impl true
    def handle_message(_, message, _) do
    Logger.info("#{inspect(self())}: producing data #{inspect(message)}")
    Process.sleep(2000)
    message
    end
    end

Issue

When I start the iex terminal I see that the connection is established

[info] Group member (foo_group_id2,coor=#PID<0.579.0>,cb=#PID<0.574.0>,generation=29):
assignments received:
  event.foo:
    partition=0 begin_offset=3183
    partition=1 begin_offset=undefined
    partition=2 begin_offset=2399
    partition=3 begin_offset=undefined
    partition=4 begin_offset=2506
    partition=5 begin_offset=2766
    partition=6 begin_offset=2215
    partition=7 begin_offset=2527
    partition=8 begin_offset=2018
    partition=9 begin_offset=12503
    partition=10 begin_offset=2085
    partition=11 begin_offset=2011
    partition=12 begin_offset=1959
    partition=13 begin_offset=1945
    partition=14 begin_offset=2055
    partition=15 begin_offset=1928
    partition=16 begin_offset=1986
    partition=17 begin_offset=2043
    partition=18 begin_offset=2028
    partition=19 begin_offset=undefined
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3183}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3839", metadata: %{headers: [], key: "1076B8A25E736A2A4E37BB25303876D6", offset: 3183, partition: 0, topic: "event.foo", ts: 1651870249942}, status: :ok}
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3184}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3846", metadata: %{headers: [], key: "F9F1DC1EC6CDE5846300B152CDC01033", offset: 3184, partition: 0, topic: "event.foo", ts: 1651870249948}, status: :ok}
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3185}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3847", metadata: %{headers: [], key: "C34C91322862A23470A4001FC4A7CB38", offset: 3185, partition: 0, topic: "event.foo", ts: 1651870249949}, status: :ok}
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3186}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3855", metadata: %{headers: [], key: "DBE92A4125651EE89BFA0CAA47ECDFAD", offset: 3186, partition: 0, topic: "event.foo", ts: 1651870249955}, status: :ok}
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3187}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3898", metadata: %{headers: [], key: "9783F5B0A4BA741051D5CA2915D97ABA", offset: 3187, partition: 0, topic: "event.foo", ts: 1651870249989}, status: :ok}
[info] #PID<0.582.0>: producing data %Broadway.Message{acknowledger: {BroadwayKafka.Acknowledger, {#PID<0.574.0>, {29, "event.foo", 0}}, %{offset: 3188}}, batch_key: {"event.foo", 0}, batch_mode: :bulk, batcher: :default, data: "body 3899", metadata: %{headers: [], key: "65F16D7346D86C4DE978E569D2FDF410", offset: 3188, partition: 0, topic: "event.foo", ts: 1651870249990}, status: :ok}
....

I've left the terminal open for 30 min and only see that the messages are always delivered to the same PID<0.582.0> process. In addition it always pulls from partition 0.

I also double checked it using live dashboard always proc_0 is busy and others are not receiving any tasks.

Screenshot 2022-05-06 at 23 26 52

Just in case it helps this my consumer group details on this topic

Screenshot 2022-05-06 at 23 28 06

As you see our consumer is lagged behind the end-offset which is the case we have sometimes in our production, to reproduce that locally, you can do :

key = fn -> Base.encode16(:crypto.strong_rand_bytes(16)) end
Enum.each(1..100000, fn i ->   :brod.produce_sync(:foo_producer, "event.foo", :hash, key.(), "body #{i}") end)

Expect to see

I'd expect to see two things:

  1. I'd expect that the producer distributes the messages to all processors.
  2. I'd expect that the producer pulls data from all partitions(not sure how it will do it, randomly or round-robin!)

Could you please kindly help me to understand if: a. This is an expected behaviour. b. This is not an expected behaviour and is a bug. c. There is a problem in my Broadway process config.

josevalim commented 2 years ago

The root cause is definitely they are pulling from the same partition. I would investigate directly in Brod if that's the expected behaviour and, if so, how to change it.

slashmili commented 2 years ago

While I was trying digging into brod, I started looking into BrodwayKafka and poke around with the producer process. When I run :sys.get_status on the producer, I get this huge state https://gist.github.com/slashmili/93b1fd245e65b630bb875ebed8935f10

It seems that producer's buffer is full with messages from different partitions but for some reason is not willing to give it to the processors.

I'm using broadway 1.0.3 and broadway_kafka 0.3.4

josevalim commented 2 years ago

Ok, that explains it. GenStage PartitionDispatcher expects an even distribution of events but there are large amounts of events going to a single partition. Can you please try out #90?

slashmili commented 2 years ago

Yup! It's putting the other process into work

Never have been happy to see red signs!

Screenshot 2022-05-07 at 01 15 44

Thanks a lot @josevalim!

amacciola commented 2 years ago

@slashmili what version of https://github.com/dashbitco/broadway_dashboard and https://hexdocs.pm/phoenix_live_dashboard/Phoenix.LiveDashboard.html

are you using ? because i set mine up to just be

live_dashboard "/dashboard",
        metrics: TelemetrySupervisor,
        additional_pages: [
          broadway: BroadwayDashboard
        ]
    end

but whenever i go to that tab i always get

Screen Shot 2022-05-10 at 7 14 20 PM

slashmili commented 2 years ago

@amacciola I ran these test in this sample app https://github.com/slashmili/tmp-phoenix-live-dashboard-with-broadway-pluging

In our prod we are using plds and connect the nodes.

But if you can run the sample app and it's working, you might find what's wrong in your app.

amacciola commented 2 years ago

@slashmili have you ever had it working on any env outside of running it locally NOT using PLDS ?

slashmili commented 2 years ago

@amacciola No, I don't have an embedded live dashboard in non-dev environments.