mara / mara-pipelines

A lightweight opinionated ETL framework, halfway between plain scripts and Apache Airflow
MIT License
2.07k stars 100 forks source link

Issues with _ParallelRead / Redesign adding optional Worker nodes #75

Open leo-schick opened 2 years ago

leo-schick commented 2 years ago

User story

I came across an interesting problem while reading multiple files via a subclass of _ParallelRead: With the current design _ParallelRead loads all files to be processed into RAM, decides which tasks to do first and then starts the parallel reading.

I have a folder with over 1.3 million files which needs to be processed on a cloud storage. It takes ages to get to the point that mara starts reading (and when you have a invalid file, all starts over again...)

In addition, it looks to me that the calculation is inefficient when one would load a lot of files which have different sizes (= different processing times).

Here is what I came up with

I redesigned the base class ParallelTask to support using generic Worker nodes instead of tasks. This is an optional mode which needs to be activated with self.use_workers = True. The Worker nodes will get their commands during runtime of the pipeline (in contrast to the Task node which requires that its commands are defined upfront).

An additional function feed_workers in class ParallelTask can be overloaded. This method is run in a separate process during pipeline execution and yields the commands which are then passed over to the workers. You can eigher yield a single command or a command list. In case you yield a command list, the list is only passed to a single worker. (This is necessary because in some cases you want to execute several commands in order for a single file).

Since the workers now get their files / commands passed on runtime form an "message queue", I expect this logic to work better when many files need to be processed.

This new design does not work for all _ParallelRead execution options, so it is only used when possible.

PR: #74

Some points to note

  1. When the feed_workers function throws an exception all commands already in the queue will be processed by the worker tasks. There is no implementation done to inform the worker nodes that they should stop their work. They will stop when all open commands in the queue picket up.
  2. When a worker node fails, the other workers and the feed woerker process will continue their work until the Queue is empty.

This implementation only works with ReadMode.ALL.