thi-ng / umbrella

⛱ Broadly scoped ecosystem & mono-repository of 199 TypeScript projects (and ~180 examples) for general purpose, functional, data driven development
https://thi.ng
Apache License 2.0
3.36k stars 149 forks source link

[transducers, resolve-map, rstream-graph] - xform steps in a graph? #37

Open den1k opened 6 years ago

den1k commented 6 years ago

imports to replicate:

import { Atom } from "@thi.ng/atom/atom";
import * as tx from "@thi.ng/transducers";
import * as txs from "@thi.ng/transducers-stats";
import * as rsg from "@thi.ng/rstream-graph";
import * as rs from "@thi.ng/rstream";

const data = [...tx.range(1, 21)].map(x => ({ date: x, close: x * 10 }));

As wonderful as transducers are the one thing they give mea headache with is nesting. For example, this transducer computes a moving average from a stream of ohlc data and merges it back into the original:

const anaTx = tx.comp(
  tx.multiplexObj({
    identity: tx.identity,
    hma: tx.comp(
      tx.pluck("close"),
      txs.hma(5)
    )
  }),
  tx.map(({ identity, ...all }) => ({ ...identity, ...all }))
);

Now imagine we also wanted to compute the deltas. No problem:

const deltaTx = tx.comp(tx.partition(2, 1), tx.map(([a, b]) => b - a));

const anaTx = tx.comp(
  tx.multiplexObj({
    identity: tx.identity,
    delta: tx.comp(tx.pluck("close"), deltaTx),
    hma: tx.comp(
      tx.pluck("close"),
      txs.hma(5)
    )
  }),
  tx.map(({ identity, ...all }) => ({ ...identity, ...all }))
);

Now say we wanted to compute the change of the delta relative to the current price. For that we need to access both the close price and the delta. To make this work with transducers alone and without recomputing the delta in another transducer, we need to chain another transducer and merge again:

const anaTx = tx.comp(
  tx.multiplexObj({
    identity: tx.identity,
    delta: tx.comp(tx.pluck("close"), deltaTx),
    hma: tx.comp(
      tx.pluck("close"),
      txs.hma(5)
    )
  }),
  tx.map(({ identity, ...all }) => ({ ...identity, ...all })),
  tx.multiplexObj({
    identity: tx.identity,
    deltaChange: tx.map(({close, delta}) => close / delta)
  }),
  tx.map(({ identity, ...all }) => ({ ...identity, ...all })),
);

We're duplicating more and more code. This gets even more complicated when we want to compute more complex values, like averages of averages. Besides the duplication, these are hard to compose. For example, the keys close and delta are hardcoded which would require a higher order function or another transducer to rename them if say low and delta is to be computed. This is where rstream-graph comes in:

const state = new Atom({});

const deltaTx = tx.comp(tx.partition(2, 1), tx.map(([a, b]) => b - a));

const graph = rsg.initGraph(state, {
  ohlc: () => rs.fromIterable(data),
  hma: {
    fn: rsg.node1(txs.hma(5)),
    ins: { src: { stream: "close/node" } }
  },
  delta: {
    fn: rsg.node1(deltaTx),
    ins: { src: { stream: "close/node" } }
  },
  close: {
    fn: rsg.node1(tx.pluck("close")),
    ins: { src: { stream: "ohlc" } }
  },
  deltaChange: {
    fn: rsg.node(tx.map(({ delta, close }) => delta / close)),
    ins: {
      delta: { stream: "delta/node" },
      close: { stream: "close/node" }
    }
  },
  ana: {
    fn: rsg.node(tx.map(({ohlc, ...all}) => ({...ohlc, ...all}))),
    ins: {
      ohlc: { stream: "ohlc" },
      hma: { stream: "/hma/node" },
      delta: { stream: "delta/node" },
      deltaChange: { stream: "/deltaChange/node" }
    }
  }
});

graph.ana.node.subscribe({
  next: x => console.log("result:", x)
});

But the problem is that graph.ana.node.subscribe triggers 4 times for each item, rightfully, once for every subscription. However, we're only interested in the value of the last update per item, since that one will contain the data for all transformations for the item.

