machty / ember-concurrency

ember-concurrency is an Ember Addon that enables you to write concise, worry-free, cancelable, restartable, asynchronous tasks.
http://ember-concurrency.com
MIT License
691 stars 155 forks source link

RFC: Cancellable task pipelines #142

Open poteto opened 7 years ago

poteto commented 7 years ago

Summary

Cancellable pipelines are a higher order Task with new derived state and the ability to cancel intermediate tasks while the pipeline is being executed. I have implemented a proof-of-concept that handles regular functions in ember-pipeline.

Motivation

When using tasks in an application, it is a common use case to have a "happy path" series of functions that require some prerequisite before another function can be called. For example:

export default Component.extend({
  requestGeolocation: task(function* () { /* ... */ }),
  fetchStoresInProximity: task(function* () { /* ... */ }),
  sortStoresByDistance: task(function* () { /* ... */ }),
  showSuccessMessage: task(function* () { /* ... */ }),

  getStores: task(function* () {
    let coords = yield get(this, 'requestGeolocation').perform(...args);
    if (coords) {
      let stores = yield get(this, 'fetchStoresInProximity').perform(coords);
      if (stores.length > 0) {
        let sortedStores = yield get(this, 'sortStoresByDistance').perform(stores, coords);
        return yield get(this, 'showSuccessMessage').perform();
      }
      return yield get(this, 'showErrorMessage').perform('No stores around you');
    }
    return yield get(this, 'showErrorMessage').perform('Could not find your location');
  }),

  actions: {
    getStores(...args) {
      return get(this, 'getStores').perform(...args);
    }
  }
});

In the above, it doesn't make sense for the other tasks in getStores to fire if the user has not granted permissions to share their location – hence the need for checking if coordinates were received by the task requestGeolocation.

However, this approach is error prone and leads to an abundance or overuse of conditional logic and other defensive programming techniques that makes tasks and functions harder to debug.

I propose a new kind of ember-concurrency primitive that represents a serial collection of tasks – the pipeline. A pipeline is similar to a task group in the sense that it it also a kind of (higher order) task that has task state. If at any point a task yields a cancellable token, the entire pipeline cancels and derived state for the pipeline reflects that.

This approach means that users will no longer need to program so defensively in order to perform a sequence of tasks that depend on the result of the previous task.

Pipelines are inspired by railway oriented programming and can make complex task chains easier to reason about, and they also provide an opportunity to handle cancellations in a clean way.

Detailed design

Here's what I'm proposing the API will be for a pipeline with a ComputedProperty prototype, along with helper functions step and cancel:

export default Component.extend({
  requestGeolocation: task(function* () {
    let coords = yield getCurrentPosition(); // simple example
    return coords || cancel('Could not find your location');
  }),

  getStores: pipeline(function* () {
    return [
      // yielding a step may not be necessary, unsure
      yield step('requestGeolocation', { foo: '123' }), // accepts options
      yield step(task(function* () { return yield resolve(1 + 1) }), { stepName: '1and1' }), // accepts anonymous functions
      yield step('fetchStoresInProximity'),
      yield step('sortStoresByDistance'),
      yield step('showSuccessMessage')
    ];
  }).drop().onCancel('handleCancel'),

  handleCancel(cancellation) {
    switch (cancellation.stepName) {
      default:
        get(this, 'showErrorMessage').perform(cancellation.reason);
        break;
    }
  })
});

Step

A step is a wrapper around a task that can hold additional metadata and runtime configuration for that task. For example, this could allow for passing extra arguments:

step('myTask', { args: [this.get('user.fullName'), 123] })

When a task is included in the pipeline as a step its concurrency policy is ignored in favor of the pipeline's.

Cancellation

In the above, the first step in the pipeline requestGeolocation can yield/return a special cancel token with an optional reason, if for example the user has declined to share their location.

Once the pipeline receives that token, it aborts the rest of the chain, and passess off the cancellation object to an optional cancel handler. Here, the user will be able to match against the task's name (or other property, for example the cancellation reason) and then handle it explicitly.

Other yieldables

It would also be possible to yield a pause token that returns a function/task that can be used to resume the pipeline at a later point.

Derived state

A pipeline will also have new derived state. For example:

{{pipeline.isRunning}}
{{pipeline.isCancelled}}
{{pipeline.isPaused}}
{{pipeline.currentStep}} <!-- could be used to show current progress in pipeline -->
{{pipeline.successfulSteps}} <!-- could be used to create a progress bar -->
{{pipeline.cancelledSteps}}
{{pipeline.steps}}
{{pipeline.cancellation.reason}}
{{pipeline.cancellation.stepName}}

Composition

Finally, a non-ComputedProperty pipeline could also be composed at runtime. For example, consider the case in a highly dynamic form wizard:

actions: {
  newFormWizard(surveyAnswers) {
    let steps = surveyAnswers
      .filterBy('giveMoreDetail')
      .map((answer) => step(answer.stepName));
    return set(this, 'wizard', new Pipeline(this, steps));
  }
}

