roadrunner-server / roadrunner

🤯 High-performance PHP application server, process manager written in Go and powered with plugins
https://docs.roadrunner.dev
MIT License
7.77k stars 405 forks source link

[💡 FEATURE REQUEST]: NSQ `Jobs` driver #1948

Open shellphy opened 1 week ago

shellphy commented 1 week ago

No duplicates 🥲.

What should be improved or cleaned up?

Nsq is a realtime distributed messaging platform created by golang, The protocol is very simple, very lightweight, and very popular, and hopefully supported

rustatian commented 1 week ago

Hey @shellphy 👋 Thanks for your feature request 👍

rustatian commented 1 week ago

@shellphy Here you may track progress: https://github.com/roadrunner-server/nsq But since I didn't use previously this driver, it would be nice if you update the description of this ticket and explain your use case, options you use, environment. As much info as you may provide to include your feedback in the initial release of this driver.

shellphy commented 1 week ago

@shellphy Here you may track progress: https://github.com/roadrunner-server/nsq But since I didn't use previously this driver, it would be nice if you update the description of this ticket and explain your use case, options you use, environment. As much info as you may provide to include your feedback in the initial release of this driver.

Thank you very much for accepting my request. I maintain a NSQ wrapper: https://github.com/zhimaAi/go_tools/blob/master/mq/nsq_pro.go

There are specific ways to use it in this project:

func StartConsumer() {
    common.RunTask(define.ConvertPdfTopic, define.ConvertPdfChannel, 1, business.ConvertPdf)
    common.RunTask(define.ConvertVectorTopic, define.ConvertVectorChannel, 2, business.ConvertVector)
    common.RunTask(lib_define.PushMessage, lib_define.PushChannel, 10, business.AppPush)
    common.RunTask(lib_define.PushEvent, lib_define.PushChannel, 5, business.AppPush)
}
if err := common.AddJobs(define.ConvertVectorTopic, message); err != nil {
    logs.Error(err.Error())
}
func AddJobs(topic, message string, delay ...time.Duration) error {
    topic = define.Env + `_` + topic
    return define.ProducerHandle.AddJobs(topic, message, delay...)
}

func RunTask(topic, channel string, workNum uint, callback func(msg string, args ...string) error) {
    topic = define.Env + `_` + topic
    err := define.ConsumerHandle.PushZero(define.Config.Nsqd[`host`]+`:`+define.Config.Nsqd[`port`], topic)
    if err != nil {
        logs.Error(`PushZero Error:%s`, err.Error())
    }
    err = define.ConsumerHandle.Run(topic, channel, workNum, callback)
    if err != nil {
        logs.Error(`Consumer Run Error:%s`, err.Error())
    }
}

It is important to note that if the producer has not set up the topic and the consumer starts listening, there will be an error, so I will usually send an empty message containing 0 when the consumer starts, as seen in the PushZero() function

If you want to quickly deploy nsq for easy testing, you can use the following docker compose configuration:

services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    restart: always
    environment:
      TZ: ${TIMEZONE}
    ports:
      - "${NSQLOOKUPD_PORT}:4161"

  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 --broadcast-address=121.40.109.241 --broadcast-tcp-port=${NSQD_TCP_PORT} --broadcast-http-port=${NSQD_HTTP_PORT}
    restart: always
    depends_on:
      - nsqlookupd
    environment:
      TZ: ${TIMEZONE}
    ports:
      - "${NSQD_TCP_PORT}:4150"
      - "${NSQD_HTTP_PORT}:4151"

  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    restart: always
    depends_on:
      - nsqlookupd
    environment:
      TZ: ${TIMEZONE}
    ports:
      - ${NSQADMIN_PORT}:4171

All those can be seen at my new project: https://github.com/zhimaAi/chatwiki

In my project, the task model is that one tcp connection corresponds to one go coroutine task, which is simpler and recommended for the initial release.

In nsq, only one tcp connection can be established, and multiple messages can be received per communication. This is achieved through rdy. It is recommended to implement this function in the second version

rustatian commented 1 week ago

Got u, thanks @shellphy 👍 I'll try to push this plugin to the v2024.2.0.