utilise / emitterify

simplifying events
75 stars 0 forks source link

Emitterify

Coverage Status Build

Callbacks

const foo = emitterify()
foo.on('event', result => result == 'foo')
foo.once('event', result => result == 'foo')
foo.emit('event', 'foo')

Promises

If you omit the callback in .on/.once, you can await the same result you normally get in the callback:

result = await foo.on('event')
result = await foo.once('event')

Observables

If you are interested in more than a single event, you can omit the callback and use .map/.filter/.reduce:

foo
  .on('event')
  .map(result => ...)
  .filter(result => ...)
  .reduce(result => ...)

It can be really useful to mix approaches too, for example to manipulate a stream of events until a certain condition is fulfilled:

await cluster.on('changes').filter(d => peers.every(isStable)) 
// cluster now stabilised

await node.on('click').reduce(acc => ++acc).filter(d => d == 10) 
// node clicked 10 times 

Signals

When you do emitter.on(event) it returns a channel through which events flow. The channel itself is emitterified so it can be used to communicate and respond to various events. There are few common ones such as start, stop for lazy subscriptions, but in contrast to other implementations, these are not hardcoded, which means you can also communicate other signals, such as the completion progress of a channel, whether the original request for a stream of server responses has been sent yet, etc. This affords much greater flexibility without any extra API, and the design becomes obvious when you realise that these are all qualities of the channel itself, and since they are events, they have the same reactive interface as other stream of events.

foo
  .on('value')
  .on('start', () => {})      // stream of meta-events, orthogonal to original channel ---->  
  .on('stop', () => {})       // stream of meta-events, orthogonal to original channel ----> 
  .on('progress', () => {})   // stream of meta-events, orthogonal to original channel ----> 
  // |
  // ↓
  // values flow down through the main channel

CSP

Observables (FRP) "push" values, whereas streams with backpressure or channels (CSP) will generally allow the consumer to process values according to it's own pace or "pull" values. This is important for certain types of operations where you don't want to overwhelm consumers. This is a natural fit here too, since channels implement the async iterator Symbol and so allow producers to respond to consumers that are already waiting and push new values only as they are requested. This is done via the stream of pull events on the channel. Using the canonical example, let's say we create chan = o.on('foo') and then pass this to the following threads:

async producer(chan, i = 0) => {
  for await (const d of chan.on('pull'))
    chan.next(++i)
}

async consumer (chan) => {
  for await (const d of chan)
    if (results.push(d) == 10) break
}

These two functions work co-operatively: the consumer is "pulling" values from the channel, whilst the producer is pushing them when requested.

Operators

The only observable operators included are .map/.filter/.reduce, everything else can be implemented as a separate operator outside the core - similar to how utilise complements the native Array operators in contrast to lodash which attempts to replace them. Below is a few example operators. They are so small, that's it probably not even worth publishing these to npm.

Related Libraries

The events in the following libraries all use this interface:

API