dashbitco / broadway

Concurrent and multi-stage data ingestion and data processing with Elixir
https://elixir-broadway.org
Apache License 2.0
2.43k stars 161 forks source link

Multiple Pipelines #265

Closed benkeil closed 3 years ago

benkeil commented 3 years ago

It looks like it is not possible to use multiple pipelines.

children = [
  ...
  {ProductReviewFeed.Pipeline.Prepare, []},
  {ProductReviewFeed.Pipeline.Batch, []},
  {ProductReviewFeed.Pipeline.Process, []},
  ...
]
2021-09-09T16:14:30.052Z module=Broadway.Topology.ProcessorStage [error] ** (RuntimeError) message was set to unknown batcher :sqs. The known batchers are [:reduce]
    (broadway 1.0.0) lib/broadway/topology/processor_stage.ex:194: Broadway.Topology.ProcessorStage.validate_message/2
    (broadway 1.0.0) lib/broadway/topology/processor_stage.ex:156: anonymous fn/6 in Broadway.Topology.ProcessorStage.handle_messages/4
    (telemetry 1.0.0) /Users/trubke/development/aaa/etrusted_product_review_feed/deps/telemetry/src/telemetry.erl:293: :telemetry.span/3
    (broadway 1.0.0) lib/broadway/topology/processor_stage.ex:143: Broadway.Topology.ProcessorStage.handle_messages/4
    (broadway 1.0.0) lib/broadway/topology/processor_stage.ex:63: anonymous fn/2 in Broadway.Topology.ProcessorStage.handle_events/3
    (telemetry 1.0.0) /Users/trubke/development/aaa/etrusted_product_review_feed/deps/telemetry/src/telemetry.erl:293: :telemetry.span/3
    (gen_stage 1.1.1) lib/gen_stage.ex:2462: GenStage.consumer_dispatch/6
    (gen_stage 1.1.1) lib/gen_stage.ex:2650: GenStage.take_pc_events/3
    (stdlib 3.15.2) gen_server.erl:695: :gen_server.try_dispatch/4
    (stdlib 3.15.2) gen_server.erl:771: :gen_server.handle_msg/6
    (stdlib 3.15.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

If I switch the order of the children for the supervisor the error is that :reduce is not found and only :sqs is known. So the last Pipeline in the children override the batchers of other pipelines.

josevalim commented 3 years ago

Can you please provide the complete code? The issue is not in the supervisor but in the pipeline definition. Thanks!

benkeil commented 3 years ago
defmodule ProductReviewFeed.Pipeline.Batch do
  use Broadway

  require Logger
  require Ecto.Query

  alias Broadway.Message

  @type json :: String.t()

  def start_link(_opts) do
    Broadway.start_link(
      __MODULE__,
      name: __MODULE__,
      context: [
        queue_url_process: Application.get_env(:product_review_feed, :queue_url_process),
      ],
      producer: [
        module: {
          BroadwaySQS.Producer,
          queue_url: Application.get_env(:product_review_feed, :queue_url_batch),
          config: [
            access_key_id: Application.get_env(:ex_aws, :access_key_id),
            secret_access_key: Application.get_env(:ex_aws, :secret_access_key),
            security_token: Application.get_env(:ex_aws, :security_token),
            region: Application.get_env(:ex_aws, :region),
          ],
        },
        concurrency: 20
      ],
      processors: [
        default: [
          concurrency: 20,
        ]
      ],
      batchers: [
        reduce: [
          concurrency: 100,
          batch_size: 1000,
          batch_timeout: 5000,
        ]
      ]
    )
  end

  def handle_message(_processor_name, %Message{data: product_id} = message, _context) do
    Logger.debug("Received message: #{inspect message}")
    message
    |> Message.put_batch_key(product_id)
    |> Message.put_batcher(:reduce)
  end

  def handle_batch(:reduce, messages, _batch_info, [queue_url_process: queue_url_process]) do
    %Message{data: product_id} = List.first(messages)
    Logger.info "Reducing #{length(messages)} messages"
    ExAws.SQS.send_message(
      queue_url_process,
      product_id,
      message_group_id: product_id
    )
    |> ExAws.request!()
    Logger.debug "Successfully send message"
    messages
  end
end
defmodule ProductReviewFeed.Pipeline.Prepare do
  use Broadway

  require Logger
  require Ecto.Query

  alias Broadway.Message

  @type json :: String.t()

  def start_link(_opts) do
    Broadway.start_link(
      __MODULE__,
      name: __MODULE__,
      context: [
        queue_url_batch: Application.get_env(:product_review_feed, :queue_url_batch),
      ],
      producer: [
        module: {
          BroadwaySQS.Producer,
          queue_url: Application.get_env(:product_review_feed, :queue_url_prepare),
          config: [
            access_key_id: Application.get_env(:ex_aws, :access_key_id),
            secret_access_key: Application.get_env(:ex_aws, :secret_access_key),
            security_token: Application.get_env(:ex_aws, :security_token),
            region: Application.get_env(:ex_aws, :region),
          ],
        },
        concurrency: 20
      ],
      processors: [
        default: [
          concurrency: 20,
        ]
      ],
      batchers: [
        reduce: [
          concurrency: 50,
          batch_size: 10,
          batch_timeout: 1000,
        ]
      ]
    )
  end

  def handle_message(_processor_name, message, _context) do
    Logger.debug("Received message: #{inspect message}")
    product_id = get_product_id(message)
    message
    |> Message.put_data(product_id)
    |> Message.put_batch_key(product_id)
    |> Message.put_batcher(:sqs)
  end

  def handle_batch(:sqs, messages, _batch_info, [queue_url_batch: queue_url_batch]) do
    Logger.info "Send #{length(messages)} messages"
    %{data: product_id} = List.first(messages)
    ExAws.SQS.send_message(
      queue_url_batch,
      product_id,
      message_group_id: product_id
    )
    |> ExAws.request!()
    Logger.debug "Successfully send message"
    messages
  end

  @spec get_product_id(message :: Message) :: String.t()
  defp get_product_id(message) do
    Jason.decode!(message.data)["productId"]
  end
end
josevalim commented 3 years ago

The error message is saying that one of your handle_message is setting the batcher to sqs but that pipeline only has a reduce batcher and vice-versa. You can share the handle_message, but it doesn’t seem to be a Broadway issue. Thanks!

josevalim commented 3 years ago

Now with the added code, you can see that the Prepare pipeline defines a :reduce batcher but calls put_batcher(:sqs). Pls double check for entries like this. :)

benkeil commented 3 years ago

How could I not see this... Thanks.