frictionlessdata / datapackage-pipelines

Framework for processing data packages in pipelines of modular components.
https://frictionlessdata.io/
MIT License
119 stars 32 forks source link

Datapackage Pipelines

Travis Coveralls PyPI - Python Version

The Basics

What is it?

datapackage-pipelines is a framework for declarative stream-processing of tabular data. It is built upon the concepts and tooling of the Frictionless Data project.

Pipelines

The basic concept in this framework is the pipeline.

A pipeline has a list of processing steps, and it generates a single data package as its output. Each step is executed in a processor and consists of the following stages:

Not every processor needs to do all of these. In fact, you would often find each processing step doing only one of these.

pipeline-spec.yaml file

Pipelines are defined in a declarative way, and not in code. One or more pipelines can be defined in a pipeline-spec.yaml file. This file specifies the list of processors (referenced by name) and the execution parameters for each of the processors.

Here's an example of a pipeline-spec.yaml file:

worldbank-co2-emissions:
  title: CO2 emission data from the World Bank
  description: Data per year, provided in metric tons per capita.
  environment:
    DEBUG: true
  pipeline:
    -
      run: update_package
      parameters:
        name: 'co2-emissions'
        title: 'CO2 emissions (metric tons per capita)'
        homepage: 'http://worldbank.org/'
    -
      run: load
      parameters:
        from: "http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"
        name: 'global-data'
        format: xls
        headers: 4
    -
      run: set_types
      parameters:
         resources: global-data
         types:
           "[12][0-9]{3}":
              type: number
    -
      run: dump_to_zip
      parameters:
          out-file: co2-emissions-wb.zip

In this example we see one pipeline called worldbank-co2-emissions. Its pipeline consists of 4 steps:

Also, we have provided some metadata:

Full JSONSchema of the pipeline-spec.yaml file can be found here

Mechanics

An important aspect of how the pipelines are run is the fact that data is passed in streams from one processor to another. If we get "technical" here, then each processor is run in its own dedicated process, where the datapackage is read from its stdin and output to its stdout. The important thing to note here is that no processor holds the entire data set at any point.

This limitation is by design - to keep the memory and disk requirements of each processor limited and independent of the dataset size.

Quick Start

First off, create a pipeline-spec.yaml file in your current directory. You can take the above file if you just want to try it out.

Then, you can either install datapackage-pipelines locally - note that Python 3.6 or higher is required due to use of Type Hinting and advanced asyncio use:

$ pip install datapackage-pipelines

You should now be able to use the dpp command:

$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}

Alternatively, you could use our Docker image:

$ docker run -it -v `pwd`:/pipelines:rw \
        frictionlessdata/datapackage-pipelines
<available-pipelines>

$ docker run -it -v `pwd`:/pipelines:rw \
       frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>

The Command Line Interface - dpp

Running a pipeline from the command line is done using the dpp tool.

Running dpp without any argument, will show the list of available pipelines. This is done by scanning the current directory and its subdirectories, searching for pipeline-spec.yaml files and extracting the list of pipeline specifications described within.

Each pipeline has an identifier, composed of the path to the pipeline-spec.yaml file and the name of the pipeline, as defined within that description file.

In order to run a pipeline, you use dpp run <pipeline-id>.

You can also use dpp run all for running all pipelines and dpp run dirty to run the just the dirty pipelines (more on that later on).

Deeper look into pipelines

Processor Resolution

As previously seen, processors are referenced by name.

This name is, in fact, the name of a Python script containing the processing code (minus the .py extension). When trying to find where is the actual code that needs to be executed, the processor resolver will search in these predefined locations:

Excluding directories form scanning for pipeline specs

By default .* directories are excluded from scanning, you can add additional directory patterns for exclusion by creating a .dpp_spec_ignore file at the project root. This file has similar syntax to .gitignore and will exclude directories from scanning based on glob pattern matching.

For example, the following file will ignore test* directories including inside subdirectories and /docs directory will only be ignored at the project root directory

test*
/docs

Caching

