canjs / can-stream-kefir

Stream values into and out of computes
https://canjs.com/doc/can-stream-kefir.html
MIT License
3 stars 2 forks source link

The stream API I want for DefineMaps (and related) #2

Closed BigAB closed 7 years ago

BigAB commented 8 years ago

Not sure if this is the right place to post this, as it may or may not involve the can-compute-stream, but this is the API I would want for an FRP Stream off of DefineMap properties.

Please look for any logical flaw, or potential problems and let's discuss and see if this is a route we would like to take with this. Thanks.

(originally from this gist https://gist.github.com/BigAB/785651896bd5b0a46c53a3a9612f7825)

EDIT: TLDR


Stream Values from a define-map

import DefineMap from 'can-define/map/';
import 'can-stream/observable-plugin'; // <- debatable

const instance = new DefineMap.extend({
  foo: 'number'
});

instance.stream('foo').subscribe(val => console.log(val))

instance.foo = 1;
instance.foo = 2;
instance.foo = 3;
// logs:
// 1
// 2
// 3

Setting a property from an frp stream

(or ES Observable as they call it in ECMA-262 proposal)

const $el = jQuery('.someElement');
const setterStream = new Observable(observer => {
  let handler = (event, args) => observer.next(...args);
  $el.on("specialevent", handler, false);
  return () => {
    $el.off("specialevent", handler, true);
  });
});

const instance = new DefineMap.extend({
  foo: 'string'
});

instance.stream('foo', setterStream);
instance.stream('foo').subscribe(val => console.log("Out:", val))

$el.trigger('specialevent', 'Hello');
instance.foo // 'Hello'
$el.trigger('specialevent', 'World');
// logs:
// Hello
// World

Working with setters and getters

I was thinking that, in order to keep all the setter, getter and conversion behaviour on each DefineMap property, that the stream would operate almost independently of them, but at the peripheries, where the input stream really just maps as "another way of calling the setter" and the output stream is really just a 'stream of lastSetValues' (after getter and conversions have run), which can then get further modified (or even ignored) using the "stream" property definition, without affecting the normal set/get/convert behaviours.

So a value would ideally go from input to output like this: inputStream -> map property setter / type conversion ¹ -> value updated! (trigger events) -> map property getter -> internal outputStream -> runs through stream property definition -> outputStream

1: this is and either or because of how set overrides type now, it would be better IMHO if it did set then type conversion, but ce'la vie.

const instance = new DefineMap.extend({
  foo: {
    type: 'string',
    get(lastSetValue) {
      return newVal.toUpperCase();
    },
    value: 'aaa'
  }
});

instance.stream('foo').subscribe(val => console.log(val))
// logs 'AAA' <- (maybe? Hot/cold, maybe depends on stream library used)

instance.foo = 'bbb';
instance.foo = 'ccc';
// logs:
// BBB
// CCC
const instance = new DefineMap.extend({
  foo: {
    type: 'string',
    set(newVal) {
      return newVal.toUpperCase();
    }
  }
});

instance.stream('foo').subscribe(val => console.log(val))

instance.foo = 'aaa';
instance.foo = 'bbb';
instance.foo = 'ccc';
// logs:
// AAA
// BBB
// CCC
const instance = new DefineMap.extend({
  foo: {
    type: 'string',
    get(lastSetValue) {
      return lastSetValue + this.bar;
    },
    set(newVal) {
      return newVal.repeat(3).toUpperCase();
    }
  },
  bar: {
    type: 'string',
    value: 'bar'
  }
});

instance.stream('foo').subscribe(val => console.log(val))

instance.foo = 'a';
instance.foo = 'b';
instance.foo = 'c';
instance.bar = 'baz'
// logs:
// AAAbar
// BBBbar
// CCCbar
// CCCbaz

Mixing setterStreams and getter/setters

const instance = new DefineMap.extend({
  bar: { type: 'string', value: 'bar' },
  foo: {
    type: 'string',
    set(newVal) {
      return newVal.toUpperCase();
    },
    get(lastSetValue) {
      return lastSetValue + this.bar;
    }
  }
});

