apollographql / graphql-subscriptions

:newspaper: A small module that implements GraphQL subscriptions for Node.js
MIT License
1.58k stars 133 forks source link

Internal error occurred during message handling. Please check your implementation. Error: Subscription field must return Async Iterable. Received: undefined. #273

Open Phors-Stephen opened 1 month ago

Phors-Stephen commented 1 month ago

+app.js const express = require("express") const { createHandler } = require("graphql-http/lib/use/express") const { ruruHTML } = require("ruru/server") const dotenv = require("dotenv"); const schema=require('./schema/index') const cors = require('cors'); const { WebSocketServer } = require('ws'); const {createServer}=require('http') const { execute, subscribe } = require('graphql'); const { useServer } = require('graphql-ws/lib/use/ws');

const app = express(); // This app is the returned value from express(). const httpServer = createServer(app);

app.use(express.json()); app.use(cors());

dotenv.config({ path: "./config.env" });

// Serve the GraphiQL IDE. app.get("/", (_req, res) => { res.type("html") res.end(ruruHTML({ endpoint: "/graphql" })) }) // Create and use the GraphQL handler. app.all( "/graphql", createHandler({ schema: schema,

})

)

// Create WebSocket server for subscriptions const wsServer = new WebSocketServer({ server: httpServer, // Correctly use the 'server' option path: '/subscriptions', });

useServer({ schema, execute, subscribe }, wsServer);

module.exports = { app, httpServer

};

+resolver // dyanmicResolver.js const pool = require('../database/config') const { publishUserAdded } = require('../subscription/subscriptionHandler'); // Adjust path as per your file structure

const dyanmicResolver = { Query: { users: async () => { const client = await pool.connect(); try { const res = await client.query('SELECT FROM users'); return res.rows; }catch(err){ console.log("faile to fetch users",err) } finally { client.release(); } }, authors: async () => { const client = await pool.connect(); try { const res = await client.query('SELECT FROM author'); return res.rows; } catch(err){ console.log("failed to fetch authors",err) } finally { client.release(); } }, books: async () => { const client = await pool.connect(); try { const res = await client.query('SELECT * FROM book'); return res.rows; } catch(err){ console.log("failed to fetch book",err) } finally { client.release(); } },

}, Mutation: { insertUser: async (_, { username, email }) => { const client = await pool.connect(); try { const query = 'INSERT INTO users (username, email) VALUES ($1, $2) RETURNING *'; const values = [ username, email]; const res = await client.query(query, values); const newUser = res.rows[0];

   // Publish subscription event
   await publishUserAdded(newUser);
   console.log("user add",USER_ADDED)
    return newUser;
  }catch(err){
    console.log("failed to insert users",err)
  }
   finally {
    client.release();
  }
},

updateUser: async (_, { id, username, email }) => {
  const client = await pool.connect();
  try {
    const query = 'UPDATE users SET username = $2, email = $3 WHERE id = $1 RETURNING *';
    const values = [id, username, email];
    const res = await client.query(query, values);
    return res.rows[0];
  }catch(err){
    console.log("failed to update users",err)
  }
   finally {
    client.release();
  }
},

deleteUser: async (_, { id }) => {
  const client = await pool.connect();
  try {
    const query = 'DELETE FROM users WHERE id = $1 RETURNING *';
    const values = [id];
    const res = await client.query(query, values);
    return res.rows[0];
  } catch(err){
    console.log("failed to delete user",err)
  }
  finally {
    client.release();
  }
},

},

};

module.exports = dyanmicResolver;

dotenv.config({ path: "../config.env" });

const pubsub = new PostgresPubSub({ connectionString: process.env.DATABASE_URL, // Make sure this environment variable is set correctly });

const USER_ADDED_TOPIC = 'userAdded';

// Subscription resolver const Subscription = { userAdded: { subscribe: () => pubsub.asyncIterator(USER_ADDED_TOPIC),

}, };

// Helper function to publish events async function publishUserAdded(newUser) { try { await pubsub.publish(USER_ADDED_TOPIC, { userAdded: newUser }); } catch (err) { console.error('Error publishing userAdded event:', err); } }

module.exports = { Subscription, publishUserAdded, };

+client vue3 calling graphql

const USER_ADDED_SUBSCRIPTION = gql`
  subscription {
    userAdded {
      id
      username
      email
    }
  }
`;

const subscribeToNewUsers = () => {
  try {
    apolloClient.subscribe({
      query: USER_ADDED_SUBSCRIPTION,
    }).subscribe({
      next({ data }) {
        users.value = [...users.value, data.userAdded];
      },
      error(err) {
        console.error('Subscription error', err);
        error.value = err; // Update error message
      }
    });
  } catch (err) {
    console.log("failed to subscribe", err);
    error.value = err
  }

};

onMounted(async () => {
  try {
    const { data } = await apolloClient.query({
      query: GET_USERS,
    });
    users.value = data.users;
    loading.value = false;

    subscribeToNewUsers(); // Start subscription after initial data load
  } catch (e) {
    console.error("Query error", e);
    error.value = e.message; // Handle query error
    loading.value = false;
  }
});

when i run test in client vue3 cli it say Error: Socket closed with event 4500 Subscription field must return Async Iterable. Received: undefined.

### Tasks