integreat-io / integreat

Node.js integration layer
Other
13 stars 3 forks source link
api data integration integreat

Integreat

An integration layer for node.js.

npm Version Maintainability

The basic idea of Integreat is to make it easy to define how to send data to and receive data from a set of services, and expose them through a well defined interface, abstracting away the specifics of each service.

There are a few concepts that makes this possible:

All configuration is done through basic JSON-friendly structures, and you define your services with different endpoints, mutation pipelines, authentication schemes, etc.

Your configuration is spun up as an Integreat instance. To send and retrieve data, you dispatch actions to your instance and get response objects back. You may define jobs to run simple actions or longer "flows" consisting of several actions with conditions and logic. You may also configure queues to have actions run in sequence or on a later time.

                   ____________________________________________________
                  |                                                   |
                  |                Integreat instance                 |
Action ----|      |                                                   |
           |-> Dispatch <-> Schema <-> Mutation <-> Adapter <-> Transporter <-> Service
Response <-|      |                                                   |
                  |___________________________________________________|

To deal with security and permissions, Integreat has a concept of an ident. Other authentication schemes may be mapped to Integreat's ident scheme, to provide data security from a service to another service or to the dispatched action. A ground principle is that nothing that enters Integreat from an authenticated service, will leave Integreat unauthenticated. What this means, though, depends on how you define your services.

Table of contents

  1. Usage
    1. Install
    2. Basic example
  2. Integreat concepts
    1. Services
    2. Transporters
    3. Adapters
    4. Authenticators
    5. Mutations
    6. Schemas
    7. Actions
    8. Jobs
    9. Queues
    10. Middleware
  3. Debugging

Usage

Install

Requires node v18.

Install from npm:

npm install integreat

You will probably also need some transporters and adapters, and the basic transformers in integreat-transformers.

Basic example

The following is the "hello world" example of Integreat. As most hello world examples, this is a bit too trivial a use case to demonstrate the real usefulness of Integreat, but it shows you the simplest setup possible.

Here, we fetch cat facts from the API endpoint 'https://cat-fact.herokuapp.com/facts', which returns data in JSON and requires no authentication. The returned list of facts are mutated and cast to the fact schema. We only fetch data from the service, and no data is sent to it.

import Integreat from 'integreat'
import httpTransporter from 'integreat-transporter-http'
import jsonAdapter from 'integreat-adapter-json'

const schemas = [
  {
    id: 'fact', // The id of the schema
    shape: {
      // The fields of the type
      id: 'string', // An id field will always be included, but we define it here for readability
      text: 'string', // The text of the cat fact
      createdAt: 'date', // The created date (`createdAt` and `updatedAt` will always be dates)
    },
    access: { allow: 'all' }, // No access restrictions
  },
]

const services = [
  {
    id: 'catfact', // The id of the service
    transporter: 'http', // Use the http transporter
    adapters: ['json'], // Run the request and the response through the json adapter
    options: {
      transporter: {
        // Options for the transporter
        uri: 'https://cat-fact.herokuapp.com/facts', // Only the uri is needed here
      },
    },
    endpoints: [
      {
        match: { action: 'GET', type: 'fact' }, // Match to a GET action for type 'fact'
        mutation: {
          $direction: 'from', // We're mutating data _from_ the service
          // Here we're mutating `response.data` and "setting it back" where we found it ...
          'response.data': [
            'response.data[]',
            {
              $iterate: true, // Mutate each item in an array
              id: '_id', // The id is called `_id` the data from the service
              text: 'text', // text is called `text`
              createdAt: 'createdAt', // Creation date is called `createdAt`
            },
          ],
        },
      },
    ],
  },
]

// Create the Integreat instance from our definitions and provide the
// transporters and adapters we require.
const great = Integreat.create(
  { schemas, services },
  { transporters: { http: httpTransporter }, adapters: { json: jsonAdapter } },
)

// Prepare an action to fetch all cat facts from the service `catfact`
const action = { type: 'GET', payload: { type: 'fact', service: 'catfact' } }

// Dispatch the action and get the response
const response = await great.dispatch(action)

The response object will look like this:

{
   status: 'ok',
   data: [
    {
      id: '58e008780aac31001185ed05',
      $type: 'fact',
      text: 'Owning a cat can reduce the risk of stroke and heart attack by a third.',
      createdAt: new Date('2018-03-29T20:20:03.844Z')
    },
    // ...
  ]
}

Integreat concepts

As mentioned in the introduction, the building blocks of Integreat are services, transporters and adapters, mutation pipelines, and schemas.

Services

A service is the API, database, FTP server, queue, etc. that you want to get data from and/or set data to. We pass on a set of service definitions to Integreat, specifying what transporter, adapters, authentication schemas it requires, in adition to defining the different endpoints available on the service, how they should be called, and how data should be mutated in each case.

We'll get back to the details of all of this in turn, but first we want to highlight how central the concept of a service is to Integreat. Basically, in Integreat "everything is a service". A simple REST/JSON API is a service, a database is a service, and everything external you want to communicate with are services. Want to set up a queue to handle actions one by one? That's a service. Want to cache data in a memory store? That's a service. Want to schedule actions to run on intervals? That's a service to. By simply defining services and their specifics, you may set up a variety of different types of configurations with the same few building blocks. This is very powerful as soon as you get into the right mindset.

Services are configured by service definitions, and tells Integreat how to fetch data from a service, how to mutate this data to schemas, and how to send data back to the service.

The service definition object includes the transporter id, adapter ids, any authentication method, the endpoints for fetching from and sending to the service, mutations that data to all endpoints will pass through, and options for transporters, adapters, etc.

{
  id: <service id>,
  transporter: <transporter id>,
  adapters: [<adapter id>, <adapter id>, ...],
  auth: <auth config>,
  meta: <type id>,
  options: {...},
  mutation: <mutation pipeline>,
  endpoints: [
    <endpoint definition>,
    ...
  ]
}

Service definitions are passed to Integreat on creation through the Integreat.create() function. There is no way to change service defintions after creation.

See mutations for a description of how to define the mutation pipeline for a service.