instance.stream('foo', setterStream)
instance.stream('foo').subscribe(val => console.log(val))

instance.foo = 'foo';
setterStream.push('newFoo') // or however the setterStream receives it's values
instance.bar = 'boom';
setterStream.push('big badda')
// logs:
// undefinedbar
// FOObar
// NEWFOObar
// NEWFOOboom
// BIG BADDA boom

Working with type constructors

const instance = new DefineMap.extend({
  address: {
    type: Address
  }
});

instance.stream('address').subscribe(val => console.log(val, val instanceof Address))

instance.address = {
  street: '123 Ave Q',
  city: 'Chicago',
  state: 'IL'
};
// logs:
// Address {street: "123 Ave Q", city: "Chicago", state: "IL"} true

Defining a composition

const instance = new DefineMap.extend({
  address: {
    type: Address
  },
  chicagoLocations: {
    stream( stream ) {
      return this.stream('address')
                .filter( address => address.city === 'Chicago' )
    }
  }
});

instance.stream('chicagoLocations').subscribe(val => console.log(val))

instance.address = {
  street: '123 Ave Q',
  city: 'Chicago',
  state: 'IL'
};
instance.address = {
  street: '16 Young St',
  city: 'Toronto',
  state: 'ON'
};
instance.address = {
  street: '5400 N. Lakewood Ave',
  city: 'Chicago',
  state: 'IL'
};
// logs:
// Address {street: "123 Ave Q", city: "Chicago", state: "IL"}
// Address {street: "5400 N. Lakewood Ave", city: "Chicago", state: "IL"}

alternative implementation:

const instance = new DefineMap.extend({
  address: {
    type: Address
  },
  chicagoLocations: {
    stream( stream ) {
      return stream.filter( address => address.city === 'Chicago' )
    },
    get() {
      return this.address
    }
  }
});

instance.stream('chicagoLocations').subscribe(val => console.log(val))

instance.address = {
  street: '123 Ave Q',
  city: 'Chicago',
  state: 'IL'
};
instance.address = {
  street: '16 Young St',
  city: 'Toronto',
  state: 'ON'
};
instance.address = {
  street: '5400 N. Lakewood Ave',
  city: 'Chicago',
  state: 'IL'
};
// logs:
// Address {street: "123 Ave Q", city: "Chicago", state: "IL"}
// Address {street: "5400 N. Lakewood Ave", city: "Chicago", state: "IL"}
const instance = new DefineMap.extend({
  foo: 'string',
  bar: { type: string, value: 'bar' }
  baz: {
    type: 'string',
    stream( stream ) {
      const fooStream = this.stream('foo')
                          .map( foo => foo.toUpperCase() );
      return stream
               .merge(fooStream)
               .combineLatest(this.stream('bar'));
    },
    get(lastSetValue) {
      return `**${lastSetValue}**`;
    }
  }
});

instance.stream('baz').subscribe(val => console.log(val))

instance.foo = 'foo-1';
instance.bar = 'new bar';
instance.baz = 'baz-1';
instance.baz = 'baz-2';
// logs:
// ['FOO-1', 'bar']
// ['FOO-1', 'new bar']
// ['**baz-1**', 'new bar']
// ['**baz-2**', 'new bar']

Mixing setterStreams and value stream composition

const instance = new DefineMap.extend({
  bar: 'string',
  foo: {
    stream( valueStream ) {
      return valueStream.combineLatest(this.stream('bar'));
    },
    value: 'foo'
  }
});

instance.stream('foo', setterStream)
instance.stream('foo').subscribe(val => console.log(val))

instance.bar = 'bar';
setterStream.push('STREAMFOO') // however the setterStream receives it's events
instance.foo // 'STREAMFOO', it's still just set by the setter stream
instance.foo = 'foo2'
instance.bar = 'bar2'
setterStream.push('Hello Wisconsin!')
// logs:
// ['foo', undefined]
// ['foo', 'bar']
// ['STREAMFOO', 'bar']
// ['foo2', 'bar']
// ['foo2', 'bar2']
// ['Hello Wisconsin!', 'bar2']