By setting the cached property on a specific pipeline step to True, this step's output will be stored on disk (in the .cache directory, in the same location as the pipeline-spec.yaml file).

Rerunning the pipeline will make use of that cache, thus avoiding the execution of the cached step and its precursors.

Internally, a hash is calculated for each step in the pipeline - which is based on the processor's code, it parameters and the hash of its predecessor. If a cache file exists with exactly the same hash as a specific step, then we can remove it (and its predecessors) and use that cache file as an input to the pipeline

This way, the cache becomes invalid in case the code or execution parameters changed (either for the cached processor or in any of the preceding processors).

Dirty tasks and keeping state

The cache hash is also used for seeing if a pipeline is "dirty". When a pipeline completes executing successfully, dpp stores the cache hash along with the pipeline id. If the stored hash is different than the currently calculated hash, it means that either the code or the execution parameters were modified, and that the pipeline needs to be re-run.

dpp works with two storage backends. For running locally, it uses a python sqlite DB to store the current state of each running task, including the last result and cache hash. The state DB file is stored in a file named .dpp.db in the same directory that dpp is being run from.

For other installations, especially ones using the task scheduler, it is recommended to work with the Redis backend. In order to enable the Redis connection, simply set the DPP_REDIS_HOST environment variable to point to a running Redis instance.

Pipeline Dependencies

You can declare that a pipeline is dependent on another pipeline or datapackage. This dependency is considered when calculating the cache hashes of a pipeline, which in turn affect the validity of cache files and the "dirty" state:

If the dependency is missing, then the pipeline is marked as 'unable to be executed'.

Declaring dependencies is done by a dependencies property to a pipeline definition in the pipeline-spec.yaml file. This property should contain a list of dependencies, each one is an object with the following formats:

Example:

cat-vs-dog-populations:
  dependencies:
    -
      pipeline: ./geo/region-areal
    -
      datapackage: http://pets.net/data/dogs-per-region/datapackage.json
    -
      datapackage: http://pets.net/data/dogs-per-region
  ...

Validating

Each processor's input is automatically validated for correctness:

Dataflows integration

Dataflows is the successor of datapackage-pipelines and provides a more Pythonic interface to running pipelines. You can integrate dataflows within pipeline specs using the flow attribute instead of run. For example, given the following flow file, saved under my-flow.py:

from dataflows import Flow, dump_to_path, load, update_package

def flow(parameters, datapackage, resources, stats):
    stats['multiplied_fields'] = 0

    def multiply(field, n):
        def step(row):
            row[field] = row[field] * n
            stats['multiplied_fields'] += 1
        return step

    return Flow(update_package(name='my-datapackage'),
                multiply('my-field', 2))

And a pipeline-spec.yaml in the same directory:

my-flow:
  pipeline:
  - run: load_resource
    parameters:
      url: http://example.com/my-datapackage/datapackage.json
      resource: my-resource
  - flow: my-flow
  - run: dump_to_path

You can run the pipeline using dpp run my-flow.

The Standard Processor Library

A few built in processors are provided with the library.

update_package

Adds meta-data to the data-package.

Parameters:

Any allowed property (according to the spec) can be provided here.

Example:

- run: update_package
  parameters:
    name: routes-to-mordor
    license: CC-BY-SA-4
    author: Frodo Baggins <frodo@shire.me>
    contributors:
      - samwise gamgee <samwise1992@yahoo.com>

update_resource

Adds meta-data to the resource.

Parameters:

Example:

- run: update_resource
  parameters:
    resources: ['resource1']
    metadata:
      path: 'new-path.csv'

load

Loads data into the package, infers the schema and optionally casts values.

Parameters:

printer

Just prints whatever it sees. Good for debugging.

Parameters:

set_types

Sets data types and type options to fields in streamed resources, and make sure that the data still validates with the new types.

This allows to make modifications to the existing table schema, and usually to the default schema from stream_remote_resources.

Parameters:

Example:

- run: add_resources
  parameters:
    name: example-resource
    url: http://example.com/my-csv-file.csv
    encoding: "iso-8859-2"
