Azure / azure-iot-sdk-java

A Java SDK for connecting devices to Microsoft Azure IoT services
https://azure.github.io/azure-iot-sdk-java/
Other
199 stars 236 forks source link

[Bug Report] MultiplexingClient does not notify state properly #1188

Closed sanjay-90210 closed 2 years ago

sanjay-90210 commented 3 years ago

Context

Description of the issue

Devices:

1. GOOD     - Enabled and working device with proper connect string
2. DISABLED - Device is present in the iot hub but is disabled
3. MISSING  - Device is not there in the iot hub, old connect string 
                is in use

Test program options:

1. Setting add.last.later system property allows the last device specified
    in the command line argument list of connect string to be added 15 secs
    later to simulate continued onboarding of devices while the system is
    running.

2. Setting disable.start.empty system property may be used to create a 
    multiplexed client that is not started before adding any devices.

Scenarios with pre-opened AMQP muliplexed connection, all devices are added at the same time:

1. 2 GOOD devices
    - OK    - Add devices works fine, both device in CONNECTED state
    - BAD   - MultiplexingClient ConnectionStatusChangeCallback is not
                called and so muxStatus is null

2. 1 GOOD, 1 MISSING
    - OK    - GOOD device works fine and continues to be connected and
                working
    - OK    - MISSING device is rejected with
                MultiplexingClientDeviceRegistrationAuthenticationException
    - BAD   - MultiplexingClient ConnectionStatusChangeCallback is not
                called and so muxStatus is null
    - BAD   - Exception reported while closing the mux client
  [azure-iot-sdk-ReactorRunner] WARN com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsIotHubConnection - Amqp connection was closed, creating a thread to notify transport layer
com.microsoft.azure.sdk.iot.device.exceptions.MultiplexingDeviceUnauthorizedException: One or more multiplexed devices failed to authenticate
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsIotHubConnection.onAuthenticationFailed(AmqpsIotHubConnection.java:693)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsCbsSessionHandler.onAuthenticationFailed(AmqpsCbsSessionHandler.java:148)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSasTokenRenewalHandler.handleAuthenticationResponseMessage(AmqpsSasTokenRenewalHandler.java:90)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsCbsReceiverLinkHandler.handleCBSResponseMessage(AmqpsCbsReceiverLinkHandler.java:96)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsCbsReceiverLinkHandler.onDelivery(AmqpsCbsReceiverLinkHandler.java:60)
3. 1 GOOD device, 1 DISABLED device
    - Cascading issues that lead to a lot of connects and disconnects that
        leaves the mux client in the state CONNECTED with both the devices
        in DISCONNECTED state. Mux client is put in a state
        DISCONNECTED_RETRYING even before it is in a CONNECTED state.
    - Too many things are not in place to report what is expected
    - No MultiplexingClientDeviceRegistrationAuthenticationException is
        thrown at anytime

Scenarios with pre-opened AMQP muliplexed connection, last device provided in the command line is added 15 seconds later:

1. 2 GOOD devices (same as 1 above)

2. first GOOD, second MISSING (same as 2 above)

3. first MISSING, second GOOD (same as 2 above)

4. first GOOD device, second DISABLED device
    - BAD   - Once the disabled device is added, things go south just like
                in scenario 3 in the above section. In this case mux client
                is in DISCONNECTED state and both the devices too are in
                DISCONNECTED state (mux was in CONNECTED state in scenario
                3 in the section above)

5. first DISABLED device, second GOOD device
    - BAD   - Mux status is null
    - BAD   - Opened mux is closed, I guess. Good device add call to
                startDeviceTwin fails with the exception 'Open the client
                connection before using it'

But wait, there's more (R.I.P Billy Mays):

1. 2 GOOD devices, delete one while running
    - OK    - GOOD device is marked as CONNECTED and deleted device is
                marked as DISCONNECTED
    - OK    - GOOD device continues to work
    - BAD   - Mux state is still null
    - No idea what will happen if a device is added at this point

