Closed ChrisHul closed 2 years ago
Thank you for PR.
This code causes a memory leak. That's why I'm disabling it.
#if 0
// Client connects. Add to the client-id list
struct client *client = calloc(1, sizeof(*client));
client->c = c;
client->cid = mg_strdup(cid);
LIST_ADD_HEAD(struct client, &s_clients, client);
ESP_LOGD(pcTaskGetName(NULL), "CLIENT ADD %p [%.*s]", c->fd, (int) client->cid.len, client->cid.ptr);
ESP_LOGI(pcTaskGetName(NULL), "CLIENT ADD %p", client);
#endif
This fix can cause a memory leak.
Use this function to check for memory leaks.
ESP_LOGD(pcTaskGetName(NULL),"total_size(MALLOC_CAP_8BIT):%d", heap_caps_get_total_size(MALLOC_CAP_8BIT));
ESP_LOGD(pcTaskGetName(NULL),"total_size(MALLOC_CAP_32BIT):%d", heap_caps_get_total_size(MALLOC_CAP_32BIT));
ESP_LOGI(pcTaskGetName(NULL),"free_size(MALLOC_CAP_8BIT):%d", heap_caps_get_free_size(MALLOC_CAP_8BIT));
ESP_LOGI(pcTaskGetName(NULL),"free_size(MALLOC_CAP_32BIT):%d", heap_caps_get_free_size(MALLOC_CAP_32BIT));
Your problem may be a problem with the processing of MG_EV_CLOSE.
https://github.com/nopnop2002/esp-idf-mqtt-broker/blob/master/main/mqtt_server.c#L283
Ok. Thanks for prompt reply. I had a look at the heap data as it displays in said code section to try to determine any leak, but numbers were jumping around to much. Now getting better results showing data in the main loop. And there is a leak although small.
I'm actually running ok connecting and disconnecting with the android dashboard. No losses even with your client name registry. Connection drops out in the socket reading routine which signals error and "read_conn" sets c->is_closing leading to mg_call(c, MG_EV_CLOSE, NULL). Client name is added and removed without free_size being altered.
The problem comes when terminating abruptly (without closing properly). I have two client (SUB+PUB) connections from the other ESP32 board and have set it up to reset every 20 seconds. The socket then transforms into a living dead and disables both write and read and I am pretty convinced that the leak resides inside the API. The losses amount to about 20 sometimes 50 bytes after closing the sockets.
I thought I should share these findings and offer my assistance in case You have any idea how to get around this. Cheers.
The problem comes when terminating abruptly (without closing properly).
The server issues pings on a regular basis.
If the client exits abruptly without proper closing, there is no ping response.
There may be a problem with the processing when the ping response times out.
I thought I should share these findings and offer my assistance in case You have any idea how to get around this.
Thank you.
The problem comes when terminating abruptly (without closing properly).
Tested with Python + paho.
If the client quits abruptly (without closing properly), the server will display this:
I (1611945) BROKER: MG_EV_CLOSE 0x0
I (1611945) BROKER: free_size(MALLOC_CAP_8BIT):171012
I (1611945) BROKER: free_size(MALLOC_CAP_32BIT):216728
I (1611975) SUBSCRIBE: RECEIVED Subscriber Down <- system/message
I (1611975) SUBSCRIBE: RECEIVED goodbye <- WILL
I use this.
https://github.com/nopnop2002/esp-idf-mqtt-broker/blob/master/sub_example.py
Number of allowed sockets is limited which creates problem in 1
You can change the number of sockets allowed here.
Thanks for all the feedback. Looks as though you're connecting to the server from a unix-based computer. I am not sure how You abort the connection, but in most cases the socket will be closed (properly) by the software anyway.
In my case I reset the client-ESP32, so there is no chance the socket can close as it should. The connection just stalls and no MG_EV_CLOSE is invoked. The socket remains running but is not readable nor writable any more and closing it apparently leaves dead allocated memory.
Leaving the socket alone is not a good solution either because it will drag the system. When I connect again the connection will stack on top of the other one. New copies of publications and subscriptions will also be allocated. I know I can increase number of socket but will always hit the roof. Besides menuconfig only allows 16 sockets. If I enter more it resets to old value. That would be interesting to change. 16 sockets is not an awful lot.
When it comes to pinging and timeouts, neither is really implemented in Mongoose. It will respond to a ping request, that's all. There is probably a whole lot going on in the TCP stack with handshaking and others, but it is out of reach (not open source from what I gather).
As I mentioned earlier, I think there is a problem in the socket API which is incapable of handling this type of event.
When it comes to pinging and timeouts, neither is really implemented in Mongoose. It will respond to a ping request, that's all.
This seems to be the root cause.
The TCP layer does have a timeout. I found out that the aborted connection terminated automatically after 2 (!) hours. This is the default Keepalive. These values can be changed with a call to setsockopt( ) and is implemented in my latest patch with application management values. However, there is still a memory leak and some reports of similar problems in the lwIP component.
Thank you for the information.
I have enabled KeepAlive based on your information.
This is the server-side log at that time.
I (5396636) BROKER: CONNECT ----> Client Start
I (5396636) BROKER: cid=[SUB001] willFlag=0
I (5396826) BROKER: MQTT_CMD_SUBSCRIBE
I (5396826) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5399796) BROKER: MQTT_CMD_SUBSCRIBE
I (5399796) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5400816) BROKER: MQTT_CMD_SUBSCRIBE
I (5400816) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5401846) BROKER: MQTT_CMD_SUBSCRIBE
I (5401846) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5402866) BROKER: MQTT_CMD_SUBSCRIBE
I (5402866) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5403896) BROKER: MQTT_CMD_SUBSCRIBE
I (5403896) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5404916) BROKER: MQTT_CMD_SUBSCRIBE
I (5404916) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5405936) BROKER: MQTT_CMD_SUBSCRIBE
I (5405936) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5406966) BROKER: MQTT_CMD_SUBSCRIBE
I (5406966) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5407996) BROKER: MQTT_CMD_SUBSCRIBE
I (5407996) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5409016) BROKER: MQTT_CMD_SUBSCRIBE
I (5409016) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5409936) BROKER: MQTT_CMD_SUBSCRIBE
I (5409936) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5410956) BROKER: MQTT_CMD_SUBSCRIBE
I (5410956) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5411986) BROKER: MQTT_CMD_SUBSCRIBE
I (5411986) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5413006) BROKER: MQTT_CMD_SUBSCRIBE
I (5413006) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5414026) BROKER: MQTT_CMD_SUBSCRIBE
I (5414026) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5415156) BROKER: MQTT_CMD_SUBSCRIBE
I (5415156) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5416086) BROKER: MQTT_CMD_SUBSCRIBE
I (5416086) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5417106) BROKER: MQTT_CMD_SUBSCRIBE
I (5417106) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5418126) BROKER: MQTT_CMD_SUBSCRIBE
I (5418126) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5419156) BROKER: MQTT_CMD_SUBSCRIBE
I (5419156) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5420176) BROKER: MQTT_CMD_SUBSCRIBE
I (5420176) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5421196) BROKER: MQTT_CMD_SUBSCRIBE
I (5421196) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5422226) BROKER: MQTT_CMD_SUBSCRIBE
I (5422226) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5423246) BROKER: MQTT_CMD_SUBSCRIBE
I (5423246) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5424266) BROKER: MQTT_CMD_SUBSCRIBE
I (5424266) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5425296) BROKER: MQTT_CMD_SUBSCRIBE
I (5425296) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5426316) BROKER: MQTT_CMD_SUBSCRIBE
I (5426316) BROKER: SUB ADD 0x37 [/topic/test/#]
I (5486326) BROKER: PINGREQ 0x37 ----> Server send PNGREQ
I (5546536) BROKER: PINGREQ 0x37 ----> Server send PNGREQ. Interval = 60 Sec.
Client power off
I (5617016) BROKER: MG_EV_CLOSE 0x0 ----> Server close Socket because client does not respond to ping. Timeout = 70 Sec.
I (5617016) BROKER: free_size(MALLOC_CAP_8BIT):198768
I (5617016) BROKER: free_size(MALLOC_CAP_32BIT):245088
Please try with the latest version.
ーーー
esp_mqtt_client_subscribe sends same SUBSCRIBE packet 16 times. The server receives SUBSCRIBE packets for the same topic 16 times.
It seem bug of esp-idf. https://github.com/espressif/esp-idf/issues/8697
Thanks for your responsiveness. It's basically the same implementation so I already tested it.
I haven't tried the native mqtt client software, so I was unaware of the repetition issue.
My understanding is that the TCP connection keepalive is independent of any socket data exchange. That way any ping activity is irrelevant to the timeout being triggered or not. This may be relevant to the native client shutting down though.
I have also uploaded my current mqtt_subsciber.c and mqtt_publisher.c that I'm working with. The code is not all that pretty, but would like you to consider (since we seem to be on good conversation terms) the following implementations:
I did this before digging into the server software, so for better consistency it would probably be better to adopt the LIST_ADD strategy instead of queuing the data. However some sort of limit would have to be added.
Thanks for your support.
I just saw that the server does not test if topic gets subscribed a second time in the same connection, meaning that you would have 16 entries with the bug you found in native MQTT.
Added the ability to determine duplicate subscriptions.
Even if a ESP-IDF's bug is fixed, the same Topic may be inadvertently subscribed.
The server tests if the same subscribe topic already exists.
I (13066) BROKER: CONNECT
I (13066) BROKER: cid=[SUB001] willFlag=0
I (13266) BROKER: MQTT_CMD_SUBSCRIBE
I (13266) BROKER: SUB TEST 0x37 [/topic/test/#]
I (13266) BROKER: SUB ADD 0x37 [/topic/test/#]
I (16236) BROKER: MQTT_CMD_SUBSCRIBE
I (16236) BROKER: SUB TEST 0x37 [/topic/test/#]
W (16236) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (17246) BROKER: MQTT_CMD_SUBSCRIBE
I (17246) BROKER: SUB TEST 0x37 [/topic/test/#]
W (17246) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (18266) BROKER: MQTT_CMD_SUBSCRIBE
I (18266) BROKER: SUB TEST 0x37 [/topic/test/#]
W (18266) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (19296) BROKER: MQTT_CMD_SUBSCRIBE
I (19296) BROKER: SUB TEST 0x37 [/topic/test/#]
W (19296) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (20316) BROKER: MQTT_CMD_SUBSCRIBE
I (20316) BROKER: SUB TEST 0x37 [/topic/test/#]
W (20316) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (21336) BROKER: MQTT_CMD_SUBSCRIBE
I (21336) BROKER: SUB TEST 0x37 [/topic/test/#]
W (21336) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (22256) BROKER: MQTT_CMD_SUBSCRIBE
I (22256) BROKER: SUB TEST 0x37 [/topic/test/#]
W (22256) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (23286) BROKER: MQTT_CMD_SUBSCRIBE
I (23286) BROKER: SUB TEST 0x37 [/topic/test/#]
W (23286) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (24306) BROKER: MQTT_CMD_SUBSCRIBE
I (24306) BROKER: SUB TEST 0x37 [/topic/test/#]
W (24306) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (25336) BROKER: MQTT_CMD_SUBSCRIBE
I (25336) BROKER: SUB TEST 0x37 [/topic/test/#]
W (25336) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (26366) BROKER: MQTT_CMD_SUBSCRIBE
I (26366) BROKER: SUB TEST 0x37 [/topic/test/#]
W (26366) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (27386) BROKER: MQTT_CMD_SUBSCRIBE
I (27386) BROKER: SUB TEST 0x37 [/topic/test/#]
W (27386) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (28416) BROKER: MQTT_CMD_SUBSCRIBE
I (28416) BROKER: SUB TEST 0x37 [/topic/test/#]
W (28416) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (29426) BROKER: MQTT_CMD_SUBSCRIBE
I (29426) BROKER: SUB TEST 0x37 [/topic/test/#]
W (29426) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (30456) BROKER: MQTT_CMD_SUBSCRIBE
I (30456) BROKER: SUB TEST 0x37 [/topic/test/#]
W (30456) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (31476) BROKER: MQTT_CMD_SUBSCRIBE
I (31476) BROKER: SUB TEST 0x37 [/topic/test/#]
W (31476) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (32506) BROKER: MQTT_CMD_SUBSCRIBE
I (32506) BROKER: SUB TEST 0x37 [/topic/test/#]
W (32506) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (33536) BROKER: MQTT_CMD_SUBSCRIBE
I (33536) BROKER: SUB TEST 0x37 [/topic/test/#]
W (33536) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (34556) BROKER: MQTT_CMD_SUBSCRIBE
I (34556) BROKER: SUB TEST 0x37 [/topic/test/#]
W (34556) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (35486) BROKER: MQTT_CMD_SUBSCRIBE
I (35486) BROKER: SUB TEST 0x37 [/topic/test/#]
W (35486) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (36496) BROKER: MQTT_CMD_SUBSCRIBE
I (36496) BROKER: SUB TEST 0x37 [/topic/test/#]
W (36496) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (37526) BROKER: MQTT_CMD_SUBSCRIBE
I (37526) BROKER: SUB TEST 0x37 [/topic/test/#]
W (37526) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (38546) BROKER: MQTT_CMD_SUBSCRIBE
I (38546) BROKER: SUB TEST 0x37 [/topic/test/#]
W (38546) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (39566) BROKER: MQTT_CMD_SUBSCRIBE
I (39566) BROKER: SUB TEST 0x37 [/topic/test/#]
W (39566) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (40596) BROKER: MQTT_CMD_SUBSCRIBE
I (40596) BROKER: SUB TEST 0x37 [/topic/test/#]
W (40596) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (41616) BROKER: MQTT_CMD_SUBSCRIBE
I (41616) BROKER: SUB TEST 0x37 [/topic/test/#]
W (41616) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I (42636) BROKER: MQTT_CMD_SUBSCRIBE
I (42636) BROKER: SUB TEST 0x37 [/topic/test/#]
W (42636) BROKER: Same topic already exist in s_sub. Subscribe is invalid.
I've added Keep Alive features.
When the client exits without closing the socket, the server closes the socket due to a timeout.
// Set tcp socket keepalive options
// timeout = keepidle+(keepcnt*keepintvl)
// timeout = 60+(1*10)=70
int keepAlive = 1;
setsockopt( (int) c->fd, SOL_SOCKET, SO_KEEPALIVE, &keepAlive, sizeof(int));
int keepIdle = 60; // default is 7200 Sec
setsockopt( (int) c->fd, IPPROTO_TCP, TCP_KEEPIDLE, &keepIdle, sizeof(int));
int keepInterval = 10; // default is 75 Sec
setsockopt( (int) c->fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepInterval, sizeof(int));
int keepCount = 1; // default is 9 count
setsockopt( (int) c->fd, IPPROTO_TCP, TCP_KEEPCNT, &keepCount, sizeof(int));
In addition, I added the SUBACK feature.
mg_mqtt_send_header(c, MQTT_CMD_SUBACK, 0, num_topics + 2);
uint16_t id = mg_htons(mm->id);
mg_send(c, &id, 2);
mg_send(c, resp, num_topics);
Socket connections seem to be sticky and peer abortion is not detected by the server.
Ran into problems with following configuration: #1. ESP32 with server, publisher and subscriber. #2. ESP32 with publisher and subscriber. #3 Generic MQTT client on android device subscribing and publishing.
Number of allowed sockets is limited which creates problem in #1. Socket connections do not close on sudden abortion of #2 (reset/out of range). When #2 tries to connect again it obtains a new connection without previous connection being deleted. Result: socket handler gets saturated and returns error. Socket number is limited to 16 and I find no way around that.
Proposed fix: revival of clientid registration code (muted) in mqtt_server.c. On new connection: check if there is a working connection with the same clientid and force that one to close if so.
This fix has been tested and working.