marvinroger / async-mqtt-client

📶 An Arduino for ESP8266 asynchronous MQTT client implementation
MIT License
834 stars 266 forks source link

Unable to publish messages #190

Closed uddhavdave closed 4 years ago

uddhavdave commented 4 years ago

Hi, I am using this library to connect to a broker which is indeed successful and returns the onConnect callback and even the subscribe acknowledge log when subscribing. However, when I publish a message on the topic there is no acknowledgment received i.e doesn't trigger onPublish callback.

I am also receiving a session present as False. Is that the issue causing this?

dk-teknologisk-mnc commented 4 years ago

@uddhavdave1 Could you share your publish code? Pay attention to the qos parameter. If it's zero (0) you shouldn't get any onPublish callback.

Regards

uddhavdave commented 4 years ago

Hi @dk-teknologisk-mnc , Thanks for the quick reply. Sorry, I cannot share the code.

I have my QoS set as 1 but still am not getting any trigger. Is there any limit to the message you can send? my message is 273 characters long.

bertmelis commented 4 years ago

Does the function return a packet id? It should be different from zero. If zero, then something went wrong.

273 bytes should not be a problem at all.

uddhavdave commented 4 years ago

Hi, @bertmelis , Yes indeed it comes out to be zero. Does the topic not being right cause this issue? I think this issue is on my side and not the library. But I tried sending in a message ( "Test" ) and it returned 1 as packetID. Something is wrong with the message I'm sending i believe.

uddhavdave commented 4 years ago

Hi all, so I dug little deeper into the issue and found that SEMAPHORE_TAKE(0); if (_client.space() < neededSpace) { SEMAPHORE_GIVE(); return 0; } this piece of code gives the error when ever i try to send my message of 273 bytes.

bertmelis commented 4 years ago

This would mean your tcp's sendbuffer is less then 273 (+ overhead). Are you on ESP32 or ESP8266?

uddhavdave commented 4 years ago

It's on ESP8266 Where can I increase it? also, I am now sending this on QoS 0.

The needed space is 305 bytes. How come such a small message is not going through?

kleini commented 4 years ago

https://github.com/kleini/async-mqtt-client/commit/f1b42054815ad82fed4519bce7febb7f1601560f <- this helps me a lot in Homie when larger chunks of MQTT messages should be sent and the TCP buffer is not large enough and packets are dropped. I have stolen this fix somewhere but I don't remember where.

uddhavdave commented 4 years ago

Thank you @kleini , It works now.

luebbe commented 4 years ago

kleini@f1b4205 <- this helps me a lot in Homie when larger chunks of MQTT messages should be sent and the TCP buffer is not large enough and packets are dropped. I have stolen this fix somewhere but I don't remember where.

Wouldn't it make sense to merge this fix back into the original project?

uddhavdave commented 4 years ago

Sorry, I had to reopen due to a misunderstanding. The program still fails at a larger length of messages. I tried increasing these TCP_SND_BUFF and TCP_MSS but had no luck.

@kleini your changes in the code essentially worked when I bypassed the space check, I didn't give me confirmation of sending still.

bertmelis commented 4 years ago

That workaround doesn't solve the low TCP buffer problem. It just creates an intermediate buffer. You seem to have a very low TCP buffer, meaning you don't seem to receive acks (which clear the buffer).

You can try to use the PR for large payloads (https://github.com/marvinroger/async-mqtt-client/pull/184) but I doubt it will work. Can you build with debug enabled in ESPAsyncTCP?

uddhavdave commented 4 years ago

So I did as you suggested, and found that the publish() doesn't send any packets. Basically I only see logs between connectingand connected to MQTT and similarly subscribe and subscribe acknowledged.

Here is the last log at subscribe: 000022.536 [ASYNC_TCP] _sent[1]: 186, unacked= 0, acked= 534, space= 944 000022.537 [ASYNC_TCP] _recv[1]: 69

bertmelis commented 4 years ago

If I understand you log correctly, you send a subscribe, it gets (tcp-)acked and then you receive the MQTT ack.

space is 944, so it should be ok.

Are you sending a c-string which is not null terminated? If so, you have to explicitly include the length.

uddhavdave commented 4 years ago

@bertmelis Thank you so much !! I realized i was sending multiple publish req consecutively which somehow caused the failure to acknowledge. It's working as intended.

Finally, about the PR you mentioned (#184) is for sending only right?

Going ahead with the closure of this issue. Thanks all!

bertmelis commented 4 years ago

A remark: when I use this lib I make a message queue and only send the next message when the previous is acked. For this to work, you also need to have the onPublish callback for QOS0. I also made an onPing callback because this could interfere. This queue is particularly useful when using SSL because axTLS uses shared buffers and can get easily corrupted.

(I can't share but the queue is pretty straightforward)

Finally, about the PR you mentioned (#184) is for sending only right?

Yes, receiving is already implemented in the lib: the message callback has the index and length as parameters so you can "rebuild" the complete message.

uddhavdave commented 4 years ago

Hey, @bertmelis thanks for the tip. I tried to use the queues.h library earlier but it gave me an error. Would you share what library are you using for making a message queue?

bertmelis commented 4 years ago

std::queue of structs.

uddhavdave commented 4 years ago

Hi @bertmelis , thanks for the help, I really appreciate it. When I am using your PR to send a message of size >1KB I am facing this error [ASYNC_TCP] _close[1]: AsyncClient 0x3FFEF634 and this closes the MQTT connection. Any idea why? The packet ID for the published packet is not 0.

I have done things similar to the example you gave in the PR.

bertmelis commented 4 years ago

Gotta look into that. I did a few things to improve stability so I might not see your behaviour in my fork. Could you raise keep alive to 1h (3600s) to try?

uddhavdave commented 4 years ago

Okay, it's set to 2 hrs already. Do I have to use index and len inside the readlongfile fn to supply substrings ? or just the return statement works fine?

bertmelis commented 4 years ago

len is not necessary but index is.

You have to use index to calculate the correct position the returned pointer points to.

Optionally, len can be used to fetch data. len is the maximum length that could be written.

I guess the lib can do the pointer maths itself... But I have to check how that is compatible with the current method.

uddhavdave commented 4 years ago

supposing I have a pointer, pointing to the large buffer, the readlongfile function can be defined like this? char* readLongFile(size_t index , size_t len) { // you can use 'index' and 'len' to fetch data. 'len' is the maximum // amount that *could* be written. Reading will stop once the total amount // reached the payload length (as given in mqttClient.publish(...)). return (char*)(msg+index); }

Is this the right way? msg being the pointer to buffer.

bertmelis commented 4 years ago

Yes (think so, I'm on phone. can't read very well)

uddhavdave commented 4 years ago

like return (char*)(msg+index) should work right?

I am able to get 1 as packetID but the message doesn't go through

bertmelis commented 4 years ago

correct.

dm2302git commented 3 years ago

kleini@f1b4205 <- this helps me a lot in Homie when larger chunks of MQTT messages should be sent and the TCP buffer is not large enough and packets are dropped. I have stolen this fix somewhere but I don't remember where.

I've testet with this solution. Cann you share more information how to fix it?

kleini commented 3 years ago

I've testet with this solution. Cann you share more information how to fix it?

@bertmelis is working on a solution to fix issues with messages to publish larger than TCP_MSS. My fork is just a temporary workaround but not a real fix for this library.