mattkrick / cashay

:moneybag: Relay for the rest of us :moneybag:
MIT License
453 stars 28 forks source link

Support subscriptions #27

Closed mattkrick closed 8 years ago

isaac-peka commented 8 years ago

I was just about to raise an issue about exactly this 👍 Do you have a rough idea about how hard this would be to implement and whether anything else is blocking it? Lack of support for subscriptions in Relay is a big killer for me. Imo it undermines the benefit of declarative data fetching if I have to define subscriptions completely imperatively, and it also doesn't look like the devs at Relay have any plans to support it out of the box.

mattkrick commented 8 years ago

I wanna get it right, here's a few thoughts on why that's difficult:

  1. Observables. Yes or no?
  2. With subs, payload matters. For that reason, I like to subscribe to a channel name, and that channel names calls a queryString that's stored on the server: https://github.com/ParabolInc/action/blob/master/src/server/graphql/wsGraphQLHandlers.js#L30-L45. But if we do that, then we don't know the shape of the data on the client. If we keep it on server & client, we're not DRY. So I think the best compromise is to send the queryString once, and then use the channelName in the websocket frame: https://github.com/SocketCluster/socketcluster/issues/167.

Currently, I'm thinking something like this: cashay.subscribe(channelName, subscriptionString, options)

I'll tackle CmRDT after this, since it's on a whole different level.

Post your ideas here!

jordanh commented 8 years ago

There's a few fascinating threads on the Falcor repo around subscriptions and Observables. Here's one: https://github.com/Netflix/falcor/issues/91#issuecomment-98263649

Just like Transformers, there's more than meets the eye here.

  1. Observables – yes, I think so. What would the alternative be? A hand-rolled asynchronous subscription interface?
  2. On the interface: do you need to expose the channel name on a per-subscription basis? Would a default make sense when the cashay singleton is created?

When using with React, do you envision if using subscribe you'd use this method directly in mapToProps?

How would unsubscribing be handled? cashay.unsubscribe(channelName, subscriptionString, options)? Would that be called when a component is unmounting?

mattkrick commented 8 years ago

@jordanh As much as i don't like it, i think we'll have to let the user call a user-defined transport function. Reason being is that I want it to work with socketcluster, socket.io, etc. On the meatier stack, i like using the socket.subscribe method because then i can keep the a list of channels in my redux store. so maybe it'll more like const unsubscribe = cashay.subscribe(queryString, subHandler)

then the handler will be something like

const sub = `myChannelName`;
socket.subscribe(sub, {waitForAuth: true});
socket.on(sub, data => {
      this.mutateFromSubscription(data)
    });
    socket.on('unsubscribe', channelName => {
      if (channelName === sub) {
        console.log(`unsubbed from ${channelName}`);
      }
    });

Of course, that's not using Observables at all.

Thankfully, the hard part is already done. A subscription is nothing more than a query that you didn't ask for, so all the pieces are already there, it's just a matter of making a pretty API.

mattkrick commented 8 years ago

Follow up from above, @jordanh I like the idea of something like a subscriptionTransport(channelName) that the user provides to the singleton. One issue I have with that is it would require loading the socket module when the singleton is created. In a perfect world, we could lazily load that & then, for example, we could have a nice tiny landing page that still uses the redux store, but then when we go to a socketized-route, we could load the socket module async.

Probably the best solution is to allow for both the singleton and the subscribe method to take a subscriptionTransport option. And maybe call it subscriptionHandler to keep naming easy. We'll also need a components option in case a subscription requests data that will mutate a query.

jordanh commented 8 years ago

Ok, I like the direction you're going.

There was a pattern from Falcor I liked. They had specified a DataSource interface class (see here) that was passed to the Falcor model constructor. Perhaps Cashay could offer its own DataSource interface class an implementor could fill in to support different transports (e.g. http, http & websockets, or just websockets) or even transports that can be upgrade or downgraded based upon changing conditions.

Here's a snipped from Falcor-land:

var model = new falcor.Model({source: new falcor.HttpDataSource("http://netflix.com/user.json")});
var movieNames = model.get('genreLists[0...10][0...10].name').toPathValues();

You could provide a similar interface class; perhaps your CashayDataSource methods could be:

Then a developer could:

import clientSchema from './clientSchema.json';
import {Cashay, SocketDataSource } from 'cashay';

const dataSource = SocketDataSource(...);

const paramsObject = { ... };
export const cashay = new Cashay({
   ...paramsObject,
   dataSource,
});

What do you think?

jordanh commented 8 years ago

Noted on your lazy-loader desires. Perhaps rather than worrying about that case now, you could provide the ability to create a new singleton (in a future implementation) and have the Cashay constructor be able to take in an existing redux store? Then you could just change horses whenever you like...

mattkrick commented 8 years ago

