tx7do / kratos-transport

kratos transport layer extension
MIT License
348 stars 95 forks source link

mqtt不支持qos配置 #43

Closed aide-cloud closed 1 year ago

aide-cloud commented 1 year ago
func (m *mqttBroker) publish(topic string, buf []byte, opts ...broker.PublishOption) error {
    if !m.client.IsConnected() {
        return errors.New("not connected")
    }

    options := broker.PublishOptions{
        Context: context.Background(),
    }
    for _, o := range opts {
        o(&options)
    }

    var qos byte = 1
    const retained bool = false

    ret := m.client.Publish(topic, qos, retained, buf)
    return ret.Error()
}

func (m *mqttBroker) Subscribe(topic string, handler broker.Handler, binder broker.Binder, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
    if !m.client.IsConnected() {
        return nil, errors.New("not connected")
    }

    var options broker.SubscribeOptions
    for _, o := range opts {
        o(&options)
    }

    var qos byte = 1

    t := m.client.Subscribe(topic, qos, func(c MQTT.Client, mq MQTT.Message) {
        var msg broker.Message

        p := &publication{topic: mq.Topic(), msg: &msg}

        if binder != nil {
            msg.Body = binder()
        } else {
            msg.Body = mq.Payload()
        }

        if err := broker.Unmarshal(m.opts.Codec, mq.Payload(), &msg.Body); err != nil {
            p.err = err
            log.Error(err)
            return
        }

        if err := handler(m.opts.Context, p); err != nil {
            p.err = err
            log.Error(err)
        }
    })

    if rs, err := checkClientToken(t); !rs {
        return nil, err
    }

    return &subscriber{
        opts:   options,
        client: m.client,
        topic:  topic,
    }, nil
}
tx7do commented 1 year ago

now support