NewLifeX / NewLife.MQTT

最流行的物联网通信协议MQTT,包括客户端、服务端和Web管理平台
MIT License
135 stars 40 forks source link

MQTT服务器是否可以主动向客户端发送消息 #6

Open omgcwz opened 2 weeks ago

omgcwz commented 2 weeks ago

MQTT服务器是否可以主动向客户端发送消息,如果可以该如何做

nnhy commented 2 weeks ago

看IMqttHandler的定义,其中的Publish就是用下向下发布消息。

`

///

MQTT处理器 /// public interface IMqttHandler { /// 处理消息 /// 消息 /// MqttMessage? Process(MqttMessage message);

/// <summary>发布消息</summary>
/// <param name="topic">主题</param>
/// <param name="data">消息数据</param>
/// <param name="qos">服务质量</param>
/// <returns></returns>
Task<MqttIdMessage?> PublishAsync(String topic, Object data, QualityOfService qos = QualityOfService.AtMostOnce);

/// <summary>发布消息</summary>
/// <param name="topic">主题</param>
/// <param name="data">消息数据</param>
/// <param name="qos">服务质量</param>
/// <param name="allowExchange">允许消息交换</param>
/// <returns></returns>
Task<MqttIdMessage?> PublishAsync(String topic, Object data, Boolean allowExchange, QualityOfService qos = QualityOfService.AtMostOnce);

/// <summary>发布消息</summary>
/// <param name="message">消息</param>
/// <returns></returns>
Task<MqttIdMessage?> PublishAsync(PublishMessage message);

/// <summary>关闭连接。网络连接被关闭时触发</summary>
/// <param name="reason"></param>
void Close(String reason);

} `

omgcwz commented 1 week ago

感谢作者的指点,但是我还是没有解决,我不知道如何获取到MqttHandler的实例服务,我用下面的方法写的会出现 One or more errors occurred. (Object reference not set to an instance of an object.)的报错信息 是否有相对完整的案例,麻烦你了

public void PublishMessage(string topic,string message) { var mqttHandler = NewLife.Model.ModelExtension.GetService(mqttServer.ServiceProvider); mqttHandler.PublishAsync(topic, message).Wait(); Console.WriteLine("发"); }

nnhy commented 1 week ago

以下代码,来自商用版FIoT平台。

using System.Text.RegularExpressions; using IoT.Data; using IoTCore.Models; using IoTMqtt.Services; using IoTServer.Models; using NewLife; using NewLife.Data; using NewLife.IoT.Models; using NewLife.IoT.ThingModels; using NewLife.Log; using NewLife.MQTT.Handlers; using NewLife.MQTT.Messaging; using NewLife.Net; using NewLife.Serialization; using XCode; using LogLevel = NewLife.Log.LogLevel;

namespace IoTServer.Services;

///

