thi-ng / umbrella

⛱ Broadly scoped ecosystem & mono-repository of 199 TypeScript projects (and ~180 examples) for general purpose, functional, data driven development
https://thi.ng
Apache License 2.0
3.36k stars 149 forks source link

[transducers] Multiplex while retaining most recent values #490

Closed nonphoto closed 3 weeks ago

nonphoto commented 4 weeks ago

Consider this example:

console.log([...iterator(
  multiplex(
    comp(
      take(2),
      map((t) => "a" + t),
    ),
    comp(
      take(3),
      map((t) => "b" + t),
    )
  ),
  range(0, 4)
)])

This will log:

[["a0","b0"],["a1","b1"],[undefined,"b2"],[undefined,undefined]]

How would I use the existing transducer functions to get the following output instead? The most recently yielded value should be retained instead of undefined, and the iterator should end after the last multiplexed transducer has ended.

[["a0","b0"],["a1","b1"],["a1","b2"]]
postspectacular commented 4 weeks ago

Hi @nonphoto 👋 One way to achieve this behavior is via partitionSync(), which basically groups input values by an user defined key function and then emits object-tuples. It only starts yielding results once all defined inputs/groups/keys have delivered at least one value...

sync stream diagram

import {
    comp,
    multiplex,
    take,
    map,
    flatten1,
    keep,
    partitionSync,
    iterator,
    range,
    trace,
} from "@thi.ng/transducers";

const xform = comp(
    // your existing multiplex
    multiplex(
        comp(
            take(2),
            map((x) => "a" + x)
        ),
        comp(
            take(3),
            map((x) => "b" + x)
        )
    ),
    // now flatten each tuple
    // (i.e. for each [`a1`, `b1`], yield `a1`, `b1` individually, in series...)
    flatten1(),
    // remove any nullish/undefined values
    keep(),
    // now form tuples of, e.g. `{ a: "a1", b: "b2" }`
    // the `reset=false` option ensures to keep the previous values of each lane around
    partitionSync(["a", "b"], { key: (x) => x[0], reset: false }),
    // transform object back into `["a1", "b2"]` (optional)
    map(({ a, b }) => [a, b])
);

const result = [...iterator(xform, range(5))];

console.log(result);
// [
//     [ "a0", "b0" ], [ "a1", "b0" ], [ "a1", "b1" ], [ "a1", "b2" ]
// ]

If you want to analyze more of these intermediate stages of that transformation pipeline, use the trace()transducer to output interim results, e.g. like so:

const xform = comp(
    // your existing multiplex
    multiplex(
        comp(
            take(2),
            map((x) => "a" + x)
        ),
        comp(
            take(3),
            map((x) => "b" + x)
        )
    ),
    trace("mplex"),
    flatten1(),
    trace("flattened"),
    keep(),
    trace("keep"),
    partitionSync(["a", "b"], { key: (x) => x[0], reset: false }),
    trace("sync"),
    map(({ a, b }) => [a, b])
);

[...iterator(xform, range(5))];
// mplex [ "a0", "b0" ]
// flattened a0
// keep a0
// flattened b0
// keep b0
// sync {
//   a: "a0",
//   b: "b0",
// }
// mplex [ "a1", "b1" ]
// flattened a1
// keep a1
// sync {
//   a: "a1",
//   b: "b0",
// }
// flattened b1
// keep b1
// sync {
//   a: "a1",
//   b: "b1",
// }
// mplex [ undefined, "b2" ]
// flattened undefined
// flattened b2
// keep b2
// sync {
//   a: "a1",
//   b: "b2",
// }
// mplex [ undefined, undefined ]
// flattened undefined
// flattened undefined
// mplex [ undefined, undefined ]
// flattened undefined
// flattened undefined
// [
//   [ "a0", "b0" ], [ "a1", "b0" ], [ "a1", "b1" ], [ "a1", "b2" ]
// ]

Finally, the sync() operator is one of the key stones of the thi.ng/rstream package and also heavy relies on this partitionSync() transducer. If you're interested, there're dozens of examples linked from the rstream readme, where you can see its use in the wild... It's super useful to build entire data transformation graph topologies, not only for UI purposes...

postspectacular commented 4 weeks ago

@nonphoto — I just realized the above still has some minor behavior difference than what you're after, so here's an alternative (shorter) approach using a custom transducer, which maybe could/should be added to the library as well:

