commschamp / cc.mqttsn.libs

CommsChampion Ecosystem MQTT-SN client / gateway libraries and applications
Mozilla Public License 2.0
65 stars 16 forks source link

Gateway sends data only slow in some cases #12

Closed trivialkettle closed 1 year ago

trivialkettle commented 1 year ago

Hi, I tried to use the gateway on my host and the client lib on my satellite. I noticed that the data is sent very slow to the client.

I reproduced this with the example programs:

What works:

  1. start the gateway and connect it to Mosquitto as broker
  2. start cc_mqttsn_sub_udp and subscribe with a # :
    ./client/src/app/sub/udp/cc_mqttsn_sub_udp -v -g localhost:1889 -t \#

What is very slow:

  1. start the gateway and connect it to Mosquitto as broker
  2. start cc_mqttsn_sub_udp and subscribe with a + :
    ./client/src/app/sub/udp/cc_mqttsn_sub_udp -v -g localhost:1889 -t foo/+/bar -k 5

I used -k5 because I noticed that some packages are sent on PINGRESP/PINGREQ. I get only one message per 5s (keepalive interval)

Any hint where I could start debugging?

Thanks

trivialkettle commented 1 year ago

I debugged a bit and found my data in

void SessionWrapper::readFromBrokerSocket()

All data is received for all topics that are subscribed with the + from the broker but only one package is sent to my client in this step

arobenko commented 1 year ago

Hi, Sorry I'm on my holidays at this moment until end of this month and don't have a capacity to help you much. Maybe I'll find a way to look into the issue deeper in the near future, but making no promises.

You can try to add some debugging prints to the SessionImpl::sendToClient() and SessionImpl::sendToBroker() functions Like making sure that "PUBLISH" messages are sent:

void SessionImpl::sendToClient(const MqttsnMessage& msg)
{
    std::cout << "Sending to client: " << msg.getId() << std::endl;
    if (msg.getId() == PublishMsg_SN::doGetId()) {
        auto* publishMsg = static_cast<const PublishMsg_SN*>(msg);
        auto& dataVector = publishMsg->field_data().value();
        ... // TODO: print publish data
    }
    sendMessage(msg, m_mqttsnStack, m_sendToClientCb, m_mqttsnMsgData);
}

The MQTT-SN definition is here. The tutorial on how to use COMMS library and the generated code is here.

Please also clarify do you receive ALL the sent messages, but with delay or some sent messages are lost. It also would be helpful if you can reproduce the issue with all the applications (client, gateway, and mosquitto broker) on the PC to eliminate the case where the physical link between your client and the gateway doesn't work properly.

trivialkettle commented 1 year ago

Hi, thanks for the fast reply and enjoy your holiday.

To clarify: I reproduced this with the example client and gateway on the PC with mosquitto broker.

Some more findings: I published the following into the broker (retained):

foo/1/bar qwert
foo/2/bar yuiop
foo/3/bar asdf
foo/4/bar ghjk
foo/5/bar zxcv
foo/6/bar bnm

and ran ./client/src/app/sub/udp/cc_mqttsn_sub_udp -v -g localhost:1889 -t foo/# -k 5

I printed a hexdump in void SessionWrapper::readFromBrokerSocket() and void Mgr::sendToClient():

(readFromBrokerSocket) 
0x000000: 20 02 00 00               ...

(sendToClient) 
0x000000: 03 05 00                 ...

(readFromBrokerSocket) 
0x000000: 90 03 00 01 02           .....

(sendToClient) 
0x000000: 08 13 40 00 00 00 01 00  ..@.....

(readFromBrokerSocket) 
0x000000: 31 10 00 09 66 6f 6f 2f  1...foo/
0x000008: 31 2f 62 61 72 71 77 65  1/barqwe
0x000010: 72 74 31 10 00 09 66 6f  rt1...fo
0x000018: 6f 2f 32 2f 62 61 72 79  o/2/bary
0x000020: 75 69 6f 70 31 0f 00 09  uiop1...
0x000028: 66 6f 6f 2f 33 2f 62 61  foo/3/ba
0x000030: 72 61 73 64 66 31 0f 00  rasdf1..
0x000038: 09 66 6f 6f 2f 34 2f 62  .foo/4/b
0x000040: 61 72 67 68 6a 6b 31 0f  arghjk1.
0x000048: 00 09 66 6f 6f 2f 35 2f  ..foo/5/
0x000050: 62 61 72 7a 78 63 76 31  barzxcv1
0x000058: 0e 00 09 66 6f 6f 2f 36  ...foo/6
0x000060: 2f 62 61 72 62 6e 6d     /barbnm