And of course, it would not be real composition if you could not also compose pipelines (but this may / may not make implementation very complex):

pipeline1: pipeline(function* () { /* ... */ }),
pipeline2: pipeline(function* () { /* ... */ }),
pipeline3: pipeline(function* () { /* ... */ }),

megaPipeline: pipeline(function* () {
  return [
    step('pipeline1'),
    step('pipeline2'),
    step('pipeline3')
  ]
}).enqueue()

More complete/real-world example

import Ember from 'ember';
import { cancel, task, step, pipeline } from 'ember-concurrency';

const { Component, get } = Ember;

export default Component.extend({
  purchase: pipeline(function* () {
    return [
      step('validateForm'),
      step('processPayment', { 
        args: [this.get('paymentForm'), this.get('currentUser')] 
      }),
      step('clearCart', {
        args: [this.get('cart')]
      }),
      step('redirectToThankYouPage')
    ];
  }).drop().onCancel('handleCancel'),

  /**
   * Validate that the form is valid, for example it could check if the 
   * credit card number and CVV are valid. Because this returns a boolean,
   * and the next step in the pipeline requires the `currentUser` object, we 
   * pass that in via the `step` function above.
   */
  validateForm: task(function* (paymentForm) {
    return true;
  }),

  /**
   * Receives additional arguments from `step`.
   */
  processPayment: task(function* (isValid, paymentForm, currentUser) {
    if (!isValid) {
      return cancel('Your form is invalid');
    }
    return yield paymentForm
      .save({ paidBy: currentUser })
      .catch((reason) => cancel(reason));
  }),

  clearCart: task(function* (payment, cart) {
    let items = payment.get('items');
    return cart.removeItems(items);
  }),

  redirectToThankYouPage: task(function* () {
    return this.get('redirect')();
  }),

  handleCancel(cancellation) {
    switch (cancellation.stepName) {
      default:
        get(this, 'showErrorMessage').perform(cancellation.reason);
        break;
    }
  }
});

How we teach this

Documentation and guides will have to be updated accordingly. I believe that pipelines will be straightforward to teach given enough examples on when to use one. The ComputedProperty form of the pipeline is also very similar in API to a TaskGroup.

Drawbacks

It may not be immediately obvious why one would use a pipeline over a regular task. It will be up to documentation and good examples to showcase their benefits.

Alternatives

As discussed in Slack, a state machine primitive could be more useful and also satisify the same motivations.

Unresolved questions

Should this be a part of ember-concurrency, or could it live standalone as a companion addon?

I may have gotten some of the API wrong, but I think the "spirit" of the implementation should be clear. Please feel free to correct if I have made a mistake!

nucleartide commented 7 years ago

This looks really good. It confused me a lot initially, but it makes sense now (I think). Would I be correct in saying that the main benefits (over a single hard-coded task) are:

  1. additional derived/UI state, and
  2. representing code as data?

So another example would be something like Hipmunk's search page, where upon kicking off a search, there's a dynamic list of airlines that get searched. And that dynamic list of search tasks could be represented as a pipeline?

poteto commented 7 years ago

Yup, those 2 points are exactly it! We could think of a pipeline as a higher order task. In addition, I think regular application and UI logic is also great fit for a pipeline:

For example:

  1. Validate some input
  2. Save the model
  3. Close the modal dialog
  4. Redirect to a thank you page
  5. Show flash message

They're not necessarily data pipelines by any means, but a series of discrete events that must happen in a specific order.

offirgolan commented 7 years ago

@poteto this RFC is super well written. Well done 👏 . This looks super promising.

My single concern is how you would properly handle failures in a pipeline step. It looks as though you have to call cancel with an error message for the cancelOn handler to get called but what if a step gets interrupted, canceled, or even just fails before you get a chance to call cancel?

  1. Would each step have to be wrapped in a try/catch/finally?
  2. Would you need to define the same error message twice (once in the step, once in the cancel handler)?

I think we can maybe extend the step API to define an error message. Just brainstorming here 😅

requestGeolocation: step(task(function* () {
    let coords = yield getCurrentPosition(); // simple example
    return coords || cancel();
  })).onCancel('Could not find your location')

This sort of API can guarantee an error message across multiple failure states.

Cryrivers commented 7 years ago

@offirgolan I would imagine that onCancel takes a function as a parameter instead of an error message. as I think returning an error message when cancelling a task is not enough for real world problems. 😂

poteto commented 7 years ago

Yes I forgot to add to the RFC, in ember-pipeline it also accepts a function. The cancel token can receive a value of any type as well, and isn't limited to a string.

machty commented 7 years ago

When a task is included in the pipeline as a step its concurrency policy is ignored in favor of the pipeline's.

I'd prefer a fast error, maybe a warning

the first step in the pipeline requestGeolocation can yield/return a special cancel token

Technically, it's possible for us EC's task runner to treat a returned or yielded cancel the same way; not sure if it's worth forcing the user to choose one way or the other (I'm sensitive to the fact that I see a lot of return yields in the wild because people aren't really sure what the return/yield semantics are and I think similar things might crop up for this feature).