Error handling and stream completion

const instance = new DefineMap.extend({
  url: 'string',
  activeTasks: {
    get(lastSetValue, resolve){
      if (this.url && resolve) {
        fetch(this.url)
          .then(response => response.json())
          .then(resolve);
      }
      return lastSetValue;
    },
    stream( stream ) {
      return stream.map( tasks => tasks.filter(t => t.active) );
    },
    value: []
  }
});

instance.stream('activeTasks').subscribe({
  next(val) { console.log(val) },
  error(err) { console.log('ERROR', err.code) }
});

instance.url = 'https://taskmaster.io/api/';
instance.url = 'https://phoneyurl.com';
// logs:
// []
// [{ desc: 'do dishes', active: true }, { desc: 'clean car', active: true } ]
// ERROR TypeError: Failed to fetch

These Event-Streams should generally be never-ending, but it is possible to complete a stream, I am just not sure why you may want to do that

const instance = new DefineMap.extend({
  number: 'number',
  count3: {
    stream() {
      return this.stream('number')
                .take(3);
    }
  }
});

instance.stream('count3').subscribe({
  next(val) { console.log(val) },
  error(err) { console.error('ERROR') }
  complete() { console.log("Stream complete") }
});

instance.number = 1;
instance.number = 2;
instance.number = 3;
instance.number = 4; // nothing happens?
// logs:
// 1
// 2
// 3
// Stream complete

Defining which Event-Stream library to used

Defaults to an ES.next observable polyfill

Set to RxJS

import DefineMap from 'can-define/map/';
import CanStream, { streamSymbol as stream } from 'can-stream';
import Rx from 'rx';
import 'can-stream/observable-plugin'; // <- debatable

CanStream.setEventStreamDelegate(Rx.Observable, { subscribe: 'forEach' }) // maybe done in config?

const instance = new DefineMap.extend({
  foo: {
    type: 'string',
    stream( stream ) {
      const everySecond = Rx.Observable.interval(1000);
      return stream
               .combineLatest(everySecond);
    },
    value: 'foo'
  }
});

instance.stream('foo').subscribe(val => console.log(val))
// logs:
// ['foo', 1]
// ['foo', 2]
// ['foo', 3]

...or with Bacon / whatever

import DefineMap from 'can-define/map/';
import CanStream, { streamSymbol as stream } from 'can-stream';
import Bacon from 'bacon';
import 'can-stream/observable-plugin'; // <- debatable

CanStream.setEventStreamDelegate(Bacon) // maybe done in config?

const instance = new DefineMap.extend({
  foo: {
    type: 'string',
    stream( stream ) {
      property = Bacon.constant(1);
      return Bacon.combineAsArray(property, stream, this.stream('bar'))
    },
    value: 'foo'
  },
  bar: {
    value: 'bar'
  }
});

instance.stream('baz').subscribe(val => console.log(val))

instance.foo = 'bears';
instance.foo = 'cats';
instance.bar = 'baz';
instance.foo = 'dogs';
// logs:
// [1, 'bears', 'bar']
// [1, 'cats', 'bar']
// [1, 'cats', 'baz']
// [1, 'docs', 'baz']

When you don't want to overwrite the stream property

Perhaps your instance already has a property called stream, you'll need another way to access the API

import DefineMap from 'can-define/map/';
import CanStream, { streamSymbol as stream } from 'can-stream';
import 'can-stream/observable-plugin'; // <- debatable

const instance = new DefineMap.extend({
  foo: 'number'
});

// use a Symbol to access the value stream
instance[stream]('foo').subscribe(val => console.log(val))
instance.foo = 1;
// logs:
// 1

// or with a setterStream
instance[stream]('foo', setterStream)

// Alternatively Use .from()
CanStream.from(instance, 'foo').subscribe(val => console.log(val))

// Alternative setter stream
CanStream.setPropWithStream(instance, 'foo', setterStream);

instance.foo = 2;
// logs:
// 2