- run: stream_remote_resources
- run: set_types
  parameters:
    resources: example-resource
    types:
      age:
        type: integer
      "yearly_score_[0-9]{4}":
        type: number
      "date of birth":
        type: date
        format: "%d/%m/%Y"
      "social security number": null

load_metadata

Loads metadata from an existing data-package.

Parameters:

Loads the metadata from the data package located at url.

All properties of the loaded datapackage will be copied (except the resources)

Example:

- run: load_metadata
  parameters:
    url: http://example.com/my-datapackage/datapackage.json

load_resource

Loads a tabular resource from an existing data-package.

Parameters:

Loads the resource specified in the resource parameter from the data package located at url. All properties of the loaded resource will be copied - path and schema included.

Example:

- run: load_resource
  parameters:
    url: http://example.com/my-datapackage/datapackage.json
    resource: my-resource
- run: load_resource
  parameters:
    url: http://example.com/my-other-datapackage/datapackage.json
    resource: 1
- run: load_resource
  parameters:
    url: http://example.com/my-datapackage/datapackage.json
    resources:
      my-resource:
        name: my-renamed-resource
        path: my-renamed-resource.csv

concatenate

Concatenates a number of streamed resources and converts them to a single resource.

Parameters:

Example:

- run: concatenate
  parameters:
    target:
      name: multi-year-report
      path: data/multi-year-report.csv
    sources: 'report-year-20[0-9]{2}'
    fields:
      activity: []
      amount: ['2009_amount', 'Amount', 'AMOUNT [USD]', '$$$']

In this example we concatenate all resources that look like report-year-<year>, and output them to the multi-year-report resource.

The output contains two fields:

join

Joins two streamed resources.

"Joining" in our case means taking the target resource, and adding fields to each of its rows by looking up data in the source resource.

A special case for the join operation is when there is no target stream, and all unique rows from the source are used to create it. This mode is called deduplication mode - The target resource will be created and deduplicated rows from the source will be added to it.

Parameters:

Important: the "source" resource must appear before the "target" resource in the data-package.

Examples:

- run: join
  parameters:
    source:
      name: world_population
      key: ["country_code"]
      delete: yes
    target:
      name: country_gdp_2015
      key: ["CC"]
    fields:
      population:
        name: "census_2015"
    full: true

The above example aims to create a package containing the GDP and Population of each country in the world.

We have one resource (world_population) with data that looks like:

country_code country_name census_2000 census_2015
UK United Kingdom 58857004 64715810
...

And another resource (country_gdp_2015) with data that looks like:

CC GDP (£m) Net Debt (£m)
UK 1832318 1606600
...

The join command will match rows in both datasets based on the country_code / CC fields, and then copying the value in the census_2015 field into a new population field.

The resulting data package will have the world_population resource removed and the country_gdp_2015 resource looking like:

CC GDP (£m) Net Debt (£m) population
UK 1832318 1606600 64715810
...

A more complex example:

- run: join
  parameters:
    source:
      name: screen_actor_salaries
      key: "{production} ({year})"
    target:
      name: mgm_movies
      key: "{title}"
    fields:
      num_actors:
        aggregate: 'count'
      average_salary:
        name: salary
        aggregate: 'avg'
      total_salaries:
        name: salary
        aggregate: 'sum'
    full: false

This example aims to analyse salaries for screen actors in the MGM studios.

Once more, we have one resource (screen_actor_salaries) with data that looks like:

year production actor salary
2016 Vertigo 2 Mr. T 15000000
2016 Vertigo 2 Robert Downey Jr. 7000000
2015 The Fall - Resurrection Jeniffer Lawrence 18000000
2015 Alf - The Return to Melmack The Rock 12000000
...

And another resource (mgm_movies) with data that looks like:

title director producer
Vertigo 2 (2016) Lindsay Lohan Lee Ka Shing
iRobot - The Movie (2018) Mr. T Mr. T
...

The join command will match rows in both datasets based on the movie name and production year. Notice how we overcome incompatible fields by using different key patterns.

The resulting dataset could look like:

