smrchy / rsmq

Redis Simple Message Queue
MIT License
1.76k stars 125 forks source link

Queue not Found error cause ns:QUEUE is removed #95

Closed Budi-9 closed 4 years ago

Budi-9 commented 5 years ago
{ queueNotFound: Queue not found
    at RedisSMQ._handleError (/home/user/RedisMQ/node_modules/rsmq/index.js:422:24)
    at Multi.redis.multi.exec [as callback] (/home/user/RedisMQ/node_modules/rsmq/index.js:37:26)
    at multi_callback (/home/user/RedisMQ/node_modules/redis/lib/multi.js:89:14)
    at Command.callback (/home/user/RedisMQ/node_modules/redis/lib/multi.js:116:9)
    at normal_reply (/home/user/RedisMQ/node_modules/redis/index.js:726:21)
    at RedisClient.return_reply (/home/user/RedisMQ/node_modules/redis/index.js:824:9)
    at JavascriptRedisParser.returnReply (/home/user/RedisMQ/node_modules/redis/index.js:192:18)
    at JavascriptRedisParser.execute (/home/user/RedisMQ/node_modules/redis-parser/lib/parser.js:574:12)
    at Socket.<anonymous> (/home/user/RedisMQ/node_modules/redis/index.js:274:27)
    at emitOne (events.js:116:13) name: 'queueNotFound', message: 'Queue not found' }
Budi-9 commented 5 years ago

when i use deleteQueue function and all queue has been deleted

ns:QUEUE in redis is not found or deleted, that maybe the problem

erdii commented 5 years ago

I'm sorry but I don't really understand your problem.. :/ Can you provide a commented code example with steps to reproduce your problem please?

Budi-9 commented 5 years ago
type ConfigExt = {
  qname: string,
  statusRetry?: number,
  timeout?: number,
  interval?: number,
  stopAfter?: number,
  debug?: boolean,
}
export type MQOptions = RedisMQ.ConstructorOptions & ConfigExt

export default class A{
  public qname: string
  public intervalCheck
  public timeoutStop

  private queueExist = false
  private options: MQOptions
  private client: RedisMQ
  private baseLog: string

  constructor(options: MQOptions) {
    if (!options.statusRetry) {
      options.statusRetry = 300
    }

    if (options.timeout === undefined) {
      options.timeout = 15000
    }

    if (options.interval === undefined) {
      options.interval = 500
    }

    if (options.stopAfter === undefined) {
      options.stopAfter = 5000
    }

    if (process.env.MQSERVICE_SPACES) {
      options.ns = process.env.QUEUE_SPACES
    }

    if (process.env.MQSERVICE_HOST) {
      options.host = process.env.MQSERVICE_HOST
    }

    if (process.env.MQSERVICE_PORT && Number(process.env.MQSERVICE_PORT)) {
      options.port = Number(process.env.MQSERVICE_PORT)
    }

    if (process.env.MQSERVICE_PASSWORD) {
      options.options.password = process.env.MQSERVICE_PASSWORD
    }

    this.baseLog = `MQService: ${options.ns || 'rsmq'}:${options.qname}`

    this.options = options
    this.qname = options.qname
    this.client = new RedisMQ(this.options)
  }

  public async initQueue() {
    try {
      const req = await this.client.createQueueAsync({
        qname: this.options.qname
      })

      this.queueExist = true

      return req
    } catch (e) {
      if (e.message === 'Queue exists') {
        this.queueExist = true
      }
      console.log(this.baseLog, e.message || e)
    }
  }

  public async start() {

    if (!this.queueExist) {
      try {
        await this.initQueue()
      } catch (error) {
        console.log(this.baseLog, error.message)
      }
    }

    if (this.intervalCheck) {
      return
    }

    this.intervalCheck = setInterval(async () => {
      try {
        const receive = await this.client.receiveMessageAsync({ qname: this.options.qname }) as RedisMQ.QueueMessage

        if (receive.id) {

          // clear timeout
          if (this.timeoutStop) {
            clearTimeout(this.timeoutStop)
            this.timeoutStop = null
          }

          if (this.options.debug) {
            console.log(this.baseLog, 'Receive: %j', receive)
          } else {
            console.log(this.baseLog, 'Receive:', receive.id)
          }

          let msgObj: Message
          try {
            msgObj = JSON.parse(receive.message) as Message
          } catch (error) {
            if (this.options.debug) {
              console.error(this.baseLog, 'Parse obj error: %j', error)
            }
          }

          if (msgObj && msgObj.url) {

            try {
            //  const req = await this._sendMessage(receive.id, msgObj, this.options)
            } catch (error) {
              console.error(this.baseLog, error && error.message)
            }
          }

          try {
            const delQueue = await this.client.deleteMessageAsync({
              qname: this.options.qname,
              id: receive.id
            })

            if (this.options.debug) {
              console.log(this.baseLog, 'Delete message:', receive.id)
            }
            console.log(this.baseLog, 'Success:', receive.id)
          } catch (error) {
            console.error(error)
          }

        } else if (this.options.debug) {
          console.log(this.baseLog, 'no avaliable message at queue', this.options.qname)
        }

        if (!receive.id && !this.timeoutStop && this.options.stopAfter) {
          this.timeoutStop = setTimeout(async () => {
            await this.stop()
          }, this.options.stopAfter)
        }

      } catch (error) {
        console.error(this.baseLog, 'Error:', error.message)
      }

    }, this.options.interval)
  }

  public async stop() {
    try {
      await this.client.deleteQueueAsync({qname: this.qname})
      console.log(this.baseLog, `Queue ${this.options.qname} has been stoped`)
      this.queueExist = false

      if (this.intervalCheck) {
        clearInterval(this.intervalCheck)
        this.intervalCheck = null
      }

      clearTimeout(this.timeoutStop)
      this.timeoutStop = null
    } catch (error) {
      this.queueExist = true
      console.error(error.message || error)
    }
  }

  public async sendMessage(message: Message, delay?: number) {
    if (!this.queueExist) {
      const queue = await this.client.listQueuesAsync()
      if (queue.indexOf(this.qname) >= 0) {
        try {
          await this.initQueue()
        } catch (error) {
          console.log(this.baseLog, error.message)
        }
      }
    }

    if (!this.intervalCheck) {
      this.start()
    }

    const _messgage = message

    if (!_messgage.headers) {
      _messgage.headers = {}
    }

    if (_messgage.maxRetry === undefined || _messgage.maxRetry < 0) {
      _messgage.maxRetry = 5
    }

    if (!this.client) {
      throw new Error('Redis MQ must be initilaize')
    }

    try {
      const opt: RedisMQ.SendMessageOptions = {
        qname: this.options.qname,
        message: JSON.stringify(message)
      }

      if (delay) {
        opt.delay = delay
      }
      const sendMsg = await this.client.sendMessageAsync(opt)

      console.log(this.baseLog, 'Send:', sendMsg)

      return sendMsg
    } catch (error) {
      throw error
    }
  }

}
erdii commented 5 years ago

Im sorry, I still do not understand what the problem is? :(

joeyslack commented 5 years ago

@ArifBudiman19 I think this is an issue in your interval loop.

const receive = await this.client.receiveMessageAsync({ qname: this.options.qname }) as RedisMQ.QueueMessage

You are looping, exepecting qname. You won't have a qname if you delete it.

This is not a library issue, this is an implementation issue. Should be closed imo.

erdii commented 5 years ago

@joeyslack thank you! @ArifBudiman19 did joyslack's comment help you?