redis / node-redis

Redis Node.js client
https://redis.js.org/
MIT License
16.91k stars 1.89k forks source link

Not able to subscribe after publishing message either from cmd or from .publish() #2327

Open omkarrepal opened 1 year ago

omkarrepal commented 1 year ago

I'm trying to implement pub/sub mechanism but I'm getting error whenever I publish message either from command line or through node package.

My client creation code looks like

async setRedisClient(): Promise<void> {
    let redisOpts: RedisClientOptions = this.cloudConfig.getRedisCredentials();
    this.pubClient = createClient(redisOpts);
    await this.pubClient.connect();
    this.subClient = this.pubClient.duplicate();
    await this.subClient.connect();
  }
getPubClient() {
    return this.pubClient;
  }

  getSubClient() {
    return this.subClient;
  }

my redis config

return {
      socket: {
        host: "localhost"
        port: 6379,
        tls: false
      },
      legacyMode: true,
      password: "1234"
    }

And my subscription code looks like:

async listenRemoteSockets() {
    logger.info("Subscriber to remote socket updates");
    try {
      // doing this to capture message after 10 seconds.
      setTimeout(async () => {
        await this.redisManager.getPubClient().publish('monitor', 'test from app 22');
      }, 10000);
      await this.redisManager.getSubClient().subscribe('monitor', (err, count) => {
        if (err) {
          logger.error(`>>>>>>>>>>>>> Error occured ${err}`);
        }
        logger.info(`>>>>>>>>>>>>>> Subscribed to ${count}`)
      });

      this.redisManager.getSubClient().on('message', (channel, message) => {
        logger.info(`>>>>>>>>>>>>> Got message from ${channel} => ${message}`)
      })

    } catch (error) {
      logger.error(`Error occured >>>>>>>>>>>>>>> ${error}`)
    }
  }

Following error is thrown once I publish from either command or through .publish()

Error: Got an unexpected reply from Redis
    at Object.onReply (app.js:2:2830344)
    at exports.default.write (app.js:2:2823283)
    at RedisCommandsQueue.onReplyChunk (app.js:2:2835206)
    at RedisSocket.<anonymous> (app.js:2:2870390)
    at RedisSocket.emit (node:events:527:28)
    at Socket.<anonymous> (app.js:2:2889095)
    at Socket.emit (node:events:527:28)
    at addChunk (node:internal/streams/readable:324:12)
    at readableAddChunk (node:internal/streams/readable:297:9)
    at Readable.push (node:internal/streams/readable:234:10)

Environment:

leibale commented 1 year ago

@omkarrepal

import { createClient } from '@redis/client';

const pub = createClient(),
  sub = pub.duplicate();

await Promise.all([
  pub.connect(),
  sub.connect()
]);

await sub.subscribe('monitor', (channel, message) => {
  console.log(channel, message);
});

setInterval(async () => {
  await pub.publish('monitor', Math.random().toString());
}, 2000);

This code did not reproduce the error...

omkarrepal commented 1 year ago

Hi @leibale ,

Thanks for quick response. :)

I tried above code and did throw same error so I modified a code a bit.

I tried following in client creation code.

Now my code looks like

import { createClient } from "@redis/client";
import { RedisClientOptions } from "redis";
import { CloudConfig } from "../cloud-config";

export class RedisManager {
  private pubClient: ReturnType<typeof createClient>;
  private subClient: ReturnType<typeof createClient>;
  constructor(private cloudConfig: CloudConfig) {
  }

  async setRedisClient(): Promise<void> {
    let redisOpts: RedisClientOptions = this.cloudConfig.getRedisCredentials();
    this.pubClient = createClient(redisOpts);
    this.subClient = this.pubClient.duplicate();
    await Promise.all([
      this.pubClient.connect(),
      this.subClient.connect()
    ]);
    await this.subClient.subscribe('monitor', (channel, message) => {
      console.log(channel, message);
    });
  }

