apollographql / graphql-subscriptions

:newspaper: A small module that implements GraphQL subscriptions for Node.js
MIT License
1.58k stars 133 forks source link

GraphQL MaxListenersExceededWarning when a subscription doesn't yield a value for multiple finished client processes #272

Closed zeuscronos closed 3 months ago

zeuscronos commented 3 months ago

I have a very simple dummy Node.js + GraphQL + Apollo project that you can download from here, or test on StackBlitz, or just inspect the code at the end of this post.

My problem is: When testing a subscription multiple times (2 different processes involved at a time, one for the server and another one for the tester) and that subscription doesn't yield a value, then the event listener for that subscription remains active even if the process that performed the test finishes. Then I get the error:

(node:20064) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 CHANNEL_MESSAGE listeners added to [EventEmitter]. Use emitter.setMaxListeners() to increase limit
(Use `node --trace-warnings ...` to show where the warning was created)

You can test it with:

Terminal 1: $ yarn; yarn dev
Terminal 2: $ clear; for i in {1..11}; do echo -e "\n\033[33m### Iteration: $i\033[0m"; node src/test.js; done

Then you will get the following:

enter image description here

However, if the subscription always yield a value (in the example above making: const IGNORE_MESSAGE_EVENTS = false), then we don't get that MaxListenersExceededWarning.

What I need: If the test process finishes gracefully/abruptly/unexpectedly the event listener should be removed automatically from the server so there are no unused resources around.

On the tester I even have the instructions below in the finally (try-catch-finally) but didn't work:

if (subscription) {
  subscription.unsubscribe();
}
if (client1) {
  client1.stop();
  client1 = null;
}
if (client2) {
  client2.stop();
  client2 = null;
}

however I think the server should not necessary expect that the tester-client finishes the connection properly but just remove listeners that were originated by a connection that doesn't exist anymore, on this case the one for the tester process.

file: /.env

PORT=4000

file: /package.json

{
  "name": "mirror",
  "private": true,
  "version": "0.0.0",
  "type": "module",
  "scripts": {
    "dev": "node src/index.js"
  },
  "devDependencies": {},
  "dependencies": {
    "@apollo/client": "^3.9.11",
    "@apollo/server": "^4.10.2",
    "@graphql-tools/schema": "^10.0.3",
    "ody-parser": "^1.20.2",
    "cross-fetch": "^4.0.0",
    "dotenv": "^16.4.5",
    "express": "^4.19.2",
    "graphql": "^16.8.1",
    "graphql-subscriptions": "^2.0.0",
    "graphql-tag": "^2.12.6",
    "graphql-ws": "^5.16.0",
    "react": "^18.2.0",
    "react-dom": "^18.2.0",
    "ws": "^8.16.0"
  },
  "peerDependencies": {
    "@types/zen-observable": "^0.8.7"
  }
}

file: /src/index.js

import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import { makeExecutableSchema } from '@graphql-tools/schema';
import bodyParser from 'body-parser';
import dotenv from 'dotenv';
import express from 'express';
import { PubSub } from 'graphql-subscriptions';
import { gql } from 'graphql-tag';
import { useServer } from 'graphql-ws/lib/use/ws';
import { createServer } from 'http';
import { WebSocketServer } from 'ws';

dotenv.config();

const app = express();
const httpServer = createServer(app);

const pubsub = new PubSub();

const NOTIFICATION_CHANNEL_MESSAGE = 'CHANNEL_MESSAGE';

const typeDefs = gql`
  type MessageEvent {
    channel: String!
    message: String!
  }
  type Query {
    health: String
  }
  type Mutation {
    sendMessage(channel: String!, message: String!): Boolean
  }
  type Subscription {
    messageEvent(channel: String!): MessageEvent
  }
`;

const IGNORE_MESSAGE_EVENTS = true;
const shouldIgnoreMessageEvents = new Promise((resolve) => resolve(IGNORE_MESSAGE_EVENTS));

const resolvers = {
  Query: {
    health: () => 'OK',
  },
  Mutation: {
    sendMessage: async (_, { channel, message }) => {
      await pubsub.publish(NOTIFICATION_CHANNEL_MESSAGE, {
        messageEvent: { channel, message },
      });

      return true;
    },
  },
  Subscription: {
    messageEvent: {
      subscribe: async function* (_, { channel }, context) {
        for await (const { messageEvent } of context.pubsub.asyncIterator([
          NOTIFICATION_CHANNEL_MESSAGE,
        ])) {
          if (messageEvent.channel === channel && !(await shouldIgnoreMessageEvents)) {
            yield messageEvent;
          }
        }
      },
      resolve: (messageEvent) => messageEvent,
    },
  },
};