yield a pause token

Seems like this is something that would/should exist for all TaskInstances in general?

Can you provide an example of how pause is used?

pipeline.isCancelled

For better or for worse the EC codebase has standardized on single L cancelation spelling, so we should stick with that (that said I've been intentionally avoiding having to choose by choosing names like didCancel rather than isCancellllation).

getStores confusion

  getStores: pipeline(function* () {
    return [
      // yielding a step may not be necessary, unsure
      yield step('requestGeolocation', { foo: '123' }), // accepts options
      yield step(task(function* () { return yield resolve(1 + 1) }), { stepName: '1and1' }), // accepts anonymous functions
      yield step('fetchStoresInProximity'),
      yield step('sortStoresByDistance'),
      yield step('showSuccessMessage')
    ];
  }).drop().onCancel('handleCancel'),

Why does the function you pass to pipeline need to be a generator fn (and each step be yielded)? Seems like it should just be a regular fn that defines all the steps in one go just like new Pipeline does for dynamic pipelines?

Also, I wonder if step(task(function* () { return yield resolve(1 + 1) }), { stepName: '1and1' }), // accepts anonymous functions needs more thought; task() returns a ComputedProperty subclass, and we definitely could make step smart enough to look out for those kinds of objects, I just wonder if this is one too many feature to implement for this RFC.

Using pipelines to prevent potential collisions

One nice thing about pipelines that might solve an issue I've encountered:

Say you have two buttons that each fire two separate pipelines that each perform a task on a component before hitting the same task on a Service, which changes some app state; likely, you'll want to prevent one task from being startable of the other is already "on its way" to changing app state. Since pipelines make you declare up front all the "steps" you wish to take, we could potentially use this information to expose an API to make pipeline B unperformable while pipeline A is still running.

The underlying mechanism here, really, is essentially grabbing a lock on each of the tasks you might end up calling. It's more derived state that can be used to coordinate locks on state changes.

General Thoughts

Love the derived state. Definitely agree that regardless of whether we do pipelines or some kind of async FSM, there should be conventions for following a happy path / progression of steps.

I still kinda feel like this should be implemented on top of an fsm primitive? I'm thinking out of the Rails e-commerce platform Spree does it: they use the state_machine gem for stashing the current Order state, but then they let you define a checkout_flow that overlays the states, optionally omitting certain steps in the "pipeline" depending on store configuration.

poteto commented 7 years ago

Thanks for the feedback!

I'd prefer a fast error, maybe a warning

👍

Technically, it's possible for us EC's task runner to treat a returned or yielded cancel the same way; not sure if it's worth forcing the user to choose one way or the other (I'm sensitive to the fact that I see a lot of return yields in the wild because people aren't really sure what the return/yield semantics are and I think similar things might crop up for this feature).

Returning makes the most sense to me, yielding seems semantically incorrect for cancelling a generator.

For better or for worse the EC codebase has standardized on single L cancelation spelling, so we should stick with that (that said I've been intentionally avoiding having to choose by choosing names like didCancel rather than isCancellllation).

I'm not American, but I can live with that I guess...

Why does the function you pass to pipeline need to be a generator fn (and each step be yielded)? Seems like it should just be a regular fn that defines all the steps in one go just like new Pipeline does for dynamic pipelines?

My assumption was that because a pipeline is also a task it would have a similar API (i.e. it expects a generator fn. I suppose it doesn't necessarily need to be a generator, but I wanted to highlight that it would have concurrency policies as well (which currently lives on the TaskProperty)

Also, I wonder if step(task(function* () { return yield resolve(1 + 1) }), { stepName: '1and1' }), // accepts anonymous functions needs more thought; task() returns a ComputedProperty subclass, and we definitely could make step smart enough to look out for those kinds of objects, I just wonder if this is one too many feature to implement for this RFC.

It's probably a topic for another RFC tbqh.

Also re: other derived state, definitely! I think there is much more state we can derive from a pipeline, I'm sure more will emerge after we have some kind of prototype in people's hands.

I think an FSM primitive could handle creating a pipeline, but would it be overkill? I have more thoughts on this, I'll add them in a separate comment

machty commented 7 years ago

@poteto have you had any more thoughts on this?

It might help to add a full example of a pipeline to help visualize how arguments / and return values flow through the pipeline.

What does it look like to convert a task that used to perform(a,b) another task with multiple args into a pipeline? Does the task return an array? Or must the tasks always return an object?

poteto commented 7 years ago

@machty I just updated the RFC with a more "real-world" example (see More complete/real-world example)

The idea is that tasks shouldn't know (or call other tasks) when part of a pipeline. So each task should only return the values (the "subject") that makes sense for that task - for example, a validation task would return true or perhaps an error message, but it might not return the object that it just validated.

These additional arguments should be passed when constructing the pipeline via step, which receives a hash as second arg. That hash has an args key which accepts an array of arguments to be passed to the task when invoked in the pipeline.