adjust / rmq

Message queue system written in Go and backed by Redis
MIT License
1.57k stars 206 forks source link

consumer not work #155

Closed sjtubinlong closed 10 months ago

sjtubinlong commented 1 year ago
package main

import (
    "fmt"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"

    "github.com/adjust/rmq/v5"
    "github.com/redis/go-redis/v9"
)

const (
    REDIS_SERV   = "xxx"
    REDIS_PASS   = "xxx"
    SERVICE_NAME = "yyy"
)

const (
    prefetchLimit   = 1000
    pollDuration    = 100 * time.Millisecond
    numConsumers    = 5
    consumeDuration = time.Millisecond
    produceChanSize = 1024 * 32
    consumeChanSize = 1024 * 32
)

var (
    ProducerChan = make(chan []byte, produceChanSize)
    ConsumerChan = make(chan []byte, consumeChanSize)
)

type Consumer struct {
    name   string
    count  int
    before time.Time
}

func NewConsumer(tag int) *Consumer {
    return &Consumer{
        name:   fmt.Sprintf("consumer-%d", tag),
        count:  0,
        before: time.Now(),
    }
}

func (consumer *Consumer) Consume(delivery rmq.Delivery) {
    payload := delivery.Payload()
    fmt.Println("consume:", payload)
    ConsumerChan <- []byte(payload)
    time.Sleep(consumeDuration)
    if err := delivery.Ack(); err != nil {
        fmt.Printf("failed to ack %s: %s", payload, err)
    }
    consumer.count++
    consumer.before = time.Now()
}

func open_rmq_conn() rmq.Connection {
    rdb := redis.NewClient(&redis.Options{
        Addr:     REDIS_SERV,
        Password: REDIS_PASS,
        DB:       0,
    })
    conn, err := rmq.OpenConnectionWithRedisClient(SERVICE_NAME, rdb, nil)
    if err != nil {
        panic(err)
    }
    return conn
}

func ProdueWorker(queue_name string) {
    conn := open_rmq_conn()
    tasks, err := conn.OpenQueue(queue_name)
    if err != nil {
        panic(err)
    }
    for {
        task_bytes := <-ProducerChan
        if err := tasks.PublishBytes(task_bytes); err != nil {
            fmt.Printf("failed to publish task: %s", err)
        } else {
            fmt.Println("produce:", string(task_bytes))
        }
    }
}

func ConsumeWorker(queue_name string) {
    conn := open_rmq_conn()
    tasks, err := conn.OpenQueue(queue_name)
    if err != nil {
        panic(err)
    }
    if err := tasks.StartConsuming(prefetchLimit, pollDuration); err != nil {
        panic(err)
    }
    if _, err := tasks.AddConsumer("consumer-0", NewConsumer(0)); err != nil {
        panic(err)
    }
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, syscall.SIGINT)
    defer signal.Stop(signals)

    <-signals
    go func() {
        <-signals
        fmt.Print("out!!!\n")
        os.Exit(1)
    }()

    <-conn.StopAllConsuming()
}

func main() {
    for i := 0; i < 10; i++ {
        ProducerChan <- []byte(fmt.Sprintf("%d", i))
    }
    var wg sync.WaitGroup
    wg.Add(2)
    go ProdueWorker("task-queue")
    go ConsumeWorker("task-queue")
    wg.Wait()
}
wellle commented 1 year ago

Hey, can you explain what the issue is?

sjtubinlong commented 1 year ago

@wellle I create a producer in function ProduceWorker and it keep publish new item to the Queue. And a ConsumeWorker function keep consume item from the Queue. But I found that my consumer's consume function will never be called.

wellle commented 1 year ago

I just tried it, seems to work for me:

$ go run main.go
produce: 0
produce: 1
produce: 2
produce: 3
produce: 4
produce: 5
produce: 6
consume: 0
produce: 7
produce: 8
produce: 9
consume: 1
consume: 2
consume: 3
consume: 4
consume: 5
consume: 6
consume: 7
consume: 8
consume: 9
wellle commented 10 months ago

Closing as resolved.