hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
847 stars 158 forks source link

can not publish when consuming? #522

Closed HiwayChe closed 6 months ago

HiwayChe commented 2 years ago

F2955B09-8F39-4a5d-B21D-6EBFCFCCB403

I use mqtt5 async client to consume message using callback, in callback (not completed yet) I publish a message, thread blocked, is it normal?

code snippet: consume callback: image

sync send: image

As thread stack shows: thread blocked at send().get().

HiwayChe commented 2 years ago

reproduce code ` public class MqttTest {

interface MqConsumeCallback {
    void onConsume(String topic, byte[] payloadBytes, long delay);
}

private static final String topic = "a.b.c";

private static AtomicLong count = new AtomicLong();

public static void main(String[] args) throws ExecutionException, InterruptedException {
    MqttTest test = new MqttTest();
    test.connect();

    test.sub(topic, (topic, payloadBytes, delay) -> {
        String message = new String(payloadBytes);
        System.out.println("message received: " + message + ", thread=" + Thread.currentThread().getName());
        test.pub("d.e.f", "send message while consuming".getBytes());
    });

    new Thread(() -> {
        while (true) {
            Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
            test.pub(topic, ("message " + count.incrementAndGet()).getBytes());
        }
    }, "sender-thread").start();
}

private MqttConnectConfig mqttConfig = new MqttConnectConfig();

{
    mqttConfig.setHost("your ip");
    mqttConfig.setPort(1883);
    mqttConfig.setUsername("fake");
    mqttConfig.setPassword("fake");
}

private Mqtt5AsyncClient client;

private MqttQos DEFAULT_QOS = MqttQos.AT_LEAST_ONCE;

private int messageExpiry = 120;

private void connect() throws ExecutionException, InterruptedException {
    client = Mqtt5Client.builder()
            .identifier("mqtt_test").automaticReconnectWithDefaultConfig()
            .serverHost(mqttConfig.getHost()).serverPort(mqttConfig.getPort())
            .addConnectedListener(context -> {
                System.out.println("connected");
            })
            .addDisconnectedListener(context -> {
                System.err.println("disconnected");
            })
            .buildAsync();
    MqttConnect mqttConnect = new MqttConnect(30, false, 120,
            MqttConnectRestrictions.DEFAULT, new MqttSimpleAuth(MqttUtf8StringImpl.of(mqttConfig.getUsername()), ByteBuffer.wrap(mqttConfig.getPassword().getBytes())),
            null, null, MqttUserPropertiesImpl.NO_USER_PROPERTIES);

    Mqtt5ConnAck ack = client.connect(mqttConnect).get();
}

private boolean pub(String topic, byte[] bytes) {
    if (client == null) {
        throw new IllegalStateException("client is null");
    }
    boolean connected = this.client.getState().isConnected();
    if (!connected) {
        throw new IllegalStateException("client not connected");
    }
    try {
        Mqtt5PublishResult result = this.client.publishWith().topic(topic).qos(DEFAULT_QOS).payload(bytes).messageExpiryInterval(this.messageExpiry)
                .send().get();
        result.getError().ifPresent(Throwable::printStackTrace);
        System.out.println("pub success, message=" + new String(bytes) + ", thread=" + Thread.currentThread().getName());
        return true;
    } catch (Exception ee) {
        ee.printStackTrace();
        return false;
    }
}

public void sub(String topic, MqConsumeCallback mqConsumeCallback) throws ExecutionException, InterruptedException {
    if (client == null) {
        throw new IllegalStateException("client is null");
    }
    this.client.subscribeWith().topicFilter(topic).qos(DEFAULT_QOS).callback((mqtt5Publish) -> {
        mqConsumeCallback.onConsume(topic, mqtt5Publish.getPayloadAsBytes(), -1);
    }).send().get();
    System.out.println("sub success, thread=" + Thread.currentThread().getName());
}

`

image

after running several seconds, both sub and pub thread are blocked. comment out test.pub("d.e.f", "send message while consuming".getBytes()); in consume callback, it works very well.

pglombardo commented 6 months ago

Hi again @HiwayChe - same question here. Is this still an issue for you?

pglombardo commented 6 months ago

I'll close this issue out but if anything remains, let us know.