import type { Nullable } from "@thi.ng/api";
import {
    comp,
    iterator,
    map,
    multiplex,
    range,
    take,
    type Transducer,
} from "@thi.ng/transducers";

/**
 * Transducer which receives fixed-`size` tuples of possibly nullish values and
 * only emits tuples which are fully populated (with non-nullish values),
 * keeping track of each component's last valid value and using those to fill
 * empty components if needed.
 *
 * @remarks
 * The following behavior is used:
 * - 1st input: `[null,null]` => no output
 * - 2nd input: `[0, null]` => no output
 * - 3rd input: `[null, 1]` => `[0, 1]`
 * - 4th input: `[1, 2]` => `[1, 2]`
 * - 5th input: `[null, 3]` => `[1, 3]`
 * - 6th input: `[]` => no output
 *
 * @param size
 */
const syncTuples =
    <T>(size: number): Transducer<Nullable<T>[], T[]> =>
    ([init, complete, reduce]) => {
        const prev: T[] = new Array(size);
        return [
            init,
            complete,
            (acc, x) => {
                let partial = false;
                let filled = true;
                for (let i = 0; i < size; i++) {
                    if (x[i] != null) {
                        prev[i] = x[i]!;
                        partial = true;
                    } else if (prev[i] == null) filled = false;
                }
                return partial && filled ? reduce(acc, prev.slice()) : acc;
            },
        ];
    };

const xform = comp(
    // your existing multiplex
    multiplex(
        comp(
            take(2),
            map((x) => "a" + x)
        ),
        comp(
            take(3),
            map((x) => "b" + x)
        )
    ),
    syncTuples(2)
);

console.log([...iterator(xform, range(5))]);
// [ [ "a0", "b0" ], [ "a1", "b1" ], [ "a1", "b2" ] ]

console.log([...iterator(syncTuples(2), [[], [0], [0, 0], [1, 1], [, 2], []])]);
// [ [ 0, 0 ], [ 1, 1 ], [ 1, 2 ] ]
postspectacular commented 4 weeks ago

@nonphoto That new syncTuples() transducer is now part of the library, just released as v9.2.0... 👍

nonphoto commented 4 weeks ago

Wow, thanks so much for the quick turnaround! I'll try it out later today. I also appreciate the code examples: I attempted something similar to syncTuples yesterday but couldn't quite get it working.

nonphoto commented 3 weeks ago

@postspectacular This is so close to what I need. The only issue is that it doesn't work with an infinite source: range(). Ideally the iterator would end when there are no more tuples to produce. This seems like it isn't possible with multiplex.

For some context, the use case here is generating vectors from individual Transducer<number, number> for each component. The component transducers are basically a map from timestamp to position value for that axis.

postspectacular commented 3 weeks ago

@nonphoto As long as you can identify what constititutes the sign/state for a "no more tuples to produce" condition, you can just plug in a takeWhile() before/after the multiplex(). E.g. if your sign for termination is that all values in the multiplexed result tuple are now undefined, then you could just do...

const xform = comp(
    // your existing multiplex
    multiplex(
        comp(
            take(2),
            map((x) => "a" + x)
        ),
        comp(
            take(3),
            map((x) => "b" + x)
        )
    ),
    // early termination when tuple has no more valid values
    takeWhile((tuple) => tuple.some((x) => x != null)),
    syncTuples(2)
);

// now using infinite range...
console.log([...iterator(xform, range())]);
// [ [ "a0", "b0" ], [ "a1", "b1" ], [ "a1", "b2"] ]
postspectacular commented 3 weeks ago

@nonphoto Btw. Instead of using iterator() or early termination, it sometimes also makes sense to use step() to apply a transducer (incl. composed ones) to a single input only and then have more traditional flow control over execution, e.g. you could do something like:

// build a function to evaluate transducer for a single input
// fn will return undefined if the transducer did not produce a result
// in general this _doesn't mean_ it's done completely, but in our case it does...
// see the step() docs for more info/examples...
const fn = step(xform);

for (let x of range()) {
    const res = fn(x);
    if (!res) break;
    console.log(x, res);
}
console.log("done");

// 0 [ "a0", "b0" ]
// 1 [ "a1", "b1" ]
// 2 [ "a1", "b2" ]
// done
nonphoto commented 3 weeks ago

Ah I see, makes sense! Thanks again for the help.