The auth property should normally be set to the id of an auth definition, if the service requires authentication. In cases where the service is authenticated by other means, e.g. by including username and password in the uri, set the auth property to true to signal that this is an authenticated service. For services accepting incoming actions, auth should be set to an object with { outgoing: <auth id | true>, incoming: <auth id | true>}. To accept several incoming actions, provide an array of <auth id | true>, and they will be run from first to last until one of them returns an ident or an error other than noaccess.

[!NOTE] When connecting to a service for listening, the outgoing auth is used. incoming is only used for validating the actions being dispatched "back" from the service.

In options, you may provide options for transporters and adapters. It is merged with the options object on the endpoint. See the options object for more on this.

Endpoints

A service will have at least one endpoint, but often there will be several. Endpoints are the definitions of the different ways Integreat may interact with a service. You decide how you want to set up the endpoints and what is the right "endpoint design" for a service, but there might be one endpoint for each operation that can be done with a type of data.

For example, let's say you have a simple REST API with blog articles and authors. There will most likely be an endpoint to fetch all (or some) articles, one endpoint for fetching one article by id, one endpoint for creating an article, one for updating an article, and so on. And you'll have similar endpoints for authors, one endpoint for fetching all, one for fetching one by id, one endpoint for creating an author, etc. As this is REST, each endpoint will address a different combination of urls and http verbs (through the transporter).

As another example, you may be accessing a database of articles and authors directly. The configuration details will be very different than for a REST API, but you'll probably have the same endpoints, fetching all articles, fetching one, creating, updating, and the same all over for users. Instead of urls and http verbs, as for REST, these endpoints will address different databases and different database operations (through the transporter).

[!NOTE] This is not to say that Integreat requires you to set up endpoints exactly as described in these examples, it might be that you would like to set up an endpoint that handles many of these cases. The intention here is just to give you an understanding of what an endpoint is in Integreat.

When you dispatch an action, Integreat will figure out what service and what endpoint to send the action to. The target service is often specified in the action payload with the targetService (or shorthand service) property, but if not, the default service of the schema specified with the payload type property, will be used.

The matching to an endpoint is done by finding the endpoint whose match object matches the action with most accuracy. The rules of the endpoint matching is describe in more details below.

Here's the format of an endpoint definition:

{
  id: <endpoint id>,
  match: {
    type: <schema id>,
    scope: <'collection'|'member'|'members'|'all'>,
    action: <action type>,
    params: {...},
    incoming: <boolean>,
    conditions: [...]
  },
  validate: [
    {
      condition: <mutation pipeline>,
      failResponse: <response object>
    }
  ],
  mutate: <mutation pipeline>,
  adapters: [<adapter id>, <adapter id>, ...],
  auth: <auth config>,
  allowRawRequest: <boolean>,
  allowRawResponse: <boolean>,
  castWithoutDefaults: <boolean>,
  options: {...},
}

All of these properties are optional. An empty endpoint defintion object will match anything, pass on the action to the transporter untouched, and relay any response coming back. This might be what you need, but often you'll want to specify a few things:

Match properties

An endpoint may specify none or more of the following match properties:

[!NOTE] There used to be a filters property on the endpoint match object. It is still supported, but it's deprecated and will be removed in v1.1. Please use conditions instead.

[!NOTE] Editor's note: Describe what incoming actions are, and give more details on filters.

There might be cases where several endpoints match an action, and in these cases the endpoint with the highest level of specificity will be used. E.g., for a GET action asking for resources of type entry, an endpoint with both action: 'GET' and type: 'entry' is picked over an endpoint matching all GET actions regardless of type. For params and filters this is decided by the highest number of properties on these objects.

The order of the endpoints in the endpoints list matters only when two endpoints are equally specified with the same match properties specified. Then the first one is used.

When no match properties are set, the endpoint will match any actions, as long as no other endpoints match.

Finally, if an action specifies the endpoint id with the endpoint payload property, this overrides all else, and the endpoint with the id is used regardless of how the match object would apply.

Example service definition with endpoint match object:

{
  id: 'entries',
  transporter: 'http',
  endpoints: [
    {
      match: {
        type: 'entry',
        action: 'GET',
        scope: 'collection',
        params: {
          author: true,
          archive: false
        }
      },
      options: {
        transporter: {
          uri: 'https://example.api.com/1.0/{author}/{type}_log?archive={archive}'
        }
      }
    }
  ],
  // ...
}

Options object

A service defintion may have options object in two places: Direction on the service definition and on any of the endpoints. When an action is sent to an endpoint, the combination of the two options are used. Also, there may be different options for the transporter and for the adapters.

Example of an options object set on the service definition:

{
  id: 'entries',
  options: {
    uri: 'https://ourapi.com/v1',
    transporter: {
      method: 'POST',
      incoming: { port: 3000 }
    },
    adapters: {
      xml: { namespaces: { ... } },
      // ...
    }
  }
}

Any properties set directly on the options object or on a transporter property, are treated as options for the transporter. If there are properties on both the options and a transporter object, they will be merged, with the transporter object having priority if conflicts. This is a shallow merge, so objects used in the options will not be merged.

In the example above, the options passed to the transporter will include uri, method, and incoming.

The incoming object on the transporter options is a bit special, as it holds separate options for transporters that support incoming requests trough the listen() method. If there are incoming objects on both the options and transporter objects, they will be merged, again with priority to the one on the transporter object.

Note that we recommend setting transporter options on the transporter object for clarity, but both will work.

Adapter options may be given in an adapters object, where each adapter may have its own options, set with the id of the adapter as a key. In the example above, the xml adapter will be given the namespaces object. A requirement for this, is that the adapter actually have an id. Adapters provided directly on service definition may not have an id, but all adapters that are referenced by an id, will also be given options set on that id, which is the common behavior.

Finally, when all this sorting have been done on options from both the service definition and an endpoint, the two options structures are merged before being used. Here, the endpoint options take priority, so that you may set a general option on the service, and override it on the endpoint.

Example of endpoint options overriding service options:

{
  id: 'entries',
  options: {
    transporter: {
      uri: 'https://ourapi.com/v1',
      method: 'GET',
    }
  },
  endpoints: [
    {
      match: { ... }
    },
    {
      match: { ... },
      options: {
        transporter: {
          method: 'POST'
        }
      }
    }
  ]
}

Here, the first enpoint will be given method: 'GET', while the next will get method: 'POST'.

Before actions are passed through mutations and finally passed to the transporter, the merged transporter options is set on an options property in the meta object of the action. This way, you may also mutate these options before they reach the transporter.

Service authentication

This definition format is used to authenticate with a service:

{
  id: <id>,
  authenticator: <authenticator id>,
  options: {
    // ...
  },
  overrideAuthAsMethod: <auth-as method>,
}

The authenticator is responsible for doing all the heavy-lifting, based on the options provided in the service authentication definition.

Configuring service metadata

Integreat supports getting and setting metadata for a service. The most common use of this is to keep track of when data of a certain type was last synced.

Some services may have support for storing their own metadata, but usually you set up a dedicated service for storing other services' metadata. A few different pieces goes into setting up a meta store:

When all of this is set up, you activate the metadata on the service the metadata will be stored for, by setting the meta property to the id of the schema defining the metadata fields. The service set on the schema will tell Integreat what service to get and set the metadata from/to.

The schema will look something like this:

{
  id: 'meta', // You may give it any id you'd like and reference it on the `meta` prop on the service
  service: <id of service handling the metadata>,
  shape: {
    <metadataKey>: <type string>,
    // ...
  }
}

To get or set metadata, use GET_META and SET_META with the service you are getting metadata from as the service. Integreat will figure out the rest.

Transporters

A transporter handles all the details of sending and receiving data to and from a service. When dispatching an action to a service, the action will be handled in a relevant manner for the type of service the transporter supports, e.g. sending an http requrest for the HTTP transporter, or doing a query to a database for the MongoDb transporter. Some transporters may also support listening to a service, e.g. the HTTP transporter listing for incoming requests or the MQTT transporter subscribing to events on a topic.

Integreat has transporters for some common cases, and more may come:

You may write your own transporters if your case is not covered by any of these. Documentation on developing transporters are coming.

Integreat will handle the transporters based on you configurations, but there are some specifics to each transporter, like HTTP needing an uri option or MongoDb needing a collection option. See the documentation of each transporter for more.

Adapters

Adapters are working together with transporters to prepare the incoming and outgoing data in accordance with the type of services they support.

As an example, the HTTP transporter will return data from a response as a string, since there is no common way to treat the response body. So for a JSON API, you will configure the JSON adapter to make sure the data from the mutations are sent as a JSON string, and that the JSON comming back from the service is parsed before mutation starts. For a service using XML, you would instead set up the XML adapter, and perhaps also the SOAP adapter, to again stringify and parse the data going back and forth.

The MongoDb transporter, on the other hand, does not require any adapters, as documents from the database will always come as arrays and object, and may be fed directly into the mutation pipelines.

Integreat currently have the following adapters:

You may write your own adapters as well, and documentation on this is coming.

Authenticators

At its simplest, an authenticator will provide necessary credientials to an outgoing action, or an ident to an incoming action. Some authenticators do this based only on the options provided, while others will do a more complex dance with the service or a third-party service, like with OAuth2.

When setting up a service, you may provide it with an auth id that refers to a service authentication definition, that again refers to an authenticator by id. The service auth definition also holds options for the authenticator, so when assigning an auth id to a service, you're assigning it an authenticator with those specific options. Another service may use the same authenticator, but with different options, and you would set this up with a different service authentication definition.

Authentication for outgoing actions are done when sending the action. When authenticated, an auth object is retrieved with the auth-as method specified on the transporter (e.g. asHttpHeaders for the http transporter), or on the overrideAuthAsMethod in auth options if set. The auth object is passed to the transporter on the action meta.auth prop. It is applied just before sending it, though, so it will be available to service middleware, but not to the mutation pipeline. This is done to expose credentials in as few places as possible. If you however want to have the auth object in mutations, set authInData to true on the service or endpoint options, and authentication will be done in the preflightAction step instead, making it available on meta.auth throughout the entire mutation pipeline.

For incoming actions, authentication is done when a listening action calls the authenticate() callback. The validate() method on the authenticator is used here, which will provide the transporter with an authorized ident.

Available authenticators:

Mutations

Both on the service and on endpoints, you define mutation pipelines. The service mutation is run before the endpoint mutation for data coming from a service, and in the oposite order when going to a service.

A nice - but sometimes complicated - thing about mutations, is that they are run in both directions. They are by default defined for mutating data coming from a service, and will be run in reverse for data going to a service. In some cases this reversing of the pipeline will work as expected without modifications -- you define the mutation pipeline for data coming from the service, and the reversed pipeline works to as well. But many times you need to make adjustments and sometimes you'll have to have separate steps based on the direction. We'll get into more details in the following.

A mutation pipeline consists of one or more steps that the data will go through, before coming out on the other in the desired shape. It helps picturing this as an actual pipeline. After each step, data will be in a different shape, and this is the input to the next step.

You define a pipeline in Integreat with an array, although for a pipeline with only one step, you may skip the array for simplicity.

Each step may be one of the following:

Dot notation paths

At its most basic, a dot notation path is just a property key, like content. You may dive into a data structure by adding a key from the next level, separated by a dot, like content.articles. With an object like this:

{
  content: {
    articles: [{ id: '1' }, { id: '2' }],
    authors: [{ id: 'john' }]
  }
}

... the path content.articles will give you the array [{ id: '1' }, { id: '2' }].

You may add brackets to the path to traverse into arrays, e.g. content.articles[0] will give you the object { id: '1' }, and content.articles[0].id will give you '1'.

Empty brackets, like content.articles[] will ensure that you get an array back. If the data at the path is an array, this will return the same as content.articles, but if the path returns an object or a plain value, it will be returned in an array.

When mapping data to a service, the paths are used to reconstruct the data format the service expects. Only properties included in the paths will be created.

Arrays are reconstructed with any object or value at the first index, unless a single, non-negative index is specified in the path.

You may use a carret ^ to go one level up -- to the parent -- in the data (after going down), so after content.articles, the path ^.authors will return [{ id: 'john' }]. Arrays count as one level, so after content.articles[0] you will need to go up twice like so: ^.^.authors.

A double carret ^^ takes you to the top -- the root -- so after content.articles[0].id, ^^.content.authors returns [{ id: 'john' }].

Carret notations -- parents and roots -- does not currently work in reverse, but they might in a future version.

Non-values

The behavior of some transformers are based upon certain values being non-values. E.g. { $alt: [<pipeline 1>, <pipeline 2>] } will use the value from the first pipeline if it returns a value, otherwise the value from the second pipeline, meaning it will check for non-values. By default null, undefined, and '' (empty string) are non-values. By setting the nonvalues param to an array of values in the defintions object you pass to Integreat.create(), you may specify your own non-values.

If you don't want empty string to a non-value, for instance, you do this:

const great = Integreat.create({
  nonvalues: [null, undefined],
  // ... other definitions
})

Schemas

A central idea to Integreat, is that any integration has two sides; the getting of data from one service and the sending of data to another. Instead of setting up an integration directly from A to B, you have a schema in middle, and configure how data from A will be mutated to a schema, and then have data in that schema will be mutated and sent to B.

This is a useful abstraction, and if you ever need to change one side, you can do so without involving the other side. If you need to switch out service B with service C, you can do so without involving the configuration of service A, or you can send data to both B and C, using the same setup for service A.

To be clear, you can setup flows without schemas in Integreat, but then you may loose this flexibility and maintainability.

A schema describe the data you expected to get out of Integreat, or send through it. You basically define the fields and their types, and may then cast data to that shape. Note that data on an action for a specified type, will be automatically cast to that type.

{
  id: <schema id>,
  plural: <the id in plural>,
  service: <the default service for this schema>,
  shape: {
    <fieldId>: <field type>,
    <fieldId>: {
      $type: <field type>,
      default: <default value>
      const: <value that will override any other value>
    },
  },
  access: <access def>
}

The shape of a schema

The shape is defined by an object where each key is the id of a field, which may contain only alphanumeric characters, and may not start with a digit. A schema cannot have the same id as a primitive type (see list below).

The values on this object define the types of the fields and a few other optional features:

{
  $type: <field type>,
  default: <default value>
  const: <value that will override any other value>
}

The $type prop sets the type of the field. The available primitive types, are string, integer, float (or number), boolean, and date. A field may also have another schema as its type, in which case the id of the schema is set in $type. An example can be an article schema with an author field of type user, referring to a schema with id user. When casting the article, data on the author prop will be cast with the user schema.

The default value will be used when the field is undefined, null, or not preset in data object being cast to this schema. If default is set to a function, the function will be run with no argument, and the returned value is used as the default value. When no default is given, undefined is used.

The const value override any value you provide to the field. It may be useful if you want a field to always have a fixed value. Just as for default, you may set it to a function, in which case the function will be run without arguments and the returned value will be used.

If both const and default are set, const will be used.

When only setting the field type, you don't need to provide the entire object, you can just provide the type string.

Example schema:

{
  id: 'article',
  shape: {
    id: 'string', // Not needed, as it is always provided, but it's good to include for clarity
    title: { $type: 'string', default: 'Unnamed article' },
    text: 'string',
    readCount: 'integer',
    archived: { $type: 'boolean', default: false },
    rating: 'float',
    createdAt: 'date',
    updatedAt: 'date'
  },
  access: 'all'
}

Note that if you provide the id field, it should be set to type 'string' or Integreat will throw. The same happens if you set createdAt or updatedAt to anything else than the type 'date'. If you don't include these fields, Integreat will include the id for you, but not createdAt or updatedAt.

Typed data

When data is cast to a schema, the data will be in the following format:

{
  id: <string>,
  $type: <schema>,
  createdAt: <date>,
  updatedAt: <date>,
  <key>: <value>,
  <key>: { id: <string>, $ref: <schema> },
  <key: [{ id: <string>, $type: <schema>, ... }],
  ...
}

Access rules

Set the access property on a schema to enforce permission checking. This applies to any service that provides data in this schema.

The simplest access rule is auth, which means that anyone can do anything with the data of this schema, as long as they are authenticated. Being authenticated, in this context, means that the dispatched action has an ident in the meta object. See the section on idents for more on this.

Example of a schema with an access rule:

{
  id: 'article',
  shape: {
    // ...
  },
  access: 'auth'
}

To signal that the schema really has no need for authorization, use all. This is not the same as not setting the auth prop, as all will override Integreat's principle of not letting authorized data out of Integreat without an access rule. all allows anybody to access the data, even the unauthenticated.

On the other end of the spectrum, none will allow no one to access data cast to this schema, no matter who they are.

For more fine-grained rules, set access to an access definition object with rules telling Integreat which rights to require when performing different actions with a given schema. These rules apply to the idents set on the action meta object.

The following access rule props are available:

In addition, you may override the general access rules of a schema with specific rules for a type of action, by setting an action object with access rules for action types. Here's an example of an access definition for allowing all authorized idents to GET data in a certain shema, requiring the role admin for SETs, and disallowing all other actions with the general rule allow: 'none':

{
  id: 'article',
  shape: {
    // ...
  },
  access: {
    allow: 'none',
    actions: {
      GET: { allow: 'auth' },
      SET: { role: 'admin' }
    }
  }
}

Note that these action specific rules only applies to actions being sent to a service. Some actions will never reach a service, but will instead trigger other actions, and access will be granted or rejected only for the actions that are about to be sent to a service. E.g. when you dispatch a SYNC action, it starts off by dispatching one or more GET actions. The SYNC action is not subjected to any access rules, but the GET actions are, and so the SYNC will fail if one of the GET is rejected.

Another example, intended for authorizing only the ident matching a user:

{
  id: 'user',
  shape: {
    // ...
  },
  access: { identFromField: 'id' }
}

Here, only actions where the ident id is the same as the id of the user data, will be allowed. This means that authenticated users (idents) may only only access their own user data.

Actions

Actions are serializable objects that are dispatched to Integreat. It is a important that they are serializable, as this allows them to, for instance, be put in a database persisted queue and be picked up of another Intergreat instance in another process. Note that Date objects are considered serializable, as they are converted to ISO date strings when needed.

An action looks like this:

{
  type: <action type>,
  payload: <payload object>,
  meta: <meta object>
}

When an action is dispatched, it returns a response object with status, data, error message, etc.

Note that in a mutation pipeline, action handler, or middleware, the response object is provided as a fourth property on the action. You will most likely meet this at least when setting up mutations.

Payload properties

The payload is, together with the action type, a description to Integreat and the service of what to do. A design principle of Integreat has been to have as little specifics in these payload, so actions may be discpatched to service without knowing how the service works. This is not always possible, at least not yet, but it's a good principle to follow, also when you configure services and plan what props need to be sent in the action payload.

You may set any properties on the payload, and they will be be available to you in the service endpoint match and in the service mutations. Some properties have special meanings, though, and you should avoid using them for anything else:

For services that support pagination, i.e. fetching data in several rounds, one page at a time, the following properties may be supported:

[!IMPORTANT] Pagination has to be supported by the service and your service configuration, and sometimes also the transporter. Integreat prepares and passes on these pagination properties, but if the service disregards them, there is little Integreat can do – except limiting the number of items returned. It's up to you to figure out how to configure pagination for a service, but youshould use these pagination properties to support it, to make this predictable. It also lets you use actions such as GET_ALL, that support pagination.

Finally, there are some properties that has no special meaning to Integreat itself, but that may be set on incoming actions from transporters. These should ideally be used in the same way or avoided:

Meta properties

The action meta object is for information about an action that does not directly define the action itself. The difference may be subtle in some cases, but the general rule is a piece of information affects how the action is run, it should be in the payload. E.g. the type of items to fetch is in the payload, while the time the action was dispatched would go in the meta.

This rule does not always hold, e.g. for information on the user dispatching the action in ident on the meta object. Different idents may result in different data being returned from the service, but still the action to perform is the same, so it makes sense to have the ident in the meta object.

You may set your own meta properties, but in most cases you'll probably rather set payload properties.

Current meta properties reserved by Integreat:

Action response

When you dispatch an action, you will get a response object back in this format:

{
  status: <status code>,
  data: <data from the service, usually mutated>,
  error: <error message>,
  warning: <warning message>,
  origin: <code telling where an error originated>
  access: <holds the ident actually being used>,
  paging: <pagination objects>,
  params: <key/value pairs>,
  headers: <key/value pairs>,
  responses: <array of sub-responses when relevant>,
}

[!NOTE] Editor's note: Is it correct that queues return the id in the data?

When the status is queued, the id of the queued action may found in response.data.id. This is the id assigned by the queue, and not necessarily the same as action.meta.id.

Status codes

The status property on the action response will be one of the following status codes:

Origin codes

The origin property is not exclusively defined, but these are some of the more common codes:

Idents

An ident in Integreat is basically an id unique to one participant in the security scheme. It is represented by an object that may also have other properties to describe the ident's access, like roles, or to make it possible to match to identities in other services.

Example ident:

{
  id: 'ident1',
  tokens: ['auth0|12345', 'github|23456'],
  roles: ['admin']
}

Actions are authenticated by setting an ident on the meta.ident property. It's up to the code dispatching an action to get hold of the properties of an ident in a secure way. Once Integreat receives an ident through a dispatch, it will assume this is accurate information and uphold its part of the security agreement and only return data and execute actions that the ident have permissions for.

Note that it's possible to set up the completeIdent middleware for combining information from the authenticator with user information held e.g. in a database.

Available action handlers

GET

Get data from a service. You receive the data on the data property, after it has been run through your service and endpoint mutations.

Example GET action to a collection of data items:

{
  type: 'GET',
  payload: { type: 'article' }
}

By providing an id property on payload, the item with the given id and type is fetched, if it exists:

{
  type: 'GET',
  payload: { type: 'article', id: '12345' }
}

See the section on payload properties for more properties that may be used with the GET action.

GET_ALL

Will run as many GET actions as needed to the get all available pages of data.

The action ...

{
  type: 'GET_ALL',
  payload: { type: 'article', pageSize: 500 }
}

... will dispatch the following action is sequence:

{
  type: 'GET',
  payload: { type: 'article', pageSize: 500 }
}
{
  type: 'GET',
  payload: { type: 'article', pageSize: 500, pageOffset: 500 }
}

... and so on, until we get no more data.

See the section on pagination for more on the paging properties.

SET

Send data to a service. The data to send is provided in the payload data property. Recomended practice is to provide the data as typed data, i.e. data objects cast to a schema, and let mutations on the service endpoint modify it to the format the service expects.

Any data coming back from the service, will be provided on response.data and may be mutated through service endpoint mutations, just as for GET actions.

Example SET action:

{
  type: 'SET',
  payload: {
    type: 'article',
    data: [
      { id: '12345', $type: 'article', title: 'First article' },
      { id: '12346', $type: 'article', title: 'Second article' }
    ]
  }
}

UPDATE

Update data on a service. The idea is that while SET is used for setting data to a service – with no regard to what is actually set in the service already, UPDATE is used for updating data, possibly not overwriting all properties. If UPDATE provides data with only a few properties, the expectation is that only these properties will be updated in the service. The UPDATE action is also expected to fail when the item being updated does not exist, unlike SET, that will usually create it.

Note that the actual behavior is up to how you set up the service and what the service itself supports, but the UPDATE action will provide you with a way of doing this.

An UPDATE action may be handled in one of two ways, where the first is just to run it against a service endpoint, much like a SET action (except it will match different endpoints). Data provided in the payload data is mutated and sent to the service according to the endpoint configuration, and any data coming back, will be provided on response.data and mutated.

What makes UPDATE different from SET, though, is the second way we may handle UPDATE actions. Whenever there is no maching UPDATE endpoint, Integreat will run the action as a GET and then a SET, to mimick and update. The GET action will have the same payload and meta as the original action. The same goes for the SET action, but the payload.data will be the data returned from GET merged with the data on the original UPDATE action. This will be a deep merge, prioritizing properties from the UPDATE action, but any createdAt date in the data from GET will be kept. If there's an updatedAt in the merged data, it will be overriden by the current time.

