dashbitco / broadway

Concurrent and multi-stage data ingestion and data processing with Elixir
https://elixir-broadway.org
Apache License 2.0
2.43k stars 161 forks source link

Pass topology options to the producer #100

Closed josevalim closed 5 years ago

josevalim commented 5 years ago

Today all Broadway producers projects are in need of receiving more options about the topology. SQS and GCPubSub would benefit from knowing the topology to configure a connection pool. Kafka needs it for reasons I can't remember. And RabbitMQ would need it for the index: https://github.com/plataformatec/broadway_rabbitmq/issues/36

One important thing to mention is also that Broadway does not respect the child_spec contract. We will always call the init function (and never child_spec and start_link) since the child specification is configured externally by the topology. Also remember that Broadway already has an optional callback, called prepare_for_draining.

In any case, we need to introduce a new callback. My suggestion is to call it init_for_broadway. It will receive init_for_broadway(options, {name, index}, topology). This callback is optional, if it is not defined, we will call the regular init callback. The name is the name given in the topology. And the index is index >= 0 with the position of the process in the supervision tree.

/cc @msaraiva @mcrumm @whatyouhide

whatyouhide commented 5 years ago

@josevalim is there a reason for going with a different name instead of init/3?

josevalim commented 5 years ago

Init/3 can look like a typo or be an accident. So I thought it would be better to be explicit? --

José Valimwww.plataformatec.com.br http://www.plataformatec.com.br/Founder and Director of R&D

whatyouhide commented 5 years ago

@josevalim okay, fair enough. I would suggest the name init_with_broadway_info/3 then, but good to go on my side :)

mcrumm commented 5 years ago

init_for_broadway/3 would still return {:producer, state}, correct?

Is there any value in an abstraction requires less knowledge of GenStage?

defmodule MyProducer do
  use Broadway.Producer

  def init(options, {name, index}, topology) do
    {:ok, state}
  end
end
josevalim commented 5 years ago

init_for_broadway/3 would still return {:producer, state}, correct?

Yes.

Is there any value in an abstraction requires less knowledge of GenStage?

It may have and we are slightly walking towards it, it seems, but we are not there yet. The important thing to note is, if we introduce an abstraction that abstracts away GenStage, then we may no longer support existing GenStage producers (which is a feature of the current design.

mcrumm commented 5 years ago

It may have and we are slightly walking towards it, it seems, but we are not there yet.

In that case, init_for_broadway/3 works for me :+1:

msaraiva commented 5 years ago

Kafka needs it for reasons I can't remember

Yup, we need this for kafka to make sure the producer always sends messages from the same partition to the same processor so we can guarantee the order of messages and have safe acknowledgements. However, we need to guarantee that throughout the pipeline, so as far as I can remember, we'll also need something similar for batcher consumers.

josevalim commented 5 years ago

Ah, in the Kafka case we need a way to modify all options before we even start the producers. :thinking:

josevalim commented 5 years ago

So I talked to @msaraiva and we will call this prepare_options and it will only receive producer options and return updated producer options. The partition-ing or not will be a producer configuration.

josevalim commented 5 years ago

For GCPubSub and Amazon SQS, I have thought more about it, and I think it doesn't make sense to couple the pool size to the topology, because we never know if the acking is happening on processors or batchers, especially in face of failures and the upcoming ack_immediately. So I think the best option for the connection pool size is to make it the double of number of producers by default, but make it overall configurable.

D4no0 commented 3 years ago

Is multiple producers support added? In documentation it is still stated:

:producer - Required. A keyword list of options. See "Producers options" section below. Only a single producer is allowed.

josevalim commented 3 years ago

Not allowed and likely won't ever be.