openspending / datapackage-pipelines-fiscal

Fiscal Data Package extensions to Datapackage Pipelines
MIT License
3 stars 6 forks source link

[#1222] Revert back to using a single pipeline #4

Closed vitorbaptista closed 6 years ago

vitorbaptista commented 6 years ago

We tried using 3 different pipelines as a way to guarantee that their processors would run only when the previous were finished. Unfortunately, this caused another issue.

Consider that we have pipelines A and B, where B depends on A. Some processor in pipeline A saves a datapackage to the local filesystem, for B to process further. Now, os-data-importers is deployed, it runs pipeline A succesfully, which writes the datapackage to the filesystem. Then, before B is able to execute, the os-data-importers is re-deployed. As its filesystem isn't persistent, all of its files are lost, including the datapackage written by A.

Later, the new deployment finishes. The datapackage-pipeline will see that A was successful, and wasn't modified, so doesn't run it again, and go on to run B. This is where we have problems. B depends on the zip file created by A, but that file doesn't exist anymore, so B raises an exception and stops.

This is where we get stuck. The datapackage-pipeline isn't aware of this zip file dependency of B, so it doesn't know it has to trigger A again.

The solution for this was to get them all to the same pipeline. However, we now have to find a way to run a processor only after all previous processors were finished. The usual way we've been doing is by simply calling spew(), and then running the processor's code. This don't work for us because we have more than one processor that needs to run after everybody before it has finished. Bear with me, because this is a bit complicated.

Let's consider a pipeline with 2 processors:

  1. modify_datapackage_and_write_to_zip
  2. fiscal.upload

The code for the first processor looks like:

from datapackage_pipelines.wrapper import ingest, spew

parameters, datapackage, resource_iterator = ingest()
spew(datapackage, resource_iterator)  # Pass the dp and resources as is

run_and_save_to('datapackage.zip')  # Run this processor and save DP to path

Notice that, in the end, it saves a datapackage to the local filesystem. This datapackage is used by the next processor, fiscal.upload. Its code would look like:

from datapackage_pipelines.wrapper import ingest, spew

parameters, datapackage, resource_iterator = ingest()
spew(datapackage, resource_iterator)  # Pass the dp and resources as is

upload_datapackage('datapackage.zip')  # This came from the previous processor

The problem is that this spew() technique only guarantees that no previous task will be ingesting data that's passing through the pipeline. However, we're here using data outside of the pipeline. It's possible that upload_datapackage() starts running before run_and_save_to() has finished. We need to block the following processors until the entirety of the previous processors has run, even code outside spew().

The solution for this was a hack. You can see the code in helpers.run_after_yielding_elements(). It receives an iterator, yields all of its elements, but before raising StopIteration, it runs a callback. Its code is very short:

def run_after_yielding_elements(resource_iterator, callback):
    while True:
        try:
            yield next(resource_iterator)
        except StopIteration:
            callback()
            raise

This allow us to block spew() after it has received all resources (so we guarantee that datapackage's data passed through our processor).

This is a hack, and ideally we wouldn't have to write it. However, this is the simplest solution I found without having to modify datapackage-pipelines itself.

Fixes openspending/openspending#1222