MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.49k
stars
1.07k
forks
source link
Help To polish code server side on Asp.Net Core 2.1 after upgrade with new MQTTNet version. #438
After I upgrade the MQTTNet library and pass to AspNet Core 2.1, I wish to polish code from MQTTNet configuration that not needed to open socket and webscoket.
Everything's work now!
Thanks.
My code is:
public void ConfigureServices(IServiceCollection services)
{
// For example only!Don't store your shared keys as strings in code.
// Use environment variables or the .NET Secret Manager instead.
var sharedKey = new SymmetricSecurityKey(
Encoding.UTF8.GetBytes("tttt"));
var certificate = new X509Certificate(@"C:\cert\server333.pfx", "passsss", X509KeyStorageFlags.Exportable);
var mqttServerOptions = new MqttServerOptionsBuilder()
.WithConnectionBacklog(100)
.WithEncryptedEndpoint()
.WithDefaultEndpointPort(1883)
.WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx))
.WithEncryptedEndpointPort(8883)
.WithStorage(new RetainedMessageHandler())
.WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(2))
.WithSubscriptionInterceptor(
context =>
{
if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
{
context.AcceptSubscription = false;
}
if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
{
context.AcceptSubscription = false;
context.CloseConnection = true;
}
}
)
.WithApplicationMessageInterceptor(context =>
{
if (MqttTopicFilterComparer.IsMatch(context.ApplicationMessage.Topic, topic + "/#"))
{
// Replace the payload with the timestamp. But also extending a JSON
// based payload with the timestamp is a suitable use case.
context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
}
}
)
.WithConnectionValidator(c =>
{
if (c.ClientId == "SpecialClient")
{
if (c.Username != "USER" || c.Password != "PASS")
{
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}
if (c.Username != tokenBosoneHIGS)
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
else
c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
}
)
.WithApplicationMessageInterceptor(c =>
{
if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0)
{
return;
}
try
{
//var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
//var timestampProperty = content.Property("timestamp");
//if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null)
//{
// timestampProperty.Value = DateTime.Now.ToString("O");
// c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString());
//}
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
)
.Build();
services.AddHostedMqttServer(mqttServerOptions);
//this adds tcp server support based on System.Net.Socket
//services.AddMqttTcpServerAdapter();
services.AddMqttConnectionHandler();
//this adds websocket support
services.AddMqttWebSocketServerAdapter();
services.AddConnections();
services.AddOptions();
services.AddMemoryCache();
if (_env.IsDevelopment())
{
services.Configure<RethinkDbOptions>(Configuration.GetSection("RethinkDbDev"));
}
else
{
services.Configure<RethinkDbOptions>(Configuration.GetSection("RethinkDbStaging"));
}
services.AddSingleton<IRethinkDbConnectionFactory, RethinkDbConnectionFactory>();
// add RethinkDB store service
services.AddSingleton<IRethinkDbStore, RethinkDbStore>();
services.AddMvc();
services.AddCors(options =>
{
options.AddPolicy("AllowDashboardOrigin",
builder => builder.WithOrigins("http://*:*", "https://*:*").AllowAnyMethod().AllowAnyHeader());
});
services.AddSignalR();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new Info
{
Version = "v1",
Title = "API",
Description = "API Description",
TermsOfService = "None",
Contact = new Contact() { Name = "X", Email = "info@xyzc.bgh", Url = "www.redconsulting.it" }
});
c.ResolveConflictingActions(apiDescriptions => apiDescriptions.First());
c.DocInclusionPredicate((docName, description) => true);
c.DescribeAllEnumsAsStrings();
// Set the comments path for the Swagger JSON and UI.
var xmlFile = $"{Assembly.GetEntryAssembly().GetName().Name}.xml";
var xmlPath = Path.Combine(AppContext.BaseDirectory, xmlFile);
c.IncludeXmlComments(xmlPath);
});
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IRethinkDbStore store, IHubContext<XomeHub> hubContext)
{
tokenBosoneHIGS = Configuration.GetSection("ApplicationSettings").GetValue<string>("bosoneHighs");
loggerFactory.AddConsole(Configuration.GetSection("Logging"));
loggerFactory.EnableRethinkDbLogging();
loggerFactory.AddDebug();
loggerFactory.AddConsole();
store.InitializeDatabase();
app.UseConnections(c => c.MapConnectionHandler<MqttConnectionHandler>("/mqtt", options => {
options.WebSockets.SubProtocolSelector = MQTTnet.AspNetCore.ApplicationBuilderExtensions.SelectSubProtocol;
}));
app.UseMqttEndpoint();
app.UseMqttServer(server =>
{
MqttNetConsoleLogger.ForwardToConsole();
server.ApplicationMessageReceived += (s, e) =>
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
Console.WriteLine();
MqttNetConsoleLogger.PrintToConsole(
$"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'",
ConsoleColor.Magenta);
DeviceDataModel data = new DeviceDataModel();
data.deviceId = e.ClientId;
data.payload = (Encoding.UTF8.GetString(e.ApplicationMessage.Payload)).ToString();
data.data = DateTime.UtcNow;
data.topic = e.ApplicationMessage.Topic;
if (data.topic != topicSystem)
store.InsertDeviceData(data);
};
server.ClientDisconnected += (s, e) =>
{
DeviceModel device = new DeviceModel();
device.customerid = "";
device.deviceId = e.ClientId;
device.devicenome = e.ClientId;
device.data = DateTime.UtcNow;
device.status = 0;
store.InsertOrUpdateDevice(device);
store.InserDeviceStatusHistory(device);
Console.Write("Client " + e.ClientId + " disconnected event fired.");
};
server.ClientConnected += (s, e) =>
{
DeviceModel device = new DeviceModel();
device.customerid = "";
device.deviceId = e.ClientId;
device.devicenome = e.ClientId;
device.status = 1;
device.data = DateTime.UtcNow;
store.InsertOrUpdateDevice(device);
store.InserDeviceStatusHistory(device);
Console.Write("Client " + e.ClientId + " connected event fired.");
};
server.Started += async (sender, args) =>
{
var msg = new MqttApplicationMessageBuilder()
.WithPayload("Mqtt is awesome from Biagio")
.WithTopic(topicSystem)
.WithAtLeastOnceQoS();
while (true)
{
try
{
await server.PublishAsync(msg.Build());
msg.WithPayload("Mqtt is still awesome at " + DateTime.Now);
}
catch (Exception e)
{
Console.WriteLine(e);
}
finally
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
};
});
//Redirect to default page
app.UseDefaultFiles();
app.UseStaticFiles();
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseCors("AllowDashboardOrigin");
app.UseAuthentication();
app.UseMvc();
//app.UseMvc(routes =>
//{
// routes.MapRoute(
// name: "default",
// template: "{controller}/{action}/{id?}");
//});e
app.UseSignalR(routes =>
{
routes.MapHub<Hub>("/hub");
});
app.UseSwagger(c =>
c.PreSerializeFilters.Add((swagger, httpReq) => swagger.Host = httpReq.Host.Value)
);
app.UseSwaggerUI(c =>
{
c.SwaggerEndpoint("/swagger/v1/swagger.json", "Api API V1");
});
}
}
)
After I upgrade the MQTTNet library and pass to AspNet Core 2.1, I wish to polish code from MQTTNet configuration that not needed to open socket and webscoket. Everything's work now! Thanks.
My code is: