dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.42k stars 1.06k forks source link

How to Use With WebSocket Server #65

Closed qcjxberin closed 6 years ago

qcjxberin commented 6 years ago

How to Use With WebSocket Server,Could you Write a Demo With this?

chkr1011 commented 6 years ago

Hi this is not fully implemented yet. I started a new library called HTTPnet which will be used for this in the future. Implementing this is quire complex because the used HTTP library can change. There are several available for .NET. If you already use one you can have a look at the https://github.com/chkr1011/MQTTnet/blob/master/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs. There you can see how to implement the required interfaces. Then you can write a binding for the HTTP library. Otherwise you have to wait until the HTTPnet library is including and having a binding.

Best regards Christian

ChristianRiedl commented 6 years ago

I tried to integrate MQTTNet into a AspNetCore 2.0 web server. It was possible with some small additional classes and I changed WebSocketStream class to wrap WebSocket instead of ClientWebSocket . I still get some exceptions when client disconnects but it works.

@chkr1011 : I think ASpNetCore 2.0 is netstandard 2.0 so it should run on latest Windows 10 Iot Core. I know it is heavy compared to HTTPnet. Maybe you have some ideas how to improve my solution.

Here is the code (Startup class)

public class Startup
{
        public void ConfigureServices(IServiceCollection services)
        {
        }

        public async void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            loggerFactory.AddConsole(LogLevel.Debug);

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            var adapter = new MqttWebSocketServerAdapter();
            var options = new MqttServerOptions
            {
                ConnectionValidator = p =>
                {
                    if (p.ClientId == "SpecialClient")
                    {
                        if (p.Username != "USER" || p.Password != "PASS")
                        {
                            return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
                        }
                    }
                    return MqttConnectReturnCode.ConnectionAccepted;
                }
            };
            var mqttServer = new MqttServer(options, new List<IMqttServerAdapter> { adapter });
            mqttServer.ClientDisconnected += (s, e) =>
            {
                Debug.WriteLine($">> Disconnected {e.Client.ClientId}");
            };

            await mqttServer.StartAsync();
            MqttNetTrace.TraceMessagePublished += (s, e) =>
            {
                Debug.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}");
                if (e.Exception != null)
                {
                    Debug.WriteLine(e.Exception.Message);
                }
            };
            app.UseWebSockets();
            app.Use(async (context, next) =>
            {
                if (context.WebSockets.IsWebSocketRequest)
                {
                    using (var webSocket = await context.WebSockets.AcceptWebSocketAsync())
                    {
                        adapter.UseWebSocket(webSocket);
                        while (webSocket.State != WebSocketState.Closed)
                        {
                            await Task.Delay(200);
                        }
                    }
                }
                else
                {
                    await next();
                }
            });
            app.UseStaticFiles();
        }

        private void LogFrame(ILogger logger, WebSocketReceiveResult frame, byte[] buffer)
        {
            var close = frame.CloseStatus != null;
            string message;
            if (close)
            {
                message = $"Close: {frame.CloseStatus.Value} {frame.CloseStatusDescription}";
            }
            else
            {
                string content = "<<binary>>";
                if (frame.MessageType == WebSocketMessageType.Text)
                {
                    content = Encoding.UTF8.GetString(buffer, 0, frame.Count);
                }
                message = $"{frame.MessageType}: Len={frame.Count}, Fin={frame.EndOfMessage}: {content}";
            }
            logger.LogDebug("Received Frame " + message);
        }
}

And here the 2 helper classes

public class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable
{
        bool                    _isRunning;
        public event Action<IMqttCommunicationAdapter> ClientAccepted;
        public Task StartAsync(MqttServerOptions options)
        {
            if (_isRunning) throw new InvalidOperationException("Server is already started.");
            _isRunning = true;
            return Task.CompletedTask;
        }
        public void UseWebSocket (WebSocket webSocket)
        {
            var wsChannel = new MqttWebSocketServerChannel(webSocket);
            var clientAdapter = new MqttChannelCommunicationAdapter(wsChannel, new MqttPacketSerializer());
            ClientAccepted?.Invoke(clientAdapter);
        }
        public Task StopAsync()
        {
            _isRunning = false;
            return Task.FromResult(0);
        }
        public void Dispose()
        {
            StopAsync();
        }
 }
public class MqttWebSocketServerChannel : IMqttCommunicationChannel, IDisposable
{
        WebSocket _webSocket;
        public MqttWebSocketServerChannel (WebSocket webSocket)
        {
            _webSocket = webSocket;
            RawReceiveStream = new WebSocketStream(_webSocket);
        }
        public Stream SendStream => RawReceiveStream;
        public Stream ReceiveStream => RawReceiveStream;
        public Stream RawReceiveStream { get; set; }

        public Task ConnectAsync()
        {
            throw new NotImplementedException();
        }

        public async Task DisconnectAsync()
        {
            RawReceiveStream = null;
            if (_webSocket == null) return;
            await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
        }
        public void Dispose()
        {
            _webSocket?.Dispose();
        }
}
chkr1011 commented 6 years ago

@ChristianRiedl I added your code to the develop branch and also fixed some issues. For me it seems to work now. Please test it and let me know if it works or if it still has issues.

I added a new ASP.NET Core 2.0 project with the demo code.

Best regards Christian

chkr1011 commented 6 years ago

@qcjxberin Support for WebSockets is coming via ASP.NET Core 2.0. Please have a look at the develop branch.

ChristianRiedl commented 6 years ago

Christian, thank you for your improvements and demo code, it works fine.

