qiwi / protopipe

Graph-driven data processor
MIT License
3 stars 1 forks source link
utils

protopipe

Graph-driven data processor. qiwi.github.io/protopipe

stability-experimental CI npm (tag) Maintainability Test Coverage CodeStyle

Idea

We often come across the problem of atomic data processing (logwrap, uniconfig, cyclone, etc), and it seems to be useful to make the one pipeline to rule them all. Not universal, not high-performance. But dumb and clear.

TL;DR

Install

  yarn add protopipe

Usage

import {Graph, IAny, ISpace, NetProcessor} from 'protopipe'

const graph = new Graph({
  edges: ['AB', 'BC'],
  vertexes: ['A', 'B', 'C'],
  incidentor: {
    type: 'EDGE_LIST_INCDR',
    value: {
      'AB': ['A', 'B'],
      'BC': ['B', 'C'],
    },
  },
})
const handler = {
  // Default handler
  graph: (space: ISpace): IAny => (NetProcessor.getData(space) || {value: 0}).value * 2,
  // Vertex specific handlers
  vertexes: {
    'B': (space: ISpace): IAny => (NetProcessor.getData(space, 'A') || {value: 10}).value * 3,
  },
}
const protopipe = new NetProcessor({
  graph,
  handler,
})
const space = protopipe.impact(true, ['A', 1]) as ISpace

console.log(NetProcessor.getData(space, 'C').value) // 6
Features
Limitations (of default executor, traverser, etc)

Definitions and contracts

IGraph

export type IGraph = {
  vertexes: Array<IVertex>,
  edges: Array<IEdge>,
  incidentor: IGraphIncidentor
}

export type IGraphRepresentation = any

export type IGraphIncidentor = {
  type: IGraphIncidentorType,
  value: IGraphRepresentation
}

Sync / async

Pass mode flag as the first .impact() argument to get result or promise.

const graph = new Graph({
  edges: ['AB', 'AC', 'BC', 'BD', 'CD', 'AD'],
  vertexes: ['A', 'B', 'C', 'D'],
  incidentor: {
    type: 'EDGE_LIST_INCDR',
    value: {
      'AB': ['A', 'B'],
      'AC': ['A', 'C'],
      'BC': ['B', 'C'],
      'BD': ['B', 'D'],
      'CD': ['C', 'D'],
      'AD': ['A', 'D'],
    },
  },
})
const handler = (space: ISpace) => NetProcessor.requireElt('ANCHOR', space).value.vertex
const netProcessor = new NetProcessor({graph, handler})

// SYNC
const res1 = netProcessor.impact(true,'A') as ISpace 
NetProcessor.getData(res1, 'D') // 'D'

// ASYNC
netProcessor.impact(false,'A').then((res) => {
  NetProcessor.getData(res, 'D') // 'D'
})

Bundles

The lib exposes its inners as ES5, ES6 and TS formats.

Customization

You're able to override everything.