title director producer num_actors average_salary total_salaries
Vertigo 2 (2016) Lindsay Lohan Lee Ka Shing 2 11000000 22000000
...

filter

Filter streamed resources.

filter accepts equality and inequality conditions and tests each row in the selected resources. If none of the conditions validate, the row will be discarded.

Parameters:

Both in and out should be a list of objects. However, out should only ever have one element.

Examples:

Filtering just American and European countries, leaving out countries whose main language is English:

- run: filter
  parameters:
    resources: world_population
    in:
      - continent: america
      - continent: europe
- run: filter
  parameters:
    resources: world_population
    out:
      - language: english

To filter out by multiple values, you need multiple filter processors, not multiple out elements. Otherwise some condition will always validate and no rows will be discareded:

- run: filter
  parameters:
    resources: world_population
    out:
      - language: english
- run: filter
  parameters:
    resources: world_population
    out:
      - language: swedish

sort

Sort streamed resources by key.

sort accepts a list of resources and a key (as a Python format string on row fields). It will output the rows for each resource, sorted according to the key (in ascending order by default).

Parameters:

Examples:

Filtering just American and European countries, leaving out countries whose main language is English:

- run: sort
  parameters:
    resources: world_population
    sort-by: "{country_name}"

deduplicate

Deduplicates rows in resources based on the resources' primary key

deduplicate accepts a resource specifier - for each resource, it will output only unique rows (based on the values in the primary key fields). Rows with duplicate primary keys will be ignored.

Parameters:

Examples:

Deduplicating rows in the world-population resource.

- run: deduplicate
  parameters:
    resources: world_population

duplicate

Duplicate a resource.

duplicate accepts the name of a single resource in the datapackage. It will then duplicate it in the output datapackage, with a different name and path. The duplicated resource will appear immediately after its original.

Parameters:

Examples:

Filtering just American and European countries, leaving out countries whose main language is English:

- run: duplicate
  parameters:
    source: original-resource
    target-name: copy-of-resource
    target-path: data/duplicate.csv

delete_fields

Delete fields (columns) from streamed resources

delete_fields accepts a list of resources and list of fields to remove

Note: if multiple resources provided, all of them should contain all fields to delete

Parameters:

Examples:

Deleting country_name and census_2000 columns from world_population resource:

- run: delete_fields
  parameters:
    resources: world_population
    fields:
      - country_name
      - census_2000

add_computed_field

Add field(s) to streamed resources

add_computed_field accepts a list of resources and fields to add to existing resource. It will output the rows for each resource with new field(s) (columns) in it. add_computed_field allows to perform various operations before inserting value into targeted field.

Parameters:

Examples:

Following example adds 4 new field to salaries resource

run: add_computed_field
parameters:
  resources: salaries
  fields:
    -
      operation: sum
      target: total
      source:
        - jan
        - feb
        - may
    -
      operation: avg
      target: average
      source:
        - jan
        - feb
        - may
    -
      operation: format
      target: full_name
      with: '{first_name} {last_name}'
    -
      operation: constant
      target: status
      with: single

We have one resource (salaries) with data that looks like:

first_name last_name jan feb mar
John Doe 100 200 300
...

The resulting dataset could look like:

first_name last_name last_name jan feb mar average total status
John Doe John Doe 100 200 300 200 600 single
...

find_replace

find and replace string or pattern from field(s) values

Parameters:

Examples:

Following example replaces field values using regular expression and exact string patterns

run: find_replace
parameters:
  resources: dates
  fields:
    -
      name: year
      patterns:
        -
          find: ([0-9]{4})( \(\w+\))
          replace: \1
    -
      name: quarter
      patterns:
        -
          find: Q1
          replace: '03-31'
        -
          find: Q2
          replace: '06-31'
        -
          find: Q3
          replace: '09-30'
        -
          find: Q4
          replace: '12-31'

We have one resource (dates) with data that looks like:

year quarter
2000 (1) 2000-Q1
...

The resulting dataset could look like:

year quarter
2000 2000-03-31
...

unpivot

Unpivots, transposes tabular data so that there's only one record per row.

