frictionlessdata / datapackage-pipelines

Framework for processing data packages in pipelines of modular components.
https://frictionlessdata.io/
MIT License
119 stars 32 forks source link

Boilerplate functionality for processor modules #15

Open cyberbikepunk opened 7 years ago

cyberbikepunk commented 7 years ago

Overview

ingest and spew are awesome little functions, but I found there was still a lot of boilerplate to do for each processor. I suggest writing some additional wrapper code to deal with with that. I base my suggestions on use-cases I've encountered so far.

Assumptions

I assume that most of the time, you either want to process data row by row and/or mutate the datapackage. Let's talk about row processing first.

Objectives

For row by row processing, the wrapper code would fulfill 2 purposes:

  1. Provide boilerplate functionality

    • Log the processor parameters
    • Force the iteration over a given sample size (useful for debugging)
    • Log a sample output of the processor
    • Handle a chosen set of exceptions
    • Collect stats and process stats
  2. Pass context to the row processor

    • Manage parameters defaults and overrides
    • Pass the parameters to the row processor
    • Pass row and resource indexes to the processor

API

My first attempt at writing code for that resulted in the utility.process function. There's no stats functionality at this stage. The API looks like:

parameters, _, resources = ingest()
new_resources = process(resources, processor_function, parameter_key=value)
spew(resources, datapackage)

My second attempt (see code in progress) is a Processor class with an API along the lines of:

parameters, datapackage, resources = ingest()
processor = Processor(function=processor_function,
                      exceptions=(ValueError),
                      enumerate_rows=True,
                      sample_size=100,
                      datapackage=datapackage,
                      parameter_key1=default_value1,
                      parameter_key2=default_value2)
new_resources = processor.process(resources)
spew(resources, datapackage)

What I would really like to achieve is:

@row_processor
def my_awesome_processor(row):
    # do stuff
    return row, stats

And similarly add datapackage mutation like:

@datapackage_mutator
def my_awesome_mutator(datapackage):
    # do stuff
    return datapackage

@akariv care to comment?

cyberbikepunk commented 7 years ago

Also very useful would be to check that the data is in sync with the schema. When I started writing processors, I thought that I could get away with it, but it causes problems later on. The order of the processors becomes problematic, for example.

cyberbikepunk commented 7 years ago

After a little thinking time, I think that we should go with the explicit solution (import this after all). The decorator solution is a little harder to implement and a little too magic. It smells like the pytest API, which is full of gotchas.