frictionlessdata / datapackage-pipelines

Framework for processing data packages in pipelines of modular components.
https://frictionlessdata.io/
MIT License
117 stars 32 forks source link

Custom processors with new low level API do not work #163

Closed cschloer closed 4 years ago

cschloer commented 5 years ago

Hi again :) let me know if you have a more preferable method of submitting these issues.

Running ubuntu 18.04 python 3.6.5

I am unable to run custom processors with the new low-level-api (with spew() as ctx). I've narrowed it down to a pretty basic situation:

pipeline-spec.yaml:

test:
  description: test
  title: test
  pipeline:
  - run: load
    parameters:
      from: SOME_KIND_OF_DATASET_URL
      name: default
  - run: bugtest.test

test.py (located in $CUSTOM_PROCESSOR_PATH/bugtest/) (this is copied directly from the docs

from datapackage_pipelines.wrapper import ingest, spew

if __name__ == '__main__':
  with ingest() as ctx:

    # Initialisation code, if needed

    # Do stuff with datapackage
    # ...

    stats = {}

    # and resources:
    def new_resource_iterator(resource_iterator_):
        def resource_processor(resource_):
            # resource_.spec is the resource descriptor
            for row in resource_:
                # Do something with row
                # Perhaps collect some stats here as well
                yield row
        for resource in resource_iterator_:
            yield resource_processor(resource)

    spew(ctx.datapackage,
         new_resource_iterator(ctx.resource_iterator),
         ctx.stats)

And I've set the DPP_PROCESSOR_PATH to the aforementioned $CUSTOM_PROCESSOR_PATH

When I run the pipeline, which really shouldn't do anything special (just load the data and then run an empty processor) it fails with this error:

ERROR log from processor bugtest.test:
+--------
| ERROR   :Expected to see 1 resource(s) but spewed 0
| ERROR   :Traceback (most recent call last):
| ERROR   :File "/home/conrad/Projects/whoi/pipeline-generator/bcodmo_pipeline/processors/bugtest/test.py", line 26, in <module>
| ERROR   :ctx.stats)
| ERROR   :File "/home/conrad/.envs/pipeline-server/lib/python3.6/site-packages/datapackage_pipelines/wrapper/wrapper.py", line 181, in __exit__
| ERROR   :spew(self.datapackage, self.resource_iterator, stats=self.stats)
| ERROR   :File "/home/conrad/.envs/pipeline-server/lib/python3.6/site-packages/datapackage_pipelines/wrapper/wrapper.py", line 95, in spew
| ERROR   :assert num_resources == expected_resources
| ERROR   :AssertionError

To be clear, this also happens with a more complicated custom processor that actually manipulates the data.

roll commented 5 years ago

@akariv @OriHoch Could you please take a look

roll commented 4 years ago

Hi @cschloer is this one still actual?