A requirement for this approach to work as expected, is that the data is casted to the same schema, but that should normally be the case when you use payload.type and don't set allowRawRequest or allowRawResponse on the endpoint.

When a GET fail, the UPDATE will fail with the same status and error.

Example UPDATE action:

{
  type: 'UPDATE',
  payload: {
    type: 'article',
    data: [
      { id: '12345', $type: 'article', title: 'First article' },
      { id: '12346', $type: 'article', title: 'Second article' }
    ]
  }
}

DELETE / DEL

Delete one or more items from a service. Set the data for the items to delete, in the payload data property as an array of typed data. In most cases, you only need to provide the id and the $type, but the way you set up the service may require more properties.

Any data coming back from the service, will be provided on response.data and may be mutated through service endpoint mutations, just as for GET actions.

Example DELETE action:

{
  type: 'DELETE',
  payload: {
    type: 'article',
    data: [
      { id: '12345', $type: 'article' },
      { id: '12346', $type: 'article' }
    ]
  }
}

You may also DELETE one item like this:

{
  type: 'DELETE',
  payload: {
    id: 'ent1',
    type: 'entry'
  }
}

DEL is a shorthand for DELETE.

GET_META

Get metadata for a service. See the section on metadata for how to set this up.

The data of the response from this aciton contains the service (the service id) and meta object with the metadata set as properties.

Example GET_META action:

{
  type: 'GET_META',
  payload: {
    service: 'entries',
    keys: ['lastSyncedAt', 'status']
  }
}

This will return data in the following form:

{
  status: 'ok',
  data: {
    service: 'entries',
    meta: {
      lastSyncedAt: '2017-08-19T17:40:31.861Z',
      status: 'ready'
    }
  }
}

If the action has no keys, all metadata set on the service will be retrieved. The keys property may be an array of keys to retrieve several in one request, or a single key.

SET_META

Set metadata on a service. The payload should contain the service to get metadata for (the service id), and a meta object, with all metadata to set as properties.

Any data coming back from the service, will be provided on response.data and may be mutated through service endpoint mutations, just as for GET actions.

Example SET_META action:

{
  type: 'SET_META',
  payload: {
    service: 'entries',
    meta: {
      lastSyncedAt: Date.now()
    }
  }
}

Note that the service must be set up to handle metadata. See Configuring metadata for more.

RUN

The RUN action will run jobs provided to Integreat.create() in the jobs definitions. These jobs will then run other actions or series of action, also called "flows".

Only one payload property is required – the jobId, which refers to a job in the jobs definitions. Any other properties on the payload will be passed on as input to the job.

An action for running the archiveOutdated job:

{
  type: 'RUN',
  payload: { jobId: 'archiveOutdated' }
}

See the section on jobs for more on how to configure jobs.

SYNC

The SYNC action will GET items from one service and SET them on another. There are different options for how to retrieve items, ranging from a crude retrieval of all items on every sync, to a more fine grained approach where only items that have been updated or created since last sync, will be synced.

The simplest action definition would look like this, where all items would be retrieved from the service and set on the target:

{
  type: 'SYNC',
  payload: {
    type: <item type>,
    retrieve: 'all',
    from: <service id | payload>,
    to: <service id | payload>
  }
}

The action will dispatch a GET action right away, and then immediately dispatch a SET_META action to update the lastSyncedAt date on the service. The SET actions to update the target service is added to the queue if one is configured.

To retrieve only new items, change the retrieve property to updated. In this case, the action will dispatch GET_META to get the lastSyncedAt from the from service, and get only newer items, by passing it the updatedAfter param. The action will also filter out older items, in case the service does not support updatedAfter.

By setting retrieve to created, you accomplish the same, but with createdAfter.

If you need to include more params in the actions to get from the from service or set to the to service, you may provide a params object for the from or to props, with the service id set as a service param. You may also provide different action types than GET and SET, by setting the action prop on the from or to objects respectively.