Since I care about one result per item, I think what I'm really looking for is something like a tx.multiplexObj + resolve-map's resolve multiplexGraph transducer, within which transducers can reference each other and run in topological dependency order. Does that make sense?

postspectacular commented 6 years ago

I see where you're going here :) You're also right that transducers are not v. suitable (or even meant) for branching dataflows, but they're great for linear pipelines. That very limitation was the initial impetus for both the resolve-map and rstream-graph packages. They're somewhat similar in purpose, but the former is intended for synchronous/one-shot computations, the latter for async/continuous cases...

What you're asking for is already possible, but your example showed me that we need more flexibility and should add another optional arg to the rsg.node() function (which really just is a simple factory wrapper for a rstream StreamSync). That StreamSync stream combinator, by default, only waits for / synchronizes the very first set of all its input values, but then re-triggers each time any input has changed. However, this behavior can be changed by passing an additional reset: true option to synchronize inputs each time...

The problem is that currently the rsg.node() function doesn't allow for this, but you could just roll this yourself and replace your ana node definition with this one here:

ana: {
    fn: (inputs, id) => rs.sync({
        id,
        src: inputs,
        xform: tx.map(({ ohlc, ...all }) => ({ ...ohlc, ...all })),
        reset: true // <-- that's the missing part
    }),
    ins: {
        ohlc: { stream: "ohlc" },
        hma: { stream: "/hma/node" },
        delta: { stream: "delta/node" },
        deltaChange: { stream: "/deltaChange/node" }
    }
}

Even though this is a solution for this particular case, there're others where expecting all inputs to change each time won't work. E.g. in one of my projects I have a rendering node subscribed to different geometry generators, but only some of them change each time/frame and yet I only want to render only once per frame. In this case I use 2 nodes instead of one:

rsg.initGraph(null, {
    .....
    // this node only merges inputs for rendering
    // but executes multiple times (no problem)
    // also creates a transformed / sidechained output
    // which bundles results until next RAF event
    renderInput: {
        fn: rsg.node(tx.map(tx.identity)),
        ins: {
            foo: { stream: "/foo/node" },
            bar: { stream: "/bar/node" },
            baz: { stream: "/baz/node" },
        },
        outs: {
            all: (node) =>
                node.subscribe(
                    rs.sidechainPartition(rs.fromRAF())
                )
        }
    },
    // actual work done by this node
    // note that this will receive an array of results of which we're
    // only interested in the last one...
    renderMain: {
        fn: rsg.node1(tx.map((all) => {
            const { foo, bar, baz } = tx.peek(all);
            // do something
        })),
        ins: {
            all: { stream: "/renderInputs/outs/all" }
        }
    }
}

More info about sidechainPartition()... also there're a few more approaches one could take, but alas not enough time in the day to document them all (right now)

Hope that helps for now!

den1k commented 6 years ago

Just tried it and it works! Feel free to close the issue unless you want to keep it around for the rsg.node() change. Thanks @postspectacular.

postspectacular commented 6 years ago

Great! There's definitely some food for thought here, but since this example conflates quite a few different concepts (time, composability, topology order) I can't think of a more straightforward solution to this at the moment. However, last night I started hacking on a new transducer around resolve-map's resolve(). Let's keep it open & will keep you posted!

postspectacular commented 6 years ago

Hey @den1k - here's a preliminary version of the preliminary-named mapTopo transducer, which I think kinda does what you're after. Similar to the graph setup it takes an object of transducers and their dependencies (other object keys). It then uses dynamic programming to wrap these transducers in a way that a) executes the given transducer in a step-wise manner and b) that resolve can figure out the topological ordering. It then returns a new transducer which combines this all... Here's an updated example:

const tx = require("@thi.ng/transducers");
const txs = require("@thi.ng/transducers-stats");
const rm = require("@thi.ng/resolve-map");

const data = [...tx.range(1, 21)].map(x => ({ date: x, close: x * 10 }));

const deltaTx = tx.comp(tx.partition(2, 1), tx.map(([a, b]) => b - a));