2. 2 GOOD devices, DISABLE one while running
    - BAD   - Both devices are marked as DISCONNECTED
    - BAD   - Mux is also marked as DISCONNECTED

I am trying to figure out an implementation strategy with all these scenarios so that my program can continue to keep the valid devices running. The best option will be to have the mux client state to be right wrt its connection. If it is not connected, I can create another mux instance and move all the devices from the disconnected one to the new one.

When a device is disabled, there is no way to know which one it is out of the 100s of devices connected using a single mux. As of now, using the mutiplexed transport, i check each device to see if it is enabled and if not remove it from thelist. With the new multiplexed client, the deleted ones are at least listed in the exception.

Being able to add a device to an active mux really is a plus wrt the old multiplexed transport.

Code sample exhibiting the issue

public class Main 
{
    private static Logger logger = LoggerFactory.getLogger(Main.class);
    private static Map<String, IotHubConnectionStatus> connectionStatusMap = 
        new ConcurrentHashMap<>();
    private static IotHubConnectionStatus muxStatus = null;

    public static void main(String[] args) throws Exception
    {
        IotHubConnectionString ics = new IotHubConnectionString(args[0]);
        MultiplexingClient muxClient = new MultiplexingClient(
            ics.getHostName(), IotHubClientProtocol.AMQPS);
        muxClient.registerConnectionStatusChangeCallback(
            new MuxConnectionStatusChangeCallback(), muxClient);

        boolean addLastLater = Boolean.getBoolean("add.last.later");
        boolean startEmpty = !Boolean.getBoolean("disable.start.empty");

        if(startEmpty)
            muxClient.open();

        Map<String, DeviceClient> deviceMap = new HashMap<>(8);
        int maxCount = args.length;
        if(addLastLater)
            maxCount--;

        for(int i = 0; i < maxCount; i++)
        {
            logger.info("## Adding device: " + i);
            String cs = args[i];
            DeviceClient client = new DeviceClient(cs,
                IotHubClientProtocol.AMQPS);
            client.registerConnectionStatusChangeCallback(
                new DeviceConnectionStatusChangeCallback(), client.getConfig());
            deviceMap.put(client.getConfig().getDeviceId(), client);
        }

        try
        { 
            if(!deviceMap.isEmpty())
                muxClient.registerDeviceClients(deviceMap.values());
        }
        catch(MultiplexingClientDeviceRegistrationAuthenticationException mx)
        {
            logger.error("## Failed with MCDRAE", mx);
            Map<String, Exception> exMap = mx.getRegistrationExceptions();
            for (String deviceId : exMap.keySet())
            {
                logger.error("## Device '{}' failed to register", deviceId,
                    exMap.get(deviceId));
                muxClient.unregisterDeviceClient(deviceMap.remove(deviceId));
            } 
        }
        catch(Throwable tx)
        {
            logger.error("## Failed to register clients", tx);
        }

        try
        { 
            if(!startEmpty)
                muxClient.open();
        }
        catch(MultiplexingClientDeviceRegistrationAuthenticationException mx)
        {
            logger.error("## Failed with MCDRAE", mx);
            Map<String, Exception> exMap = mx.getRegistrationExceptions();
            for (String deviceId : exMap.keySet())
            {
                logger.error("## Device '{}' failed to register", deviceId,
                    exMap.get(deviceId));
                muxClient.unregisterDeviceClient(deviceMap.remove(deviceId));
            } 
        }
        catch(Throwable tx)
        {
            logger.error("## Failed NOT with MCDRAE", tx);
        }

        for(DeviceClient dc : deviceMap.values())
        {
            try
            {
                dc.startDeviceTwin(new DeviceTwinStatusCallBack(),
                    dc.getConfig(), new OnPropertyCallBack(), dc.getConfig());
            }
            catch(Throwable tx)
            {
                logger.error("## Failed to start twin for device '{}'",
                    dc.getConfig().getDeviceId(), tx);
            }
        }

        if(addLastLater)
        {
            Thread.sleep(15000L);
            logger.info("## Adding later ------------------------");
            int i = args.length - 1;
            logger.info("## Adding device: " + i);
            String cs = args[i];
            DeviceClient client = new DeviceClient(cs,
                IotHubClientProtocol.AMQPS);
            client.registerConnectionStatusChangeCallback(
                new DeviceConnectionStatusChangeCallback(), client.getConfig());

            boolean good = false;
            try
            { 
                muxClient.registerDeviceClient(client);
                logger.info("## Device registered : " + i);
                good = true;
            }
            catch(Throwable tx)
            {
                logger.error("## Failed to register device : " + i, tx);
                muxClient.unregisterDeviceClient(client);
            }

            try
            {
                if(good)
                {
                    client.startDeviceTwin(new DeviceTwinStatusCallBack(),
                        client.getConfig(), new OnPropertyCallBack(),
                        client.getConfig());
                }
            }
            catch(Throwable tx)
            {
                logger.error("## Failed to start twin for device '{}'",
                    client.getConfig().getDeviceId(), tx);
            }
        }

        System.out.println("Press Enter to close ...");
        Scanner scanner = new Scanner(System.in);
        scanner.nextLine();

        logger.info("## Connection status: " + connectionStatusMap);
        logger.info("## Mux status: " + muxStatus);

        muxClient.close();

        System.out.println("Press Enter to exit ...");
        scanner.nextLine();
    }

