vapor / redis

Vapor provider for RediStack
MIT License
458 stars 57 forks source link

add in PSUBSCRIBE #136

Closed khoogheem closed 3 years ago

khoogheem commented 5 years ago

Currently the subscribe function only deals with the "SUBSCRIBE" feature. It would be good to look at a method that allows the "PSUBSCRIBE" to be used.

maybe something like using the current subscribe but extend it to:

    public func subscribe(
        _ channels: Set<String>,
        pchannels: Set<String>?,
        subscriptionHandler: @escaping (RedisChannelData) -> Void
    ) throws -> Future<Void> {
       var messages: [RedisData] = [.array([.bulkString("SUBSCRIBE")] + channels.map {.bulkString($0)})]
        if let pchannels = pchannels {
            messages += [.array([.bulkString("PSUBSCRIBE")] + pchannels.map {.bulkString($0)})]
        }

        return queue.enqueue(messages) { [weak self] channelMessage in
            if let redisChannelData = try self?.convert(channelMessage: channelMessage) {
                subscriptionHandler(redisChannelData)
            }

            if let redisChannelData = try self?.pconvert(channelMessage: channelMessage) {
                subscriptionHandler(redisChannelData)
            }

            return false
        }
    }

    private func pconvert(channelMessage: RedisData) throws -> RedisChannelData? {
        // must contain ["message", <channel>, <redisData>]
        guard let array = channelMessage.array, array.first?.string == "pmessage" else {
            return nil
        }
        guard let channel = array[2].string else {
            throw RedisError(
                identifier: "channel.data",
                reason: "channel data did not contain identifier."
            )
        }
        return RedisChannelData(channel: channel, data: array[3])
    }
khoogheem commented 5 years ago

It probably makes more sense to have a subscribe and subscribe function.. but I think there needs to be a little more work done.. As this quick hack still makes you have to use subscribe.

Mordil commented 3 years ago

This is now available in Redis 4 as RediStack supports this