By default, SYNC will send every item gotten from the from service to the to service. You can split an array of items into several sets, by setting the maxPerSet on the payload object, to a max number of items per set. If you need to have one set per individual item, you may set setMember to true on the payload object. (This is almost the same as setting maxPerSet to 1, except it won't be wrapped in an array.)

[!NOTE] There are more options than these, and the documentation will be updated to include them later.

EXPIRE

The EXPIRE action have two alternative way of operating. If the deleteWithParams param is false or not set, we will first dispatch a GET action to fetch expired data items from a service, and the then dispatch a DELETE action with the retrieved data items. If deleteWithParams is true, we will instead dispatch a DELETE action right away with the same params we would have provided to the GET action.

Here's an example of an EXPIRE action that will dispatch a GET and a DELETE:

{
  type: 'EXPIRE',
  payload: {
    service: 'store',
    type: 'entry',
    endpoint: 'getExpired',
    msFromNow: -24 * 60 * 60 * 1000 // Delete entries older than 24 hours
  }
}

Here's an example of an EXPIRE action that will dispatch a DELETE directly:

{
  type: 'EXPIRE',
  payload: {
    service: 'store',
    type: 'entry',
    deleteWithParams: true
  }
}

The GET action (or the DELETE action when deleteWithParams is true) will have a timestamp property with the current time as microseconds since epoc (Januar 1, 1970 UTC), and isodate as the current time in the extended ISO 8601 format(YYYY-MM-DDThh:mm:ss.sssZ).

To have timestamp and isodate be a time in the future instead, set msFromNow to a positive number of milliseconds. This will be added to the current time. To have a time in the past, use a negative number for msFromNow.

SERVICE

A SERVICE action will be sent directly to the specified service without any intervention by Integreat. This allows for running specialized actions on the service that goes beyond what Integreat supports. It's up to each transporter to support such actions, describe what they'll do, and define their payload properties.

An example of an action that will tell a Bull queue to clean out all completed jobs more than a week old:

{
  type: 'SERVICE',
  payload: {
    type: 'cleanCompleted',
    targetService: 'bullService',
    olderThanMs: 604800000
  }
}

Write your own action handlers

You may write your own action handlers to handle dispatched actions just like the built-in types.

Action handler signature:

async function (action, { dispatch, getService, setProgress, options }) { ... }

Your action handler must return a response object. If your handler just relays to another action handler, it may pass on the response returned from that handler, but in many cases it will be more correct to generate your own response.

You provide your custom actions to Integreat on setup, by providing an object with the key set to the action type your handler will be responsible for, and the handler function as the value:

const actions = {
  `MY_ACTION`: async function myAction (action, { dispatch }) { ... }
}
const great = Integreat.create(defs, { schemas, services, mappings, actions })

Note that if you set up your custom action handler with an action type that is already used by one of Integreat's built-in action handlers, the custom handler will have precedence. So be careful when you choose an action type, if your intention is not to replace an existing action handler.

Jobs

You define jobs to run one or more actions on a schedule or to add additional logic that is not provided by one specific service endpoint. When you dispatch several actions, in sequence or in parallel, we call it a "flow".

A simple job running on a schedule, may look like this:

const syncJob = {
  id: 'syncEntries',
  cron: '0 */1 * * *', // Every hour
  action: {
    type: 'SYNC',
    payload: {
      type: 'entry',
      retrieve: 'updated',
      from: 'entries',
      to: 'store',
    },
  },
}

This will dispatch the given SYNC action every hour. (The SYNC action and cron syntax is out of scope in this section. Fron cron expressions, (Crontab)[https://crontab.cronhub.io] is a good and practical resource.)

An alternative to running a job on a schedule with cron, is to run it by dispatching a RUN action with the job id in the payload jobId param.

A flow may look like this:

const flowJob = {
  id: 'getEntryFromOtherService',
  flow: [
    {
      id: 'getFromStore',
      action: {
        type: 'GET',
        payload: {
          type: 'entry',
          targetService: 'store',
        },
      },
      premutation: {
        payload: {
          $modify: 'payload',
          id: '^^.action.payload.id',
        },
      },
    },
    {
      id: 'getFromOtherService',
      action: {
        type: 'GET',
        payload: {
          type: 'entry',
          targetService: 'otherService',
        },
      },
      premutation: {
        payload: {
          $modify: 'payload',
          id: '^^.getFromStore.response.data.otherId',
        },
      },
    },
  ],
}

Several things are going on here: First, we have a flow with two actions. We imagine here that we are going to fetch an entry with an id that is found in the store service, and use the otherId retrieved from that service to get the entry from otherService. The two steps in flow look a lot like a job, and in one way they are the same, with some differences, that we will get back to. They are run sequentally in the order they appear in the flow array.

Secondly, we have a premutation on each step. This is given the action and may mutate it before it is dispatched. As for endpoint mutations, the top level has $modify: true as default, but we need to modify the sub-objects we include, when that is what we want. In the first step, we set the payload id to the id provided in the action that called this job. This job is passed to mutations on jobs and steps under the name 'action', and we prepend it with the root path (^^) as it is found on the top level of the data structure we're mutating.

The second step is similar, but here we set the payload id to the otherId found in the response data of the first step. The action and the response from a step is available to all following steps by the id of the step, in this case 'getFromStore'. We have to prepend with the root path (^^) here as well. When we say the action and response is available, we find it as an action object with any response on a response property.

With the action { type: 'RUN', payload: { jobId: 'getEntryFromOtherService', id: '12345' } }, the first step will dispatch a GET for the id 12345. If that action succeeds with the data { id: '12345', otherId: '67890' }, the second step will dispatch a GET for the id 67890. The response from the last action is returned by default.

If any job step fails, the entire job will fail and the error will be returned, unless you set up any preconditions, postconditions, or postmutations to alter this default behavior. More on that in the following sections.

Job and step mutations

Jobs and job steps provide two mechanisms for mutations: premutation and postmutation. They work in a similar way, but premutation is used to mutate the action of a job or a job step before it is dispatched, and postmutations is used to mutate the response from a job or a step.

Note that premutation will not have an effect on a job with a flow, but postmutation may be used with the response from a flow.

Both mutation pipelines are passed an object that holds the action and response of every step that has been run so far, set on a property with the same key as the step id. The response is given as a response property on the action. The action that was dispatched to run the job is also included on the 'action' property. (Thus a step may not have the id 'action'.) The object also holds the action of the step on an internal property, and this is the starting point for the mutations. You do not reference the step action directly, just mutate was is given in the pipeline data and use the root path ^^ to get to the actions and responses of other steps.

For both pipelines, an action object is expected, and in the case of postmutation the action should have a response object which is what will be used as the response from the job or step.

Step conditions

By default, if a step in a flow fails, no more steps are run, and the entire job will fail with the response from the failed step. You may however provide your own conditions for when a step should be run and when a step should be regarded as having failed.

preconditions on a step must be an array of condition objects, that must all pass in order for the step to be run. A condition object must have a condition property with an array of mutation pipelines, that must all return truthy for the condition to be regarded as passed. Each pipeline is given the same object as is given to premutation with action and responses from previous steps, but without the action to be dispatched. See the section on mutating jobs for more on this.

The condition object may also have a failResponse property with a response object that will be used as the response from the step if the condition fails.

Finally, the condition object may have a break property set to true, to signal that the entire job should fail if the condition fails. If break is not set, the step will just be skipped and the job flow continues.

[!NOTE] By setting the feature flag breakByDefault to true (on the flags object in the defintions given to Integreat.create()), break will be true by default, and you must set it to false to make the flow continue. This will be the default behavior in the next major version of Integreat, so it's a good idea to set the flag to true already now.

Note that a step has a default pre-condition, that will make it fail and stop the flow if the previous step failed. By specifying your own preconditions, you override this, and only your conditions will be used. But when you set breakByDefault to true (see note above), this default condition will be set in the postconditions instead, so that you may override it there. This way, you may set pre-conditions on a step, whithout overriding the fail-on-error behavior of the step before.

postconditions is also an array of condition objects, but this is used to decide if the step should be regarded as having failed after its action or flow has run. The condition pipelines are passed the same object as postmutation, but after postmutation has been applied. Just as for preconditions, the break property is false by default, so to stop the entire job, set it to true (but see note on breakByDefault above). An error will usually cause the job to fail even with break: false, but the breaking may be handled by the preconditions on the next step, as describe above.

Post-conditions specify what is required for a step to be succeessful, and sometimes you may require a certain error as success, e.g. when you're checking a cache and will only continue if a value is not cached, requiring a notfound response status. The condition pipeline for this should be straight forward, but as you cannot specify the response that will be used when the condition passes, you may wonder what happens with the error response. Integreat will set the status of a passing response to ok if it was an error, and otherwise leave it as is. Also, when changing an error to an ok, any error property will be changed to a warning.

Dispatching several actions by iterating over an array

Sometimes you will want to dispatch several actions based on a data array, e.g. when you have an array of data items, but the relevant endpoint only accepts one data item. This may be done with iterate, which is a special mutation that is must return an array, and the job action will be dispatched once for every item in this array. The item will be set as payload data. premutation may be used to modify the action before it is dispatched as usual, but note that the mutation is applied to every single action, after the iterate, so to speak.

This applies to both a job with an action and a step with an action in a flow.

The responses of each action are combined and set as a response object on the step action (before the iteration), making an iterated step just like any other. When all actions are successful, the response will have status ok, and the response data will be an array of the data from each response in the order they where run. When there are errors, Integreat will use any common status, if possible, otherwise the status will be 'error'. The error string will include all the indidivual errors, separated by a pipe (|). The individual responses will also be available on a responses property on the response object.

Every single iterated action and response will also be available on the step id with an index prefix, e.g. getEntries will have getEntries_0, getEntries_1, etc.

By default, the iterations are run in sequence, but you may run several in parallel by specifying the number of concurrent iterations on the iterateConcurrency property on the job step. The default is 1.

Queues

As everything else in Integreat, a queue is also a service. You configure a queue service, e.g. integreat-transporter-bull, and set its service id on the queueService property of the definition object you give to Integreat.create():

import bullQueue from `integreat-transporter-bull`

const services = [
  {
    id: 'queue',
    transporter: 'bull',
    // ...
  }
]
const transporters = {
  bull: bullQueue
}

const great = Integreat.create(
  { services, queueService: 'queue' },
  { transporters }
)

To queue an action instead of dispatching it right away, you set queue: true on the meta object. If everything is set up correctly, Integreat will push the action to the queue. When the action is later pulled from the queue, it will be dispatched again, but without the queue property.

You may also set the meta queue property to a Unix timestamp, and if the queue transporter supports it, it will be run at this time instead of being processed as soon as it is next in line in the queue.

When a queue is not set up, a dispatched action with queue: true will just be run right away as a normal action.

You may also use queues directly, by dispatching to it as a server and getting incoming actions from its listen() method. In that case, it's just as any other service with no need for any special handling.

[!NOTE] Queueing actions are actually done through an action handler, but this handler is not available from outside Integreat.

Middleware

Integreat supports middleware, and there are two different middleware "pipelines":

To set up a logger of what we recieve from and send to a service, you'll use the second middleware "pipeline", while a logger of dispatched actions would be placed in the first.

When actions pass through middleware, they may modifiy the actions as appropriate, and you will have middleware that modifies (e.g. the completeIdent middleware), and others that just monitors what's coming through (e.g. a logger).

Middelware is passed to Integreat like this:

const great = Integreat.create(
  defs,
  resources,
  [
    // Dispatch middleware
  ],
  [
    // Service middleware
  ],
)

completeIdent middleware

If your access rules are based only on the information received from an authenticator, you don't need the following. You will always get an id and potentially some other fields, like roles.

But when you need to match the ident id from the authenticator with user information held somewhere else, e.g. in a database, you need to configure a user schema and set up a service to fetch this information.

Integreat uses schemas and services to store idents. In the definition object passed to Integreat.create(), you may provide an identConfig property with a definition object looking something like this:

const great = Integreat.create(
  {
    // ...,
    identConfig: {
      type: 'user',
      props: {
        id: 'id',
        roles: 'groups',
        tokens: 'tokens',
      },
    },
  },
  {
    // ...
  },
)

Note that in the example above, the id of the data will be used as the ident id. When the id is not suited for this, you will need another field on the schema that may act as the ident id. In cases where you need to transform the id from the data in some way, this must be set up as a separate field and the mutation will dictate how to transform it. In most cases, the id will do, though.

For some setups, this requires certain endpoints to be defined on the service. To match a token with an ident, the service must have an endpoint that matches actions like this:

{
  type: 'GET',
  payload: {
    type: 'user',
    tokens: 'github|23456'
  }
}

In this case, user is the schema mapped to idents, and the tokens property on the ident is mapped to the tokens field on the schema.

To make Integreat complete idents on actions with the persisted data, set it up with the completeIdent middleware:

const great = Integreat.create(defs, resources, [
  Integreat.middleware.completeIdent,
])

This middleware will intercept any action with meta.ident and replace it with the ident item loaded from the designated schema. If the ident has an id, the ident with this id is loaded, otherwise a withToken is used to load the ident with the specified token. If no ident is found, the original ident is kept.

Writing middleware

You may write middleware to intercept dispatched actions. This may be useful for logging, debugging, and situations where you need to make adjustments to certain actions.

A middleware is a function that accepts a next() function as only argument, and returns an async function that will be called with the action on dispatch. The returned function is expected to call next() with the action, and return the result from the next() function, but is not required to do so. The only requirement is that the functions returns a valid response object.

Example implementation of a very simple logger middleware:

const logger = (next) => async (action) => {
  console.log('Dispatch was called with action', action)
  const response = await next(action)
  console.log('Dispatch completed with response', response)
  return respons
}

Debugging

Run Integreat with env variable DEBUG=great, to receive debug messages.

Some sub modules sends debug messages with the integreat: prefix, so use DEBUG=great,integreat:* to catch these as well.