const schema = makeExecutableSchema({ typeDefs, resolvers });

const wsServer = new WebSocketServer({
  server: httpServer,
  path: '/graphql',
});

useServer({ schema, context: { pubsub } }, wsServer);

const server = new ApolloServer({
  schema,
  introspection: true,
  plugins: [ApolloServerPluginDrainHttpServer({ httpServer })],
});

await server.start();

app.use('/graphql', bodyParser.json(), expressMiddleware(server));

const PORT = process.env.PORT;
httpServer.listen(PORT, () => {
  console.log(`Server running on port: ${PORT}`);
});

file: /src/test.js

import pkg from '@apollo/client';
import fetch from 'cross-fetch';
import dotenv from 'dotenv';
import gql from 'graphql-tag';
import { createClient } from 'graphql-ws';
import ws from 'ws';

const { ApolloClient, ApolloLink, HttpLink, InMemoryCache, Observable } = pkg;

const expect = (value) => {
  return {
    toBe: (expected) => {
      if (value !== expected) {
        throw new Error(`Received: ${value} | Expected: ${expected}`);
      }
    },
  };
};

const createPromiseSignal = () => {
  let resolveFunction;
  const promise = new Promise((resolve) => {
    resolveFunction = resolve;
  });

  return [resolveFunction, promise];
};

// prettier-ignore
class GraphQLWsLink extends ApolloLink {
  constructor(url) {
    super();
    this.client = createClient({ url, webSocketImpl: ws });
  }
  request(operation) {
    return new Observable((observer) => {
      const { query, variables } = operation;
      const dispose = this.client.subscribe(
        {
          query: query.loc?.source.body || '',
          variables,
        },
        {
          next: (data) => observer.next(data),
          error: (err) => observer.error(err.reason ? new Error(err.reason) : err),
          complete: () => observer.complete(),
        }
      );

      return () => {
        dispose();
      };
    });
  }
  onConnected(callback) {
    this.client.on('connected', callback);
  }
  async close() {
    await this.client.dispose();
  }
}

dotenv.config();

const httpUri = `http://localhost:${process.env.PORT}/graphql`;

let client1 = null;
let client2 = null;
let subscription = null;

const channelToListen = 'sports';
const channelToSend = 'sports'; // { sports -> ok, tech -> error }
const message = 'Hello World';

try {
  const wsLink = new GraphQLWsLink(httpUri.replace(/^http:\/\//, 'ws://'));

  client1 = new ApolloClient({
    link: wsLink,
    cache: new InMemoryCache(),
  });

  // prettier-ignore
  const [wsLinkConnectionResolve, wsLinkConnectionPromise] = createPromiseSignal();

  wsLink.onConnected(wsLinkConnectionResolve);

  const observer = client1.subscribe({
    query: gql`
      subscription messageSubscription($channel: String!) {
        messageEvent(channel: $channel) {
          channel
          message
        }
      }
    `,
    variables: {
      channel: channelToListen,
    },
  });

  subscription = observer.subscribe({
    next(response) {
      console.log(response.data.messageEvent);
    },
  });

  await wsLinkConnectionPromise;

  client2 = new ApolloClient({
    link: new HttpLink({
      uri: httpUri,
      fetch,
    }),
    cache: new InMemoryCache(),
  });

  // prettier-ignore
  const { data: { sendMessage } } = await client2.mutate({
    mutation: gql`
      mutation sendMessage($channel: String!, $message: String!) {
        sendMessage(channel: $channel, message: $message)
      }
    `,
    variables: {
      channel: channelToSend,
      message,
    },
  });

  expect(sendMessage).toBe(true);

  console.log('All tests passed!');

} finally {
  if (subscription) {
    subscription.unsubscribe();
  }
  if (client1) {
    client1.stop();
    client1 = null;
  }
  if (client2) {
    client2.stop();
    client2 = null;
  }
}
zeuscronos commented 3 months ago

Using: withFilter(...) did the trick.