MaterializeInc / demos

Demos of Materialize, the operational data warehouse.
https://materialize.com
Apache License 2.0
50 stars 8 forks source link

Connection examples -- NodeJS subscribes #91

Open chuck-alt-delete opened 11 months ago

chuck-alt-delete commented 11 months ago

Hey team,

I have two suggestions for the NodeJS subscribe example:

  1. Our current example could be made much simpler with the use of the new SUBSCRIBE ... ENVELOPE UPSERT ...
  2. Our current example doesn't show how to gracefully shut down subscribes

I included a minimal express js app where multiple users can subscribe to data by hitting http://localhost:3000/data. The subscribes are closed when the client closes, and the whole thing shuts down gracefully when the server is closed.

Some ideas for further refinement of this express app:

require('dotenv').config();
const express = require('express');
const app = express();
const { Pool } = require('pg');

// Configure the Postgres client
const pool = new Pool({
  user: process.env.MZ_USER,
  host: process.env.MZ_HOST,
  database: process.env.MZ_DB,
  password: process.env.MZ_PASSWORD,
  port: process.env.MZ_PORT,
  ssl: true
});

// global set of active subscription loops
let activeLoops = new Set();

app.get('/data', (req, res) => {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders();

    let client;

    // on request, activate a subscription loop and add it to the set of active loops
    let loopControl = { active: true };
    activeLoops.add(loopControl);

    (async () => {
        try {
            client = await pool.connect();
            await client.query('BEGIN');
            await client.query('DECLARE c CURSOR FOR SUBSCRIBE t WITH (PROGRESS) ENVELOPE UPSERT (KEY(id))');

            // Fetch rows while the loop is switched on.
            // We need FETCH ALL for minimal latency, but that means this loop will not exit until the next datum comes through.
            // Hence why we needed WITH (PROGRESS). This ensures data will be received and the loop with be able to exit.
            while (loopControl.active) {
                const data = await client.query("FETCH ALL c");
                data.rows.forEach(function(row) {
                    // filter out progress messages
                    if (!row.mz_progressed) {
                        // map row fields
                        row = {
                            mz_progressed: row.mz_progressed,
                            mz_timestamp: Number(row.mz_timestamp),
                            mz_state: row.mz_state,
                            id: row.id,
                            content: row.content
                        }
                        // publish server-sent events
                        res.write(`data: ${JSON.stringify(row)}\n\n`);
                    }

                });
            }
        } catch (err) {
            handleError(err);
        } finally {
            if (client) {
                res.end()
                console.log('closing pg client');
                await client.query('COMMIT')
                console.log('committed transaction');
                await client.release();
            }
        }
    })();

    const handleError = (err) => {
        console.error(err);
        res.end();
        loopControl.active = false;
        activeLoops.delete(loopControl);
    };

    req.on('close', () => {
        res.end();
        loopControl.active = false;
        activeLoops.delete(loopControl);
        console.log('client closed');
    });
});

server = app.listen(3000, function () {
  console.log('Example app listening on port 3000!');
});

async function gracefulShutdown() {
    console.log('Initiating graceful shutdown');

    // Stop all active subscriptions
    activeLoops.forEach(loop => {
        loop.active = false;
    });

    try {
        // Using a promise to handle server.close since it doesn't natively return one
        await new Promise((resolve) => {
            server.close(resolve);
        });
        console.log('Express server closed.');

        // End the database pool
        await pool.end();
        console.log('Database pool closed.');

        process.exit(0);
    } catch (err) {
        console.error('Error during graceful shutdown:', err);
        process.exit(1);
    }
}

// Listen for specific signals to initiate graceful shutdown
process.on('SIGINT', gracefulShutdown);
process.on('SIGTERM', gracefulShutdown);