Parameters:

Examples:

Following example will unpivot data into 3 new fields: year, direction and amount

parameters:
  resources: balance
  extraKeyFields:
    -
      name: year
      type: integer
    -
      name: direction
      type: string
      constraints:
        enum:
          - In
          - Out
  extraValueField:
      name: amount
      type: number
  unpivot:
    -
      name: 2015 incomes
      keys:
        year: 2015
        direction: In
    -
      name: 2015 expenses
      keys:
        year: 2015
        direction: Out
    -
      name: 2016 incomes
      keys:
        year: 2016
        direction: In
    -
      name: 2016 expenses
      keys:
        year: 2016
        direction: Out

We have one resource (balance) with data that looks like:

company 2015 incomes 2015 expenses 2016 incomes 2016 expenses
Inc 1000 900 2000 1700
Org 2000 800 3000 2000
...

The resulting dataset could look like:

company year direction amount
Inc 2015 In 1000
Inc 2015 Out 900
Inc 2016 In 2000
Inc 2016 Out 1700
Org 2015 In 2000
Org 2015 Out 800
Org 2016 In 3000
Org 2016 Out 2000
...

Similar result can be accomplished by defining regular expressions instead of constant values

parameters:
  resources: balance
  extraKeyFields:
    -
      name: year
      type: integer
    -
      name: direction
      type: string
      constraints:
        enum:
          - In
          - Out
  extraValueField:
      name: amount
      type: number
  unpivot:
    -
      name: ([0-9]{4}) (\\w+)  # regex for original column
      keys:
        year: \\1  # First member of group from above
        direction: \\2  # Second member of group from above

dump_to_sql

Saves the datapackage to an SQL database.

Parameters:

dump_to_path

Saves the datapackage to a filesystem path.

Parameters:

dump_to_zip

Saves the datapackage to a zipped archive.

Parameters:

Deprecated Processors

These processors will be removed in the next major version.

add_metadata

Alias for update_package, is kept for backward compatibility reasons.

add_resource

Adds a new external tabular resource to the data-package.

Parameters:

You should provide a name and url attributes, and other optional attributes as defined in the spec.

url indicates where the data for this resource resides. Later on, when stream_remote_resources runs, it will use the url (which is stored in the resource in the dpp:streamedFrom property) to read the data rows and push them into the pipeline.

Note that url also supports env://<environment-variable>, which indicates that the resource url should be fetched from the indicated environment variable. This is useful in case you are supplying a string with sensitive information (such as an SQL connection string for streaming from a database table).

Parameters are basically arguments that are passed to a tabulator.Stream instance (see the API). Other than those, you can pass a constants parameter which should be a mapping of headers to string values. When used in conjunction with stream_remote_resources, these constant values will be added to each generated row (as well as to the default schema).

You may also provide a schema here, or use the default schema generated by the stream_remote_resources processor. In case path is specified, it will be used. If not, the stream_remote_resources processor will assign a path for you with a csv extension.

Example:

- run: add_resource
  parameters:
    url: http://example.com/my-excel-file.xlsx
    sheet: 1
    headers: 2
- run: add_resource
  parameters:
    url: http://example.com/my-csv-file.csv
    encoding: "iso-8859-2"

stream_remote_resources

Converts external resources to streamed resources.

External resources are ones that link to a remote data source (url or file path), but are not processed by the pipeline and are kept as-is.

Streamed resources are ones that can be processed by the pipeline, and their output is saved as part of the resulting datapackage.

In case a resource has no schema, a default one is generated automatically here by creating a string field from each column in the data source.

Parameters:

Example:

- run: stream_remote_resources
  parameters:
    resources: ['2014-data', '2015-data']
- run: stream_remote_resources
  parameters:
    resources: '201[67]-data'

This processor also supports loading plain-text resources (e.g. html pages) and handling them as tabular data - split into rows with a single "data" column. To enable this behavior, add the following attribute to the resource: "format": "txt".

dump.to_sql

Alias for dump_to_sql, is kept for backward compatibility reasons.

