ReactiveX / IxJS

The Interactive Extensions for JavaScript
https://reactivex.io/IxJS/
MIT License
1.32k stars 73 forks source link

pipe() should work recursively, and the free standing pipe() should work inside the pipe chain method #326

Closed buzzware closed 8 months ago

buzzware commented 3 years ago

pipe() can be used as a chain method with a series of operators from([1,2,3]).pipe(map(x=>x*3),filter(x=>x%2==0)); but I would like to dynamically build my pipeline before using it. To do that I would like to have operator() methods that return one or more operators. It would make sense then if returning a pipe of operators would qualify as returning an operator.

function operator1() { return pipe(map(x=>x*3),filter(x=>x%2==0)); // this doesn't work, because pipe() can't be used inside pipe below }

function operator2() { return map(x=>x+100); }

from([1,2,3]).pipe(operator1(),operator2());

If this can't be supported, then it should throw an error. Currently it behaves in unexpected ways.

trxcllnt commented 3 years ago

Does this not work?

function operator1(source) {
  return source.pipe(map(x => x * 3), filter(x => x % 2 == 0));
}

function operator2(source) {
  return map(x => x + 100)(source);
}

from([1,2,3]).pipe(operator1, operator2);
buzzware commented 3 years ago

@trxcllnt yes that does work. I suppose I could either 1) just use your operator1 style, and still use pipe for a single operator 2) have the abstract operator function return either op() or [opA(),opB()] and use lodash flatten(castArray(operator())) the array before calling pipe(...operators).

I don't find your operator2 style intuitive. But if I was to adopt that, I would expect pipe(map(x => x + 100))(source) to work, but it has to be pipe(source,map(x => x + 100))

If you're not going to have chaining operators on the source object, then being able to use standalone pipe() recursively within chaining pipe() would make a smooth experience.

For now, this achieves what I need :

const { from, pipe } = require('ix/asynciterable');
const { map,filter } = require('ix/asynciterable/operators');
const _ = require('lodash');

function joinOps(...ops) {
  return _.flatten(ops);
}

(async function() {
  try {

    function operator1() {
      return [
        map(x => x * 3),
        filter(x => x % 2 != 0)
      ];
    }

    function operator2() {
      return map(x => x + 100);
    }

    function logger(data) {
      console.log(data);
      return data;
    }

    try{
      let operators = [operator1,operator2];
      operators = _(operators).map(op => op()).flatten().value();
      let result = await from([1,2,3]).pipe(...operators).forEach(logger);
      console.log('done');
    }
    catch(err) {
      console.error('pipeline failed with error:', err);
    }

    console.log('finished');
  } catch(e) {
    console.log(e);
  } finally {
    process.exit();
  }
})();
trxcllnt commented 3 years ago

The style I described is how all the Ix operators are implemented; functions which return functions that take a source iterable and return a result iterable. pipe() is a specialization of left-fold over a list of functions with the signature (source: TSource) => TResult.

It sounds like you want a curried version of pipe, which is valid but straightforward to write yourself:

// could also be:
// const compose = (...ops) => (source) => source.pipe(...ops);
const compose = (...ops) => (source) => ops.reduce((source, op) => op(source), source);

function operator1() {
  return compose(map(x => x * 3), filter(x => x % 2 == 0));
}

function operator2() {
  return map(x => x + 100);
}

from([1,2,3]).pipe(operator1(), operator2());
buzzware commented 3 years ago

Thanks for that, I haven't had enough functional experience to write that quickly. Yes, your compose() is what I was expecting from the standalone pipe(). This can't be a special need - could you add something like compose to the library?