apollographql / graphql-subscriptions

:newspaper: A small module that implements GraphQL subscriptions for Node.js
MIT License
1.59k stars 133 forks source link

Help: Easy subscriptions with rethinkdb changefeeds #93

Closed rnenjoy closed 6 years ago

rnenjoy commented 7 years ago

Hello guys!

Sorry for the level on this. But I havnt found any better place to ask this, and i'm new to nodejs and graphql. I have been looking for an easy way to make use of rethinkdb changefeeds in my subscription, and just found out of this solution and was wondering what you guys think of it:

Subscription: {
    changefeedTest: {
      subscribe: () => {
        const feedUuid = uuid()
        r.table('people').filter({ maybeSomethingUniqueHereForEachUser }).changes().run().then((feed) => {
          feed.each((error, change) => {
            console.log('publishing data to user')
            pubsub.publish(feedUuid, { changefeedTest: change.new_val })
          })
        })

        return pubsub.asyncIterator(feedUuid)
      }
    }
  }

Any thoughts of this approach? With this code I skip the generic pubsub topics and create a unique one for each subscription query against the DB. This way I think i'm certain that each user will just get the data it should have since they will only get the unique topic.

Only problem I have right now, is that the changefeed keeps running after the user disconnects. So after a while testing I have X (one more each time) "publishing data to user" as soon as i open a new connection. I am thinking of saving a reference to the feed to an array, and close them in the onDisconnect hook on subscriptionserver. I would however either save them somehow on a session specific variable (i dont know how to do this in node), or save it in a big array grouped on session id.

feedsToDisconnect[uniqueSessionId].push(feed)

and then inside the onDisconnect:

feedsToDisconnect[uniqueSessionId].map((feed) => { feed.close() }) and kill of the group.

rnenjoy commented 7 years ago

Ok

Here is a working design.

Server.js

export const subscriptionServer = SubscriptionServer.create(
  {
    schema,
    execute,
    subscribe,
    onConnect: (connectionParams, webSocket) => {
      const userid = uuid.v4()
      webSocket.uuid = userid
      return { uuid: userid }
    },
    onDisconnect: (websocket) => {
      ActiveFeeds.close(websocket.uuid)
    }
  },
  {
    server: server,
    path: '/graphql',
  }
)

resolvers.js

Subscription: {
    changefeedTest: {
      subscribe: (root, {}, context, self) => {
        // Create an unique feed id
        const feedUuid = context.uuid + self.fieldName
        // Setup the changefeed query
        r.table('people').changes().run().then((feed) => {
          feed.each((error, change) => {
            // Publish to the unique feed id
            pubsub.publish(feedUuid, { changefeedTest: change.new_val })
          })

          // Add the feed cursor to the activeFeeds grouped on user uuid
          ActiveFeeds.add(context.uuid, feed)
        })

        // Return the async iterator with the unique feed id
        return pubsub.asyncIterator(feedUuid)
      }
    }
  }
smeijer commented 7 years ago

Any thoughts of this approach? With this code I skip the generic pubsub topics and create a unique one for each subscription query against the DB. This way I think i'm certain that each user will just get the data it should have since they will only get the unique topic.

I think the most important question is; does it scale?

rnenjoy commented 7 years ago

Yeah exactly. I'm not qualified to answer that myself i'm afraid :)

corysimmons commented 7 years ago

https://www.youtube.com/watch?v=b2F-DItXtZs :o

smeijer commented 7 years ago

I'm not seeing how that video is relevant.

rnenjoy commented 7 years ago

Rethinkdb is very good at scaling so the changefeed part should be ok.

corysimmons commented 7 years ago

@smeijer Sorry, anytime I see a modern DB + the words "web scale" my mind goes there. I thought you might've been joking with @rnenjoy

smeijer commented 7 years ago

I understand. But I was actually serious. Because the current quick and easy ways to implement graphql subscriptions do not scale. When we want to install a scalable solution; we need stacks that include MQTT or redis .

If switching from mongo (or sql) to rethinkdb means that subscriptions are automatically included in a scalable way; than I can imagine a lot of people are willing to drop mongo + redis in favor for rethinkdb.

It will definitely improve the developer (/devops) experience.

rnenjoy commented 7 years ago

How do we know if it scales? What needs scaling? is the bottle neck the database? Rethinkdb says this about changefeeds:

"Changefeeds perform well as they scale, although they create extra intracluster messages in proportion to the number of servers with open feed connections on each write."

rnenjoy commented 7 years ago

I made some changes to allow for one subscription to handle insert, update and delete. Which a subscription normally couldn't handle.

resolvers.js

subscribe: (root, {}, context, self) => {
        // Create an unique feed id
        const feedUuid = context.uuid + self.fieldName

        // Setup the changefeed query
        r.table('people').changes().run().then((feed) => {
          feed.each((error, change) => {
            // Publish to the unique feed id
            pubsub.publish(feedUuid, { peopeSubscription: { people: change.new_val, operationType: getOperationType(change) } } )
          })

          // Add the feed cursor to the activeFeeds grouped on user uuid
          ActiveFeeds.add(context.uuid, feed)
        })

        // Return the async iterator with the unique feed id
        return pubsub.asyncIterator(feedUuid)
      }

schema.js

type knxSwitchSubscription {
  people: people!
  operationType: String!
}

the operationType is a string with insert/update/delete. With rethinkdb the changes array is missing old_val with an insert, missing new_val with a delete but has both with an update. So thats how its knows.

agborkowski commented 6 years ago

@rnenjoy where u store ( ActiveFeeds) ??

 // Add the feed cursor to the activeFeeds grouped on user uuid
 ActiveFeeds.add(context.uuid, feed)
grantwwu commented 6 years ago

I'm not quite sure that this is really a GraphQL subscriptions question; the one thing I did see that might be relevant has to do with cleaning up resources after a disconnect. That's being discussed https://github.com/apollographql/graphql-subscriptions/issues/143