(sendToClient) 
0x000000: 0f 0a 00 01 00 02 66 6f  ......fo
0x000008: 6f 2f 31 2f 62 61 72     o/1/bar

(sendToClient) 
0x000000: 0c 0c 10 00 01 00 00 71  .......q
0x000008: 77 65 72 74              wert

(readFromBrokerSocket) 
0x000000: d0 00                    ..

(sendToClient) 
0x000000: 0f 0a 00 02 00 05 66 6f  ......fo
0x000008: 6f 2f 32 2f 62 61 72     o/2/bar

(sendToClient) 
0x000000: 02 17                    ..

(sendToClient) 
0x000000: 0c 0c 10 00 02 00 00 79  .......y
0x000008: 75 69 6f 70              uiop

As you can see, all data is received in the Gateway at once. But not sent to the client. The second message is sent only after a new message is received.

I also noticed that, if i start a client and subcribe this retained data, and then publish something (is recieved without retained flag) that this data is sent instantly to the client. The still retained topics are still only sent after a ping. So maybe there is a issue with the retained flag handling.

Example: I publish as above as retained:

foo/2/bar yuiop
foo/3/bar asdf
foo/4/bar ghjk
foo/5/bar zxcv
foo/6/bar bnm

than I start the client ./client/src/app/sub/udp/cc_mqttsn_sub_udp -v -g localhost:1889 -t foo/# -k 5 I recieve

foo/1/bar (qos=0 retained) : qwert

Than I publish

foo/2/bar yuiop
foo/3/bar asdf

and the client recieves this instantly. After each ping the client recieves the remaining topics

foo/4/bar ghjk
foo/5/bar zxcv
foo/6/bar bnm

Unfortunately I could not add the debugging you proposed because I got some compiler errors:

cc.mqttsn.libs/gateway/src/lib/SessionImpl.cpp:254:28: error: invalid ‘static_cast’ from type ‘const MqttsnMessage’ {aka ‘const comms::Message<comms::option::app::IdInfoInterface, comms::option::app::ReadIterator<const unsigned char*>, comms::option::app::WriteIterator<unsigned char*>, comms::option::app::Handler<cc_mqttsn_gateway::MsgHandler>, comms::option::app::LengthInfoInterface, comms::option::def::Endian<comms::util::traits::endian::Big>, comms::option::def::MsgIdType<cc_mqttsn::MsgId> >’} to type ‘const PublishMsg_SN*’ {aka ‘const cc_mqttsn::message::Publish<comms::Message<comms::option::app::IdInfoInterface, comms::option::app::ReadIterator<const unsigned char*>, comms::option::app::WriteIterator<unsigned char*>, comms::option::app::Handler<cc_mqttsn_gateway::MsgHandler>, comms::option::app::LengthInfoInterface, comms::option::def::Endian<comms::util::traits::endian::Big>, comms::option::def::MsgIdType<cc_mqttsn::MsgId> >, cc_mqttsn::options::DataViewDefaultOptionsT<cc_mqttsn::options::ServerDefaultOptionsT<> > >*’}
  254 |         auto* publishMsg = static_cast<const PublishMsg_SN*>(msg);
trivialkettle commented 1 year ago

The difference between the retained messages and the later published messages is, that the retained arrive in one block and the later come as single. So I guess there is just somewhere a loop missing to handle all data recieved at once.

a1lu commented 1 year ago

Hi, I can reproduce this on ubuntu 22.04 with eclipse-mosquitto:2 sha786ca925d7394a38dc7be5c02fc5e4da7e686c915a2673c6b3d1652c4e8c050b docker image.

I could fix this on my side by adding a checkSend(); after m_currPub.reset(); at https://github.com/commschamp/cc.mqttsn.libs/blob/master/gateway/src/lib/session_op/PubSend.cpp#L251

arobenko commented 1 year ago

Thanks @a1lu, looks like you're right. All other places of m_currPub.reset() are followed by the invocation of the checkSend() except that one. @trivialkettle, please try it. If it fixes you problem you can either submit a pull request for "develop" branch for me to merge, or fix it in your forked repository and wait until I introduce it in the official release.

trivialkettle commented 1 year ago

Yes that fixed my issue. thanks

arobenko commented 1 year ago

As was commented in #13, the pull-request is insufficient, there are unexpected problems detected in the unittests. Deeper analysis and fix is required.

arobenko commented 1 year ago

The "develop" branch should contain a complete fix now. The unittests are fixed.

trivialkettle commented 1 year ago

The fix looks good to me and it still works.

arobenko commented 1 year ago

Merged to v1.0.4 release.