shamblett / mqtt_client

A server and browser based MQTT client for dart
Other
548 stars 176 forks source link

SubscriptionStatus keeps pending #186

Closed MrMicha closed 4 years ago

MrMicha commented 4 years ago

Subscribing to a topic sets my MqttServerClient to MqttSubscriptionStatus.pending (as expected).

Here is my function:

void subscribeToTopic(String topicName) {

    if (client.connectionStatus.state == MqttConnectionState.connected) {
      print("Subscribing to $topicName");
      client.subscribe(topicName, MqttQos.atLeastOnce);
    }

    client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {

      final MqttPublishMessage recMess = c[0].payload;
      final String message = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);

      print("new message received: $message");
      onMessageReceived(message);
    });

    print(client.getSubscriptionsStatus(topicName));  
  }

The result of this print is always MqttSubscriptionStatus.pending even when i call the print later in the program.

The broker i use for testing is running locally on my computer (mosquitto) and it sends SUBACKs back. I also tested it with HIVEMQ (public MQTT Broker for testing and playing around with MQTT). Same results.

Here are some logs from the mosquitto.log file:

1590738182: New client connected from 127.0.0.1 as 14534234542343252 (c1, k60).
1590738182: Sending CONNACK to 14534234542343252 (0, 0)
1590738182: Received SUBSCRIBE from 14534234542343252
1590738182:     test (QoS 1)
1590738182: 14534234542343252 1 test
1590738182: Sending SUBACK to 14534234542343252
1590738182: Received PUBLISH from 14534234542343252 (d0, q2, r0, m2, 'abc', ... (25 bytes))
1590738182: Sending PUBREC to 14534234542343252 (Mid: 2)
1590738184: Received SUBSCRIBE from 14534234542343252
1590738184:     abc (QoS 1)
1590738184: 14534234542343252 1 abc
1590738184: Sending SUBACK to 14534234542343252
1590738188: Received PUBREL from 14534234542343252 (Mid: 2)
1590738188: Sending PUBCOMP to 14534234542343252 (Mid: 2)
1590738188: Sending PUBLISH to 14534234542343252 (d0, q1, r0, m1, 'abc', ... (25 bytes))
1590738188: Received PUBACK from 14534234542343252 (Mid: 1)
1590738188: Socket error on client 14534234542343252, disconnecting.

Publishing and connecting both work as expected. Other clients receive the messages but i do not receive any messages being published.

Update: By accident I was able to receive the messages. But in a different way than it is supposed to happen:

I created another MqttServerClient but i forgot to change the clientID. What the broker does is close the old connection and connect the "new" client:

1590739433: Received PUBACK from 14534234542343252 (Mid: 1)
1590739433: Client 14534234542343252 already connected, closing old connection.
1590739433: Client 14534234542343252 disconnected.
1590739433: New client connected from 127.0.0.1 as 14534234542343252 (c1, k60).

Right after the new client is created it automatically subscribes to the topics and for the first time ever client.onSubscribed function is run and the messages from the "old" client are shown.

But new messages are not received and MqttSubscriptionStatus is still pending.

I do not call this a solution cause this is definitely not the way to go but maybe it helps to figure out the problem.

shamblett commented 4 years ago

OK, I have a unit test to check pending subs go active when a suback is received which is passing however this functionality depends on the message identifier in the suback matching the one stored for the subscription, I can't see Mosquitto or Hive getting this wrong they are solid brokers, could you turn on logging on the client and post the output from your test.

MrMicha commented 4 years ago

Here is the output from the test:

2020-06-02 09:11:33.746367 -- SynchronousMqttServerConnectionHandler::_connectAckProcessor - state = connected
ConnectionCallback started
2020-06-02 09:11:33.747255 -- SynchronousMqttServerConnectionHandler:: cancelling connect timer
2020-06-02 09:11:33.747983 -- SynchronousMqttServerConnectionHandler::internalConnect - post sleep, state = Connection status is connected with return code of connectionAccepted and a disconnection origin of none
2020-06-02 09:11:33.748393 -- SynchronousMqttServerConnectionHandler::internalConnect exited with state Connection status is connected with return code of connectionAccepted and a disconnection origin of none
CONNECTED
Subscringing to test
2020-06-02 09:11:33.754503 -- MqttConnectionHandlerBase::sendMessage - MQTTMessage of type MqttMessageType.subscribe
Header: MessageType = MqttMessageType.subscribe, Duplicate = false, Retain = false, Qos = MqttQos.atLeastOnce, Size = 0
Subscribe Variable Header: MessageIdentifier={1}
Payload: Subscription [{1}]
{{ Topic={test}, Qos={MqttQos.atLeastOnce} }}