// the new transducer
const mapTopo = (xforms) => {
    const compileXform = ({ins, xform}) => {
        ins = `{${ins.join(",")}}`;
        return new Function("f",`return (${ins})=>f(${ins});`)(tx.step(xform));
    };
    // pre-compile & wrap all xforms
    xforms = tx.transduce(
        tx.map(([id, spec]) => [id, compileXform(spec)]),
        tx.assocObj(),
        tx.pairs(xforms)
    );
    return tx.map((x) => rm.resolve({...x, ...xforms }));
};

const pipe = tx.comp(
    mapResolve({
        delta: {
            ins: ["close"],
            xform: tx.comp(tx.pluck("close"), deltaTx)
        },
        hma: {
            ins: ["close"],
            xform: tx.comp(tx.pluck("close"), txs.hma(5))
        },
        deltaChange: {
            ins: ["delta", "close"],
            xform: tx.map(
                ({ delta, close }) => delta != null ? delta / close : 0
            )
        }
    }),
    // skip first values due to hma() delay
    tx.drop(5)
);

console.log([...tx.iterator(pipe, data)]);

Output:

[ { date: 6,
    close: 60,
    delta: 10,
    hma: 63.33333333333332,
    deltaChange: 0.16666666666666666 },
  { date: 7,
    close: 70,
    delta: 10,
    hma: 73.33333333333334,
    deltaChange: 0.14285714285714285 },
  { date: 8,
    close: 80,
    delta: 10,
    hma: 83.33333333333334,
    deltaChange: 0.125 },
  { date: 9,
    close: 90,
    delta: 10,
    hma: 93.33333333333333,
    deltaChange: 0.1111111111111111 },
 ...
den1k commented 6 years ago

Spotted a bug on second glance:

const pipe = mapTopo({
  delta: {
    ins: ["close"], // <- uses close as ins but it's not defined
    xform: tx.comp(tx.pluck("close"), deltaTx) // <- works because it gets the whole object and plucks close
  },
  hma: {
    ins: ["close"],
    xform: tx.comp(tx.pluck("close"), txs.hma(5))
  },
  deltaChange: {
    ins: ["delta", "close"],
    xform: tx.map(({ delta, close }) => (delta != null ? delta / close : 0))
  }
});

if ins is not defined it should probably be the same as tx.identity on the object, like:

mapTopo({
  date: tx.pluck("date")
});

here the top level xform (when ins is undefined) -> gets the whole object:

And one other idea, a single input as in

mapTopo({
  close: tx.pluck("close"),
  hma: {
    in: "close",
    xform: txs.hma(5),
  }
});

also pondering nesting. for example, I'd like to compute the deltas between the moving averages and then compute the delta change relative to the average closing price. output would look like this:

{
  date: 5,
  close: 50,
  hma: {
    value: 49,
    delta: 1,
    deltaChange: 0.7
  }
}

so then we'd start the whole spiel of topo sort again inside hma

postspectacular commented 6 years ago

Am not entirely sure I follow these latest points. Can you please clarify why close would not be defined here?

delta: {
    ins: ["close"], // <- uses close as ins but it's not defined
    xform: tx.comp(tx.pluck("close"), deltaTx) // <- works because it gets the whole object and plucks close
  },

Also why would you not specify inputs? And why use identity? (your second point). If you want to transform the whole object as input, why not just add another transducer before or after mapTopo()? I don't think it'd be right to turn the latter into this can-do-all swiss army knife...

Related, for nesting we'd have to resort to the more flexible key resolution mechanism supported by resolve-map, i.e. where you use the passed resolve fn arg to look up other keys (which can be anywhere in the object). With the current approach using object destructuring this only works for flat objects. I was planning to rewrite mapTopo using this approach anyhow, since it also works with transpiling to ES5 targets.

As for singular vs. plural input declarations - i know it's a convenience thing popular in the JS world, but I generally tend to shy away from such things for the sake of fewer special cases, lower maintenance & complexity and generally stick to a more uniform approach... IMHO it's not hard to wrap a string into an array literal

den1k commented 6 years ago

In the object passed to mapTopo close is not defined as a transducer. This delta xform declares that it takes close as an input but instead gets the whole object (e.g. {date: 10, close: 50) and plucks close off that before passing it to deltaTx.

That’s exactly the case one can’t specify inputs for, when the input is supposed to be the whole object instead of the result of another transducer.

Re: nesting, sounds good. Curious what you come up with.

Re: singular vs plural, it’s not hard, true, but since we’re dealing with transducers we also don’t have the convenience of object destructuring (unless we compose a map transducer or pluck), so I imagined there’d be a lot of those if singular inputs aren’t a thing. On Fri, Aug 31, 2018 at 05:58 Karsten Schmidt notifications@github.com wrote:

Am not entirely sure I follow these latest points. Can you please clarify why close would not be defined here?

delta: { ins: ["close"], // <- uses close as ins but it's not defined xform: tx.comp(tx.pluck("close"), deltaTx) // <- works because it gets the whole object and plucks close },

Also why would you not specify inputs? And why use identity? (your second point). If you want to transform the whole object as input, why not just add another transducer before or after mapTopo()? I don't think it'd be right to turn the latter into this can-do-all swiss army knife...

Related, for nesting we'd have to resort to the more flexible key resolution mechanism supported by resolve-map, i.e. where you use the passed resolve fn arg to look up other keys (which can be anywhere in the object). With the current approach using object destructuring this only works for flat objects. I was planning to rewrite mapTopo using this approach anyhow, since it also works with transpiling to ES5 targets.

As for singular vs. plural input declarations - i know it's a convenience thing popular in the JS world, but I generally tend to shy away from such things for the sake of fewer special cases, lower maintenance & complexity and generally stick to a more uniform approach... IMHO it's not hard to wrap a string into an array literal

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/thi-ng/umbrella/issues/37#issuecomment-417616100, or mute the thread https://github.com/notifications/unsubscribe-auth/AEb7InhwRm6eFiv2dz2pTVDNZ2KtfJa7ks5uWQjSgaJpZM4WSiiU .

postspectacular commented 6 years ago

It's actually not all quite like that at the moment. The compileXform converts a spec like this:

{ ins: ["foo", "bar"], xform: tx.map(({foo,bar}) => foo + bar) }

into a closure like this:

((fn) => ({foo, bar}) => fn({foo, bar}))(tx.step(spec.xform))

So you're right, there's always an object being passed to that transducer, but:

1) that object only contains the keys you specified as ins in the spec and 2) that fn above is not a transducer anymore, but a single step executor of a transducer

So now that I understand what you meant :) it does make more sense to have a different behavior for single input transducers, since we could then compile thes into a function like:

((fn) => ({foo}) => fn(foo))(tx.step(spec.xform))

The ({foo}) arg list is still needed for resolve-map to understand that this function depends on foo. Alternatively, if we switch to the legacy resolution mechanism it would become:

((fn) => ($) => fn($("foo")))(tx.step(spec.xform))

That makes more sense now... will post update later!

den1k commented 5 years ago

Updated the mapTopo to pass the value for single keys ins as the only argument to the xform

const resolveTx = xforms => {
  const compileXform = ({ ins, xform }) => {
    const resolveObj = `{${ins.join(",")}}`;
    const args = ins.length == 1 ? ins[0] : resolveObj;
    return new Function("f", `return (${resolveObj}) => f(${args})`)(
      tx.step(xform)
    );
  };
  // pre-compile & wrap all xforms
  xforms = tx.transduce(
    tx.map(([id, spec]) => [id, compileXform(spec)]),
    tx.assocObj(),
    tx.pairs(xforms)
  );
  return tx.map(x => rm.resolve({ ...x, ...xforms }));
}

https://beta.observablehq.com/@den1k/resolvetx

Also tried figuring out how to avoid executing a transduction step when some of its consumed values are undefined. For example, the deltaChangeRatio has to check whether the delta is defined to avoid erroring.

deltaChangeRatio: {
      ins: ["delta", "close"],
      xform: tx.map(({ delta, close }) => (delta != null ? delta / close : 0)) // <------
    }

This is another point where behavior akin to rsg.sync with reset: true would be nice.