dashbitco / broadway

Concurrent and multi-stage data ingestion and data processing with Elixir
https://elixir-broadway.org
Apache License 2.0
2.43k stars 161 forks source link

Proposal: Publishers with handled back pressure from business logic #50

Closed am-kantox closed 5 years ago

am-kantox commented 5 years ago

As it was discussed at λ-days, I am to describe our use case for Broadway.

We have a Rabbit queue of incoming messages at the rate ~10K/sec in peaks. We apply validation rules onto them since some of the messages are malformed. These validation rules might be time-consuming, the result of applying them would be another Rabbit queue (valid messages.)

Then we apply some processing again and deliver them to another queue(s). That said, we currently use Rabbit as a communication channel between different processes, microservices, and the main application. That simplifies the architecture because there are very few if any ties between code in different microservices.

It could be drawn this way:

——————————   ——————————   ——————————   ——————————   ——————————   —————————— 
| Rabbit | → |  Flow  | → | Rabbit | → |  ....  | → | Rabbit | → |  Flow  | 
——————————   ——————————   ——————————   ——————————   ——————————   —————————— 

Flows over GenStages are used for heavy computation. To avoid code repetitions, we built a library that takes one or more processors in a form of {Mod, :fun} tuples alongside with sources and destinations. It was created two years ago and basically works over connection pools and compiled configs, that’s why we were so excited to hear about Broadway. Unfortunately in it’s current documentation Broadway is meant as a “dead end,” meaning that it has a great infrastructure to handle back pressure and to process incoming data, but one cannot just plug-and-play the existing Flow implementation into the pipeline as shown above.

It would be great to have [optional] publishers or how do you name it as well (top half of the picture was taken from announce by Plataformatec):

                         [producer_1]
                             / \
                            /   \
                           /     \
                          /       \
                 [processor_1] [processor_2] ... [processor_50]  <- process each message
                          /\     /\
                         /  \   /  \
                        /    \ /    \
                       /      x      \ 
                      /      / \      \
                     /      /   \      \
                    /      /     \      \
              [batcher_s3_odd]  [batcher_s3_even]
                    /\                  \
                   /  \                  \
                  /    \                  \
                 /      \                  \
 [consumer_s3_odd_1] [consumer_s3_odd_2]  [consumer_s3_even_1] <- process each batch

 ————————————————————————————————————————————————————————————————————————————————————————

 [consumer_s3_odd_1] [consumer_s3_odd_2]  [consumer_s3_even_1] <- process each batch
                \      /             \      /                   
                 \    /               \    /                   
                  \  /                 \  /                   
                   \/                   \/                   
            [batcher_s3_odd]     [batcher_s3_even]
                         / \     /\ 
                        /   \   /  \
                       /     \ /    \
                      /       x      \
                     /       / \      \
                    /       /   \      \
                   /       /     \      \
                  /       /       \      \
                [publisher_1]   [publisher_2] ... [publisher_50]  <- publish each message

That way the consumer part (optionally including batchers) might be easily extracted into a separate codebase, making the business code fully isolated from any broker/connection handling.

We currently use kinda this architecture with supported source backends RabbitMQ / HTTP and destinations backends SQL / RabbitMQ / HTTP / Slack. We are able to simply add another publisher and change nothing in the business logic to export data to the database or to another say client who requires HTTP webhooks.

As I said, our implementation of connectors is just a pool hence we need to reimplement back pressure support in our business logic units. It is not as much of boilerplate, but I am pretty sure it might bring a huge added value to Broadway if it was supported through a configuration like:

def start_link(_opts) do
  Broadway.start_link(__MODULE__,
    name: __MODULE__,
    producers: [...],
    processors: [...],
    batchers: [...],

    pipeline: {MyBusinessUnit, :process},

    batchers: [
      s3_positive: [stages: 2, batch_size: 10],
      s3_negative: [stages: 1, batch_size: 10]
    ],
    publishers: [
      rabbit: [
        module: {BroadwayRabbitMQ.Publisher, exchange_name: "my_exchange"}
      ]
    ]
  )
end
msaraiva commented 5 years ago

Hi @am-kantox. Thanks for your proposal!

We have already discussed some ideas to make Broadway more flexible/pluggable in the near future. However, we decided, for now, to get more feedback like this from the community before we make a decision in that direction.

