eclipse-paho / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.15k stars 895 forks source link

Too many publishes is in progress with QOS 1 #551

Open sunny1974 opened 6 years ago

sunny1974 commented 6 years ago

Hi All, i set to maxInflight messages property to 1000 and trying to publish messages with 300 TPS, But after some point(after submiting 55K message) i'm facing below issue.

MqttException (0) - Too many publishes in progress (32202) at com.imi.rtm.manager.PahoAsyncConnectionManager.publishMessage(PahoAsyncConnectionManager.java:205) at com.imi.rtm.manager.RTMDispatcher.processInternal(RTMDispatcher.java:343) at com.imi.rtm.manager.RTMDispatcher.process(RTMDispatcher.java:399) at com.imi.rtm.manager.RTMProcessManager.processMessage(RTMProcessManager.java:619) at com.imi.rtm.manager.RTMProcessManager.process(RTMProcessManager.java:241) at com.imi.rtm.main.RTMProcessor$1.process(RTMProcessor.java:152) at com.imimobile.ump.queue.processor.AbstractProcessor$1.run(AbstractProcessor.java:339) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: Too many publishes in progress (32202) at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:513) at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:158) at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:187) at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1355) at com.imi.rtm.manager.PahoAsyncConnectionManager.publishMessage(PahoAsyncConnectionManager.java:203) ... 11 more

Can anyone suggest me what to do. Im using mosca server as backend and Java client as org.eclipse.paho.client.mqttv3-1.2.0 version.

jpwsutton commented 6 years ago

Hi @sunny1974,

This sounds like you could be exceeding your max in flight window, if you enable trace, then you should get a log message along the lines of: "sending {0} msgs at max inflight window", this will tell you how many messages are actually in flight at the point when you try to publish a new message.

Keep in mind, that at QoS 1, you are relying on the PubAck from the server to be processed for all outgoing messages before the number of inflight messages can be decreased, so this may be a bottleneck in the broker, network or amount of cpu / memory available in the client JVM.

pstanton commented 6 years ago

@jpwsutton i'm seeing this too - if using QoS 1, if the PubAck is never received, how long before the inflight message times out?

what is the default max inflight? how can this be set?

hylkevds commented 6 years ago

@pstanton: I don't think they time out (I've not found a timeout mechanism in the code).

client = new MqttClient(SERVER_URI, CLIENT_ID);
MqttConnectOptions options = new MqttConnectOptions();
options.setMaxInflight(1000); // Default = 10
client.connect(options);
Wanchen7 commented 6 years ago

What should I do when this exception occurs? I don't think increase maxInflight it's a good solution . When client is running to enough time ,it will still reach the max value. I can't find a method to reset whole state except disconnect and reconnect again.

dobermai commented 6 years ago

The solution we are often using is by blocking the caller thread explicitly by using a Semaphore which has the value of the in-flight window for exclusive access to publish(). This ensures that no more publish() calls are allowed than the in-flight window.

Increasing the in-flight window won't solve the problem, as you still need to make sure that you don't exceed your in-flight window.

Wanchen7 commented 6 years ago

@dobermai you save my time, thanks a lot !!

jurek7 commented 6 years ago

@dobermai could you please share sample code ?

jurek7 commented 6 years ago

I'm sharing my code to work-around this issue. Solution is based on semaphore locking as @dobermai suggested:

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.Semaphore;

public class App {

    public static void main(String[] args) throws InterruptedException {
        String topic = "/adb/notification";
        String content = "{\"name\": \"test-event\"}";
        int qos = 1;
        String broker = "tcp://localhost:1883";
        String clientId = "mqtt-publisher";

        MemoryPersistence memoryPersistence = new MemoryPersistence();

        final int MAX_INFLIGHT = 10;

        final int MESSAGES_TO_SEND = 400_000;

        Semaphore semaphore = new Semaphore(MAX_INFLIGHT);

        try {
            MqttClient mqttClient = new MqttClient(broker, clientId, memoryPersistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            connOpts.setMaxInflight(MAX_INFLIGHT);
            mqttClient.connect(connOpts);

            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    semaphore.release();
                }
            });

            for (int i = 0; i < MESSAGES_TO_SEND; i++) {
                MqttMessage message = new MqttMessage(content.getBytes());
                message.setQos(qos);

                semaphore.acquire();
                mqttClient.publish(topic, message);
            }

            mqttClient.disconnect();
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

Above solution works well, but in my opinion there must be something wrong with the implementation of max inflight messages inside library:

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class App {

    public static void main(String[] args) throws InterruptedException {
        String topic = "/adb/notification";
        String content = "{\"name\": \"test-event\"}";
        int qos = 1;
        String broker = "tcp://localhost:1883";
        String clientId = "mqtt-publisher";

        MemoryPersistence memoryPersistence = new MemoryPersistence();

        final int MAX_INFLIGHT = 10;

        final int MESSAGES_TO_SEND = 400_000;

        try {
            MqttClient mqttClient = new MqttClient(broker, clientId, memoryPersistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            connOpts.setMaxInflight(MAX_INFLIGHT);
            mqttClient.connect(connOpts);

            for (int i = 0; i < MESSAGES_TO_SEND; i++) {
                MqttMessage message = new MqttMessage(content.getBytes());
                message.setQos(qos);
                mqttClient.publish(topic, message);
            }

            mqttClient.disconnect();
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }

}

I'd expect that using blocking API will send up to MAX_INFLIGHT messages and will block sending new messages until it received PUBACK but unfortunately program hangs and never exit:

/usr/lib/jvm/java-8-openjdk-amd64/bin/java -javaagent:/home/juri/.local/share/JetBrains/Toolbox/apps/IDEA-C/ch-0/182.3684.101/lib/idea_rt.jar=42287:/home/juri/.local/share/JetBrains/Toolbox/apps/IDEA-C/ch-0/182.3684.101/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/java-atk-wrapper.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/rt.jar:/home/juri/projects/pmp_poc/mqttpublisher/target/classes:/home/juri/.m2/repository/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.2.0/org.eclipse.paho.client.mqttv3-1.2.0.jar App
reason 32202
msg Too many publishes in progress
loc Too many publishes in progress
cause null
excep Too many publishes in progress (32202)
Too many publishes in progress (32202)
    at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:513)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:158)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:187)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1355)
    at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:583)
    at App.main(App.java:31)
ChristianLutz commented 5 years ago

I agree with @jurek7, so this should be labeld as bug and not as question. We do have exact the same problem.

nivram07 commented 5 years ago

Hello, was this acknowledged as a bug and fixing it or leaving this issue as is? I'm having the same issue as well. Thanks.

derolf commented 5 years ago

I am encountering the same problem and I think there is a race condition: Sometimes publish returns before inFlight is decremented. I saw this by capturing the following two stack traces in that order:

This one came FIRST. notifyComplete results in the publish function to return.

notifyComplete:195, Token (org.eclipse.paho.client.mqttv3.internal)
notifyResult:1175, ClientState (org.eclipse.paho.client.mqttv3.internal)
notifyReceivedAck:1011, ClientState (org.eclipse.paho.client.mqttv3.internal)
run:150, CommsReceiver (org.eclipse.paho.client.mqttv3.internal)
run:748, Thread (java.lang)

This one came AFTERWARDS:

decrementInFlight:943, ClientState (org.eclipse.paho.client.mqttv3.internal)
notifyComplete:1143, ClientState (org.eclipse.paho.client.mqttv3.internal)
handleActionComplete:256, CommsCallback (org.eclipse.paho.client.mqttv3.internal)
run:197, CommsCallback (org.eclipse.paho.client.mqttv3.internal)
run:748, Thread (java.lang)

The order should the other way around: At first, inFlight should be decremented and then notifyComplete should be called.

In case you are using Kotlin, here is an extension function to workaround the problem:

fun MqttClient.safePublish(topic: String, payload: ByteArray, qos: Int, retained: Boolean) {
  while (true) {
    try {
      return this.publish(topic, payload, qos, retained)
    } catch (e: MqttException) {
      if (e.reasonCode != REASON_CODE_MAX_INFLIGHT.toInt()) {
        throw e
      }
    }
  }
}
muchengyang commented 5 years ago

Dear all: May I ask if the problem has been solved?

ghost commented 5 years ago

@All, I am also looking for solution to this problem. Please help me

sunny1974 commented 5 years ago

I solved this issue by sleeping message execution thread when it reaches maxInflight window. Something like below. while (m_objPahomgr.getInflightMessages() >= maxInflight-2) { try { System.out.println("##########waiting 100ms############"+m_objPahomgr.getInflightMessages()); Thread.sleep(100); } catch (Exception e) { }

                }
davidgraeff commented 5 years ago

We at openHAB are facing the same issue. An explicit timeout mechanism should be established as a short term solution, and the race conditions that allows the inflight value to increase, but never decrease, should to be fixed in the long run.

Aaron-He commented 5 years ago

I solved this issue by changing the code in the paho library. Can see this: https://github.com/eclipse/paho.mqtt.java/pull/690

cosinelabs commented 5 years ago

Is this resolved? We are facing the same issue.

e-grigorov commented 4 years ago

I got the same issue. @derolf description is pretty clear about the problematic scenario. A simple app to reproduce:

public static void main(String[] args) throws MqttException {
  try (MqttAsyncClient client = new MqttAsyncClient("tcp://mqtt.eclipse.org:1883", MqttClient.generateClientId(),
      new MemoryPersistence())) {
    client.setCallback(new MqttCallback() {
      @Override
      public void messageArrived(String topic, MqttMessage message) throws Exception {
      }

      @Override
      public void deliveryComplete(IMqttDeliveryToken token) {
        // slow down MQTT Call thread
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
      }

      @Override
      public void connectionLost(Throwable cause) {
      }
    });
    IMqttToken token = client.connect();
    token.waitForCompletion();

    System.out.println("Message publishing...");
    MqttMessage message = new MqttMessage("a b".getBytes());
    message.setQos(1);

    token = client.publish("a/b", message);
    token.waitForCompletion();
    System.out.printf("Message published, in-flight messages, expected: 0, actual: %s%n",
        client.getInFlightMessageCount());

    token = client.disconnect();
    token.waitForCompletion();
  }
}
Sleepingbug commented 3 years ago

@pstanton: I don't think they time out (I've not found a timeout mechanism in the code).

client = new MqttClient(SERVER_URI, CLIENT_ID);
MqttConnectOptions options = new MqttConnectOptions();
options.setMaxInflight(1000); // Default = 10
client.connect(options);

i have a same question. ------ it works. thank you ! @hylkevds

MetSystem commented 1 day ago

i have a same question. image