eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.1k stars 880 forks source link

ConcurrentModificationException when calling unsubscribe #986

Open dereulenspiegel opened 1 year ago

dereulenspiegel commented 1 year ago

When subscribing to topics with a topic specific message listener on the AsyncClient (i.e calling MqttAsyncClient.subscribe([]MqttSubscription, Object, MqttActionListener, IMqttMessageListener, MqttProperties an unsubscribe later runs into the ConcurrentModifcationException. This is caused here because the the Map is modified (by calling remove) without using the iterator. So when using topic specific subscriptions it doesn't seem possible to unsubscribe right now.

ptma commented 11 months ago

Confirmed

nl-software commented 2 weeks ago

I ran into this issue as well, but could create a workaround by overriding the MqttAsyncClient.unsubscribe() operation and use the AsyncClient class instead:

public class AsyncClient extends org.eclipse.paho.mqttv5.client.MqttAsyncClient {

    public AsyncClient(String serverURI, String clientId) throws MqttException {
        super(serverURI, clientId);
    }

    public AsyncClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException {
        super(serverURI, clientId, persistence, null, null);
    }

    public AsyncClient(String serverURI, String clientId, MqttClientPersistence persistence,
            MqttPingSender pingSender, ScheduledExecutorService executorService) throws MqttException {
        super(serverURI, clientId, persistence, pingSender, executorService);
    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#unsubscribe(java.lang.String[
     * ], java.lang.Object, org.eclipse.paho.mqttv5.client.MqttActionListener,
     * org.eclipse.paho.mqttv5.common.packet.MqttProperties)
     */
    /// Bugfix workaround override of org.eclipse.paho.mqttv5.client.MqttAsyncClient to
    /// suppress a java.util.ConcurrentModificationException in
    /// org.eclipse.paho.mqttv5.client.internal.CommsCallback.removeMessageListener().
    /// @see https://github.com/eclipse/paho.mqtt.java/issues/986
    @Override
    public IMqttToken unsubscribe(String[] topicFilters, Object userContext, MqttActionListener callback,
            MqttProperties unsubscribeProperties) throws MqttException {

// skip logging, super.log is private
//      final String methodName = "unsubscribe";

//      // Only Generate Log string if we are logging at FINE level
//      if (log.isLoggable(Logger.FINE)) {
//          String subs = "";
//          for (int i = 0; i < topicFilters.length; i++) {
//              if (i > 0) {
//                  subs += ", ";
//              }
//              subs += topicFilters[i];
//          }
//
//          // @TRACE 107=Unsubscribe topic={0} userContext={1} callback={2}
//          log.fine(CLASS_NAME, methodName, "107", new Object[] { subs, userContext, callback });
//      }

        for (String topicFilter : topicFilters) {
            // Check if the topic filter is valid before unsubscribing
            // Although we already checked when subscribing, but invalid
            // topic filter is meanless for unsubscribing, just prohibit it
            // to reduce unnecessary control packet send to broker.
// super.mqttConnection is private unfortunately; it can be retrieved from the IMqttToken CONNACK but too mouch work here.
//          MqttTopicValidator.validate(topicFilter, true/* allow wildcards */, this.mqttConnection.isSharedSubscriptionsAvailable());
        }

        // remove message handlers from the list for this client
        for (String topicFilter : topicFilters) {
            try {
                this.comms.removeMessageListener(topicFilter);
            }
            catch (java.util.ConcurrentModificationException ex) {
                // Workaround for exception thrown at org.eclipse.paho.mqttv5.client.internal.CommsCallback.removeMessageListener(CommsCallback.java:568)
                // due to a remove() while iterating over the map.
                // At that location an explicit iterator must be used to remove the listener from the collection.
            }
        }

        MqttToken token = new MqttToken(getClientId());
        token.setActionCallback(callback);
        token.setUserContext(userContext);
        token.internalTok.setTopics(topicFilters);

        MqttUnsubscribe unregister = new MqttUnsubscribe(topicFilters, unsubscribeProperties);
                token.setRequestMessage(unregister);

        comms.sendNoWait(unregister, token);
// skip logging, super.log is private
//      // @TRACE 110=<
//      log.fine(CLASS_NAME, methodName, "110");

        return token;
    }
}