dump.to_path

Saves the datapackage to a filesystem path.

Parameters:

dump.to_zip

Saves the datapackage to a zipped archive.

Parameters:

Note

dump.to_path and dump.to_zip processors will handle non-tabular resources as well. These resources must have both a url and path properties, and must not contain a schema property. In such cases, the file will be downloaded from the url and placed in the provided path.

Custom Processors

It's quite reasonable that for any non-trivial processing task, you might encounter a problem that cannot be solved using the standard library processors.

For that you might need to write your own processor - here's how it's done.

There are two APIs for writing processors - the high level API and the low level API.

High Level Processor API

The high-level API is quite useful for most processor kinds:

from datapackage_pipelines.wrapper import process

def modify_datapackage(datapackage, parameters, stats):
    # Do something with datapackage
    return datapackage

def process_row(row, row_index,
                resource_descriptor, resource_index,
                parameters, stats):
    # Do something with row
    return row

process(modify_datapackage=modify_datapackage,
        process_row=process_row)

The high level API consists of one method, process which takes two functions:

A few examples

# Add license information
from datapackage_pipelines.wrapper import process

def modify_datapackage(datapackage, parameters, stats):
    datapackage['license'] = 'CC-BY-SA'
    return datapackage

process(modify_datapackage=modify_datapackage)
# Add new column with constant value to first resource
# Column name and value are taken from the processor's parameters
from datapackage_pipelines.wrapper import process

def modify_datapackage(datapackage, parameters, stats):
    datapackage['resources'][0]['schema']['fields'].append({
      'name': parameters['column-name'],
      'type': 'string'
    })
    return datapackage

def process_row(row, row_index, resource_descriptor, resource_index, parameters, stats):
    if resource_index == 0:
        row[parameters['column-name']] = parameters['value']
    return row

process(modify_datapackage=modify_datapackage,
        process_row=process_row)
# Row counter
from datapackage_pipelines.wrapper import process

def modify_datapackage(datapackage, parameters, stats):
    stats['row-count'] = 0
    return datapackage

def process_row(row, row_index, resource_descriptor, resource_index, parameters, stats):
    stats['row-count'] += 1
    return row

process(modify_datapackage=modify_datapackage,
        process_row=process_row)

Low Level Processor API

In some cases, the high-level API might be too restricting. In these cases you should consider using the low-level API.

from datapackage_pipelines.wrapper import ingest, spew

if __name__ == '__main__':
  with ingest() as ctx:

    # Initialisation code, if needed

    # Do stuff with datapackage
    # ...

    stats = {}

    # and resources:
    def new_resource_iterator(resource_iterator_):
        def resource_processor(resource_):
            # resource_.spec is the resource descriptor
            for row in resource_:
                # Do something with row
                # Perhaps collect some stats here as well
                yield row
        for resource in resource_iterator_:
            yield resource_processor(resource)

    spew(ctx.datapackage,
         new_resource_iterator(ctx.resource_iterator),
         ctx.stats)

The above code snippet shows the structure of most low-level processors.

We always start with calling ingest() - this method gives us the context, containing the execution parameters, the data-package descriptor (as outputed from the previous step) and an iterator on all streamed resources' rows.

We finish the processing by calling spew(), which sends the processed data to the next processor in the pipeline. spew receives:

A more in-depth explanation

spew writes the data it receives in the following order:

A few examples

We'll start with the same processors from above, now implemented with the low level API.

# Add license information
from datapackage_pipelines.wrapper import ingest, spew

if __name__ == '__main__':
  with ingest() as ctx:
    ctx.datapackage['license'] = 'MIT'
    spew(ctx.datapackage, ctx.resource_iterator)
# Add new column with constant value to first resource
# Column name and value are taken from the processor's parameters
from datapackage_pipelines.wrapper import ingest, spew

parameters, datapackage, resource_iterator = ingest()

datapackage['resources'][0]['schema']['fields'].append({
   'name': parameters['column-name'],
   'type': 'string'
})