MQTT控制器。处理业务逻辑 public class MqttController : MqttHandler {

region 属性

private readonly MyDeviceService _deviceService;
private readonly ThingService _thingService;
private readonly MqttService _mqttService;
private readonly QueueService _queue;
private CancellationTokenSource _source;
#endregion

#region 构造
/// <summary>实例化</summary>
/// <param name="deviceService"></param>
/// <param name="thingService"></param>
/// <param name="queue"></param>
/// <param name="tracer"></param>
/// <param name="log"></param>
public MqttController(MyDeviceService deviceService, ThingService thingService, MqttService mqttService, QueueService queue, ITracer tracer, ILog log)
{
    Tracer = tracer;
    Log = log;
    _deviceService = deviceService;
    _thingService = thingService;
    _mqttService = mqttService;
    _queue = queue;
}
#endregion

#region 接收指令
/// <summary>当客户端连接时</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override ConnAck OnConnect(ConnectMessage message)
{
    using var span = Tracer?.NewSpan("MqttLogin", message);

    var session = Session;
    var clientId = message.ClientId;
    WriteLog("客户端[{0}]连接 user={1} pass={2} clientId={3}", session.Remote.EndPoint, message.Username, message.Password, clientId);
    if (clientId.IsNullOrEmpty()) clientId = message.Username;

    try
    {
        var dvSession = _mqttService.MqttLogin(new MqttAuthReq
        {
            ClientId = clientId,
            Username = message.Username,
            Password = message.Password,
            RemoteIp = session.Remote.Host
        });

        // 存储客户端信息
        if (session is IExtend extend) extend["DeviceSession"] = dvSession;

        var device = dvSession.Device;

        _source = new CancellationTokenSource();
        _ = Task.Run(() => ConsumeMessage(device, session.Remote.Host, _source));

        return base.OnConnect(message);
    }
    catch (Exception ex)
    {
        span?.SetError(ex, null);

        // 登录出错时,向MQTT客户端发送错误响应
        return new ConnAck { ReturnCode = ConnectReturnCode.RefusedBadUsernameOrPassword };
    }
}

/// <summary>
/// 检查是否登录,如果未登录则断开会话
/// </summary>
/// <param name="session"></param>
/// <returns></returns>
private MqttDeviceSession CheckLogin(INetSession session)
{
    if (session is IExtend extend && extend["DeviceSession"] is MqttDeviceSession ss) return ss;

    // 未登录
    session.TryDispose();

    return null;
}

/// <summary>当客户端断开连接时</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override MqttMessage OnDisconnect(DisconnectMessage message)
{
    _source?.Cancel();

    var session = Session;
    WriteLog("客户端[{0}]断开", session.Remote);

    var dvSession = CheckLogin(session);
    if (dvSession == null) return null;

    _deviceService.Logout(dvSession.Device, "OnDisconnect", "Mqtt", session.Remote.Host);

    return base.OnDisconnect(message);
}

/// <summary>当收到客户端发布的消息时</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override MqttIdMessage OnPublish(PublishMessage message)
{
    var session = Session;
    var dvSession = CheckLogin(session);
    if (dvSession == null) return null;

    using var span = Tracer?.NewSpan(nameof(OnPublish));

    var dv = dvSession.Device;
    //var deviceCode = dv.Code;
    //var productKey = dvSession.ProductKey;

    // TODO 通过产品查询判断上报的topic是否合规
    //var serviceReplyTopic = $"sys/{productKey}/{deviceCode}/thing/service/post_reply";
    //var propertyPostTopic = $"sys/{productKey}/{deviceCode}/thing/property/post";
    //var pingPostTopic = $"sys/{productKey}/{deviceCode}/thing/event/ping/post";

    var ip = session.Remote.Host;
    if (dv == null)
    {
        //throw new Exception($"非法设备编码,产品[{productKey}]-设备[{deviceCode}]不存在!");
        session.TryDispose();
        return null;
    }

    // 更新设备在线状态。网关上线
    _deviceService.SetDeviceOnline(dv, ip, nameof(OnPublish));

    var topic = message.Topic;
    var msg = message.Payload.ToStr();
    span?.SetTag(topic + Environment.NewLine + msg);

    try
    {
        var segs = topic.Split('/', StringSplitOptions.RemoveEmptyEntries);
        //if (segs == null || segs.Length < 4) throw new Exception($"收到过短Topic[{topic}],Payload={msg}");
        if (segs != null && segs.Length >= 4)
        {
            var productKey = segs[1];
            var deviceCode = segs[2];
            var child = Device.FindByCode(deviceCode);
            if (child == null || child.Id != dv.Id && child.ParentId != dv.Id)
                throw new Exception($"非法设备编码,[{deviceCode}]并非当前登录设备[{dv}]的子设备");

            // 默认使用登录设备,然后使用Topic中的设备
            dv = child;

            var topic2 = segs.Skip(3).Join("/");

            // 数据上报
            if (topic2.EqualIgnoreCase("thing/property/post"))
            {
                OnPostProperty(msg, dv, productKey, deviceCode, ip);
            }
            // 服务调用结果
            else if (topic2.EqualIgnoreCase("thing/service/post_reply"))
            {
                OnServiceReply(msg, dv, productKey, deviceCode, ip);
            }
            // 事件上报
            else if (topic2.EqualIgnoreCase("thing/event/info/post", "thing/event/alert/post", "thing/event/error/post"))
            {
                var eType = segs[2];

                OnPostEvent(msg, dv, productKey, deviceCode, ip, eType);
            }
            // ping上报
            else if (topic2.EqualIgnoreCase("thing/event/ping/post"))
            {
                OnPing(msg, dv, productKey, deviceCode, ip);
            }
            // OTA升级
            else if (topic2.EqualIgnoreCase("thing/ota/firmware/get"))
            {
                OnUpgrade(msg, dv, productKey, deviceCode, ip);
            }

            //else
            //    throw new Exception($"收到非法Topic[{topic}],Payload={msg}");
        }
    }
    catch (Exception ex)
    {
        span?.SetError(ex, null);
        Log?.Error(ex.Message);

        // 收到无法识别的Topic
        var ev = new EventModel
        {
            Type = "error",
            Name = "非法发布",
            Remark = ex.Message,
            Data = topic,
            Time = DateTime.UtcNow.ToLong(),
        };

        _deviceService.WriteHistory(dv, ev.Name, false, ex.Message, ip);
        _thingService.PostEvent(dv, ev, ip);

        return null;
    }

    return base.OnPublish(message);
}

/// <summary>当收到客户端的心跳时</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override NewLife.MQTT.Messaging.PingResponse OnPing(PingRequest message)
{
    var session = Session;
    var dvSession = CheckLogin(session);
    if (dvSession == null) return null;

    //var clientId = dvSession.ClientId;
    _deviceService.Ping(dvSession.Device, null, null, "Mqtt", session.Remote.Host);

    //if (_Logins[clientId] >= 1)
    //    _Logins[clientId]--;

    return base.OnPing(message);
}

/// <summary>当收到客户端的订阅时</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override SubAck OnSubscribe(SubscribeMessage message)
{
    var session = Session;
    WriteLog("客户端[{0}]订阅主题[{1}]", session.Remote, String.Join(", ", message.Requests.Select(p => p.TopicFilter)));

    var dvSession = CheckLogin(session);
    if (dvSession == null) return null;

    var ip = session.Remote.Host;
    var ev = new EventModel
    {
        Type = "info",
        Name = "订阅",
        Remark = message.Requests.ToJson(),
        Time = DateTime.UtcNow.ToLong(),
    };

    _thingService.PostEvent(dvSession.Device, ev, ip);

    return base.OnSubscribe(message);
}

/// <summary>取消订阅</summary>
/// <param name="session"></param>
/// <param name="message"></param>
/// <returns></returns>
protected override UnsubAck OnUnsubscribe(UnsubscribeMessage message)
{
    var session = Session;
    WriteLog("客户端[{0}]取消订阅主题[{1}]", session.Remote, String.Join(", ", message.TopicFilters));

    var dvSession = CheckLogin(session);
    if (dvSession == null) return null;

    var ip = session.Remote.Host;
    var ev = new EventModel
    {
        Type = "info",
        Name = "取消订阅",
        Remark = message.TopicFilters.ToJson(),
        Time = DateTime.UtcNow.ToLong(),
    };

    _thingService.PostEvent(dvSession.Device, ev, ip);

    return base.OnUnsubscribe(message);
}
#endregion

#region 业务处理
private void OnPostProperty(String msg, Device dv, String productKey, String deviceCode, String ip)
{
    var model = msg.ToJsonEntity<DataModels>();
    if (model != null && model.Items != null)
    {
        _thingService.PostData(dv, model, "MqttPost", ip);

        _ = PublishAsync($"sys/{productKey}/{deviceCode}/thing/property/post_reply", "ok");
    }
}

private void OnServiceReply(String msg, Device dv, String productKey, String deviceCode, String ip)
{
    var model = msg.ToJsonEntity<ServiceReplyModel>();
    _thingService.ServiceReply(dv, model);
}

private void OnPing(String msg, Device dv, String productKey, String deviceCode, String ip)
{
    var model = msg.ToJsonEntity<PingInfo>();

    _deviceService.Ping(dv, model, null, "Mqtt", ip);

    var rs = new NewLife.IoT.Models.PingResponse
    {
        Time = model?.Time ?? 0,
        ServerTime = DateTime.UtcNow.ToLong(),
    };

    _ = PublishAsync($"sys/{productKey}/{deviceCode}/thing/event/ping/post_reply", rs);
}

private void OnPostEvent(String msg, Device dv, String productKey, String deviceCode, String ip, String eventType)
{
    var model = msg.ToJsonEntity<EventModel>();

    _thingService.PostEvent(dv, model, ip);

    var rs = new EventResponse()
    {
        Time = model?.Time ?? 0,
        ServerTime = DateTime.UtcNow.ToLong(),
        EventType = eventType,
        Status = "ok",
        Name = model?.Name
    };

    _ = PublishAsync($"sys/{productKey}/{deviceCode}/thing/event/{eventType}/post_reply", rs);
}

private void OnUpgrade(String msg, Device dv, String productKey, String deviceCode, String ip)
{
    // 应用过滤规则,使用最新的一个版本
    var pv = _deviceService.Upgrade(dv, ip);
    if (pv == null) return;

    //todo 需要处理url为完整http地址
    var rs = new UpgradeInfo
    {
        Version = pv.Version,
        Source = pv.Source,
        Executor = pv.Executor,
        Force = pv.Force,
        FileSize = pv.Size,
        FileHash = pv.FileHash,
        Description = pv.Remark,
    };

    _ = PublishAsync($"sys/{productKey}/{deviceCode}/thing/ota/firmware/get_reply", rs);
}

private async Task ConsumeMessage(Device device, String ip, CancellationTokenSource source)
{
    DefaultSpan.Current = null;
    var cancellationToken = source.Token;
    var queue = _queue.GetQueue(device.Code);
    try
    {
        while (!cancellationToken.IsCancellationRequested && !Session.Disposed)
        {
            ISpan span = null;
            var mqMsg = await queue.TakeOneAsync(30);
            if (mqMsg != null)
            {
                // 埋点
                span = Tracer?.NewSpan($"mqtt:ServiceQueue", mqMsg);
                if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("消费到下发指令消息:{0}", mqMsg);

                // 解码
                var dic = JsonParser.Decode(mqMsg);
                span?.Detach(dic);
                var msg = JsonHelper.Convert<ServiceModel>(dic);

                if (msg == null || msg.Id == 0 || msg.Expire.Year > 2000 && msg.Expire < DateTime.Now)
                    _deviceService.WriteHistory(device, "Mqtt发送", false, "消息无效。" + mqMsg, ip);
                else
                {
                    _deviceService.WriteHistory(device, "Mqtt发送", true, mqMsg, ip);

                    // 向客户端传递埋点信息,构建完整调用链
                    msg.TraceId = span + "";

                    var log = DeviceServiceLog.FindById(msg.Id);
                    if (log != null)
                    {
                        if (log.TraceId.IsNullOrEmpty()) log.TraceId = span?.TraceId;
                        log.Status = ServiceStatus.处理中;
                        log.Update();
                    }

                    var topic = $"sys/{device.Product.Code}/{device.Code}/thing/service/post";
                    var data = msg.ToDictionary();
                    data["id"] = msg.Id.ToString();

                    await PublishAsync(topic, data);
                }
            }
            else
            {
                await Task.Delay(100, cancellationToken);
            }
            span?.Dispose();
        }
    }
    catch (TaskCanceledException) { }
    catch (Exception ex)
    {
        XTrace.WriteException(ex);
    }
    finally
    {
        source.Cancel();
    }
}
#endregion

#region 辅助
///// <summary>写日志</summary>
///// <param name="format"></param>
///// <param name="args"></param>
//private void WriteLog(String format, params Object[] args) => Log?.Info($"[MqttServer]{format}", args);
#endregion

}

omgcwz commented 1 week ago

上面的案例是在实现MqttHandler接口的情况下去做方式数据,我希望做到的在业务中去使用PublishAsync方法去主动发送数据而不是在连接或断开事件后被动触发

我想实现的是一个线程,定时发布一条数据数据