So what if the dev creates a DataSource/Transport in a separate file & then imports that either into the singleton (non-lazy) or into each sub (lazy)?

Just winging it here, but something like...

export default class Transport extends CashayTransport {
  constructor(event, options = {}) {
    if (!event) {
      throw new Error('event is required!');
    }
    super();
    this.options = options;
    this.event = event;
  }

  send(query, variables, operationName) {
    const payload = {query, variables, operationName};
    const socket = socketCluster.connect(this.options);
    return new Promise(resolve => {
          socket.emit(this.event, payload, (err, res) => {
          if (err) {
            const message = err[0].message;
            const error = new Error(`GraphQL Error: ${message}`);
            error.rawError = err;
            throw error;
          }
          return resolve(res.data)
        });
  })
  }
}
jordanh commented 8 years ago

Yes! That's a good first stab.

mattkrick commented 8 years ago

@jondubois any thoughts on how you think this should look?

jondubois commented 8 years ago

@mattkrick I can't really add much to the discussion on the Redux store/modeling aspect - The APIs discussed all look good to me.

For the channel subscription aspect, I noticed in this discussion: https://github.com/SocketCluster/socketcluster/issues/167 you mentioned that it would be nice to be able to subscribe to resources using simple channel names like myCar instead of specifying the exact resource such as cars/123. I think that may not be very practical to do with channels (it can be made to work but very difficult to scale).

As a rule of thumb, channel names should be globally unique throughout the entire system. You should not have two different resources share the same generic channel name.

For example, in the case of a generic myCar channel; we would want it to refer to a different resource depending on the logged-in user; this breaks the global uniqueness requirement of channel names. We could add a PUBLISH_OUT middleware to filter messages so that users only get updates related to their own car - That solves the problem from the user's point of view but on the backend, it means that the myCar channel will be really busy (not scalable) because it will have to handle every update related to every car of every user (unless we do some really complex channel name transformations/sharding on the backend - Not worthwhile) - By contrast, if we have globally unique channel names like car/123 (car/{carId}) - This is highly scalable because each channel only handles updates for a single car. Ideally you want to have more unique channel names and in a way such that each channel handles a small number of updates.

Each unique channel in SC can handle about 10K messages per second (and probably at least 20K per second using uWS engine) which is pretty high but this is a HARD limit - So you don't want your channels' message volume to grow proportionally to the number of users in your system - Ideally you want to shard your data/messages over more unique channels as your user base grows so that each channel maintains roughly the same volume of messages as you add more users.

One possible alternative to having channels in the form car/{carId} could be to have channel names like {userId}:myCar (or {userId}@myCar) - In this case we don't need to specify an exact car Id - The server can figure that out - But by having the userId as part of the channel name, we get automatic sharding and avoid using entirely generic channel names (so it's scalable on the backend too).

Conceptually, I guess the big difference between the channel format car/{carId} and {userId}:myCar is that in the first case, the consumer (the frontend) has to be smart and know exactly what car they want to subscribe to. In the second case, the consumer can be dumber and let the server figure that out - This could be particularly handy with collections because then the server could decide which users' collections are affected when some data changes (and publish the updates to the appropriate channels).

Last time I did work like this, I ended up with the car/{carId} approach (I let the frontend/consumer be the smart one and know all the resources and IDs it needs). It worked out in the end (collections, pagination, custom sorting and all). But the downside with this design is that clients have to tell the server which collections will be affected whenever they (the clients) add/remove a specific resource to/from a collection (this logic would be better-suited on the server - E.g. part of the schema).