We certainly going to get back to this issue in the next few days/weeks, so stay tuned and thanks again for your feedback!

josevalim commented 5 years ago

Hi @am-kantox! Thanks for feedback.

I have some questions about your current pipeline.

When you say you have RabbitMQ -> Flow -> RabbitMQ -> Flow, do you mean that:

  1. you get the data from RabbitMQ, then you process it with Flow, then you send it to RabbitMQ, and then there is another Flow that gets the data from RabbitMQ again?

  2. or you get the data from RabbityMQ, then you process it with Flow, then you send it to RabbitMQ, and continue processing it with Flow (without getting it from RabbitMQ again)?

That will help us steer the discussion in the correct direction.

am-kantox commented 5 years ago

The first one. We try to decouple all Flows as much as possible, hence no ties.

Also, No2 might be of interest as well, when the next pipeline is in the same microservice.

On Wed, Feb 27, 2019, 6:54 PM José Valim notifications@github.com wrote:

Hi @am-kantox https://github.com/am-kantox! Thanks for feedback.

I have some questions about your current pipeline.

When you say you have RabbitMQ -> Flow -> RabbitMQ -> Flow, do you mean that:

1.

you get the data from RabbitMQ, then you process it with Flow, then you send it to RabbitMQ, and then there is another Flow that gets the data from RabbitMQ again? 2.

or you get the data from RabbityMQ, then you process it with Flow, then you send it to RabbitMQ, and continue processing it with Flow (without getting it from RabbitMQ again)?

That will help us steer the discussion in the correct direction.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/plataformatec/broadway/issues/50#issuecomment-467963973, or mute the thread https://github.com/notifications/unsubscribe-auth/AK3INiVj3k-fOVlQ1JUdTYPfHjujs2Bbks5vRsZrgaJpZM4bT0sC .

josevalim commented 5 years ago

The first one. We try to decouple all Flows as much as possible, hence no ties.

@am-kantox so in this case, you should be able to create multiple broadway pipelines. Correct? Why would publishers be necessary? Or is just that you don't need batching?

am-kantox commented 5 years ago

Eh. Because we basically do not want to think about where we do publish. Slack and usual HTTP endpoint do not like 5K posts per a sec, and I am currently building a back pressure publisher against these endpoints.

On Wed, Feb 27, 2019, 7:44 PM José Valim notifications@github.com wrote:

The first one. We try to decouple all Flows as much as possible, hence no ties.

@am-kantox https://github.com/am-kantox so in this case, you should be able to create multiple broadway pipelines. Correct? Why would publishers be necessary? Or is just that you don't need batching?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/plataformatec/broadway/issues/50#issuecomment-467981715, or mute the thread https://github.com/notifications/unsubscribe-auth/AK3INhjpeMQiqsKVSrxeSuyVTbjpgaz0ks5vRtIFgaJpZM4bT0sC .

am-kantox commented 5 years ago

That does mean that Broadway might take care about number of publishers to Rabbit, or about number of requests per a sec to some other service, assuming we have peak times and quiet times.

josevalim commented 5 years ago

There are two ways we can implement this functionality today with GenStage:

  1. You have custom consumers that set the demand to manual to have granular control over the demand

  2. You simply block in the consumer until the thing you are calling can accept the request

Almost everyone is doing 2, because it is by far the simplest approach, and that's what Broadway supports. It is also not a major issue to block a consumer.

Have you implemented 1 by any chance? If so, I would love to take a look at it, otherwise, I would postpone this until we have use cases in place.

am-kantox commented 5 years ago

Well, we have not implemented 1 despite I have had a couple of attempts :)

Let us postpone this until we have it implemented.

Still, the ability to deliberately select a publish method according to the endpoind is valuable IMHO. Like, if there is a DB as a backend, collect batches and do bulk insert. Or report success if N messages were acked from external source.

On Wed, Feb 27, 2019, 8:05 PM José Valim notifications@github.com wrote:

There are two ways we can implement this functionality today with GenStage:

1.

You have custom consumers that set the demand to manual to have granular control over the demand 2.

You simply block in the consumer until the thing you are calling can accept the request

Almost everyone is doing 2, because it is by far the simplest approach, and that's what Broadway supports. It is also not a major issue to block a consumer.

