kefirjs / kefir

A Reactive Programming library for JavaScript
https://kefirjs.github.io/kefir/
MIT License
1.87k stars 97 forks source link

groupBy function #301

Closed niecore closed 4 years ago

niecore commented 4 years ago

Is there an alternative to baconjs's function groupBy also in kefir available? If not, would it be accepted as PR?

mAAdhaTTah commented 4 years ago

Could you explain groupBy's behavior?

niecore commented 4 years ago

groupBy groups the emitted events in a observable by a key function and returns a stream of streams in which each stream only contains items which have the same key function result.

for reference: https://baconjs.github.io/api3/classes/observable.html#groupby https://rxjs.dev/api/operators/groupBy

mAAdhaTTah commented 4 years ago

Interesting. I don't know as we need this for Kefir core (@kefirjs/core happy to disagree if you think this is useful), but one nice thing we added recently is the thru method, which would allow you to write a groupBy function that implements this behavior and use it like a method:

import groupBy from 'kefir-groupBy' // would be a package you create
import Kefir from 'kefir'

var events = [
  { id: 1, type: 'add', val: 3 },
  { id: 2, type: 'add', val: -1 },
  { id: 1, type: 'add', val: 2 },
  { id: 2, type: 'cancel' },
  { id: 3, type: 'add', val: 2 },
  { id: 3, type: 'cancel' },
  { id: 1, type: 'add', val: 1 },
  { id: 1, type: 'add', val: 2 },
  { id: 1, type: 'cancel' }
]

function keyF(event) {
  return event.id
}

function limitF(groupedStream) {
  const cancel = groupedStream.filter(x => x.type === 'cancel').take(1)
  var adds = groupedStream.filter(x => x.type === 'add')

  return adds.takeUntil(cancel).map(e => e.val)
}

Kefir.sequentially(2, events)
  .thru(groupBy(keyF, limitF)) // use it like this
  .flatMap(groupedStream => groupedStream.fold(0, (acc, x) => acc + x))
  .onValue(sum => {
    console.log(sum)
    // returns [-1, 2, 8] in an order
  })
niecore commented 4 years ago

Great, I'll try it with the thru method.

Thanks ! :)

niecore commented 4 years ago

If someone is looking for the solution:

const T = require("transducers-js");
const Kefir = require("kefir");

const groupBy = (keyF, limitF) => src => {
    const streams = {};

    return src.transduce(T.comp(
        T.filter((x) => !streams[keyF(x)]),
        T.map(function(firstValue) {
            const key = keyF(firstValue);
            const similarValues = src.changes().filter(x => keyF(x) === key );
            const data  = Kefir.later(0, firstValue).concat(similarValues);

            const limited = limitF(data, firstValue).withHandler((emitter, event) => {

                if (event.type === 'end') {
                    delete streams[key];
                    emitter.end();
                } else {
                    emitter.emit(event.value);
                }
            });

            streams[key] = limited;
            return limited;
        })
))};
mAAdhaTTah commented 4 years ago

The only thing I would test there is making sure errors just flow through as expected, but otherwise, looks good!

niecore commented 4 years ago

I am coming back here to ask for your help. The implementation seems to work very much fine a a normal environment but does not work when it's used within jest-kefir and I have no idea why.

Here is my test usage in a normal environment where everything works fine:

const data = {id: 1, data: 42};
const data2 = {id: 2, data: 43};

const a = stream();

const b = a
    .thru(groupBy(a => a.id))
    .flatMap(groupedStream => groupedStream.flatMapLatest(x => Kefir.later(0, x)))
    .log();

send(a, [value(data), value(data2)]);

Output:

[stream.transduce.flatMap] <value> { id: 1, data: 42 }
[stream.transduce.flatMap] <value> { id: 2, data: 43 }

This is the test which fails because there is no output at all:

const data = {id: 1, data: 42};
const data2 = {id: 2, data: 43};

const a = stream();

const b = a
        .thru(groupBy(a => a.id))
        .flatMap(groupedStream => groupedStream.flatMapLatest(x => Kefir.later(0, x)));

expect(b).toEmit([value(42), value(43)], () => {
    send(a, [value(data), value(data2)]);
});

Maybe you have a hint for, what could be the problem.

mAAdhaTTah commented 4 years ago

The use of Kefir.later means that the value is being emitted after the test ends. Either use Kefir.constant, which will emit the value immediately, or use toEmitInTime and tick to advance the clock & emit that value.