tx7do / kratos-transport

kratos transport layer extension
MIT License
348 stars 95 forks source link

RocketMQ5.3 订阅消息提示 : ERROR msg=[UserActions] subscriber not found #100

Closed LittleMoreInteresting closed 1 day ago

LittleMoreInteresting commented 2 months ago
package server

import (
    "context"
    "os"
    "os/signal"
    "syscall"
    "testing"

    "github.com/go-kratos/kratos/v2/log"
    "github.com/stretchr/testify/assert"
    "github.com/tx7do/kratos-transport/broker"
    rocketmqOption "github.com/tx7do/kratos-transport/broker/rocketmq/option"
    "github.com/tx7do/kratos-transport/transport/rocketmq"
)

const (
    // Name Server address
    testBroker = "192.168.2.61:8081"

    testTopic     = "UserActions"
    testGroupName = "CID_ONSAPI_OWNER"
)

func handleHygrothermograph(_ context.Context, topic string, headers broker.Headers, msg *string) error {
    log.Infof("Topic %s, Headers: %+v, Payload: %+v\n", topic, headers, msg)
    return nil
}

func TestServer(t *testing.T) {
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

    ctx := context.Background()

    srv := rocketmq.NewServer(
        rocketmqOption.DriverTypeV5,
        rocketmq.WithNameServer([]string{testBroker}),
        rocketmq.WithGroupName(testGroupName),
        rocketmq.WithCodec("json"),
    )

    err := rocketmq.RegisterSubscriber[string](srv, ctx, testTopic, testGroupName,
        handleHygrothermograph,
        broker.WithQueueName("UserActions"))
    assert.Nil(t, err)

    if err := srv.Start(ctx); err != nil {
        panic(err)
    }

    defer func() {
        if err := srv.Stop(ctx); err != nil {
            t.Errorf("expected nil got %v", err)
        }
    }()

    <-interrupt
}