I think the idea of putting most of the smartness into the client/consumer exists in a big part because of the trend towards server-less (or should I say less-server) architecture: The ultimate goal seems to be to have a generic server and write as much of the custom business-logic as possible on the frontend - A big motivator for this entire serverless movement/trend was because servers are (were?) difficult to manage (developers don't want to think about scalability or recovering crashes etc - They want that logic to be generic/automatic).

With tools like Docker, Rancher and Kubernetes, managing lots of servers could become really easy again and so I think it's an open question whether putting all the smartness into the consumer/frontend still makes the most sense or if a more balanced hybrid approach is better.

If you're willing to tightly couple to a database with changefeeds like RethinkDB, it's probably easier to use changefeeds and just socket.emit('userCollectionName', data) (for example) directly to the client (instead of using channels - In this case you offload all the smartness of realtime subscription management to the database itself). But then the hard part with this approach is to create and then tear-down/cleanup the stateful changefeeds from the workers when clients connect/disconnect.

... Haha OK, that's a lot of rambling but hopefully that gives some idea of the complexity of various approaches (from my perspective).

mattkrick commented 8 years ago

@jondubois wow that's awesome! i was so fixated on the rethinkdb includeIntial which essentially creates 1 channel per socketId that i forgot entirely about using socketCluster's subscribe and publish like it should be used (i currently just emit instead of publish). So I'm thinking (at least in the socketCluster example) that if a sub is generic, the publish it. If it's individualized, use emit to avoid going through the socket broker.

At what point does it make sense to individualize a sub? For example, if I look for statsByUserId, there might be 1 person on it, or maybe 2. Would you say that the penalty of having to emit 2 individual subs would outweigh the penalty of going through publish & then the broker? In other words, how much cheaper is emit than publish?

i'm all for serverless architecture, but i haven't seen a serverless socket architecture so although i know it's possible, i'm imagining it's not common. That said, our servers get state the second we do the socket handshake & hold the socketId, so i don't mind putting more state on the server if it means a smaller payload.

So I'm thinking that if we called something like cashay.subscribe(queryName, channelName, {variables, transport}) that'd be all we need, and then the user-definedtransport function will send off all the information via http, socket.io, sucketCluster, etc. Can you think of any socket packages that don't allow for sending both a channelName and a payload? socketCluster's emit has always had it & subscribe now has it. So, that means on the server they'll either append socketId to the channelName for individual subs, or they'll use something like RethinkDB changefeeds & just use emit from there.

I still don't like sending the queryString to the server, I'd rather send a channelName so the server can cache the queryString AST instead of parsing every queryString that came in on a subscribe. Having it on the server & client isn't DRY, but it has to be on the client for cashay and it has to be on the server so folks can use it to declaratively decide what fields to send. Maybe we could copy it over to to the clientSchema.json during compile time? require a file on the server that has a LUT between channelName and queryString and then write to clientSchema with the results.

mattkrick commented 8 years ago

@jordanh @jondubois @mglukhovsky (I'm bringing you in here since I'm thinking Cashay could also play nicely with horizon in the future).

Think i got the API solved for handling subscriptions in Cashay, would love your feedback:

({data, setVariables, unsubscribe} = cashay.subscribe(subscriptionString, subscriber, options));

inputs: subscriptionString: a GraphQL query string subscriber: A user-defined function that subscribes to a channel, handles incoming data, and returns an unsubscribe function. Details below. options: an object that includes variables, component and key (just like the query method). Also a fastMode Boolean that is turned on by default and you must turn off if your subscription data doesn't include everything your subscriptionString asked for (eg DDP will only send the fields that changed, not the whole new document). It's basically a trade-off between payload size & CPU cycles.

An example subscribe callback. The user is given subscriptionString, handlers, and variables. Here is how they could use them (the example assumes RethinkDB changefeed data format (https://rethinkdb.com/docs/changefeeds/javascript/) @mglukhovsky & socketCluster as transport):

const subscriber = (subscriptionString, handlers, variables) => {
  let baseChannel;
  for (let [key, value] of channelLookupMap.entries()) {
    if (value === subscriptionString) {
      baseChannel = key;
      break;
    }
  }
  const channelName = `${baseChannel}/${variables.userId}` // eg `meeting/123`
  const socket = socketCluster.connect({authTokenName});
  const {add, update, remove, error} = handlers;
  socket.subscribe(channelName, {waitForAuth: true});
  socket.on(channelName, data => {
    if (!data.old_val) {
      add(data.new_val);
    } else if (!data.new_val) {
      remove(data.old_val.id);
    } else {
      update(data.new_val);
    }
  });
  socket.on('unsubscribe', unsubChannel => {
    if (unsubChannel === channelName) {
      console.log(`unsubbed from ${unsubChannel}`);
    }
  });
  return () => socket.unsubscribe(channelName)
};

const channelLookupMap = new Map([['meeting', 
  `subscription($meetingId: ID!) {
    subToPosts(meetingId: $meetingId) {
      id,
    }
  }`]]);

outputs: data: an array (or single object, for point subscriptions) of all denormalized documents received since the subscription began (is not cleared after unsubbed, allowing for pausing the sub). setVariables: shorthand for unsubbing from the current sub and rerunning the subscriber function with new vars (eg changing postId might change the point subscription from meeting/123 to meeting/124. unsubscribe: call this function with no params to unsubscribe from the subscription.

jordanh commented 8 years ago

@mattkrick: I really, really like the overall direction. Also @mglukhovsky I feel like I've been living under a rock; this is the first I've read of Horizon. It looks wonderful. I can definitely see how Cashay could grow into become a useful front-end cache and subscription manager.

@mattkrick I only have 👍 to say about the semantics. All the pieces look to be in place.

Only critique I can offer is that if you're growing an interface (like subscriber, above) make it an actual class interface an implementor can extend. Along the lines of my previous comment, if you find transport neutrality to be important make it a lower level interface (or set of interfaces) that's given to Cashay when the singleton is created.

If you feel strongly that, no sir, I believe per-channel subscription transports are an important use-case then allow a default transport to be overridden for a given query(), mutate(), or subscribe(). And, then just wrap it in as a key on the options object.

In summary:


import clientSchema from './clientSchema.json';
import {Cashay, CashayTransport} from 'cashay';

export class WSTransport extends CashayTransport {
  ...
}

export class HTTPTransport extends CashayTransport {
  ...
}

const paramsObject = { 
  transport: new HTTPTransport(...),
  ...
};

export const cashay = new Cashay(paramsObject);

If in a spot you only want to use websockets:

import {WSTransport} from 'myCashay';

const options = {
  transport: new WSTransport(...),
  ...
};
({data, setVariables, unsubscribe} = cashay.subscribe(subscriptionString, options));
mattkrick commented 8 years ago

@jordanh I think a Transport class for queries/mutations makes sense since you need to pass in everything you'd pass in anyways for the fetch API, but I don't understand why it'd helpful for sockets since subscriber is a callback, not a class.

For queries & mutations, it'd look like new HTTPTransport('example.com', {fetchTimeout, credentials, headers, etc})

For subscriptions, that stuff is all stored in the socket already, so there's no args that we need to pass in & no transport to instantiate, right?

jordanh commented 8 years ago

Thinking it through a bit more, you're right. This is a separate contract, a separate interface.

Only thing I can think of is if Cashay does provide a Transport interface and that interface holds a reference to an active socket connection, it'd be nice to have a method I could call on Cashay to get that socket so it can be handed to my subscriber callback...

mattkrick commented 8 years ago

a reference to an active socket connection

It's possible the socket connection isn't active yet, since they probably don't want to activate it until the use cashay.subsribe for the first time. By having a socket transport, we'd essentially do the import {socket} from 'socket-module' for them in an unlazy way, at the expense of an extra param for our API (yuck).

I think instead, the suggested practice would be to create your subscriber in a separate file, and import it when you need to use cashay.subscribe. In doing so, you only import subscriber from './foo on the containers that need it, which means it'll be lazily imported so if you've only got 1 route that needs it, it won't load & handshake until that webpack chunk loads.

1 more thing we should think about is the possibility of using server-side events instead of websockets, but i've never used those before. I think if Cashay offers an HTTPTransport we could have a handleSubs method in there that assumes SSE, and then the API would remain unchanged (passing in a transport when you instantiate the singleton). Regardless of the method, we have no idea what form those data packets are going to take (RethinkDB, DDP, etc), we just assume each frame will be a Create, Update, or Remove & give them to tools to determine which it is & send it on it's way.

jordanh commented 8 years ago

👍

mattkrick commented 8 years ago

At facebook, 1 pubsub channel can have many queries: https://youtu.be/ViXL0YQnioU?t=23m30s

jondubois commented 8 years ago

@mattkrick SSEs aren't the best abstraction to use when you have too many channels because they'd require a separate connection for each channel which is not good for performance.

You MAY be able to multiplex multiple subscriptions over a single SSE connection but that can get really complex and you'd have to have subscribe/unsubscribe calls as separate HTTP requests (which at scale, could end up hitting a completely different server from where your SSE connection is and then things can get messy...) With WebSockets, even at scale, you get a guarantee that the subscription request hits the same server from which the channel stream originates.

Regarding your second comment regarding Facebook pub/sub, it would be interesting to get more details specifically of how they did this. I guess with the data option in SC (when calling socket.subscribe(...)) we're probably approaching what they have (maybe we can change something to take it one step further).

From what I understood, it sounds like they subscribe to the exact same data stream on the backend and then before copies of the payload are sent out to clients, each copy gets transformed to "fit into" the GraphQL query provided. I guess in SC, we may be able to do this using MIDDLEWARE_PUBLISH_OUT middleware (It might already be possible actually). It's interesting to note that they are using IDs as part of the channel name so the channel name contains explicit information about the resource which they subscribe to.

Another interesting thing is that they seem to think of pub/sub channels in terms of 'actions' while I mostly think of channels in terms of resources being changed (my thinking is more in line with the changefeeds concept from RethinkDB).

In any case, it's good to know that they've also identified pub/sub as the ideal approach for this problem.

mattkrick commented 8 years ago

@jondubois

each copy gets transformed to "fit into" the GraphQL query provided.

that's how i understood it, too. My current strategy is the DB issues a changefeed to GraphQL, which then modifies it for the whole channel. They take it 1 step deeper and perform a filter inside their pub/sub middleware. I see why they do it (support for legacy versions), but running a GraphQL execution for every member in a given channel seems ridiculous.

Instead, I'd just open up a new channel. For example, Comments:v1 gives you id, comment, foo. Comments:v2 gives you id, comment, bar. The client_v2 asks for Comments:v2. Easy & performant. Maybe that's what the hash was in their channel name & they just explained it poorly? I just can't imagine facebook saving a unique AST & executing it for each subscriber for each socket message..... at least not at their scale!