MaterializeInc / connection-examples

Materialize connection examples
https://materialize.com/docs
Apache License 2.0
6 stars 3 forks source link

Feature request: Add NodeJS server example that uses Websockets/SSE for SUBSCRIBE #10

Open chuck-alt-delete opened 1 year ago

chuck-alt-delete commented 1 year ago

We have a nice internal example here:

It would be nice if we could simplify this down to a minimal but useful websocket app .

We may want to wait until the websocket API endpoint is considered stable.

canadaduane commented 1 year ago

I'd love to know if there is more info available on this. However, just thinking about it... if the websocket API endpoint is used as an example, I don't see how that would be a practical starting-off point for a react app? The websocket API seems to require "full access" authorization, i.e. access to the database that a web app should never have. I'd like to see a little bit more practical usage, perhaps with an intermediating auth layer?

chuck-alt-delete commented 1 year ago

Updating this issue since another customer came across this and wants to read updates into a frontend over websockets. A good place for a websockets API example would probably be the NodeJS backend example

This would allow the backend to handle auth, which is a good point by @canadaduane.

To disambiguate, there are two ways to receive updates from Materialize:

  1. The websocket api (wss://\<host>/api/experimental/sql)
  2. Using SUBSCRIBE over pgwire protocol (eg any postgres client library) as an infinite cursor

Either way the NodeJS webserver gets the data, it can expose these updates over websockets or SSE.

chuck-alt-delete commented 1 year ago

An alternative to websockets would be Server-Sent Events (SSE). SSE is a one-way push protocol where the server pushes events to the client over a long-lived HTTP connection. It is a little more straightforward and lightweight than websockets. Since SUBSCRIBE doesn't require 2-way communication, SSE is a perfectly good way to make updates available over HTTP.

Here is a minimal express.js app that achieves this:

require('dotenv').config();
const express = require('express');
const app = express();
const { Pool } = require('pg');

// Configure the Postgres client
const pool = new Pool({
  user: process.env.MZ_USER,
  host: process.env.MZ_HOST,
  database: process.env.MZ_DB,
  password: process.env.MZ_PASSWORD,
  port: process.env.MZ_PORT,
  ssl: true
});

app.get('/data', (req, res) => {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders();

    let client;
    let loopActive = true;

    const handleError = (err) => {
        console.error(err);
        res.end();
        loopActive = false;
    };

    req.on('close', () => {
        res.end();
        loopActive = false;
        console.log('client closed');
    });

    (async () => {
        try {
            client = await pool.connect();
            await client.query('BEGIN');
            await client.query('DECLARE c CURSOR FOR SUBSCRIBE t ENVELOPE UPSERT (KEY(id))');

            while (loopActive) {
                const data = await client.query('FETCH ALL c');
                data.rows.forEach(function(row) {
                    // map row fields
                    row = {
                        mz_timestamp: Number(row.mz_timestamp),
                        mz_state: row.mz_state,
                        id: row.id,
                        content: row.content
                    }
                    // publish server-sent events
                    res.write(`data: ${JSON.stringify(row)}\n\n`);
                });
            }
        } catch (err) {
            handleError(err);
        } finally {
            if (client) {
                console.log('closing pg client');
                client.release();
            }
        }
    })();
});

app.listen(3000, function () {
  console.log('Example app listening on port 3000!');
});

After pointing a browser at http://localhost:3000/data, updates from the view t arrive immediately:

data: {"mz_timestamp":1687974778434,"mz_state":"upsert","id":2,"content":"hi"}

data: {"mz_timestamp":1687974778434,"mz_state":"upsert","id":3,"content":"ho"}

data: {"mz_timestamp":1687974780411,"mz_state":"upsert","id":1,"content":"bloop"}

data: {"mz_timestamp":1687974805446,"mz_state":"delete","id":1,"content":null}