datopian / planner

Plan processing based on spec
MIT License
5 stars 1 forks source link

Planner

Travis Coveralls

Create flows out of specs

Flow Specs

meta:
  owner: <owner username>
  ownerid: <owner unique id>
  dataset: <dataset name>
  version: 1
  findability: <published/unlisted/private>
inputs:
 -  # only one input is supported atm
    kind: datapackage
    parameters:
      descriptor: <datapackage-descriptor>
      resource-mapping:
        <resource-name-or-path>: <resource-url>

processing:
 - # Processing steps that need to be done on sources to get proper data streams
   input: <source-resource-name - e.g. `my-excel-resource`>
   output: <destination-resource-name - e.g. `my-excel-resource-sheet-1`>
   tabulator:
     # These are sample options for tabulator, see its docs for all available options
     headers: 2
     sheet: 1
 -
   input: <source-resource-name - e.g. `my-excel-resource`>
   output: <destination-resource-name - e.g. `my-excel-resource-sheet-1`>
   dpp:
     - run: sort
       parameters:
         resources: resource1
         sort-by: fullname

outputs:
  -
    kind: rdbms  # dump.to_sql
    parameters:
      engine: <tbd, should be the name of a user provided configuration - not the actual connection string>
  -
    kind: sqlite # dump.to_sql
  -
    kind: npm  
    parameters:
      credentials: <tbd, should be the name of a user provided configuration - not the actual credentials>
  -
    kind: zip (dump.to_zip)
    parameters:
        out-file: <name of the file>
  - ... # other output formats
schedule: 'every 1d' # s/m/h/d/w (second -> week)

Processing Flows

Processing flows are part of the 'assembly line' for a single assembled dataset. All processor flows are single pipelines, and generate one or more processing artifacts. These artifacts are later collected to create the final datapackage.

By default, we add 3 processing flows:

Implementing a new processing flow

Usually when adding a new output format or transformation we will add a new flow for that.

In order to create that, we need to implement a class inheriting from BaseProcessingNode, and add that class name to the ORDERED_NODE_CLASSES array (in the correct location).

Each of these classes is provided with some processing artifacts and it must return the new processing artifact(s) that it is knows how to generate. The ProcessingArtifact class represents a single such artifact. It holds:

Under the hood

Each of these artifacts is converted into a pipeline, which always has the following structure:

The pipelines' execution order is maintained by properly setting the dependencies between the different pipelines. This is based on the list of required artifacts that each flow defines.

An aggregating pipeline is also created that assembles the final package, based on all the separate intermediate packages. This aggregating datapackage does not need to process any data, as it copies resources with absolute URL paths (and not relative ones).

Contributing

Please read the contribution guideline:

How to Contribute

Thanks!