  getPubClient() {
    return this.pubClient;
  }

  getSubClient() {
    return this.subClient;
  }
}

it prints following from console.log

null [ 'subscribe', 'monitor', 1 ]

Then I ran following command from redis-cli 127.0.0.1:6379> PUBLISH monitor test

and it again thrown following error.

Error: Got an unexpected reply from Redis
    at Object.onReply (/Users///terminal-ui/dist/app.js:2:2805176)
    at exports.default.write (/Users///terminal-ui/dist/app.js:2:2798115)
    at RedisCommandsQueue.onReplyChunk (/Users///terminal-ui/dist/app.js:2:2810038)
    at RedisSocket.<anonymous> (/Users///terminal-ui/dist/app.js:2:2845464)
    at RedisSocket.emit (node:events:527:28)
    at Socket.<anonymous> (/Users///terminal-ui/dist/app.js:2:2864893)
    at Socket.emit (node:events:527:28)
    at addChunk (node:internal/streams/readable:324:12)
    at readableAddChunk (node:internal/streams/readable:297:9)
    at Readable.push (node:internal/streams/readable:234:10)

Created fresh application with only this code

const redis = require('redis');

const client = redis.createClient({
    socket: {
        host: 'localhost',
        port: 6379,
        tls: false
    },
    legacyMode: true,
    password: '1234'
});

const subClient = client.duplicate();

client.connect().then(() => {
     subClient.connect().then(() => {
        console.log("both clients connected");
        subClient.subscribe('monitor', (message, channel) => {
            console.log(message, channel);
        });
     });
});

Got following output >>>>>>>>>>>>>>>>>

both client connected
null [ 'subscribe', 'monitor', 1 ]
/Users/infyuser/wabtec/node-redis-trial/node_modules/@redis/client/dist/lib/client/commands-queue.js:57
                    throw new Error('Got an unexpected reply from Redis');
                    ^

Error: Got an unexpected reply from Redis
    at Object.onReply (/Users/infyuser/wabtec/node-redis-trial/node_modules/@redis/client/dist/lib/client/commands-queue.js:57:27)
    at RESP2Decoder.write (/Users/infyuser/wabtec/node-redis-trial/node_modules/@redis/client/dist/lib/client/RESP2/decoder.js:119:26)
    at RedisCommandsQueue.onReplyChunk (/Users/infyuser/wabtec/node-redis-trial/node_modules/@redis/client/dist/lib/client/commands-queue.js:203:72)
    at RedisSocket.<anonymous> (/Users/infyuser/wabtec/node-redis-trial/node_modules/@redis/client/dist/lib/client/index.js:352:84)
    at RedisSocket.emit (node:events:527:28)
    at Socket.<anonymous> (/Users/infyuser/wabtec/node-redis-trial/node_modules/@redis/client/dist/lib/client/socket.js:192:42)
    at Socket.emit (node:events:527:28)
    at addChunk (node:internal/streams/readable:324:12)
    at readableAddChunk (node:internal/streams/readable:297:9)
    at Readable.push (node:internal/streams/readable:234:10)

Node.js v18.3.0

PS: When I try subscribing monitor channel from redis-cli I do receive updates there when published from redis-node Am I missing something. I've upgraded to redis@latest (4.5.0)

leibale commented 1 year ago

Looks like PubSub is not working well with legacyMode on, I'll debug it later today

omkarrepal commented 1 year ago

@leibale So the workaround would be turning off a legacy mode.

leibale commented 1 year ago

@omkarrepal yea turn off legacy mode in the subscriber client

edit: or use .v4.subscribe on the legacy client

ABHIJITBURMAN commented 1 year ago

I face the same issue, while legacy mode true , i am not able to subscribe any channel in redis, using redis(npm) 4.6.6 Please let me know, if there is any update regarding this.

leibale commented 1 year ago

@ABHIJITBURMAN just turn off legacy mode on the subscriber client