So yeah, essentially DefineMap.prototype gets a subscribe method, that will set up event streams based on property values. Also, there is a new PropDefinition called stream, which receives a stream of values based on change events, and MUST return a stream as well.


Does anyone see any potential problems or over-sights with an API designed like this?
Looking for feed back -- Adam L Barrett (BigAB)

BigAB commented 8 years ago

To summarize a bit more succinctly:

I want an API that treats each Map property as a event-stream sink with side effects, and an event-stream source.

Any pre-sink processing should be done before the sink (which is the property on the instance). The sink just sets the property value, and the source just emits lastSetValues.

The stream PropDefinition is just a convenience, to co-locate some "post-source" processing with the property definition.

This concept should also be extended to computes too.

justinbmeyer commented 8 years ago

@BigAB did you look at the current proposed APIs?

A few thoughts:

I like PropDefinition.stream better than what's currently planned ... probably

The current plan makes something like: const everySecond = Rx.Observable.interval(1000); harder. I just need to make sure, which I think is true, that it's going to be memory safe.

Standalone API First, PUNCH API 2nd

Other than the stream in the define object, there's really not a plan for adding methods to DefineMap, or computes. Instead, I think we should start with external helpers that do this:

computeStream.from( instance, 'baz')

Later, someone can punch these methods into DefineMap.

PropDefinition.get and PropDefinition.stream can't be used together (at least at the start)

get should not be a source for stream.
Instead the source should be the lastSet value.
There's no reason to have a chain like set -> get -> stream because stream can do everything get can do and more.

Also, IMO, I think get should always provide the final value ... not stream.

And in reality it would make implementation a lot harder b/c we'd have to have 3 computes stored "somewhere" ... where we already have a system that handles 2 fine. Essentially, stream would get compiled to a compute just like get gets compiled to a compute and set as this._computed[prop]. We'd have to have get compiled within stream.

APIS

Also, I think this is where an API signature would be worth a million example lines.

DefineMap.prototype.stream

.stream( prop )

.stream(prop, setterStream)

Honestly, I don't know how this can work with memory safety. Maybe you can elaborate. This would make the map subscribe to setterStream. How does that get undone?

I would make the following change instead:

.stream(prop, eventName)

For example:

.stream("items","add")

PropDefinition.stream(lastSetStream)

Returns {Stream} - Returns a stream whose last value is the value of the map property.

BigAB commented 8 years ago

did you look at the current proposed APIs?

I did, and saw the presentation in Chicago. Which is why I wrote this.

get should not be a source for stream.

I disagree, but this is sort of the crux of the difference here.

I don't want to restrict the user from using both, I want to define what happens when they use both get and stream. If you can do what you need to in the stream and you are only using the streamed values, then do that. But if you need to use both for some reason, this is how it would work. Same with set and type.

This way the Map can be used as it is now, with no behaviour changes, and then at the moment someone needed a stream, they would have that option available too.

Instead the source should be the lastSet value.

Is there a benefit to this that I am overlooking?

I see a benefit in being able to use the Map for synchronous values, and bound values, and have an optional stream on top of that.

I think maybe expressing some derived values as a getter will be much more straight forward than as a merged stream.

const instance = new DefineMap.extend({
  firstName: 'string',
  lastName: 'string',
  fullName: {
    get() { return `${ this.firstName } ${ this.lastName }` }
  }
});

instance.stream('fullName').subscribe(name => console.log(name));

compared to

const instance = new DefineMap.extend({
  firstName: 'string',
  lastName: 'string',
  fullName: {
    stream() {
      return this.stream('firstName')
        .merge(this.stream('lastName'))
        .map( ([first, last]) => `${ first } ${ last }`  );
    }
  }
});

instance.stream('fullName').subscribe(name => console.log(name));

Also, IMO, I think get should always provide the final value ... not stream.

I get that. I disagree. and I believe... This is the conceptual change that I think would most benefit this API.

I believe, currently we are trying to put streams inside our maps and computes. I think they should be on the outside, and in that way not interfere at all with how maps currently work, they are only additive, not restrictive.

