datahq / dataflows

DataFlows is a simple, intuitive lightweight framework for building data processing flows in python.
https://dataflows.org
MIT License
194 stars 39 forks source link

Can't name/rename resources #19

Closed zelima closed 5 years ago

zelima commented 5 years ago

By default, resources are named like following: res_1, res_2 etc.. and paths to the resources look similar res_1.csv, res_2.csv...

As a dataflows user, I want to name/rename resource(s) with choice of mine, so that I'm able to reuse resource(s) and find them by name, or just look nice

Acceptance Criteria

Analysis

I tried to create a custom processor that changes the name of the resource but does not really work.

Option one: modify resource object descriptor:

def name_resource(package):
    package.pkg.resources[0].descriptor['name'] = 'countries'
    package.pkg.resources[0].descriptor['path'] = 'countries.csv'
    package.pkg.resources[0].commit()
    yield package.pkg
    yield from package

f = Flow(
      [{'hello': 'world'}],
      name_resource,
      dump_to_path('data'),
)

This kind of work as the output file is named countries.csv, but nothing is changed inside datapackage.json

$ cat data/nato_countries_official/datapackage.json 
{
  "name": "m-package",
  "resources": [
    {
      "name": "res_1",
      "path": "res_1.csv",
      "profile": "tabular-data-resource",
      "schema": {
        "fields": [
          {
            "format": "default",
            "name": "country_name",
            "type": "string"
          }
        ]
      }
    }
  ]
}

Option 2: modify pkg object descriptor:

def name_resource(package):
    package.pkg.descriptor['resources'][0]['name'] = 'countries'
    package.pkg.descriptor['resources'][0]['path'] = 'countries.csv'
    package.pkg.commit()
    yield package.pkg
    yield from package

f = Flow(
      [{'hello': 'world'}],
      name_resource,
      dump_to_path('data'),
)

This results in the error, thinking the resource is gone at all

Traceback (most recent call last):
  File "flows/run_all.py", line 4, in <module>
    nato_countries_official.process()
  File "/home/.virtualenvs/fedex/lib/python3.6/site-packages/dataflows/base/flow.py", line 15, in process
    return self._chain().process()
  File "/home/.virtualenvs/fedex/lib/python3.6/site-packages/dataflows/base/datastream_processor.py", line 66, in process
    for res in ds.res_iter:
  File "/home/.virtualenvs/fedex/lib/python3.6/site-packages/dataflows/base/datastream_processor.py", line 57, in <genexpr>
    res_iter = (it if isinstance(it, ResourceWrapper) else ResourceWrapper(res, it)
  File "/home/.virtualenvs/fedex/lib/python3.6/site-packages/dataflows/processors/dumpers/dumper_base.py", line 80, in process_resources
    for resource in resources:
  File "/home/.virtualenvs/fedex/lib/python3.6/site-packages/dataflows/base/datastream_processor.py", line 54, in <genexpr>
    res_iter = (ResourceWrapper(get_res(rw.res.name), rw.it)
  File "/home/.virtualenvs/fedex/lib/python3.6/site-packages/dataflows/base/datastream_processor.py", line 57, in <genexpr>
    res_iter = (it if isinstance(it, ResourceWrapper) else ResourceWrapper(res, it)
  File "/home/.virtualenvs/fedex/lib/python3.6/site-packages/dataflows/base/datastream_processor.py", line 31, in process_resources
    for res in resources:
  File "/home/.virtualenvs/fedex/lib/python3.6/site-packages/dataflows/base/datastream_processor.py", line 55, in <genexpr>
    for rw in res_iter)
  File "/home/.virtualenvs/fedex/lib/python3.6/site-packages/dataflows/base/datastream_processor.py", line 51, in get_res
    assert ret is not None
AssertionError
OriHoch commented 5 years ago

this works on latest version:

from dataflows import Flow, update_resource, dump_to_path

f = Flow(
      [{'hello': 'world'}],
      update_resource('res_1', name='countries', path='countries.csv'),
      dump_to_path('data'),
)