weo-edu / store

derived data stores
0 stars 0 forks source link

Wiring Interface #3

Open joshrtay opened 9 years ago

joshrtay commented 9 years ago

Pipe proposal:

userStore = UserStore();
userStore.source(nsg.reader('dispatch', 'user'));
userStore.out.pipe(nsq.write('log'));
userStore.error.pipe(nsq.write('errors'));
userStore.event.pipe(nsg.write('events'));
userStore.message.pipe(nsq.write('messages'));
joshrtay commented 9 years ago

Emitter proposal:

var reader = nsg.reader('dispatch', 'user');
var errors = nsg.writer('errors');
reader.on('message', function(msg) {
 try {
   userStore.emit(msg.type, msg.body);
 } catch(e) {
   errors.publish(e); 
 } 
});
joshrtay commented 9 years ago

Multi Store Pipe:

var userStore = UserStore();
userStore.source(nsg.reader('dispatch', 'user'));

var activityStore = ActivityStore();
activityStore.source(nsg.reader('dispatch', 'user'));

es.merge(userStore.out, activityStore.out).pipe(nsg.write('log'));
es.merge(userStore.error, activityStore.error).peipe(nsg.write('errors'));
joshrtay commented 9 years ago

Multi Store Emitter:

Can't quite picture how this works.

ashaffer commented 9 years ago

I was thinking something even as simple as:

stores/index.js

var stores = [
  require('./user'),
  require('./share')
];

module.exports = function(name, event) {
  // Assuming dispatch is autocurried
  stores.forEach(dispatch(name, event));
};

function dispatch(name, event, store) {
  if(store.events[name])
    store.events[name](event);
}  

Which is then consumed by the nsq reader:

var stores = require('lib/stores');

reader.on('message', function(msg){
  var name = msg.name.toString();
  var body = msg.body.toString();
  try {
    stores(name, body);
  } catch(e) {
    // requeue, etc..
  }
});
ashaffer commented 9 years ago

Alternative

stores/index.js

var source = require('lib/reader');

source.channel('user', require('./user'))
source.channel('share', require('./share'))
// ...

Or

