frictionlessdata / datapackage-pipelines

Framework for processing data packages in pipelines of modular components.
https://frictionlessdata.io/
MIT License
117 stars 32 forks source link

DPP_PROCESSOR_PATH does not work with flow #183

Open cschloer opened 4 years ago

cschloer commented 4 years ago

Overview

Python 3.7.5, Ubuntu

If you try to use the flow key instead of run, custom flows saved under a path stored in DPP_PROCESSOR_PATH are not properly loaded. Running simply dpp correctly shows the pipeline as being valid, but when you actually run it you get:

 ModuleNotFoundError: No module named 'bcodmo_pipeline_processors'

recreate:

DPP_PROCESSOR_PATH=~/path/to/processors/

pipeline-spec.yaml:

test:
  title: test
  description: this is a test 
  pipeline:
    - flow: bcodmo_pipeline_processors.load
      parameters:
        format: csv
        from: /path/to/load/from
        name: res

where a load flow is stored under:

/path/to/processors/bcodmo_pipeline_processors/load.py

and the load flow looks like:

from dataflows import Flow, load
from datapackage_pipelines.utilities.resources import PROP_STREAMING, PROP_STREAMED_FROM

def flow(parameters, datapackage, resources, stats):

  def count_resources():
        def func(package):
            global num_resources
            num_resources = len(package.pkg.resources)
            yield package.pkg
            yield from package
        return func

    def mark_streaming(_from):
        def func(package):
            for i in range(num_resources, len(package.pkg.resources)):
                package.pkg.descriptor['resources'][i].setdefault(PROP_STREAMING, True)
                package.pkg.descriptor['resources'][i].setdefault(PROP_STREAMED_FROM,  _from)
            yield package.pkg
            yield from package
        return func

   _from = parameters.pop('from')
    return Flow(
        count_resources(),
        load(_from, parameters),
        mark_streaming(_from),
    )

Tasks