frictionlessdata / datapackage-pipelines

Framework for processing data packages in pipelines of modular components.
https://frictionlessdata.io/
MIT License
119 stars 32 forks source link

How to change field names? #164

Closed SPTKL closed 5 years ago

SPTKL commented 5 years ago

Hi there, how do I change field names in the pipeline? I know I can copy a column under a new name and delete the old one, is there a rename column function somewhere? Thanks

In order to submit an issue, please ensure you can check the following. Thanks!

cschloer commented 5 years ago

Hey, The developers can correct me if I'm wrong but the only way I've found to rename fields with the standard processor library is to combine add_computed_field and delete field. However, if you want to add a custom processor here's one that I built recently, though unfortunately it uses the old version of injest/spew.

from datapackage_pipelines.wrapper import ingest, spew
from dataflows.helpers.resource_matcher import ResourceMatcher
import logging

logging.basicConfig(
    level=logging.WARNING,
)
logger = logging.getLogger(__name__)

parameters, datapackage, resource_iterator = ingest()

resources = ResourceMatcher(parameters.get('resources'), datapackage)
fields = parameters.get('fields', [])

def modify_datapackage(datapackage_):
    dp_resources = datapackage_.get('resources', [])
    for resource_ in dp_resources:
        if resources.match(resource_['name']):
            for field in fields:
                old_field_name = field['old_field']
                new_field_name = field['new_field']
                datapackage_fields = resource_['schema']['fields']
                for datapackage_field in datapackage_fields:
                    if datapackage_field['name'] == old_field_name:
                        datapackage_field['name'] = new_field_name
                resource_['schema']['fields'] = datapackage_fields

    return datapackage_

def process_resource(rows):
    for row in rows:
        for field in fields:
            old_field_name = field['old_field']
            new_field_name = field['new_field']
            if new_field_name in row:
                raise Exception(f'New field name {new_field_name} already exists in row {row.keys()}')
            value = row[old_field_name]
            row[new_field_name] = value
            del row[old_field_name]
        yield row

def process_resources(resource_iterator_):
    for resource in resource_iterator_:
        spec = resource.spec
        if not resources.match(spec['name']):
            yield resource
        else:
            yield process_resource(resource)

spew(modify_datapackage(datapackage), process_resources(resource_iterator))
SPTKL commented 5 years ago

Thank you!