apollographql / graphql-subscriptions

:newspaper: A small module that implements GraphQL subscriptions for Node.js
MIT License
1.59k stars 133 forks source link

AsyncIterator with backlog + pubsub decorator #114

Closed ilijaNL closed 2 years ago

ilijaNL commented 7 years ago

Support for a backlog mechanism in a asynciterator. Can be used to create backlog of publish events. This is especially useful for chat applications such as whatsapp and any other application that are dealing with bad connection.

Usage: First implement the PersistenceClient interface for your data storage, my implementation of mongodb:

{
    fetchNext: (params) => new Promise((resolve) => {

      const {
        collection,
        fromSeq,
        batchSize,
        queryFilter,
      } = params;

      const resolveResult = (err, result) => {
        resolve(result.map(item => ({
          seq: item.timestamp,
          payload: item.payload,
        })));
      };

      // console.log('fetchNext', collection, fromSeq, batchSize);

      const query = {};

      if (fromSeq) {
        query.timestamp = { $gt: fromSeq };
      }

      const finalQuery = Object.assign(query, queryFilter);

      DB.collection(collection).find(finalQuery)
      .sort({ timestamp: 1 })
      .limit(batchSize)
      .toArray(resolveResult);
    }),
    save: (collection, item) => {
      DB.collection(collection).insert({
        timestamp: (new Date()).toISOString(),
        payload: item,
      });
    },
  }

Secondly, decorate your pubsub system

const pubsubOnline = new MQTTPubSub({
  client,
});

const pubsub = createPersistentPubSub(
  persistenceClient,
  pubsubOnline,
);

Publish events with the decorator:

pubsub.publishWithPersistence(MESSAGE_MUTATION,
            {
              [MESSAGE_MUTATION]: {
                type: CREATED,
                message: newMessage,
              },
              userIds: channel.participants,
            }, 'persistence');

Return the asyncIteratorWithPersistence from the decorator in subscribe:

subscribe: withFilter(
        () => pubsub.asyncIteratorWithPersistence(MESSAGE_MUTATION, {
          batchSize: 100,
          collection: 'persistence',
          lastSequence: undefined, // this is the cursor, undefined starts from first element
        }),
          (payload, args, { permission }) =>
            Boolean(payload.userIds && payload.userIds.indexOf(permission.userId) >= 0),
      ),

You can pass the lastSequence (cursor) parameter in the socketConnection or as args in the subscription and don't forget to update on every new call.

apollo-cla commented 7 years ago

@Rusfighter: Thank you for submitting a pull request! Before we can merge it, you'll need to sign the Meteor Contributor Agreement here: https://contribute.meteor.com/

hwillson commented 2 years ago

Thanks for this @ilijaNL and sorry we haven't responded sooner (I wish I could say 4 years is a new record 😳). This is an interesting idea but it's a pretty significant change to this library, and doesn't really add functionality the needs to be explicitly handled by this library (this can be done outside of it). For features of this nature it's a great idea to open an issue first to discuss the idea and approach with the community, and gain support for the feature you would like to add, before diving into the implementation. I'll close this for now, but thanks again!