zjykzk / rocketmq-client-go

rocketmq golang client
MIT License
9 stars 5 forks source link

how to reconnection rocketmq when network closed #6

Open wolftankk opened 5 years ago

wolftankk commented 5 years ago

when create a push consumer, after after a period of time, receive error msg: use of closed network connection, only heartbeat. How can I reconnect?


type SimplePushCallback func(msgs []*message.Ext, ctx *consumer.ConcurrentlyContext) error

type simplePushConsumer struct {
    callback SimplePushCallback
}

func (c *simplePushConsumer) Consume(msgs []*message.Ext, ctx *consumer.ConcurrentlyContext) consumer.ConsumeConcurrentlyStatus {
    err := c.callback(msgs, ctx)

    if err != nil {
        return consumer.ReconsumeLater
    }

    return consumer.ConcurrentlySuccess
}

func NewPushConsumer(consulAddr, name, groupName, topic string, f SimplePushCallback) (pc *PushConsumer, err error) {
    config, err := LoadConfig(consulAddr, name)

    if err != nil {
        return
    }

    c, err := consumer.NewConcurrentlyConsumer(groupName, config.Nameservers, &simplePushConsumer{
        callback: f,
    }, logger)

    if err != nil {
        return nil, err
    }

    c.FromWhere = consumer.ConsumeFromFirstOffset
    c.MessageModel = consumer.Clustering
    c.UnitName = "push"

    c.Subscribe(topic, "*")

    return &PushConsumer{
        pc: c,
    }, nil
}

func (pc *PushConsumer) Start() error {
    return pc.pc.Start()
}

func (pc *PushConsumer) Shutdown() {
    pc.pc.Shutdown()
}

func (pc *PushConsumer) SendBack(m *message.Ext, delayLevel int, broker string) error {
    return pc.pc.SendBack(m, delayLevel, broker)
}
zjykzk commented 5 years ago

under normal circumstances, it reconnects to the server(namesrv/broker) automatically

can you post the log?