def new_resource_iterator(resource_iterator_):
    def resource_processor(resource_):
        for row in resource_:
            row[parameters['column-name']] = parameters['value']
            yield row

    first_resource = next(resource_iterator_)
    yield(resource_processor(first_resource))

    for resource in resource_iterator_:
        yield resource

spew(datapackage, new_resource_iterator(resource_iterator))
# Row counter
from datapackage_pipelines.wrapper import ingest, spew

_, datapackage, resource_iterator = ingest()

stats = {'row-count': 0}

def new_resource_iterator(resource_iterator_):
    def resource_processor(resource_):
        for row in resource_:
            stats['row-count'] += 1
            yield row

    for resource in resource_iterator_:
        yield resource_processor(resource)

spew(datapackage, new_resource_iterator(resource_iterator), stats)

This next example shows how to implement a simple web scraper. Although not strictly required, web scrapers are usually the first processor in a pipeline. Therefore, they can ignore the incoming data-package and resource iterator, as there's no previous processor generating data:

# Web Scraper
import requests
from datapackage_pipelines.wrapper import ingest, spew
from datapackage_pipelines.utilities.resources import PROP_STREAMING

parameters, _, _ = ingest()

host = parameters['ckan-instance']
package_list_api = 'https://{host}/api/3/action/package_list'
package_show_api = 'https://{host}/api/3/action/package_show'

def scrape_ckan(host_):
    all_packages = requests.get(package_list_api.format(host=host_))\
                           .json()\
                           .get('result', [])
    for package_id in all_packages:
      params = dict(id=package_id)
      package_info = requests.get(package_show_api.format(host=host_),
                                  params=params)\
                             .json()\
                             .get('result')
      if result is not None:
        yield dict(
            package_id=package_id,
            author=package_info.get('author'),
            title=package_info.get('title'),
        )

datapackage = {
  'resources': [
    {
      PROP_STREAMING: True,   # You must set this property for resources being streamed in the pipeline!
      'name': 'package-list',
      'schema': {
        'fields': [
          {'name': 'package_id', 'type': 'string'},
          {'name': 'author',     'type': 'string'},
          {'name': 'title',      'type': 'string'},
        ]
      }
    }
  ]
}

spew(datapackage, [scrape_ckan(host)])

In this example we can see that the initial datapackage is generated from scratch, and the resource iterator is in fact a scraper, yielding rows as they are received from the CKAN instance API.

Plugins and Source Descriptors

When writing pipelines in a specific problem domain, one might discover that the processing pipelines that are developed follow a certain pattern. Scraping, or fetching source data tends to be similar to one another. Processing, data cleaning, validation are often the same.

In order to ease maintenance and avoid boilerplate, a datapackage-pipelines plugin can be written.

Plugins are Python modules named datapackage_pipelines_<plugin-name>. Plugins can provide two facilities:

Source Descriptors

A source descriptor is a yaml file containing information which is used to create a full pipeline.

dpp will look for files named <plugin-name>.source-spec.yaml , and will treat them as input for the pipeline generating code - which should be implemented in a class called Generator in the datapackage_pipelines_<plugin-name> module.

This class should inherit from GeneratorBase and should implement two methods:

Example

Let's assume we write a datapackage_pipelines_ckan plugin, used to pull data out of CKAN instances.

Here's how such a hypothetical generator would look like:

import os
import json

from datapackage_pipelines.generators import \
    GeneratorBase, slugify, steps, SCHEDULE_MONTHLY

SCHEMA_FILE = os.path.join(os.path.dirname(__file__), 'schema.json')

class Generator(GeneratorBase):

    @classmethod
    def get_schema(cls):
        return json.load(open(SCHEMA_FILE))

    @classmethod
    def generate_pipeline(cls, source):
        pipeline_id = dataset_name = slugify(source['name'])
        host = source['ckan-instance']
        action = source['data-kind']

        if action == 'package-list':
            schedule = SCHEDULE_MONTHLY
            pipeline_steps = steps(*[
                ('ckan.scraper', {
                   'ckan-instance': host
                }),
                ('metadata', {
                  'name': dataset_name
                }),
                ('dump_to_zip', {
                   'out-file': 'ckan-datapackage.zip'
                })])
            pipeline_details = {
                'pipeline': pipeline_steps,
                'schedule': {'crontab': schedule}
            }
            yield pipeline_id, pipeline_details

