apache / rocketmq-client-go

Apache RocketMQ go client
https://rocketmq.apache.org/
Apache License 2.0
1.28k stars 409 forks source link

[ISSUE #1112] feat: optimize producer send async #1111

Closed twz915 closed 7 months ago

twz915 commented 8 months ago

What is the purpose of the change

https://github.com/apache/rocketmq-client-go/issues/1112 optimize producer send async, without block (connect and sendRequest)

Comparison of the time spent sending messages

old version image

new version image

Example to Test

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "github.com/apache/rocketmq-client-go/v2/producer"
)

var defaultProducer = getProducer()

func getProducer() rocketmq.Producer {
    p, err := rocketmq.NewProducer(
        producer.WithSendMsgTimeout(time.Second*3),
        producer.WithRetry(2),
    )
    if err != nil {
        panic(err)
    }
    err = p.Start()
    if err != nil {
        panic(err)
    }
    return p
}

func sendMsg(topic string, body []byte) error {
    msg := primitive.NewMessage(topic, body)
    ctx := context.Background()
    startTime := time.Now()
    err := defaultProducer.SendAsync(ctx, func(ctx context.Context, result *primitive.SendResult, err error) {
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Println(result.MsgID)
    }, msg)
    fmt.Println("t: ", time.Since(startTime))
    return err
}

func main() {
    for i := 0; i < 20; i++ {
        sendMsg("your-topic-here", []byte("test-body1"))
    }
    time.Sleep(time.Second * 10)
    return
}