Have you implemented 1 by any chance? If so, I would love to take a look at it, otherwise, I would postpone this until we have use cases in place.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/plataformatec/broadway/issues/50#issuecomment-467989318, or mute the thread https://github.com/notifications/unsubscribe-auth/AK3INta-u-GaDCJCeEW_feSAVRgE19bMks5vRtbtgaJpZM4bT0sC .

josevalim commented 5 years ago

Still, the ability to deliberately select a publish method according to the endpoind is valuable IMHO.

You can achieve this today by setting up multiple batchers. :)

am-kantox commented 5 years ago

I must be missing something.

Butchers are about getting external data, correct me if I am wrong.

I am talking about 10 different destinations, after 10 processing steps.

Am I missing something?

On Wed, Feb 27, 2019, 8:20 PM José Valim notifications@github.com wrote:

Still, the ability to deliberately select a publish method according to the endpoind is valuable IMHO.

You can achieve this today by setting up multiple batchers. :)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/plataformatec/broadway/issues/50#issuecomment-467994682, or mute the thread https://github.com/notifications/unsubscribe-auth/AK3INqDgGdHgFmDoPelnECIshg2FaSj9ks5vRtqBgaJpZM4bT0sC .

josevalim commented 5 years ago

Batchers are about grouping data before "publishing" it to a separate step. So for example, if you want write to RabbitMQ one by one and write to the database ten by ten, you define two batchers.

So 10 different destinationis is totally possible today. 10 processing steps aren't right now but this is tracked in #39.

am-kantox commented 5 years ago

I am not a mind reader :)

Glad to hear this title means exactly this.

josevalim commented 5 years ago

@am-kantox the batchers part though (10 destinations) exists already today and is in the docs. Was that not made clear in the docs? Is there something we could improve?

am-kantox commented 5 years ago

@josevalim I have carefully re-read the docs; it was totally me misunderstanding the pipeline. I was fooled because I have a very biased view based on our existing implementation. I was under an impression all the heavy work is done in consumers, after batching, not in processors. I am positive docs are good enough and other people discovering Broadway would be fine understanding it as it is.

The thing is, now I am not convinced by the top of the pyramid on the pipeline. It sounds like all the messages are going through Broadway.handle_message/3 callback and we are to implement Flow there. Is that correct?

Even when multiple producers/processors will be allowed, there is no batching / partitioning before Broadway.handle_message/3 in the pipeline. Yes, we could rely on pattern matching / guarding in it, but that’s not exactly how batching works. So, the whole Flow implementation would usually go into the single handle_message/3 callback, because of key option to Flow.partition/2 is [biased: in a vast majority of cases] more complicated than a guard. Is that correct?

The above is the major reason I was misunderstanding the batcher’s role.

josevalim commented 5 years ago

If you are using Broadway, I don't see why you would use Flow. We already do all of the partitioning, parallelism, etc for you. Given you said you are doing RabbitMQ -> Flow -> RabbitMQ -> Flow this will be two broadway pipelines: one that gets from RabbitMQ, processes it and sends to another RabbitMQ channel. And another pipeline that gets from RabbitMQ as well and consumes it.

Why would you need both Broadway and Flow?

am-kantox commented 5 years ago

Flow is a more general and powerful abstraction than Broadway that focuses on data as a whole, providing features like aggregation, joins, windows, etc. — README

@josevalim I understand that there are two Broadway pipelines, but this is out of the scope of why Flow, so let’s assume we have one for simplicity.

We have a complicated Flow with several emits and we need to keep and pass an accumulator through the whole Flow. Basically, this is a processing of the huge CSV having several different partitions and a complicated logic to produce different new Ecto schemas out of it. Either I did not get how Broadway could be used instead, or we are stick to Flow for this particular processing. Partitioning inside these Flows is done by, say, properties.

OTOH, we want this Flows to act as plugins agnostic to the real data source. It might be several CSVs to be glued into one, or even many API calls with “a single row” data in each of them. For the latter, we want to use Broadway, processing different data sources and partitioning them, say, by client. This partitioning is relatively easy and does not require Flow.Window and other sophisticated stuff from Flow.

And we want these parts to be independent as much as possible. I hope the above answers the question.

josevalim commented 5 years ago

Right, you can't do accumulator in Broadway, so if that is a must, Flow is a better option! But we can allow hashing, which will at least guarantee the same processors handle the same data. This can be useful at least for ordering. I have opened #62 for this case. Thanks!