hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
847 stars 158 forks source link

.sessionExpiryInterval(int) prevents graceful disconnect and shutdown #498

Closed cware-architect closed 1 year ago

cware-architect commented 3 years ago

Expected behavior

Calling client.disconnect() should allow for gracefully program termination. The provided example creates an async client, connects and subscribes to a topic. Normal operation would expect the program to exit after the main() runs out.

Assuming the client requires a disconnect, a the tester thread was created to to create a delayed called to client.disconnect(). The tester thread waits briefly and then triggers a client.disconnect() which should interrupt the subscription and the program gracefully exit.

Actual behavior

The test does not exit gracefully and remains operating for the length of time set in the call to client.sessionExpiryInterval(int). If set to 20 seconds, the subscribe thread remains alive for 20 seconds. Our use case requires a session to live a few hours, making this an issue. We can call a brute force System.exit() but this is bad practice.

To Reproduce

Run test code and it will not gracefully exit after the tester thread calls client.disconnect(). It waits out the time set in sessionExpiryInterval(int). The assumption is that the client should cleanly terminate when disconnect() is called. The assumption is the sessionExpiryInterval should signal the broker to keep the session alive with respective seconds parameter, but it should not impact the lifetime/lifecycle of the client.

We would even assume that as soon as the client goes out of scope it disconnects, and no threads hold the program alive.

As this is our first exposure to the HiveMQ client, please let us know if there is another operation that we must perform that will both set the broker's session lifetime and yet provide a timely exit when disconnect() is called.

Workarounds

The last one is clearly a hack, as calling cancel() on a CF that has already signaled complete is shameful. This leaves calling System.exit(), but we like graceful shutdowns in our production software.

Comments and Observations

The active thread appears to be waiting on an nio operation with timeout. Should the disconnect() interrupt this thread?

But then why would the session expiry interval affect the lifetime of the local client? If it is meant to create a grace period during shutdown, a pause for a subscription to complete, then it should be a separate interval setting.

This is the async variant of the client, so connect and subscribe statements are not expected to keep the client alive, or impact the client lifecycle. Should a call to disconnect() even be required?

Reproducer code

public class HiveSubAsyncFail
  {

  public static void main( String[] args )
      throws Exception
    {
    final Mqtt5AsyncClient client = Mqtt5Client.builder()
        .identifier( "asyncsub" )
        .serverHost( "localhost" )

        // so we can observe the internal connect/disconnect events
        .addConnectedListener( x -> System.out.println( ":connected" ) )
        .addDisconnectedListener( x -> System.out.println( ":disconnected" ) )

        .buildAsync();

    client.connectWith()

        // signal broker we want to accumulate messages in our absence
        .cleanStart( false )

        // set session expiry to 20 seconds
        /// it does signal broker to keep our session for 20 seconds,
        /// but PREVENTS DISCONNECT. It holds up graceful disconnect for 20 seconds.
        /// Because we want to sever to keep session for an hour, so will it hold up the disconnect
        .sessionExpiryInterval( 20 )

        .send();

    final @NotNull CompletableFuture<Mqtt5SubAck> subFutre = client.subscribeWith()
        .topicFilter( "Examples" )
        .callback( p -> System.out.println( new String( p.getPayloadAsBytes() ) ) )
        .send();

    subFutre.whenComplete( ( ack, th ) ->
      {
      System.out.println( "subscribe complete" + th == null ? "." : " with error " + th.getMessage() );
      } );

    // starting separate TESTER THREAD to let main thread lapse
    new Thread( () ->
      {
      try
        {
        // pause 5 seconds to test successful subscription
        Thread.sleep( 5000 );

        // then signal disconnect
        System.out.println( "signal disconnect" );
        client.disconnect();

        // wait a second for disconnect to take effect
        Thread.sleep( 1000 );
        System.out.println( "conn state " + client.getState() );

        // A WORKAROUND HACK...
        // Calling cancel on the original subscribe future kills
        // the thread that prevents a graceful shutdown
        // without this call, client won't shutdown until sessionExpiryInterval() expires

        // subCF.cancel( true );

        // we could also call system.exit, but that's just bad practice
        // System.exit( 99 );
        }
      catch( final InterruptedException e )
        {
        e.printStackTrace();
        }
      } ).start();

    // main thread should quietly finish and this test terminate after tester thread fires disconnect()
    // the only thing keeping the test alive is poorly behaved threads that didn't get the disconnect signal

    }

  }

Details

pglombardo commented 1 year ago

Hi @cware-architect - this is a very well written diagnosis. Our apologies for the late reply.

A couple thoughts:

Maintaining client state between threads is a architectural issue to ensure proper behaviour IMO. For multithreaded use cases, you could create a new client for each thread etc... A shared client has obvious complications as you've pointed out.

// set session expiry to 20 seconds
/// it does signal broker to keep our session for 20 seconds,
/// but PREVENTS DISCONNECT. It holds up graceful disconnect for 20 seconds.
/// Because we want to sever to keep session for an hour, so will it hold up the disconnect
.sessionExpiryInterval( 20 )

The sessionExpiryInterval is used by the broker to maintain sessions when the socket is disconnected without an MQTT disconnect call. Useful for remote IoT devices that have intermittent connectivity.

Whatever the sessionExpiryInterval is set to, you can always call disconnect at any time. In fact with MQTT 5, you can call disconnect with an updated sessionExpiryInterval value. For example, with the value 0 to indicate:

the Session ends when the Network Connection is closed.

See the client documentation here for an example of calling disconnect with a sessionExpiryInterval.

All of this is explained in the MQTT 5 specification under Session State.

I believe I've addressed the core issues. Let me know if this clarifies the situation (and makes sense). I'd be happy to help out more.

pglombardo commented 1 year ago

Hi @cware-architect I'll close out this issue but keep it as a reference when we revisit this code. If anything else remains, please feel free to re-open or file another issue. Thanks for the great write up!