inputStream -> someProp -> outputStream

Trying to internalize the streams is what is causing the interop problems (i.e. "you don't use get with stream", "you can't set this from both a stream and directly setting the property", etc.)

I think conceptually the "stream on the outside" is easier to understand.

Honestly, I don't know how this can work with memory safety. Maybe you can elaborate. This would make the map subscribe to setterStream. How does that get undone?

I imagine it would be something like:

Until CanStream.from(map, 'propName') or CanStream.setPropWithStream(map, 'propName', setterStream) is called, nothing happens at all to the map.

++side note those methods names are totally negotiable, just being explicit

if .setPropWithStream(map, 'propName', setterStream) is called, and passed an event-stream, a binding is set up. Not being familiar with kefir, I'll explain from an ES Observable / Rx point of view. A binding is created, by calling subscribe on the setterStream with something like:

setPropWithStream(map, propName, setterStream) {
  // Just pseudo code, do something more sophisticated, not this
  map._subscriptions = map._subscriptions || {};
  const subscription = setterStream.subscribe({
    next(val) { map[propName] = val },
    error(err){ /*...???... some sort of error handling */ },
    complete() {
      delete map._subscriptions[propName];
      // if I am the last prop delete _subscriptions too
    }
  });
  // some sort of reference to the subscriptions, this isn't a real code suggestion
  map._subscriptions[propName] = subscription;
}

...then in whatever teardown method maps normally have you could just call subscription.unsubscribe from any items the subscriptions collection handling

Not sure why you'd need it, but I suppose there could be some API for canceling the setterStream subscription manually.

if .from(map, 'propName') is called, it basically does something like:

from(map, propName) {
  // Just pseudo code, do something more sophisticated, not this
  map._streams = map._streams || {};
  // if a stream for this instance:propName has already been created, return it
  if (map._streams[propName]) {
    return map._streams[propName]
  }
  // create a new stream of getter values
  const propStream = new Observable(observer => {
      // Create an event handler which sends data to the stream
      let handler = (ev, newVal, oldVal) => observer.next(newVal);

      // Attach the change handler
      map.on(propName, handler);

      // Return a subscription object
      return {
        unsubscribe() {
          // Detach the change handler from the map
          map.off(propName, handler);
        }
      };
  });

  // THIS NEXT PART IS AN OVERSIMPLIFICATION OF THE HARDEST PART
  const propsDefinition = map._propDefintions[propName].stream;
  // run the maps `stream` propDefinition which defaults to S => S
  const definedStream = propsDefinition ? propsDefinition(propStream) : propStream;

  // store this stream in the instance cache
  map._streams[propName] = definedStream;
  // return it
  return definedStream;
}

...and that should do it. With the aforementioned oversimplification, that is the gist of what I think will work, handle all use cases, be simple to grok and apparently, now that I wrote it out a bit, not require touching DefineMap at all unless we want to add the instance methods (and with these methods available that should be trivial), or _propDefintions is not really available at all (in any accessible form).

Also, I think this is where an API signature would be worth a million example lines.

You are so right. To be honest, the whole gist that stated this was just a stream of consciousness style thing of me trying to work out what I wanted, from a vague idea to on paer. I probably should have not just sent it, but I was also trying to make some noise before we went down to far on a path I didn't like.

I wanted to show with the tons of examples the interop between get/set/type and the stream.

Even in my pseudo code above I am just sort of figuring it out as I go.

It started from at the presentation when it was stated, "Just don't use get with stream in the propsDefinitions", that made me think we were working around the wrong concept, and thought, we already have an observable value, we should just be able to stream values in and out of it, and then just use whatever event-stream api the 3rd party lib provided, which led to the gist and ultimately to this issue.

I would make the following change instead:

.stream(prop, eventName)

prop {String} - A property value on this map. Things like "foo.bar" or "." work too. eventName {String} - A string event name. Returns {Stream} - A stream of event objects created by .dispatch(eventName). For example:

This change would only stream other properties from this map. How could you get an external stream like Rx.Observable.interval(1000) and change a properties value with each new value streamed?

justinbmeyer commented 8 years ago

I'll try to organize my thoughts as much as possible. The stream of consciousness is a bit hard to follow but I really appreciate you working through this with me.

1. Conventions

If we want to make set, stream and get work, the flow should be in that order set -> stream -> get. It's very natural that set is incoming, get is the final read value determined from any other value. This is how getter/setters work everywhere. We then support set, get, stream, set -> get, set -> stream, stream -> get, set -> stream -> get. There is no "interop" problem that I see (interop between get/set/type and the stream). Maybe you can explain what is the interop problem?

2. stream -> get connivence probably not worth it

I think you're thinking that get/compute makes composite values like fullName easier to assemble, and then with that value assembled, you'll be able to use it in a following stream definition in fun and exciting ways. Essentially that get -> stream will be nice because one might want to assemble fullName and then count how many times that changes like:

fullNameChangeCount: {
  get(){
    return this.first + " " + this.last;
  },
  stream(lastGetValue) {
     return lastGetValue.count();
  }
}   

This might happen, but I think the person in this case should define fullName because it's not worth having get return anything other than the value returned to the user. #1 is just too strong of a convention, and the benefits so small, that it doesn't seem worth it.

get fullName(){
   return this.first + " " + this.last;
},
fullNameChangeCount: {
  stream: function(){
   return  this.stream("fullName").count();
  }
}

next step I guess I'd like to see some real-world use cases where it's demonstrably better to violate #1.

3. Interference

This order doesn't interfere with how maps currently work. In fact, it maintains it from my perspective (get being what's returned) and makes it easier to implement. The this._computed[prop] is still created from get, not from stream.

4. Memory safety with setProp

Can you explain how .setPropWithStream(map, 'propName', setterStream) would be setup?

I'm generally not a fan of this stuff because it very easily creates memory leaks (we used to have can/tie which did the same thing).

The memory problem is because of:

map._subscriptions[propName] = subscription;

There's no "teardown method maps normally have". Maps don't have any teardown methods. Instead, things like can-control, can-stache, which are tied to the DOM, know how to detach everything.

What you're wanting is something like backbone's listensTo. can-event actually has this (but we don't tell people, it's used internally).

I'm very wary of shifting that responsibility to users to call some map.teardown() method.

It might be possible to hook into .addEventListener(prop) to then subscribe to the activated setter. That way only a bound property would actually be updating itself with any attached setter.

next step Show how this would be setup in an application lifecycle. Lets make sure it's memory safe without an undo burden on devs (CanJS devs never have to call off themselves).

5. Multiple setterswith setProp

What happens if someone calls setPropWithStream on the same map/prop twice?

6. Over extending?

It started from at the presentation when it was stated, "Just don't use get with stream in the propsDefinitions", that made me think we were working around the wrong concept, and thought, we already have an observable value, we should just be able to stream values in and out of it, and then just use whatever event-stream api the 3rd party lib provided, which led to the gist and ultimately to this issue.

Reading this, I think you might be over thinking what I meant by that and extending that statement beyond what was meant or implied.

I think we are working on a very similar concept. I 100% agree that we should be able to:

I might agree about streaming values in if a safe way can be provided.

The only daylight between us I think is the order of get and stream. I don't think this is some big conceptual change. It boils down to me:

  1. Originally, not caring about get and stream interop and wanting to punt for later (still maybe a good idea considering this discussion).
  2. Now, believing the conventions of get always returning what you expect from foo.bar (#1) is more important than any "computes are nicer first" (#2) benefit.

7. .stream(prop, eventName) confusion

I would make the following change instead:

I should have said "support this" or "think about this" as opposed to "instead". It doesn't take external streams. However, it does solve the critical, imo, use case of simply creating a stream for any time a list gets items added to it:

map.stream("items", "add") //-> stream of add events

8. Thanks again!

justinbmeyer commented 7 years ago

I'm closing this. I think most of this is covered for now. I think the getter / stream order should be opened in can-define-stream.