cujojs / most

Ultra-high performance reactive programming
MIT License
3.49k stars 206 forks source link

Simultaneity and merging/combining/sampling #145

Open briancavalier opened 9 years ago

briancavalier commented 9 years ago

Currently, simultaneous events are problematic. Here's a silly, but illustrative example:

let x = most.periodic(10, 'hi');
let y = most.merge(x, x);

What does it mean to merge x with itself? Currently, y will contain twice as many events as x. In effect, y synthesizes new events out of thin air. Another:

let x = most.periodic(10, 1);
let y = most.combine(add, x, x);

What does it mean to combine x with itself? Again, y will end up containing twice as many events, and imho, they will be unintuitive. For example, if you do y.observe(a => console.log(a)), it will log:

1
2
1
2
... and so on

Exactly every 10 ms, there is actually only one event, yet, combine will log two events.

I have no idea how common simultaneous events are in practice, but the above cases are certainly confusing, imho. I'd like to brainstorm potential solutions.

Transactional time with commit phase

See #144 for a particular line of thinking on how to resolve the combine problem, specifically. I'm not convinced it's the right thing, but it does help. It may be something we could do across the board, i.e. combinators that deal with multiple inputs could always wait until "end of current timeslot", some sort of timeslot commit phase, to make decisions on how to propagate events.

Merge combinator and monoids

For merging, some other (non-JS) reactive implementations use a merge combinator that requires either:

  1. events to be monoids, which can be combined via mconcat, or
  2. a function to be passed in for merging simultaneous events. The function is only called when there are simultaneous events. All other events are merged as is.

It seems interesting that 2 is quite similar to combine.

Other affected combinators

Unfortunately, the problem doesn't end with merge and combine. It exists for any combinator that has deals with multiple input streams, where > 1 of those input streams may be active simultaneously. For example, simultaneous events can occur with flatMap, but not with concatMap.

benawhite commented 9 years ago

I saw this same issue when implementing a "most" version of the kefir tree demo. I was using sampleWith...

gmarceau commented 8 years ago

Hi Benawhite,

What an interesting question. When we finished publishing the FrTime sequence of paper, I became convinced that a notion of simultaneous events was needed to make FRP work well in practice.

When we implemented exploratory demo, the pattern that occurred frequently was decomposition-recomposition. It is a pattern which is natural when using functions which become oddly difficult in FRP.

var _ = require('underscore')
var most = require('most')

// Setup
const square = x => x * x
const dbl = x => x + x

const points = [{x: 1, y: 2}, {x: 3, y: 4}, {x:5, y:6}]
const pointStream = most.from(points)

This is natural:

const squaredPoints = _.map(points, ({x, y}) => ({ x: dbl(x), y: square(y) }))

This is also fine:

const pointSquaredStream = pointStream.map(({x, y}) => ({x: dbl(x), y: square(y) }))
pointStream.forEach(console.log)

But once we start reconstructing, we run into trouble. So let's deconstruct:

const xDoubleStream = pointStream.map(p => p.x).map(dbl)
const ySquareStream = pointStream.map(p => p.y).map(square)

// And then reconstruct:

const makePoint = (x, y) => ({x, y})

Reconstructing using zip

most.zip(makePoint, xDoubleStream, ySquareStream).forEach(console.log)

zip behaves well in the normal case. However, it fails non-gracefully in the presence of bugs.

In the case of mismatched numbers of points, zip buffers the longer stream silently, consuming memory until a long-delay out-of-memory error. This would be very hard to debug.

most.zip(makePoint,
         xDoubleStream.concat(xDoubleStream),
         ySquareStream)
    .forEach(console.log)

Reconstructing with zip can also lead to incorrect results for simple mistakes where a fail-fast error message would have been preferable.

most.zip(makePoint,
         xDoubleStream.filter(x => x > 3),
         ySquareStream.filter(y => y > 3))
    .forEach(console.log)

