Azure / azure-iot-sdk-csharp

A C# SDK for connecting devices to Microsoft Azure IoT services
Other
464 stars 493 forks source link

[Bug Report] ModuleClient messages aren't routed when a DeviceClient is open at the same time #3343

Closed matthieucx closed 1 year ago

matthieucx commented 1 year ago

Context

Description of the issue

When routing messages, if the ModuleClient sending the message is alive while a DeviceClient is too, the message is set to the default endpoints (i.e it appears when using `az iot hub monitor-events --hub-name or the Azure IoT Explorer).

I do not know if this is something that is by design or not but I could not find any information or warning against using both a DeviceClient and a ModuleClient in the same application. I'm guessing that they're maybe sharing resources ? One strange thing is that in the IoTHub Explorer the given output is still present.

Code sample exhibiting the issue

Here is my worker class for sending a message:

using System.Text;
using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;

namespace dfp;

public class dfp : BackgroundService
{
    private readonly ILogger<dfp> _logger;
    private const string Output = "output1";

    public dfp(ILogger<dfp> logger)
    {
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);

        var moduleClient = ModuleClient.CreateFromConnectionString("");
        var deviceClient = DeviceClient.CreateFromConnectionString("", TransportType.Amqp);

        Console.WriteLine("Created module clients");

        for (int i = 0; i < 500; i++)
        {
            Thread.Sleep(5000);
            await TriggerUploader(moduleClient, deviceClient);
        }

    }

    private async Task TriggerUploader(ModuleClient moduleClient, DeviceClient deviceClient)
    {
        Log.Debug($"{DateTime.Now}> Triggering Uploader");
        var messageBody = new
        {
            filepath = "test_payload.txt",
        debug_field = "foobar"
        };
        var messageString = JsonConvert.SerializeObject(messageBody);
        var messageBytes = Encoding.UTF8.GetBytes(messageString);
        var message = new Message(messageBytes);

        message.ContentEncoding = "utf-8";
        message.ContentType = "application/json";

        await moduleClient.SendEventAsync(Output, message);
        Log.Debug($"Sending message: {messageString} to {Output}");
        Log.Information("Message sent");
    }
}

A "Setup" class:

public class Setup
{
    public static void GetSetup(string[] args, out IHostBuilder hostBuilder,
        bool forTesting = false)
    {
        BuildHost(out hostBuilder);
    }

    private static void BuildHost(out IHostBuilder hostBuilder)
    {
        hostBuilder = Host.CreateDefaultBuilder()
            .UseSerilog((context, logging) => logging
                .MinimumLevel.Verbose()
                .ReadFrom.Configuration(context.Configuration)
                .WriteTo.Console())
            .ConfigureServices((context, services) =>
            {
                var config = context.Configuration;
                services.AddHostedService<dfp>();
            })
    }
}

And my Program.cs:

using Serilog;

namespace dfp;

public class Program
{
    public static async Task<int> Main(string[] args)
    {
        Log.Logger = new LoggerConfiguration()
            .WriteTo.Console()
            .CreateBootstrapLogger();

        Log.Information("Starting up!");

        try
        {
            Setup.GetSetup(args, out var hostBuilder);
            var host = hostBuilder.Build();
            Log.Information("Host has been built");
            await host.RunAsync();

            Log.Information("Stopped cleanly");
            return 0;
        }
        catch (Exception ex)
        {
            Log.Fatal(ex, "An unhandled exception occured during bootstrapping");
            return 1;
        }
        finally
        {
            Log.CloseAndFlush();
        }
    }
}

Here is my class to pick up messages, which uses the same setup and program classes

using System.Diagnostics;
using System.Text;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Client.Transport;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Newtonsoft.Json;
using Serilog;
using uploader.Component;
using uploader.Utilities;

namespace uploader;

public class BlobUploader : BackgroundService
{
    private readonly ILogger<BlobUploader> _logger;

    public BlobUploader(
        ILogger<BlobUploader> logger
    )
    {
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Worker running at: {time}", DateTimeOffset.

        var moduleClient = await ModuleClient.CreateFromEnvironmentAsync();

        Console.WriteLine("Created device and module clients");
        Console.WriteLine($"Module client: {moduleClient}");
        Console.WriteLine(moduleClient.ToString());
        var callbackContext = new ModuleClientCallbackContext { CancellationToken = stoppingToken, ModuleClient = moduleClient };
        await moduleClient.SetInputMessageHandlerAsync("input1", Test, callbackContext, stoppingToken);
        Log.Information("Subscribed to receive messages from other modules");
    }

    private async Task<MessageResponse> Test(Message receivedMessage, object context)
    {
        var messageBytes = receivedMessage.GetBytes();
        var messageString = Encoding.UTF8.GetString(messageBytes);

        Log.Information($"Received message: {messageString}");

        return MessageResponse.Completed;
    }

    private async Task<MessageResponse>OnMessageReceivedAsync(Message receivedMessage, object context)
    {
        try
        {
            var callbackContext = (ModuleClientCallbackContext)context;
            var moduleClient = callbackContext.ModuleClient;
            var ct = callbackContext.CancellationToken;

            var messageBytes = receivedMessage.GetBytes();
            var messageString = Encoding.UTF8.GetString(messageBytes);

            Log.Information($"Received message: {messageString}");

            var messageBody = JsonConvert.DeserializeObject<MessageBody>(messageString);
            if (messageBody != null)
            {
                // TODO: Error handling ?
                //await UploadFile(messageBody.FilePath, ct);
            }

            await moduleClient.CompleteAsync(receivedMessage, ct);

            return MessageResponse.Completed;
        }
        catch (Exception ex)
        {
            Log.Error($"Error when receiving message: {ex}");
            return MessageResponse.Abandoned;
        }
    }

And the route that I'm using:

"dfpToUploader": "FROM /messages/* INTO BrokeredEndpoint(\"/modules/uploader/inputs/input1\")"

Console log of the issue

There isn't any error or warning, only messages not being routed (and thus shown in the Azure IoT Explorer).

image

Thanks in advance for any help, I'll be more than happy to help track down this issue

tmahmood-microsoft commented 1 year ago

Hi @chefmtt, unfortunately I am not able to reproduce this issue on my end. I am using the latest version of the SDK and have both device and module client connected/open and have the same messaging route that you are using. For me the messages are received on the module client.

Could please provide us with a log file to help us look into it with more details. Also, does it work for you when the device client is not connected? Do you see similar behavior when using MQTT transport?

tmahmood-microsoft commented 1 year ago

Hi @matthieucx, I am closing this issue due to inactivity, feel free to re-open this if you need any more assistance.