I am working on similar .NET projects just for fun and learning by doing. Smart home and media integration arround AspNet Core servers and Xamarin based clients.

Smart home with integration of HomeMatic, Osram Spotify, Fritzbox, Tinkerforge, ... Media with remotes for Samsung TV, Undok, Musiccast, Roon DLNA server based on AspNet Core

If you have a need for such things tell me.

chkr1011 commented 6 years ago

@ChristianRiedl I need all 😄

Sound very interesting to me. I also planning a Xamarin App for my Smart Home system. The system is documented here:

https://www.hackster.io/cyborg-titanium-14/home-automation-system-using-raspberry-pi-784235

and here:

https://github.com/chkr1011/Wirehome

Let me know if you are interested in the project. I would like to add you to the Wirehome community if you are interested. Let me know...

Best regards Christian

ChristianRiedl commented 6 years ago

Yes, you can add me to the Wirehome community. Currently I am still busy with my companies software product (SCADA, running in 12 nuclear power plants), but I plan to reduce this boring things, which I do now since 40 years.
I enjoy creating private projects where I do not have to spend 50 % in test code.

I took a look to the Wirehome project. Alexa integration is also part of my toys. But in general I had bad experience with stability of Raspberry PI as server, I prefer a Windows server and also have good experience running AspNet Core on QNAP NAS.

For the SmartHome skill it was quite usefull that all my servers support OpenID authentication.

I tried to use OAuth Bearer authentication to establish WebSocket connection. But I have no idea how it is possible. I have seen only samples with Basic Authentication. ClientWebSocket.Options supports a Cookie container, but I was not able to deliver a Authentication header to the web server.

JanEggers commented 6 years ago

@chkr1011 i had alook at your latest changes and i think it would be great to create a real package based on mqtt.netstandard not just a test app. aka mqttnet.aspnetcore. that package could contain all the code needed to adapt mqttnet into aspnet core.

that library could:

create a wrapper for your tracing to forward it to ms.extensions.logging create a wrapper of the options for ms.extensions.options and ms.extensions.configuration create a wrapper for the MqttServer to be a hosted service (automatic start and stop with app livecycle) finally do that glue code so a package consumer can just app.UseMqttServer(options => { options.configureStuff() });

that is what i do in my project and i guess it would be great for anyone else that wants to add mqtt to an existing aspnetcore app.

chkr1011 commented 6 years ago

@JanEggers I agree that is a good idea. But with package you mean a separate nuget package? Since we are not moving to netstandard 2.0 at the moment it must be a separate one.

I already had a look regarding the logging stuff but this requires also netstandard 2.0. So probably a new nuget package would be the best solution or what do you think?

Do you may have some additions for the managed client?

ChristianRiedl commented 6 years ago

I propose to introduce ms.extensions.logging.abstraction 1.1.2 in the 1.x standard libraries. It is netstandard 1.1 and upward compatible with ms.extension.logging 2.0. In the libaries you need only the ILogger interface and an extension class.

MqttWebSocketServerAdapter and MqttWebSocketServerChannel can be added to the framework netstandards libs as they are netstandard 1.3.

App livecycle integration, ms.extensions.options and config would be fine to implement in the meanwhile in the aspnet core 2.0 demo.

JanEggers commented 6 years ago

But with package you mean a separate nuget package?

yup.

i agree with @ChristianRiedl we can use 1.x abstractions for logging. only the IHostedService needs 2.0.

So i can provide a PR with logging and di for the existing package and a seperate package/project for HostedService and AppHostBuilder.

is that ok?

ManagedClient looks good, less stuff to handle manually.

chkr1011 commented 6 years ago

@JanEggers OK

ChristianRiedl commented 6 years ago

I would ask you for one more additional feature. As I am using Bearer authentication I would need a possibility to set header values during websocket connect. I know that not so many people will need this feature, but it will not impair somebody.

    public class MqttClientWebSocketOptions : BaseMqttClientOptions
    {
        public string Uri { get; set; }
        public IEnumerable<KeyValuePair<string, string>> HeaderValues { get; set; }
    }
...

        public async Task ConnectAsync()
        {
            var uri = _options.Uri;
            if (!uri.StartsWith("ws://", StringComparison.OrdinalIgnoreCase))
            {
                uri = "ws://" + uri;
            }

            _webSocket = new ClientWebSocket();
            _webSocket.Options.KeepAliveInterval = _options.KeepAlivePeriod;
            if (_options.HeaderValues != null)
            {
                foreach (var headerValues in _options.HeaderValues)
                {
                    _webSocket.Options.SetRequestHeader(headerValues.Key, headerValues.Value);
                }
            }
            await _webSocket.ConnectAsync(new Uri(uri), CancellationToken.None);
            RawReceiveStream = new WebSocketStream(_webSocket);
        }
chkr1011 commented 6 years ago

Looks OK for me. I will add this feature to the supported frameworks. Even if not everyone needs this it makes sense to me. But I will use a dictionary etc.

ChristianRiedl commented 6 years ago

Thanks a lot

chkr1011 commented 6 years ago

A new project is added to the repository. Please have a look at this. If you still need some information or assistance please let me know.

marcelopastorino commented 5 years ago

@chkr1011, we can't find information regarding auth with a bearer token. Instead of using Basic auth, we would like to pass a a JWT to authorize the connection. Is this possible using this library?

JanEggers commented 5 years ago

this project does neither support basic auth or bearer tokens. the mqtt protocol specifies a connect packet that you have to validate in the broker. how you do that is up to you

marcelopastorino commented 5 years ago

@JanEggers thanks! We are already solving it that way.