source.channel('user', dispatch(require('./user'))
source.channel('share', dispatch(require('./share'))

function dispatch(store, event) {
  if(store.on[event.name])
    store.on[event.name](event)
}
joshrtay commented 9 years ago

how about this? it makes it a little clearer which way the data is flowing. i think.

source.channel('user').pipe(dispatch(require('./user'))
ashaffer commented 9 years ago

Ya that seems good. My only objection to it is that I would prefer not to actually use node's streams. They seem so nice but they never really work the way we expect.

joshrtay commented 9 years ago

that's fine.

ashaffer commented 9 years ago

Generic .pipe implementation?

thing.pipe = function(sink) {
  this._sinks.push(sink);
};

thing.send = function(data) {
  this._sinks.forEach(function(sink) {
    sink(data);
  });
};

Could be implemented as a mixin.

EDIT: Implementation that can be standalone or mixin:

function Pipe(obj) {
  if(arguments.length) return extend(obj, this.prototype);
  if(! (this instanceof Pipe)) return (new Pipe);
}

Pipe.prototype.pipe = function(sink) {
  this._sinks = this._sinks || [];
  this._sinks.push(sink);
  return this;
};

Pipe.prototype.send = function(data) {
  if(! this._sinks) return;
  this._sinks.forEach(function(sink) {
    sink(data);
  });
};
ashaffer commented 9 years ago

Actually probably the ideal is for pipe to be able to be either a mixin or it's own thing, similar to the component emitter that we use. For implementing channels we could just do:

var channels = {};

source.channel = function(name) {
  if(channels[name]) return channels[name];

  var pipe = new Pipe();
  var reader = nsq.reader({
    //...
    channel: name
  });

  reader.on('message', function(msg) {
    pipe.send(msg);
  });

  return (channels[name] = pipe);
};

Or source.channel could actually just be the only export. So you'd do:

source('user').pipe(dispatch(require('./user'))
source('share').pipe(dispatch(require('./share'))
joshrtay commented 9 years ago

ya. that looks really solid.

joshrtay commented 9 years ago

so pipe is it's own lib. where does dispatch live? also dispatch needs to be slightly more complicated.

function dispatch(store, evt) {
  if(store.on[evt.name]) {
   try {
    yield store.on[evt.name](evt.body)
   } catch(e) {
    error('store')(e);
   }
  }
}
joshrtay commented 9 years ago

oh and dispatch needs to do msg.success and msg.requeue.

ashaffer commented 9 years ago

I was thinking that dispatch lives in stores/index.js or whatever file is doing:

source(...).pipe(...)

And ya, that's right. dispatch should also probably encapsulate the transliteration of event names (e.g. from snake -> camel, if we want to do that).

I was thinking success/requeue would be handled at a higher level? Not sure. dispatch could return true/false to source which could be in charge of that, since right now source is the only thing interfacing directly with nsq.

ashaffer commented 9 years ago

The above being said, I actually kind of feel like boolean parameters / return values are kind of bad for readability/clarity generally (except isAThing or hasSomeProperty type functions). This guy talked about it recently:

http://jlebar.com/2011/12/16/Boolean_parameters_to_API_functions_considered_harmful..html

So maybe we should put a handled property on the event or something?

EDIT: Should be noted that both of these notions are also somewhat contrary to the pipe/send abstraction, and in particular contrary to events being immutable. Though maybe the outer shell could be mutable. Or handled could be a function, like preventDefault().

ashaffer commented 9 years ago

Third option:

function dispatch(store, evt) {
  if(store.on[evt.name]) {
   try {
    yield store.on[evt.name](evt.body)
   } catch(e) {
    error('store')(e);
    throw e;
   }
  }
}

Then the error propagates back up to source where it can catch it again and handle it appropriately. This way our failure modes are uniform.

joshrtay commented 9 years ago

ya. but what would source ever need to do with it? its like the default error handler in a koa request. it just console.errors the stack and drops the error so it doesnt destroy your process.

ashaffer commented 9 years ago

source would be in charge of requeueing or not deleting on error (not sure which paradigm nsq uses there). So dispatch would log, and source would handle the queueing related stuff.

joshrtay commented 9 years ago

ok. so source is in charge of parsing message, requeuing and setting msg.timeout.

ashaffer commented 9 years ago

Ya sounds good. What does msg.timeout do?

joshrtay commented 9 years ago

i believe it auto requeues after a certain time period.

ashaffer commented 9 years ago

Should the wiring interface have its control inverted? E.g.

module.exports = function(source) {
  source('user').pipe(dispatch(require('./user'))
  // ...
}

This might make it easier to stream events in from sources other than nsq, for migration or testing purposes.

weo-edu-admin commented 9 years ago

I think that piping probably provides sufficient decoupling. We can just change the read stream if we want a different source. On Apr 18, 2015 2:26 PM, "Andrew Shaffer" notifications@github.com wrote:

Should the wiring interface have its control inverted? E.g.

module.exports = function(source) { source('user').pipe(dispatch(require('./user')) // ... }

This might make it easier to stream events in from sources other than nsq, for migration or testing purposes.

— Reply to this email directly or view it on GitHub https://github.com/weo-edu/store/issues/3#issuecomment-94202310.

joshrtay commented 9 years ago

No I think we can just use a different read stream if we want. The piping provides sufficient decoupling.

It should probably read:

nsq('events', 'user').pipe(dispatch(user))
ashaffer commented 9 years ago

But in order to do that we'd have to actually edit the stores file, as opposed to simply requiring it differently.

ashaffer commented 9 years ago
require('./stores')(nsq)
require('./stores')(streamFromFile)
module.exports = function(source) {
  source('user').pipe()
  ...
}

Seems much more general.

joshrtay commented 9 years ago

Actually we should probably decouple more.

joshrtay commented 9 years ago

I just don't know what source('user') means in any context besides nsq.

ashaffer commented 9 years ago

Ya, it is unfortunate that the channel abstraction is hard to decouple, but sources that don't care about it don't need to implement it (e.g. file streams). If there is a way to decouple the channel concept i'm in favor of it, just can't think of one. File stream could just be:

require('./stores')(function() { return stream; })
ashaffer commented 9 years ago

Idea:

stores/index.js

exports.user = dispatch(require('./user'))
exports.share = dispatch(require('./share'))
// ...

nsq binding:

each(require('./stores'), function(store, channel) {
  source(channel).pipe(store);
})

file streaming:

each(require('./stores'), function(store) {
  file.pipe(store);
});
joshrtay commented 9 years ago

that seems pretty right. it just seems to me that maybe the dispatch wrap should be happening in the user store. we could keep the api essentially the same, we just export a function instead of an object.

var store = module.exports = function dispatch(evt) {...}

store.on = {...};
store.get = function(evt){...}
joshrtay commented 9 years ago

New api: wiring is perform by nsq_to_lambda and json_to_lambda utilities

ashaffer commented 9 years ago

So, for wiring purposes, a store is a single function which accepts an event, and either throws or returns generator-synchronously.

xxx_to_lambda utilities take some source of input, and translate it into a stream of events which are passed to the function.