qri-io / specs

qri core specifications
0 stars 0 forks source link

Qrimatic: Workflows, triggers, runs & hooks #3

Open b5 opened 3 years ago

b5 commented 3 years ago

Workflows

A Workflow orchestrates a dataset with a transform component to keep the dataset up to date. Workflow triggers define when and how to run a dataset transform. A successful run creates a new dataset version. A failed run records a record of failure to the dataset's oplog, including safe errors like no changes. Hooks define actions to take when a dataset changes. Workflows are stored in the qrimatic scheduler, which delegates runs to a qri instance.

type Workflow struct {
  // idenfitiers
  ID             string     `json:"id"`                   // CID string
  OwnerID        string     `json:"ownerId"`              // user that created this workflow
  Created        *time.Time `json:"created"`              // date workflow was created

  // orchestration state
  Disabled       bool       `json:"disabled"`             // if true, workflow will not generate new run starts
  RunCount       int        `json:"runCount"`             // number of times this workflow has been triggered
  LatestRunStart *time.Time `json:"latestRunStart"`       // time workflow last started

  // configuration
  Triggers       []Trigger  `json:"triggers"`             // things that can initiate a run
  DatasetID      string     `json:"datasetId"`            // dataset target
  RunConfig      RunConfig  `json:"runConfig"`            // configuration flags for save, defines how to run a workflow
  OnComplete     []Hook     `json:"onComplete"`           // things to do after a run executes

  // ephemeral state
  CurrentRun     *RunState         `json:"currentRun,omitempty"` // optional currently executing run
  VersionInfo    dsref.VersionInfo `json:"versionInfo"`     // optional versionInfo of DatsaetID field
}

A workflow is executed when a trigger condition is met. The scheduler loads the dataset, runs the transform, and saves the result, creating a new dataset version.

As a rule, any state that can be represented on target dataset should be stored there. Transforms are not owned by the workflow, and must be loaded from dataset history for execution.

Triggers

Triggers are data structures registered with a scheduler that configure when to run a transform. Each trigger has a unique identifier to distinguish itself as the originator of a run. Triggers are owned by a workflow and cannot move between workflows. It it possible to have multiple triggers of the same type on a workflow (eg: multiple webhook-type triggers with different permissions)

type Trigger struct {
  ID           string // identifier
  WorkflowID   string 

  // trigger configuration
  Type         string
  Config       TriggerConfig

  // orchestration state
  RunCount     int
  LastRunStart *time.Time
  LastExecutionStatus string
}

All triggers fall into one of three conceptual categories: Temporal - fire when a time wall is exceeded External - fire via an inbound remote procedure call Dependency - fire as a result of shifts in the dataset version graph

Triggers change state each time they are called. These state changes can be used to manage noisy triggers with backoffs & rate limits

Runs

Runs represent a single execution of a workflow. The qri instance executing the run emits time-stamped events that are interpreted into the run state of an executing transformation. Runs are executed and owned by a qri instance. Run Identifiers can be constructed by the scheduler & passed to

Deploying a workflow

Deploying a workflow combines two steps into one action:

The only dataset component that can be modified by a deploy is a transform, which is passed directly to qri core. The workflow, on the other hand is completely owned by the scheduler. The payload of a deploy request is split to reflect this distinction:

{
  "transform": {},
  "workflow": {}, 
}

Scheduler event loop

Once a workflow is deployed all triggers are continually checked by the scheduler's event loop:

// loop forever
for {
  for _, t := range ActiveTriggers() {
    if t.Ready() {
      runID := transform.NewRunID()
      w := LoadWorkflow()

      // "executor" goroutine. In the real world this will run on a qri instance:
      go func(runID string, runConfig map[string]interface{}, pk crypto.PrivKey) {
        ds := inst.LoadDataset()
        // pass values from the 
        ds.Commit = &dataset.Commit{
          // in the long run, workflows should have their own private key that
          // the originating user authors a ucan for to enable version creation
          // Author: ...
          Workflow: w.ID, // new "Workflow" field connects any commit to a workflow
          Run: t.ID,      // new "Run" field connects a successful commit to a run identifier
        }

        // Everything in here is delegated to an instance
        res := &lib.RunResponse{}
        _ := instance.Save({
          Apply: true,
          Wait: false,
          Dataset: ds,
        }, res)
      }(ds)

      w.RunCount++
      t.Advance()
      SaveTrigger(t)
      SaveWorkflow(w)
    }
  }

  time.Sleep(time.Second) // pick an interval to check for runs at
}

In this pseudocode the scheduler doesn't know if jobs finish (or run success / failure)

b5 commented 3 years ago

The above doesn't cover run state caching, which is a responsibility the scheduler could also handle by listening to the event bus and modifying a RunState struct based on messages that match a run identifier