dashbitco / broadway_kafka

A Broadway connector for Kafka
222 stars 52 forks source link

Undesirable resource usage related to producer concurrency #132

Closed oliveigah closed 8 months ago

oliveigah commented 9 months ago

Context

I've noticed that the current implementation of BroadwayKafka.BrodClient.setup/4 always starts a new :brod client as follow:

def setup(stage_pid, client_id, callback_module, config) do
    with :ok <- :brod.start_client(config.hosts, client_id, config.client_config),
         {:ok, group_coordinator} <-
           start_link_group_coordinator(stage_pid, client_id, callback_module, config) do
      Process.monitor(client_id)
      ref = Process.monitor(group_coordinator)
      Process.unlink(group_coordinator)
      {:ok, group_coordinator, ref}
    end
  end

The problem is that this function is called for every new BroadwayKafka.Producer which may be initialized multiple times if producer concurrency is set to a number greater than one.

At my current understanding, in order to achieve maximum parallelism the number of broadway producers we need is the lowest between schedulers online and the sum of all topic's partitions.

But with the current implementation this would lead to a new TCP connection with each broker of the cluster for each one of the producers which is undesirable.

Proposal

Since 1 brod client is enough to handle most workloads, we could offer a new client_option called max_concurrency defaults to :infinity that would control how many brod clients will be started.

At my first look at the code I think the best approach would be start all brod clients before any producer and select a random client for each one on intialization.

The general approach consist in the following changes:

This proposal is very broad and I'll probably need to refine it during development considering possible side effects.

Closing thoughts

Let me know if all this makes sense to you or if I misunderstood something about the problem or it's another way to solve this with the current features we have.

If all make sense I'll start working on the PR for this. Just let me know! Thanks! :smiley:

josevalim commented 9 months ago

I am ok with sharing the client but I think your proposed solution would be non-trivial. Perhaps the best is to have a shared_client: boolean() configuration. If true, you create the client in the supervision tree and shares it, otherwise it is one per producer. WDYT? If you are happy with it, I'd love to review a PR. :)

oliveigah commented 9 months ago

Yeah, that sounds simpler indeed. The only problem I can see is that it's a one or all solution.

Maybe receive an integer to create multiple clients and select a random one on producer initialization?

What is the part you consider non trivial? Multiple (but not all) clients or starting them inside producer init?

Gonna start to work on a PR receiving a boolean flag at first and we decide about the integer later.

Thanks!

v0idpwn commented 9 months ago

I'm not sure if I follow the use-case: why would you have a producer concurrency higher than the number of brod clients required for maximum parallelism?

i.e.: assume you have 4 partitions, why'd you start more than 4 producers? And if it has some benefit, wouldn't those benefits be lost by sharing the same brod client/group coordinator?

oliveigah commented 9 months ago

assume you have 4 partitions, why'd you start more than 4 producers?

Yes, AFAICT we do not need more producer concurrency than 4, but that is not the problem here. The problem is that each broadway producer initialize a new brod client.

With the current behaviour if you setup this pipeline you will end up with 4 brod clients and 4 TCP conections (assuming just 1 broker).

The point is that you could reuse the brod client and not start a new one per producer because the paralelism needed for the producer scales differently from the parallelism needed for brod clients.

And if it has some benefit, wouldn't those benefits be lost by sharing the same brod client/group coordinator?

The only scenario I can see starting more brod clients would help is if the bottleneck of your pipeline is the TCP connection between the application and the broker, which is almost never the case if you are batching properly and the connection uses non blocking IO.

As the kafka protocol guide says https://kafka.apache.org/protocol.html#protocol_network:

The client will likely need to maintain a connection to multiple brokers, as data is partitioned and the clients will need to talk to the server that has their data. However it should not generally be necessary to maintain multiple connections to a single broker from a single client instance (i.e. connection pooling).

The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well. The broker's request processing allows only a single in-flight request per connection in order to guarantee this ordering. Note that clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput. i.e., clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer. All requests are initiated by the client, and result in a corresponding response message from the server except where noted.

Considering that, I think most cases would not be negativelly impacted by sharing clients but since it may be a problem for some specific scenarios we should keep the current behaviour as the default.

That's makes sense to you @v0idpwn?

v0idpwn commented 9 months ago

Absolutely, thanks for the thoughtful explanation!

oliveigah commented 9 months ago

@josevalim after my first pass at the code I have some considerations.

Since we just have access to the client config when Broadway.start_link is called at runtime I can't see a way to start the clients directly on the broadway_kafka supervision tree without some changes on the configuration interface.

The only way I could start the client before Broadway.start_link is called is by setting a new configuration section just for shared clients. Like this:

client_1 = %{
  id: :my_shared_client_1,
  hosts: ["host1", "host2"]
  group_config_options: foo,
  client_config_options: bar,
  fetch_config_options: baz
}

config :broadway_kafka, :shared_clients, [ client_1, client_2,  client_3 ]

And them accepts the shared_client_id as a option for the BroadwayKafka.Producer like this:

Broadway.start_link(MyBroadway,
  name: MyBroadway,
  producer: [
    module: {BroadwayKafka.Producer, [
      shared_client_id: :my_shared_client_1
      topics: ["test"],
    ]},
    concurrency: 1
  ],
  processors: [
    default: [
      concurrency: 10
    ]
  ]
)

Was that what you had in mind? Seems like a bigger change than what you had proposed, am I missing something?

josevalim commented 9 months ago

The prepare_for_start callback should allow you to specify more children that are added to the supervision tree: https://github.com/dashbitco/broadway/blob/ebee2a94ffa6f16bc14ffa6dbc20d3c2f7b5bb73/lib/broadway/producer.ex#L114

oliveigah commented 8 months ago

I've tested the changes on the sandbox environment of a real world system we have here and the results are great so far.

The memory usage decreased 1.6 GB and port usage decreased in 500 ports. Some results from the :observer_cli

shared_client = true image

shared_client = false image

Given that, I think we can close this issue! Thanks for help and feedback. :smile: