aws / aws-iot-device-sdk-java

Java SDK for connecting to AWS IoT from a device.
https://aws.amazon.com/iot/sdk/
Apache License 2.0
210 stars 169 forks source link

Client gets stuck when publishing a msg in a onMessage callback #35

Closed wingsofovnia closed 6 years ago

wingsofovnia commented 7 years ago

Problem

Publishing a message in a response to a message is very typical case but AWSIotMqttClient#publish stucks in ScheduledThreadPoolExecutor if it's run inside AWSIotMessage#onMethod callback for AWSIotMqttClient#subscribe:

"pool-2-thread-1@1516" prio=5 tid=0x13 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at com.amazonaws.services.iot.client.core.AwsIotCompletion.get(AwsIotCompletion.java:203)
      at com.amazonaws.services.iot.client.core.AbstractAwsIotClient.publish(AbstractAwsIotClient.java:154)
      at com.amazonaws.services.iot.client.AWSIotMqttClient.publish(AWSIotMqttClient.java:651)
      at com.amazonaws.services.iot.client.core.AbstractAwsIotClient.publish(AbstractAwsIotClient.java:144)
      at com.amazonaws.services.iot.client.AWSIotMqttClient.publish(AWSIotMqttClient.java:627)
      at com.amazonaws.services.iot.client.core.AbstractAwsIotClient.publish(AbstractAwsIotClient.java:135)
      at com.amazonaws.services.iot.client.AWSIotMqttClient.publish(AWSIotMqttClient.java:580)
      at com.amazon.aws.iot.SdkBugMain.sendHelloWordlSync(SdkBugMain.java:45)
      at com.amazon.aws.iot.SdkBugMain.access$000(SdkBugMain.java:15)
      at com.amazon.aws.iot.SdkBugMain$1.onMessage(SdkBugMain.java:37)
      at com.amazonaws.services.iot.client.core.AbstractAwsIotClient$1.run(AbstractAwsIotClient.java:293)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:748)

Scenario

public class SdkBugMain {

    private static final String KEYSTORE_RESOURCE_FILE = "keystore";
    private static final String KEYSTORE_PASSWORD = "password";
    private static final String ENDPOINT = "idididididididid.iot.eu-central-1.amazonaws.com";
    private static final AWSIotQos QOS = AWSIotQos.QOS1; // doesn't matter

    private static final String INBOUND_TOPIC = "in";
    private static final String OUTBOUND_TOPIC = "out";

    public static void main(String[] args) throws Exception {
        InputStream keystoreStream = resourceAsStream(KEYSTORE_RESOURCE_FILE);
        KeyStore keyStore = readKeyStore(keystoreStream, KEYSTORE_PASSWORD);

        AWSIotMqttClient mqttClient = new AWSIotMqttClient(ENDPOINT, UUID.randomUUID().toString(), keyStore, KEYSTORE_PASSWORD);

        mqttClient.connect();

        mqttClient.subscribe(new AWSIotTopic(INBOUND_TOPIC, QOS) {
            @Override
            public void onMessage(AWSIotMessage message) {
                System.out.println("Msg arrived = " + message.getStringPayload() + " from topic = " + INBOUND_TOPIC);
                sendHelloWordlSync(mqttClient);
                //sendHelloWordlAsync(mqttClient); //works fine
            }
        });
    }

    private static void sendHelloWordlSync(AWSIotMqttClient client) {
        try {
            client.publish(OUTBOUND_TOPIC, "HELLO WORLD");
            System.out.println("Hello sent to topic = " + OUTBOUND_TOPIC); // won't be printed if Sync
        } catch (AWSIotException e) {
            e.printStackTrace();
        }
    }

    private static void sendHelloWordlAsync(AWSIotMqttClient client) {
        ForkJoinPool.commonPool().execute(() -> sendHelloWordlSync(client));
    }

    private static InputStream resourceAsStream(String filename) {
        return Thread.currentThread().getContextClassLoader().getResourceAsStream(filename);
    }

    private static KeyStore readKeyStore(InputStream in, String pass) {
        try {
            KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
            keyStore.load(in, pass.toCharArray());
            return keyStore;
        } catch (Exception e) {
            throw new RuntimeException("Failed to read keystore", e);
        }
    }
}

Expected result

Actual result:

Walkaround

Run a publish method in a separate thread (uncomment sendHelloWordlAsync, comment sendHelloWordlSync).

fengsongAWS commented 7 years ago

Hi @wingsofovnia , Thanks for your interest in AWS IoT Device SDK Java. I will create a issue in our backlog and will look into this and fix it in the future release.

rongsaws commented 7 years ago

Both the incoming message callback and publish completion processing are handed by the client thread pool, and by default, the size of each client's thread pool is 1, so they might be competing the same thread. This could be fixed by adding the following line before connect.

    mqttClient.setNumOfClientThreads(2);
wingsofovnia commented 7 years ago

@rongsaws right. Anyway, this looks suspicious to me, they should use different pools since they are operations of different purposes. Moreover, setNumOfClientThreads(2); will introduce a possible race condition in case of multiple listeners with nested publishers. not a fix imho.

forbode commented 6 years ago

Just an FYI for those using this within an SQS Listener in Spring Boot. The same issue arises when the SQS message arrives and you send an IoT publish message. Setting the Number of Client Threads, as suggested by @rongsaws worked to solve the problem.

fengsongAWS commented 6 years ago

Thanks for your interest. Please open another thread if you still have problems.

StanislavSerdiuk commented 4 years ago

Has it been resolved yet? I use the implementation 'com.amazonaws:aws-java-sdk-iotjobsdataplane:1.11.653' and have the same problem. I agree with @wingsofovnia that setNumOfClientThreads(2); is not fix because I subsribe to a couple of topics and each of them publishes to another topic.

lucky-mandator commented 2 years ago

if someone is still needed, we can use forkJoinPool if you don't want to define threads for clients.

ForkJoinPool.commonPool().execute(() -> { call your publish method here})

this should also solve.