tpetry / laravel-postgresql-enhanced

Support for many missing PostgreSQL specific features
MIT License
773 stars 31 forks source link

Interest in a postgres broadcast driver? #65

Closed unansweredocd closed 1 year ago

unansweredocd commented 1 year ago

Hey,

For a while now I've been thinking about using NOTIFY / LISTEN to broadcast events in Laravel, Redis is awesome and all but I'd much rather rely on postgres for as long as I can.

Laravel comes with database support for cache, queue and sessions, yet the broadcasting part of it is understandably not supported when it comes to databases.

Just now I've gotten a basic version of broadcasting to work, using NOTIFY and LISTEN. It doesn't yet respect the 8000 bytes limit but that could be added.

Would having a broadcast postgres driver be something that's in the scope of this project? If so, I would love to share the code.

tpetry commented 1 year ago

Do you have a working implementation? I always believed this won't work with PHP because of race conditions when the code is currently not executing LISTEN. And the PDO driver can't watch for results in the background? Or does it do this?

unansweredocd commented 1 year ago

Yeah it works though I wouldn't know of any race conditions.

I just have a broadcast driver that calls pg_notify and after that I use socket.io on the node side of things to listen to events on those channels. Is that something that matches your ideas or no?

unansweredocd commented 1 year ago

Anyway, here's my code. My understanding of LISTEN and NOTIFY is very basic, I know of the size limit I mentioned earlier, but that's about it. The php side of this is based on what I found here https://github.com/adaojunior/laravel-postgres-broadcast-driver, I just added more recent code, some of the methods are based on the Redis broadcaster implementation for example.

<?php

namespace App\Providers;

use Illuminate\Broadcasting\Broadcasters\Broadcaster as BroadcastersBroadcaster;
use Illuminate\Broadcasting\Broadcasters\UsePusherChannelConventions;
use Illuminate\Database\PostgresConnection;
use Illuminate\Support\Arr;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;

class PostgresBroadcaster extends BroadcastersBroadcaster
{
    use UsePusherChannelConventions;

    /**
     * @var \Illuminate\Database\PostgresConnection
     */
    private $connection;

    public function __construct(PostgresConnection $connection)
    {
        $this->connection = $connection;
    }

    /**
     * Authenticate the incoming request for a given channel.
     *
     * @param  \Illuminate\Http\Request  $request
     * @return mixed
     *
     * @throws \Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException
     */
    public function auth($request)
    {
        $channelName = $this->normalizeChannelName($request->channel_name);

        if (
            empty($request->channel_name) ||
            ($this->isGuardedChannel($request->channel_name) &&
                ! $this->retrieveUser($request, $channelName))
        ) {
            throw new AccessDeniedHttpException;
        }

        return parent::verifyUserCanAccessChannel(
            $request,
            $channelName
        );
    }

    /**
     * Return the valid authentication response.
     *
     * @param  \Illuminate\Http\Request  $request
     * @param  mixed  $result
     * @return mixed
     */
    public function validAuthenticationResponse($request, $result)
    {
        if (is_bool($result)) {
            return json_encode($result);
        }

        $channelName = $this->normalizeChannelName($request->channel_name);

        $user = $this->retrieveUser($request, $channelName);

        $broadcastIdentifier = method_exists($user, 'getAuthIdentifierForBroadcasting')
                        ? $user->getAuthIdentifierForBroadcasting()
                        : $user->getAuthIdentifier();

        return json_encode(['channel_data' => [
            'user_id' => $broadcastIdentifier,
            'user_info' => $result,
        ]]);
    }

    /**
     * {@inheritdoc}
     */
    public function broadcast(array $channels, $event, array $payload = [])
    {
        if (empty($channels)) {
            return;
        }

        $connection = $this->connection;

        // Building payload like this matches the payload that's
        // used for the 'redis' broadcast driver.
        $payload = json_encode([
            'event' => $event,
            'data' => $payload,
            'socket' => Arr::pull($payload, 'socket'),
        ]);

        foreach ($channels as $channel) {
            $connection->select(
                'SELECT pg_notify(:channel, :payload)',
                [
                    'channel' => $channel,
                    'payload' => $payload
                ]
            );
        }
    }
}

Socket io side of things.

require("dotenv").config();
const fetch = require("node-fetch");

const io = require('socket.io')();

/**
 * @param {String} token a base64_encoded token for the user
 * @param {String} room the name of the room which the user tries to join
 * @returns {Promise<boolean>}
 */
const authenticate = async (token, socketId, channel) => {
    try {
        const response = await fetch(
            `${process.env.APP_URL}/broadcasting/auth`,
            {
                method: 'POST',
                headers: {
                    Accept: 'application/json',
                    'Content-Type': 'application/json',
                    Authorization: `Bearer ${token}`,
                },
                body: JSON.stringify({ channel_name: channel, socket_id: socketId }),
            }
        );

        return response.ok ? [true, {}] : [false, await response.json()];
    } catch (error) {
        return [false, await error];
    }
};

const authenticateChannels = async (token, socketId, channels) => {
    return new Promise(async resolve => {
        let authorized = true;
        let response = {};

        await channels.forEach(async (channel) => {
            [authorized, response] = await Promise.resolve(authenticate(token, socketId, channel));
            if(!authorized) {
                resolve([authorized, response]);
            }
        });

        resolve([authorized, response]);
    });
}

const resolveChannels = room => {
    return room.split(',');
}

io.use(async (socket, next) => {
    const {
        id,
        handshake: {
            query: { room, token },
        },
    } = socket;

    const [authorized, response] = await authenticateChannels(token, id, resolveChannels(room));

    return authorized ? next() : next(response);
});

const pg = require("pg");

const pgsql = new pg.Client({
    user: process.env.DB_USERNAME,
  host: process.env.DB_HOST,
  database: process.env.DB_DATABASE,
  password: process.env.DB_PASSWORD,
  port: process.env.DB_PORT || 5432,
})

pgsql.connect();

io.on('connection', (socket) => {

    const {
        handshake: {
            query: { room, token },
        },
    } = socket;

    if (!token) {
        return;
    }

    // room is a comma separated string of channels, thus by splitting
    // the string by comma we get a list of channels the event should
    // be passed to.
    resolveChannels(room).forEach(channel => {
        socket.join(channel);
        pgsql.query(`LISTEN "${channel}"`);
    });
    socket.join("all");
    pgsql.query(`LISTEN "all"`);

    const handler = function (message) {
        const { event, data } = JSON.parse(message.payload);

        if (event === 'terminate') {
            io.close();
            process.exit(0);
        }

        socket.in(message.channel).emit(event, data);
    }

  pgsql.on('notification', handler);

  socket.on('disconnect', () => {
    // Removes the specified listener from the listener array for the event named eventName.
    pgsql.off('notification', handler);
  });
});

io.listen(3000);
tpetry commented 1 year ago

Ah, sure. When you only broadcast from PHP then it is no problem. But the PHP part couldn't listen.

For now, I've got no intention to integrate it as I would also need to maintain a listen server. Feel free to make packages on your own for it 😉

With more interest for this concept I may think about integrating it.