In this case, if we store a ckan.source-spec.yaml file looking like this:

ckan-instance: example.com
name: example-com-list-of-packages
data-kind: package-list

Then when running dpp we will see an available pipeline named ./example-com-list-of-packages

This pipeline would internally be composed of 3 steps: ckan.scraper, metadata and dump_to_zip.

Validating Source Descriptors

Source descriptors can have any structure that best matches the parameter domain of the output pipelines. However, it must have a consistent structure, backed by a JSON Schema file. In our case, the Schema might look like this:

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": {
    "name":          { "type": "string" },
    "ckan-instance": { "type": "string" },
    "data-kind":     { "type": "string" }
  },
  "required": [ "name", "ckan-instance", "data-kind" ]
}

dpp will ensure that source descriptor files conform to that schema before attempting to convert them into pipelines using the Generator class.

Providing Processor Code

In some cases, a generator would prefer to provide the processor code as well (alongside the pipeline definition). In order to to that, the generator can add a code attribute to any step containing the processor's code. When executed, this step won't try to resolve the processor as usual but will the provided code instead.

Running on a schedule

datapackage-pipelines comes with a celery integration, allowing for pipelines to be run at specific times via a crontab like syntax.

In order to enable that, you simply add a schedule section to your pipeline-spec.yaml file (or return a schedule from the generator class, see above), like so:

co2-information-cdiac:
  pipeline:
    -
        ...
  schedule:
    # minute hour day_of_week day_of_month month_of_year
    crontab: '0 * * * *'

In this example, this pipeline is set to run every hour, on the hour.

To run the celery daemon, use celery's command line interface to run datapackage_pipelines.app. Here's one way to do it:

$ python -m celery worker -B -A datapackage_pipelines.app

Running this server will start by executing all "dirty" tasks, and continue by executing tasks based on their schedules.

As a shortcut for starting the scheduler and the dashboard (see below), you can use a prebuilt Docker image:

$ docker run -v `pwd`:/pipelines:rw -p 5000:5000 \
        frictionlessdata/datapackage-pipelines server

And then browse to http://<docker machine's IP address>:5000/ to see the current execution status dashboard.

Pipeline Dashboard & Status Badges

When installed on a server or running using the task scheduler, it's often very hard to know exactly what's running and what the status is of each pipeline.

To make things easier, you can spin up the web dashboard to provide an overview of each pipeline's status, its basic info and the result of it latest execution.

To start the web server run dpp serve from the command line and browse to http://localhost:5000

The environment variable DPP_BASE_PATH will determine whether dashboard will be served from root or from another base path (example value: /pipelines/).

The dashboard endpoints can be made to require authentication by adding a username and password with the environment variables DPP_BASIC_AUTH_USERNAME and DPP_BASIC_AUTH_PASSWORD.

Even simpler pipeline status is available with a status badge, both for individual pipelines, and for pipeline collections. For a single pipeline, add the full pipeline id to the badge endpoint:

http://localhost:5000/badge/path_to/pipelines/my-pipeline-id

Or for a collection of pipelines:

http://localhost:5000/badge/collection/path_to/pipelines/

Note that these badge endpoints will always be exposed regardless of DPP_BASIC_AUTH_PASSWORD and DPP_BASIC_AUTH_USERNAME settings.

Integrating with other services

Datapackage-pipelines can call a predefined webhook on any pipeline event. This might allow for potential integrations with other applications.

In order to add a webhook in a specific pipeline, add a hooks property in the pipeline definition, which should be a list of URLs. Whenever that pipeline is queued, starts running or finishes running, all the urls will be POSTed with this payload:

{
  "pipeline": "<pipeline-id>",
  "event": "queue/start/progress/finish",
  "success": true/false (when applicable),
  "errors": [list-of-errors, when applicable]
}

Known Issues