Here, the points { x: 6, y: 4 } and { x: 10, y: 16 } are produced, which isn't what was intended since they pair the input points across. But the error is subtle and would be hard to track down, especially if the processing was intricate.

This case is particularly interesting since it teases apart two different notions of simultaneity. Since we defined points by invoking most.from on an array, the events are occurring at the same moment in time. That's the first notion -- time-based simultaneity. However, when the bug occurs the values being ziped are not derived from the same event via deconstruction. That's the second notion -- value-based simultaneity.

I'll claim that in an FRP context, this second notion of simultaneousness, value-based simultaneity, is the important one.

Now let's look at the second manner of reconstructing.

Reconstructing using combine

most.combine(makePoint,
             xDoubleStream,
             ySquareStream).forEach(console.log)

This code produces three events { x: 10, y: 4 }, { x: 10, y: 16 }, and { x: 10, y: 36 }. It is only because of the notion of simultaneity that three events are produced. Without simultaneity, we should expect six events. Willingly or not, most's implementation of combine detects that the event on xDoubleStream and ySquareStream are occuring at the same time and absorbs the glitches.

This behavior violates what is expected from combine when reading the documentation:

combine: create a new stream that emits the set of latest event values from all input streams whenever a new event arrives on any input stream.

In order to fix combine, either the documentation needs to be modified to document this behavior, which would require defining simultaneousness. In particular, it would require choosing and articulating the difference between time-based simultaneity and value-based simultaneity. Alternatively, the implementation of combine has to change to maintain the six-events output even in the presence of simultaneous events.

How to fix this

The big open question is, should the semantic of a FRP framework incorporate an explicit notion of simultaneity, along with combiners to manipulate simultaneity?

When we wrapped up the FrTime effort, if we had worked on it more I would have implemented apply and lift and then tried building some stuff with them (to be fair, Grep Cooper was skeptical. I think he had a different approach in mind).

The idea was that zip and combine are great combiners, they should not be repurposed and instead new operators should be introduced specifically to make the deconstruct-reconstruct pattern easy and reliable in the fail-fast-fail-loud sense.

The combiners could be defined as follow:

  most.apply(makePoint, xDoubleStream, ySquareStream)
  const makePointStream = most.lift(makePoint)
  makePointStream(xDoubleStream, ySquareStream)

But the tricky part would be in the implementation, in particular, the handling and maintaining of the same-time versus same-value distinction.

briancavalier commented 8 years ago

@gmarceau Thank you for such an informative reply! I read it once, and I think I need to read it a few more times to fully absorb it. Planning to read again today :)

Can you explain a bit more about value simultaneity? Is this a similar notion to how arrowized & signal vector FRPs use tuples or other tree structures to represent simultaneous events?

gmarceau commented 8 years ago

I am glad you found it inspiring. Let me know if you have additional questions on the second read.

Value simultaneity is firstly the title of a research project I would embark on given the time. It could be defined in multiple different ways and it would necessitate experimentation to discover the most practical definition.

I am not intimately familiar with the tree representation you mention. If FrTime, we used mutable linked lists to represents event streams. Under every expression requiring evaluation, new events would be pushed as they occurred, and events would popped to start their processing at this expression. Most of these lists would be zero- or one-event long. But events produced faster than the processing rate at which they could be would accumulate in the streams, by design. The same representation worked for time-simultaneous events. Two pushes back-to-back on the same stream. In other words, we defined the semantics of FrTime intentionally to that

let y = most.merge(x, x);

would result in y with two events, one after the next, just like most.js does at the moment. There are two conceptualization of this behavior. One is that the events are time-simultaneous but ordered. The other is that they are separated by an epsilon time difference.

This is quite different from the notion of value simultaneity that's involved in the call the combine. Value-simultaneity pertains to two different event streams, namely whether they should produce one or two events when the two streams are combine-d.

irisjae commented 7 years ago

+1

irisjae commented 7 years ago

How about the atomic updates approach from https://github.com/paldepind/flyd#atomic-updates ?