dashbitco / broadway

Concurrent and multi-stage data ingestion and data processing with Elixir
https://elixir-broadway.org
Apache License 2.0
2.43k stars 161 forks source link

Add `terminate/3` callback #319

Closed type1fool closed 7 months ago

type1fool commented 1 year ago

I have been on a team using Broadway this year to build an ETL application for migrating data from MSSQL tables into Postgres tables. With a custom Tds producer, this process has been fairly straightforward.

However, I understand that Broadway wasn't designed for short-lived ETL jobs. So, we've had to do some hacky stuff where the producer stops the job when it runs out of records. This has been acceptable, though it violates SRP.

As new requirements have come along, we've needed to write some data to CSV files. We built a CSV consumer that opens a file during initialization, handles events to insert rows, and closes the file upon termination. Again, we've had to pollute the producer with code to close the CSV consumer.

I experimented with Flow over the past week or two, trying to determine if it would be a better fit for this type of work. I'm finding myself solving graceful shutdown problems that are already solved by Broadway, and wishing Broadway had an optional terminate/3 callback or a way to interact with Terminator for post-job cleanup work. I don't have a clear idea of how to implement this.

Is this a good idea? I'd be happy to open a PR with a little guidance on where to start. Or, would it be better to build and compose GenStages without Broadway or Flow?

josevalim commented 1 year ago

Files belong to the process that create them and are automatically collected once said processes finish. So you may not need to do anything at all. :)

Other than that, we could add a terminate callback but there are long standing discussions of where it should live. For example, if it is part of the terminator, it will be invoked whenever any part of the topology restarts and then it should rather be called handle_drain or similar. It could also be invoked by Broadway.Topology but then it is only invoked when the whole tree terminates.

type1fool commented 1 year ago

It did occur to me that the file process may be closed automatically. One important detail I left out is that we're also uploading the file to the cloud. So, it's a little more than pure 'cleanup' fwiw.

josevalim commented 1 year ago

@type1fool you could also spawn a process that monitors the file or the consumer and makes sure it terminates accordingly. Specially because terminate is not invoked on broken links and stuff, so it would be the best guarantee for execution (Broadway or not).

type1fool commented 1 year ago

...terminate is not invoked on broken links and stuff...

Good to know. I'll look into handling those scenarios.

These jobs run under a DynamicSupervisor, so maybe part of the answer is to monitor the producer and/or Broadway pipeline and terminate any remaining children upon shutdown. 🤔

type1fool commented 1 year ago

Recently I had some time to look more closely at the source code for Broadway.Topology & Broadway.Topology.Terminator.

I think there's a case to be made for adding consumers to the pipeline topology, which would allow the terminator to drain consumers in Terminator.terminate/2. It's not clear to me how a consumer GenStage would handle subscriptions in a Broadway pipeline, but casting to a GenStage from handle_batch/4 or handle_message/3 may be sufficient.

Example

To provide more context, this chart illustrates how a job would extract data from a source DB, transform it with processors, then load results into a destination DB and CSV file via consumers:

flowchart
  source[(SourceDb)] -- Extract --> PostgresProducer

  subgraph broadway[Broadway]

  PostgresProducer -. Transform .-> Processor1
  PostgresProducer -. Transform .-> Processor2

  Processor1 -.-> Batcher1
  Processor1 -.-> Batcher2
  Processor1 -.-> Batcher3
  Processor1 -.-> Batcher4
  Processor2 -.-> Batcher1
  Processor2 -.-> Batcher2
  Processor2 -.-> Batcher3
  Processor2 -.-> Batcher4

  Batcher1 == Upsert ==> PostgresConsumer
  Batcher2 == Upsert ==> PostgresConsumer
  Batcher3 == Upsert ==> PostgresConsumer
  Batcher4 == Upsert ==> PostgresConsumer

  Batcher1 -. Cast .-> CsvConsumer
  Batcher2 -. Cast .-> CsvConsumer
  Batcher3 -. Cast .-> CsvConsumer
  Batcher4 -. Cast .-> CsvConsumer

  end

  PostgresConsumer == Upsert ==> destination[(DestinationDb)]
  CsvConsumer == Write ==> csv((CSV File))

I did try out a few ideas for monitoring the job process, but I struggled with where to track consumer processes in order to handle termination upon job completion. It may be a few weeks before I'm able to get back into the weeds on this problem, but it's been a fun problem to hack on.

whatyouhide commented 1 year ago

@type1fool one thing I'm confused about in this approach: why would the consumers be part of the Broadway pipeline in the first place, rather than being external processes?

type1fool commented 1 year ago

If there were a terminate callback, I'd probably put the consumer pids in the pipeline context, then drain them when the callback is invoked.

Making them part of topology is an alternate approach I think.

whatyouhide commented 1 year ago

@josevalim do we want to go ahead with this terminate/3 callback?

@type1fool for example, can you monitor your Postgres producer from the consumers, so that you know when to shut down the consumers? Alternatively, can you monitor the Broadway pipeline itself from the consumers, and shut them down once the pipeline is down and drained?

type1fool commented 1 year ago

@whatyouhide Thank you for following up!

If the producer can broadcast a message to the job pipeline when it's out of messages, the pipeline could trigger its terminate callback. My memory is a bit fuzzy on this - it's been a few months since I was actively building jobs with Broadway - but that seems like it would work that way.

josevalim commented 7 months ago

Closing this one for now because, if you want a terminate or drain, you can also achieve this by starting your own supervision tree, with a terminator process alongside your broadway instance:

Supervisor.start_link([{MyBroadway, MyTerminator}], strategy: :one_for_all)

Then your terminator process traps exits and cleans up on its terminate/2. Pretty much the same way our terminator is implemented.

type1fool commented 7 months ago

Thank you for the follow up.