    static class DeviceConnectionStatusChangeCallback implements
    IotHubConnectionStatusChangeCallback
    {
        @Override
        public void execute(IotHubConnectionStatus status,
        IotHubConnectionStatusChangeReason reason, Throwable tx, Object ctx)
        {
            DeviceClientConfig c = (DeviceClientConfig)ctx;
            String id = c.getDeviceId();
            connectionStatusMap.put(id, status);
            logger.info("## " + id + " : Status : " + status + ", Reason: " +
                reason, tx);
        }
    }

    static class MuxConnectionStatusChangeCallback implements
    IotHubConnectionStatusChangeCallback
    {
        @Override
        public void execute(IotHubConnectionStatus status,
        IotHubConnectionStatusChangeReason reason, Throwable tx, Object ctx)
        {
            muxStatus = status;
            logger.info("## MUX : Status : " + status + ", Reason: " +
                reason, tx);
        }
    }

    static class DeviceTwinStatusCallBack implements IotHubEventCallback
    {
        @Override
        public void execute(IotHubStatusCode status, Object ctx)
        {
            DeviceClientConfig c = (DeviceClientConfig)ctx;
            String id = c.getDeviceId();
            logger.info("## " + id + " : IoT Hub TWIN status : " + status);
        }
    }

    static class OnPropertyCallBack implements TwinPropertyCallBack
    {
        @Override
        public void TwinPropertyCallBack(Property property, Object ctx)
        {
            DeviceClientConfig c = (DeviceClientConfig)ctx;
            String id = c.getDeviceId();
            logger.info("## " + id + " : OnProperty callback for " +
                (property.getIsReported() ? "reported" : "desired") +
                " property '" + property.getKey() + "' : Value '" +
                property.getValue() + "', Version " + property.getVersion());
        }
    }

}

Console log of the issue

Too many to list, so left out.

timtay-microsoft commented 3 years ago

Just to make sure we're both on the same page, the connection state callback for device-level events (i.e. device disabled) only reflects in the device client level connection status callback.

For instance, if you have a multiplexed connection with 5 devices on it, and you disable one of these devices from the service side, the expected behavior is that the connection status callback tied to that particular device client will execute with the connection lost event. The multiplexing client's connection status callback won't execute then though, since the AMQP connection itself is still open.

With that in mind, what issues are you seeing with the connection status callbacks for the multiplexing client and its device clients?

sanjay-90210 commented 3 years ago

Just to make sure we're both on the same page, the connection state callback for device-level events (i.e. device disabled) only reflects in the device client level connection status callback. Yes, I actually want it to work that way, but things do not work that way.

The primary issue is that once mux is opened, it should be put into connected state. This call does not seem to happen. A call back is registered and then the mux is opened as given below, but the callback is never called with a "CONNECTED' status.

        IotHubConnectionString ics = new IotHubConnectionString(args[0]);
        MultiplexingClient muxClient = new MultiplexingClient(
            ics.getHostName(), IotHubClientProtocol.AMQPS);
        muxClient.registerConnectionStatusChangeCallback(
            new MuxConnectionStatusChangeCallback(), muxClient);

        boolean addLastLater = Boolean.getBoolean("add.last.later");
        boolean startEmpty = !Boolean.getBoolean("disable.start.empty");

        if(startEmpty)
            muxClient.open();

For instance, if you have a multiplexed connection with 5 devices on it, and you disable one of these devices from the service side, the expected behavior is that the connection status callback tied to that particular device client will execute with the connection lost event. The multiplexing client's connection status callback won't execute then though, since the AMQP connection itself is still open. I expect things to work that way, but that is not what is actually happening. When you disable a device, the other devices as well as the mux itself is disconnected. Until the disabled device is removed from mux, nothing will work for that instance of mux.

2. 2 GOOD devices, DISABLE one while running
    - BAD   - Both devices are marked as DISCONNECTED
    - BAD   - Mux is also marked as DISCONNECTED

In the scenario below, there are loads of issues relating to reconnects that I have no clue what it is trying to do. I would expect mux and the good device to be in connected state and the disabled device in disconnected state.

3. 1 GOOD device, 1 DISABLED device
    - Cascading issues that lead to a lot of connects and disconnects that
        leaves the mux client in the state CONNECTED with both the devices
        in DISCONNECTED state. Mux client is put in a state
        DISCONNECTED_RETRYING even before it is in a CONNECTED state.
    - Too many things are not in place to report what is expected
    - No MultiplexingClientDeviceRegistrationAuthenticationException is
        thrown at anytime
timtay-microsoft commented 3 years ago

I spent some time digging into this issue today, and have a couple of findings so far

I have a fix merged in already for the first bug I found, which is that a multiplexing client with 1 or fewer devices registered at open time did not execute the connection status callback, like you reported. (#1198)

The service appears to throw an AmqpInternalException rather than an AmqpNotFoundException when a device becomes unregistered, which seems a bit off to me considering there is no service error here, just a typical 404 client error. Regardless of which error is thrown here, however, the SDK attempts to reconnect the disabled device by default, and will eventually close the whole multiplexing client. This is not expected behavior from the SDK, as I'd expect the multiplexed connection and the other devices to continue on, and just have the disabled device close it's device client instance. I'm still digging into this, but I should have a fix checked in soon.

timtay-microsoft commented 3 years ago

My team is putting out a release shortly that will contain both #1202 and #1198, so would you be willing to upgrade to the latest version and try out the multiplexing client after that release is done?

sanjay-90210 commented 3 years ago

Sure, I will

timtay-microsoft commented 3 years ago

@sanjay-90210 are you still running into any issues after upgrading to the latest version?

sanjay-90210 commented 3 years ago

Sorry for the delay, I did not realize that a release was made.

Looks better than before with the mux state listener getting called with the right state unless a device is disabled. Issues with disabled devices still exist. Once a disabled device is added into a connection, the state of all devices bounce between connected and disconnected as if the server itself is disconnecting the connection. Log (only my program logs shown) from the case where a device is disabled while things were connected and working fine is given below (running the sample program with 2 good devices and disabling a devicein the middle)

[main] INFO com.kalkitech.test.azure.multimux.Main - ## MUX : Status : CONNECTED, Reason: CONNECTION_OK [main] INFO com.kalkitech.test.azure.multimux.Main - ## Adding device: 0 [main] INFO com.kalkitech.test.azure.multimux.Main - ## Adding device: 1 [azure-iot-sdk-ReactorRunner] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : Status : CONNECTED, Reason: CONNECTION_OK [azure-iot-sdk-ReactorRunner] INFO com.kalkitech.test.azure.multimux.Main - ## m7 : Status : CONNECTED, Reason: CONNECTION_OK [azure-iot-sdk-IotHubReceiveTask] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : IoT Hub TWIN status : OK [azure-iot-sdk-IotHubReceiveTask] INFO com.kalkitech.test.azure.multimux.Main - ## m7 : IoT Hub TWIN status : OK [azure-iot-sdk-IotHubReceiveTask] INFO com.kalkitech.test.azure.multimux.Main - ## m7 : OnProperty callback for desired property 'abc' : Value '36.0', Version 22 [azure-iot-sdk-IotHubReceiveTask] INFO com.kalkitech.test.azure.multimux.Main - ## m7 : OnProperty callback for desired property 'abc' : Value '37.0', Version 23 [azure-iot-sdk-IotHubReceiveTask] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : OnProperty callback for desired property 'mojo' : Value 'true', Version 2 [azure-iot-sdk-DeviceSessionReconnectionTask:33493911-812a-4248-854c-755e3f51e70c] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : Status : DISCONNECTED_RETRYING, Reason: NO_NETWORK [azure-iot-sdk-ReactorRunner] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : Status : DISCONNECTED, Reason: CLIENT_CLOSE [azure-iot-sdk-DeviceSessionReconnectionTask:33493911-812a-4248-854c-755e3f51e70c] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : Status : DISCONNECTED_RETRYING, Reason: NO_NETWORK [azure-iot-sdk-ReactorRunner] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : Status : CONNECTED, Reason: CONNECTION_OK [azure-iot-sdk-ConnectionReconnectionTask:33493911-812a-4248-854c-755e3f51e70c] INFO com.kalkitech.test.azure.multimux.Main - ## m7 : Status : DISCONNECTED_RETRYING, Reason: NO_NETWORK [azure-iot-sdk-ConnectionReconnectionTask:33493911-812a-4248-854c-755e3f51e70c] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : Status : DISCONNECTED_RETRYING, Reason: NO_NETWORK [azure-iot-sdk-ConnectionReconnectionTask:33493911-812a-4248-854c-755e3f51e70c] INFO com.kalkitech.test.azure.multimux.Main - ## MUX : Status : DISCONNECTED_RETRYING, Reason: NO_NETWORK [azure-iot-sdk-ReactorRunner] INFO com.kalkitech.test.azure.multimux.Main - ## m7 : Status : CONNECTED, Reason: CONNECTION_OK [azure-iot-sdk-ReactorRunner] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : Status : CONNECTED, Reason: CONNECTION_OK [azure-iot-sdk-DeviceSessionReconnectionTask:cd56c84c-e222-45f5-b945-d5c7255020bb] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : Status : DISCONNECTED, Reason: COMMUNICATION_ERROR [azure-iot-sdk-ReactorRunner] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : Status : CONNECTED, Reason: CONNECTION_OK [main] INFO com.kalkitech.test.azure.multimux.Main - ## Connection status: {m7=CONNECTED, d3=CONNECTED} [main] INFO com.kalkitech.test.azure.multimux.Main - ## Mux status: DISCONNECTED_RETRYING [main] INFO com.kalkitech.test.azure.multimux.Main - ## m7 : Status : DISCONNECTED, Reason: CLIENT_CLOSE [main] INFO com.kalkitech.test.azure.multimux.Main - ## d3 : Status : DISCONNECTED, Reason: CLIENT_CLOSE [main] INFO com.kalkitech.test.azure.multimux.Main - ## MUX : Status : DISCONNECTED, Reason: CLIENT_CLOSE

If a disabled device is added in a connection or a device is disabled when the connection to it is setup, things go south. State of mux may also not getting back to connected even when the devices are shown as connected. Please verify this scenario.

timtay-microsoft commented 3 years ago

From my initial testing, that behavior is fixed by https://github.com/Azure/azure-iot-sdk-java/commit/3d065c9073c94aae8d58ff57ac0b4d26c11cb93d

This fix wasn't a part of the latest release, but I do plan on releasing it soon.

I tried opening a multiplexing client with 2 devices, then registering another device that was already disabled, and the other devices were unaffected as expected.

Likewise, I tried opening a multiplexing client with 3 devices, disabling one of them, and the other 2 devices were unaffected as expected.

With that in mind, I'm going to mark this issue as "Fix Checked In"

timtay-microsoft commented 3 years ago

That said, it does appear that there is a bug around opening a multiplexing client with a set of already registered devices where one or more of those devices is disabled. I'll dig into that a bit more

timtay-microsoft commented 3 years ago

I've put out another release that contains some fixes related to your scenario, in case you want to take another look

https://github.com/Azure/azure-iot-sdk-java/releases/tag/2021-05-24

sanjay-90210 commented 3 years ago

I tried out the new build with just the base disabled device case, i.e run the sample with 2 devices, one good and one disabled. The new build goes into an infinite loop of retries, filling the logs nicely. It also leaks threads with each try. Two processed thread dumps are attached (duplicate stack traces are combined into a count for easy review). After a little while of retrying, there are 32 IotHubSendTask and IotHubReceiveTask threads. A few minutes later the thread numbers were 105 each.

1.html.gz 2.html.gz

sanjay-90210 commented 3 years ago

@timtay-microsoft Any updates on the disabled device issue? Do you need any more data from me?

timtay-microsoft commented 3 years ago

Sorry, I've been a bit sidetracked by some other work. I should have enough to repro, but I'll let you know if I need anything else

timtay-microsoft commented 3 years ago

I'd like to start by simplifying your repro code a bit since there are a lot of parts to it that should be irrelevant to this issue. From what I've been able to simplify so far, this is the code that I'd expect a repro of the infinite loop and leaked threads from:

public static void main(String[] args)
        throws Exception {
    IotHubClientProtocol protocol = IotHubClientProtocol.AMQPS;
    DeviceClient client1 = new DeviceClient(connectionString1, protocol);
    DeviceClient client2 = new DeviceClient(connectionString2, protocol); // device is disabled before running this sample

    Map<String, DeviceClient> deviceMap = new HashMap<>(2);
    deviceMap.put(client1.getConfig().getDeviceId(), client1);
    deviceMap.put(client2.getConfig().getDeviceId(), client2);

    MultiplexingClient multiplexingClient = new MultiplexingClient(hostname, protocol);

    // no need to wrap this in a try/catch block since this call never throws a 
    // MultiplexingClientDeviceRegistrationAuthenticationException if the multiplexing client isn't open yet
    multiplexingClient.registerDeviceClients(deviceMap.values());

    try
    {
        multiplexingClient.open();
    }
    catch (MultiplexingClientDeviceRegistrationAuthenticationException mx)
    {
        // interestingly, from my initial testing, this catch block isn't hit. The reason being that IoT Hub does allow 
        // you to successfully open the disabled device's session, but the service then kills the session shortly afterwards.
        Map<String, Exception> exMap = mx.getRegistrationExceptions();
        for (String deviceId : exMap.keySet())
        {
            multiplexingClient.unregisterDeviceClient(deviceMap.remove(deviceId));
        }
    }

    // give some time here just in case the leaked threads take a while to accumulate
    Thread.sleep(3000);

    multiplexingClient.close();

    System.out.println("Done waiting, all threads should be finished");
}

With this code, I'm not seeing any infinite loops during any of the register/unregister calls, and the close call on the multiplexing client successfully closes the only pair of send/receive threads. Where exactly in your repro code in the opening comment of this thread are you seeing the infinite loop?

sanjay-90210 commented 3 years ago

I copy pasted your code, made minor mods to pass the device strings in and executed it multiple times with the same device connect strings. The first device being normal and the second device being disabled. In one execution, i got an exception in the code:

 try
        {
            multiplexingClient.open();
        }
        catch (MultiplexingClientDeviceRegistrationAuthenticationException mx)
        {
            // interestingly, from my initial testing, this catch block isn't hit. The reason being that IoT Hub does allow 
            // you to successfully open the disabled device's session, but the service then kills the session shortly afterwards.
            Map<String, Exception> exMap = mx.getRegistrationExceptions();
            for (String deviceId : exMap.keySet())
            {
                multiplexingClient.unregisterDeviceClient(deviceMap.remove(deviceId));
            }
        }

The output log for it is given below: out-exepection.log

In all other cases, it went into an infinite loop. A full log with multiple thread dumps is given below: out-loop.log

This execution had to be killed in the end. Hope this helps you with debugging. The jars i have as dependencies are given below:

asm-7.1.jar asm-analysis-7.1.jar asm-commons-7.1.jar asm-tree-7.1.jar asm-util-7.1.jar azure-core-1.7.0.jar azure-core-http-netty-1.5.4.jar azure-storage-blob-12.8.0.jar azure-storage-common-12.8.0.jar azure-storage-internal-avro-12.0.0.jar bcmail-jdk15on-1.64.jar bcpkix-jdk15on-1.64.jar bcprov-jdk15on-1.64.jar commons-codec-1.14.jar commons-lang3-3.11.jar gson-2.8.0.jar iot-deps-0.13.0.jar iot-device-client-1.30.2.jar jackson-annotations-2.10.1.jar jackson-core-2.10.1.jar jackson-databind-2.10.1.jar jackson-dataformat-xml-2.10.1.jar jackson-datatype-jsr310-2.10.1.jar jackson-module-jaxb-annotations-2.10.1.jar jakarta.activation-api-1.2.1.jar jakarta.xml.bind-api-2.3.2.jar jffi-1.2.19.jar jffi-1.2.19-native.jar jnr-a64asm-1.0.0.jar jnr-constants-0.9.12.jar jnr-enxio-0.21.jar jnr-ffi-2.1.10.jar jnr-posix-3.0.50.jar jnr-unixsocket-0.23.jar jnr-x86asm-1.0.2.jar junit-3.8.1.jar netty-buffer-4.1.51.Final.jar netty-codec-4.1.51.Final.jar netty-codec-http2-4.1.51.Final.jar netty-codec-http-4.1.51.Final.jar netty-codec-socks-4.1.51.Final.jar netty-common-4.1.51.Final.jar netty-handler-4.1.51.Final.jar netty-handler-proxy-4.1.51.Final.jar netty-resolver-4.1.51.Final.jar netty-tcnative-boringssl-static-2.0.31.Final.jar netty-transport-4.1.51.Final.jar netty-transport-native-epoll-4.1.51.Final-linux-x86_64.jar netty-transport-native-kqueue-4.1.51.Final-osx-x86_64.jar netty-transport-native-unix-common-4.1.51.Final.jar org.eclipse.paho.client.mqttv3-1.2.2.jar proton-j-0.30.0.jar qpid-proton-j-extensions-1.2.3.jar reactive-streams-1.0.3.jar reactor-core-3.3.8.RELEASE.jar reactor-netty-0.9.10.RELEASE.jar security-provider-1.4.0.jar slf4j-api-1.7.30.jar slf4j-simple-1.7.30.jar stax2-api-4.2.jar tim-multi-mux-receive-props-1.0.0-SNAPSHOT.jar woodstox-core-6.0.2.jar

timtay-microsoft commented 3 years ago

I'm not able to repro those same results so far. In particular, the ProtonIOException: bytes left unconsumed error is not showing up in my logs. That error is treated as a connection level event, so the multiplexed connection itself goes into reconnection. The loop here seems to be

1) connection briefly opened 
2) device session dropped for the disabled device 
3) "bytes left unconsumed" error
4) multiplexed connection closes itself and starts reconnection
1) connection briefly opened
2) device session dropped for disabled device
3)
4)
...

The behavior of each individual step seems correct to me. The only issue here is why you are seeing the "bytes left unconsumed" error. Are you sure you aren't doing anything different from the repro code I left above, and are you introducing any multithreaded use of a particular client or something?

When I run the above code (even in Ubuntu 16.04) I only see this happen:

connection briefly opened -> device session dropped for the disabled device -> retry for the disabled device gives up, other device and the multiplexed connection are still active and useable.

sanjay-90210 commented 3 years ago

The full project that i am using is attached. test.tar.gz

timtay-microsoft commented 3 years ago

I still can't repro your scenario with that project. Do you see this same issue on other operating systems or on other JDK versions?

timtay-microsoft commented 3 years ago

@sanjay-90210 do you have any other guidance on repro'ing this issue?

sanjay-90210 commented 3 years ago

I created a fresh set of iot devices and tested the disabled device issue. This time it looks a lot better, but I do run into the infinite loop issue even now. I am not able to pinpoint the exact state a device has to be in for this to happen. If a device is created in the "disabled" state to start with, it shows no infinite loop issues (10+ tries did not run into this issue). If a device is created enabled, connected to without issues at least once, then disabled while connected or otherwise, it MAY show the infinite loop issue. With enough retries and state change from disabled to enabled and back, I hit this issue and just like that, it can go away and no infinite loop will be there. The one thing I notices is that, if it ever gets into an infinite loop, the following exception is thrown:

[azure-iot-sdk-DeviceSessionReconnectionTask:a746d96e-e314-4bfb-9533-72a0159bd7e3] WARN com.microsoft.azure.sdk.iot.device.transport.IotHubTransport - Updating device m15 status to new status DISCONNECTED_RETRYING with reason NO_NETWORK
com.microsoft.azure.sdk.iot.device.transport.amqps.exceptions.AmqpInternalErrorException: Device m15 at IotHub device-hub not registered.
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsExceptionTranslator.convertFromAmqpException(AmqpsExceptionTranslator.java:50)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsIotHubConnection.onSessionClosedUnexpectedly(AmqpsIotHubConnection.java:729)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionHandler.onLinkClosedUnexpectedly(AmqpsSessionHandler.java:305)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsReceiverLinkHandler.onLinkRemoteClose(AmqpsReceiverLinkHandler.java:114)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.IotHubReactor.run(IotHubReactor.java:26)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsIotHubConnection$

When things stop without an infinite loop, the following log lines are reported:

[azure-iot-sdk-DeviceSessionReconnectionTask:7970208f-c1aa-417f-b078-40f3602555d9] DEBUG com.microsoft.azure.sdk.iot.device.transport.IotHubTransport - Mapping exception throwable to COMMUNICATION_ERROR because the sdk was unable to classify the thrown exception to anything other category
com.microsoft.azure.sdk.iot.device.transport.amqps.exceptions.AmqpNotFoundException: {"errorCode":404001,"trackingId":"9e7fd049e1264db4b2c3571e9b0d8404-G:0-TimeStamp:06/22/2021 07:21:14-G:0-TimeStamp:06/22/2021 07:21:14","message":"m15","timestampUtc":"2021-06-22T07:21:14.0593455Z"}
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsExceptionTranslator.convertFromAmqpException(AmqpsExceptionTranslator.java:62)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsIotHubConnection.onSessionClosedUnexpectedly(AmqpsIotHubConnection.java:729)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionHandler.onLinkClosedUnexpectedly(AmqpsSessionHandler.java:305)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSenderLinkHandler.onLinkRemoteClose(AmqpsSenderLinkHandler.java:115)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.IotHubReactor.run(IotHubReactor.java:26)
    at com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsIotHubConnection$ReactorRunner.call(AmqpsIotHubConnection.java:1162)

I will write a test program with a recovery strategy that can recover from deleted or disabled devices while still continuing to work with the good devices and see if that can be sustained for an extended periods of time.

timtay-microsoft commented 2 years ago

Closing this issue as we were never able to repro this issue. In the recent 2.0.0 release, the reconnection and open/close logic have been improved and it may have fixed this issue. If you are still seeing this issue after upgrading to 2.0.0, then please open a new issue.