MqttSubscriptionStatus.pending
MqttSubscriptionStatus.pending
Subscringing to abc
2020-06-02 09:11:35.758565 -- MqttConnectionHandlerBase::sendMessage - MQTTMessage of type MqttMessageType.subscribe
Header: MessageType = MqttMessageType.subscribe, Duplicate = false, Retain = false, Qos = MqttQos.atLeastOnce, Size = 0
Subscribe Variable Header: MessageIdentifier={2}
Payload: Subscription [{1}]
{{ Topic={abc}, Qos={MqttQos.atLeastOnce} }}

MqttSubscriptionStatus.pending
2020-06-02 09:11:37.769821 -- MqttConnectionHandlerBase::sendMessage - MQTTMessage of type MqttMessageType.publish
Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.exactlyOnce, Size = 0
Publish Variable Header: TopicName={abc}, MessageIdentifier={3}, VH Length={0}
Payload: {28 bytes={<100><117><109><98><32><109><101><115><115><97><103><101><32><102><114><111><109><32><119><114><97><112><112><101><114><49><49><49>

abc:
MqttSubscriptionStatus.pending
test:
MqttSubscriptionStatus.pending
Subscringing to smth
2020-06-02 09:11:48.778106 -- MqttConnectionHandlerBase::sendMessage - MQTTMessage of type MqttMessageType.subscribe
Header: MessageType = MqttMessageType.subscribe, Duplicate = false, Retain = false, Qos = MqttQos.atLeastOnce, Size = 0
Subscribe Variable Header: MessageIdentifier={4}
Payload: Subscription [{1}]
{{ Topic={smth}, Qos={MqttQos.atLeastOnce} }}

MqttSubscriptionStatus.pending
smth:
MqttSubscriptionStatus.pending
✓ MqttWrapper MqttWrapper works
2020-06-02 09:11:52.791681 -- MqttConnection::_onData
shamblett commented 4 years ago

Ok thanks, this log sows that the pending status is correct, you haven,t received any subacks, so the subscription status will stay as pending.

MrMicha commented 4 years ago

Yes, that's right. But the broker sends SUBACKs as you can see in the log-file of the broker:

1590738184: Received SUBSCRIBE from 14534234542343252
1590738184:     abc (QoS 1)
1590738184: 14534234542343252 1 abc
1590738184: Sending SUBACK to 14534234542343252

You said that the SUBACK is only recognized when its MessageID fits the MessageID of the subscribe message from the client.

Could that be the problem in this case?

shamblett commented 4 years ago

The suback would still be received and decoded to extract its message identifier, you should be seeing lines like ''MqttConnection::_onData'' when a message is received, other than the conack nothing seems to be being received by the client.

What platform are you running on BTW?

MrMicha commented 4 years ago

I am using Ubuntu 18.04

There is a "MqttConnection::_onData" line but it appears after the test finished as you can see in the output of the test with logging turned on:

MqttSubscriptionStatus.pending
✓ MqttWrapper MqttWrapper works
2020-06-02 10:52:25.864511 -- MqttConnection::_onData
Exited

Is this supposed to happen in a test?

shamblett commented 4 years ago

I'm not sure what you mean by 'in a test', there could be a delay between the broker sending the subacks and the client receiving them, you need to only stop the client after your subacks have been received and processed if you want to check the pending state of a subscription. What is it you are trying to test?

MrMicha commented 4 years ago

By test i mean a testing library for flutter (flutter_test). I implemented a Mqtt Wrapper so I can use it for an App later. I have not implemented the frontend yet so I decided to use this test library to test the wrapper.

But it looks like receiving messages (including SUBACKs) does not work within the test.

That is why I created a very simple App which just starts the setup procedure for the wrapper. The client is then run on the device which is my actual plan.

And in this case it works. I am receiving data which is then decoded and recognized as SUBACK.

So it looks like receiving data just does not work with the testing library I was using.

shamblett commented 4 years ago

Ok I see, thanks for the update.

MrMicha commented 4 years ago

It is for me to thank you for your help.