Closed moxigua closed 3 years ago
Hello @moxigua
Thanks for your message. I am having difficulty with the archive you attached. Could you please try sending just the application source file?
The "timeout error" (-7) is expected. It is simply notification that the keep alive timer has expired due to inactivity and the ping message will be sent to the broker to keep the connection active.
The "network error (-8)" is not expected. It usually indicates that the broker has terminated the connection. Is it possible that another client with the same ID connected, which would cause the broker to disconnect the first client?
Thanks, Eric @ wolfSSL
Thanks for your reply.
The "timeout error" (-7) is expected. <, I'll test whether it will affect the results and then reply to you.
The "network error (-8)"<In my experient, only one Client is used. So I don't think the error was caused by the same ID. By the way, Have the latest library functions been verified in Arduino?
Here is the major code for my projects (STM32+W5500),
/******************************** mqttnet.c*********************************/
#include "wolfmqtt/mqtt_client.h"
#include "wolfmqtt/mqttnet.h"
#include "socket.h"
#include "W5500_MQTT_functions.h"
#include "stm32f10x.h"
#include "W5500_functions.h" // include DNS
/* connect TCP */
static int NetConnect(void *context, const char* host, word16 port,
int timeout_ms)
{
uint8_t dns_socket = Default_SOCK_DNS,errorCount = 0;
socket(dns_socket,Sn_MR_TCP,MQTT_Server_Port,Sn_MR_ND);
DNS(dns_socket,(char *)host,MQTT_Server_IP); // DNS
while(1)
{
/* connect using MQTT*/
switch(getSn_SR(MQTT_Client)) // get status of W5500
{
case SOCK_INIT:
if(connect(MQTT_Client, MQTT_Server_IP,port) != SOCK_OK)
{
osDelay(3000);
if(errorCount >3)
{
return -1;
}
errorCount ++;
}
else
{
break;
}
break;
case SOCK_ESTABLISHED:
if(getSn_IR(MQTT_Client) & Sn_IR_CON)
{
setSn_IR(MQTT_Client, Sn_IR_CON); // clear flag
}
return 0; // OK
case SOCK_CLOSE_WAIT:
disconnect(MQTT_Client);
break;
case SOCK_CLOSED:
close(MQTT_Client);
socket(MQTT_Client,Sn_MR_TCP,MQTT_Local_Port++,Sn_MR_ND);
break;
}
}
}
/*发送数据*/
static int NetWrite(void *context, const byte* buf, int buf_len,
int timeout_ms)
{
int ret;
uint32_t sentsize = 0;
while(buf_len != sentsize)
{
ret = send(MQTT_Client, (unsigned char *)buf+sentsize, buf_len-sentsize);
if(ret < 0)
{
close(MQTT_Client);
return ret;
}
sentsize += ret;
}
return ret;
}
static int NetRead(void *context, byte* buf, int buf_len,
int timeout_ms)
{
int ret=0;
int size = 0;
//printf("need to read %d bytes data.\r\n", buf_len);
while(timeout_ms--){
if((size =getSn_RX_RSR(MQTT_Client)) > 0){
if(size > buf_len) size = buf_len;
ret = recv(MQTT_Client, buf, size);
if(ret < 0)
{
//printf("read error.\r\n");
return MQTT_CODE_ERROR_NETWORK;
}
else if(ret == 0)
{
//printf("have read 0 bytes data.\r\n");
return MQTT_CODE_ERROR_TIMEOUT;
}
//printf("have read %d bytes data.\r\n",ret);
return ret;
}
else{
osDelay(1);//延时1ms
}
//return MQTT_CODE_ERROR_TIMEOUT;
// printf("have read 0 bytes data.\r\n");
// return 0;
}
}
/* disconnect */
static int NetDisconnect(void *context)
{
disconnect(MQTT_Client);
close(MQTT_Client);
return 0;
}
int MqttClientNet_Init(MqttNet* net)
{
if (net) {
XMEMSET(net, 0, sizeof(MqttNet));
net->connect = NetConnect;
net->read = NetRead;
net->write = NetWrite;
net->disconnect = NetDisconnect;
net->context = NULL;
}
return 0;
}
int MqttClientNet_DeInit(MqttNet* net)
{
if (net){
WOLFMQTT_FREE(net->context);
XMEMSET(net, 0, sizeof(MqttNet));
}
return 0;
}
/************************************ MQTT Layer function ***********************************/
//*****************************************************************************
//
//! MQTT callback
//
//*****************************************************************************
static int mqttclient_message_cb(MqttClient *client, MqttMessage *msg,
byte msg_new, byte msg_done)
{
byte buf[PRINT_BUFFER_SIZE+1];
word32 len;
(void)client;
if (msg_new) {
len = msg->topic_name_len;
if (len > PRINT_BUFFER_SIZE) {
len = PRINT_BUFFER_SIZE;
}
XMEMCPY(buf, msg->topic_name, len);
buf[len] = '\0';
/*打印Topic主题信息 */
printf("MQTT Message: Topic %s, Qos %d, Len %u\r\n",
buf, msg->qos, msg->total_len);
}
/* 打印Topic消息体 */
len = msg->buffer_len;
if (len > PRINT_BUFFER_SIZE) {
len = PRINT_BUFFER_SIZE;
}
XMEMCPY(buf, msg->buffer, len);
buf[len] = '\0';
printf("Payload (%d - %d): %s\r\n",
msg->buffer_pos, msg->buffer_pos + len, buf);
if (msg_done) {
printf("MQTT Message: Done\r\n");
}
/* Return negative to terminate publish processing */
return MQTT_CODE_SUCCESS;
}
//*****************************************************************************
//
//! MQTT interface
//
//*****************************************************************************
/**
* @ingroup MQTT config function
* @brief MQTT clinet init
* @details
* @param void
*/
int MQTTClient_Init(void)
{
int rc;
uint8_t memsize[2][8] = {{2,2,2,2,2,2,2,2},{2,2,2,2,2,2,2,2}};
if(ctlwizchip(CW_INIT_WIZCHIP,(void*)memsize) == -1)
{
printf("WIZCHIP Initialized fail.\r\n");
return -1;
}
setsockopt(MQTT_Client,SO_KEEPALIVEAUTO,(void *)2);
/* Initialize Network */
rc = MqttClientNet_Init(&net);
#ifdef Debug
printf("MQTT Net Init: %s (%d)\r\n",
MqttClient_ReturnCodeToString(rc), rc);
#endif
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
/* Initialize MqttClient structure */
//tx_buf = (byte*)WOLFMQTT_MALLOC(MAX_BUFFER_SIZE);
//rx_buf = (byte*)WOLFMQTT_MALLOC(MAX_BUFFER_SIZE);
rc = MqttClient_Init(&client, &net, mqttclient_message_cb,
tx_buf, MAX_BUFFER_SIZE, rx_buf, MAX_BUFFER_SIZE,
cmd_timeout_ms);
#ifdef Debug
printf("MQTT Init: %s (%d)\r\n",
MqttClient_ReturnCodeToString(rc), rc);
#endif
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
return 0;
exit:
State = 0;
/* Cleanup network */
MqttClientNet_DeInit(&net);
return -1;
}
/**
* @ingroup
* @brief MQTT connecct
* @details
* @param host->
*/
int MQTTClient_Connect_With_Name(char *host, char * user, char * pwd)
{
int rc;
// some configs
const char* topicName = DEFAULT_SUBSCRIBE_TOPIC;
const char* username = user;
const char* password = pwd;
MqttQoS qos = DEFAULT_MQTT_QOS;
/* Define connect parameters */
MqttConnect connect;
MqttMessage lwt_msg;
/* Connect to broker */
rc = MqttClient_NetConnect(&client, host, MQTT_Server_Port,
DEFAULT_CON_TIMEOUT_MS, use_tls, mqttclient_tls_cb);
#ifdef Debug
printf("MQTT Socket Connect: %s (%d)\r\n", MqttClient_ReturnCodeToString(rc), rc);
#endif
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
if(rc == MQTT_CODE_SUCCESS)
{
XMEMSET(&connect, 0, sizeof(MqttConnect));
connect.keep_alive_sec = DEFAULT_KEEP_ALIVE_SEC;
connect.clean_session = clean_session;
connect.client_id = client_id;
/* Last will and testament sent by broker to subscribers
of topic when broker connection is lost */
XMEMSET(&lwt_msg, 0, sizeof(lwt_msg));
connect.lwt_msg = &lwt_msg;
connect.enable_lwt = enable_lwt;
if (enable_lwt) {
/* Send client id in LWT payload */
lwt_msg.qos = qos;
lwt_msg.retain = 0;
lwt_msg.topic_name = topicName;
lwt_msg.buffer = (byte*)client_id;
lwt_msg.total_len = (word16)XSTRLEN(client_id);
}
/* Optional authentication */
connect.username = username;
connect.password = password;
/* Send Connect and wait for Connect Ack */
rc = MqttClient_Connect(&client, &connect);
#ifdef Debug
printf("MQTT Connect: %s (%d)\r\n",
MqttClient_ReturnCodeToString(rc), rc);
#endif
if (rc == MQTT_CODE_SUCCESS) {
State = 1;
/* Validate Connect Ack info */
#ifdef Debug
printf("MQTT Connect Ack: Return Code %u, Session Present %d\r\n",
connect.ack.return_code,
(connect.ack.flags & MQTT_CONNECT_ACK_FLAG_SESSION_PRESENT) ?
1 : 0
);
#endif
return 0;
}
}
exit:
State = 0;
/* Cleanup network */
MqttClientNet_DeInit(&net);
return -1;
}
/**
* @ingroup
* @brief
* @details
* @param topicName->
* @param MqttQoS->
MQTT_QOS_0 = 0, At most once delivery
MQTT_QOS_1 = 1, At least once delivery
MQTT_QOS_2 = 2, Exactly once delivery
MQTT_QOS_3 = 3, Reserved - must not be used
* @param message-> 消息的内容
* @param topicName-> 消息的长度
*/
int MQTTClient_Publish(char *topicName,MqttQoS qos,char *message,uint32_t length)
{
int rc;
MqttPublish publish;
/* Publish Topic */
XMEMSET(&publish, 0, sizeof(MqttPublish));
publish.retain = 0;
publish.qos = qos;
publish.duplicate = 0;
publish.topic_name = topicName;
publish.packet_id = mqttclient_get_packetid();
publish.buffer = (byte*)message;
//publish.total_len = (word16)XSTRLEN(TEST_MESSAGE);
publish.total_len = length;
#ifdef Lock
MQTT_Lock(osWaitForever);
#endif
rc = MqttClient_Publish(&client, &publish);
#ifdef Lock
MQTT_Unlock();
#endif
#ifdef Debug
printf("MQTT Publish: Topic %s, %s (%d)\r\n",
publish.topic_name, MqttClient_ReturnCodeToString(rc), rc);
#endif
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
return 0;
exit:
State = 0;
/* Cleanup network */
MqttClientNet_DeInit(&net);
return -1;
}
/**
* @ingroup MQTT使用函数
* @brief MQTT客户端消息的订阅
* @details
* @param topicName-> 订阅消息的 topic
* @param MqttQoS-> 消息发布服务质量
MQTT_QOS_0 = 0, At most once delivery
MQTT_QOS_1 = 1, At least once delivery
MQTT_QOS_2 = 2, Exactly once delivery
MQTT_QOS_3 = 3, Reserved - must not be used
*/
int MQTTclient_Subscribe(char *topicName,MqttQoS qos)
{
int rc;
int i ;
MqttSubscribe subscribe;
MqttTopic topics[1], *topic;
/* Build list of topics */
topics[0].topic_filter = topicName;
topics[0].qos = qos;
/* Subscribe Topic */
XMEMSET(&subscribe, 0, sizeof(MqttSubscribe));
subscribe.packet_id = mqttclient_get_packetid();
subscribe.topic_count = sizeof(topics)/sizeof(MqttTopic);
subscribe.topics = topics;
#ifdef Lock
MQTT_Lock(osWaitForever);
#endif
rc = MqttClient_Subscribe(&client, &subscribe);
#ifdef Lock
MQTT_Unlock();
#endif
printf("MQTT Subscribe: %s (%d)\r\n",
MqttClient_ReturnCodeToString(rc), rc);
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
for (i = 0; i < subscribe.topic_count; i++) {
topic = &subscribe.topics[i];
printf(" Topic %s, Qos %u, Return Code %u\r\n",
topic->topic_filter, topic->qos, topic->return_code);
}
return 0;
exit:
State = 0;
/* Cleanup network */
MqttClientNet_DeInit(&net);
return -1;
}
/**
* @ingroup MQTT使用函数
* @brief MQTT客户端消息的取消订阅
* @details
* @param topicName-> 取消订阅消息的 topic
* @param MqttQoS-> 消息发布服务质量
MQTT_QOS_0 = 0, At most once delivery
MQTT_QOS_1 = 1, At least once delivery
MQTT_QOS_2 = 2, Exactly once delivery
MQTT_QOS_3 = 3, Reserved - must not be used
*/
int MQTTclient_Unsubscribe_Topics(char *topicName,MqttQoS qos)
{
int rc;
MqttUnsubscribe unsubscribe;
MqttTopic topics[1];
/* Build list of topics */
topics[0].topic_filter = topicName;
topics[0].qos = qos;
/* Unsubscribe Topics */
XMEMSET(&unsubscribe, 0, sizeof(MqttUnsubscribe));
unsubscribe.packet_id = mqttclient_get_packetid();
unsubscribe.topic_count = sizeof(topics)/sizeof(MqttTopic);
unsubscribe.topics = topics;
#ifdef Lock
MQTT_Lock(osWaitForever);
#endif
rc = MqttClient_Unsubscribe(&client, &unsubscribe);
#ifdef Lock
MQTT_Unlock();
#endif
printf("MQTT Unsubscribe: %s (%d)\r\n",
MqttClient_ReturnCodeToString(rc), rc);
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
return 0;
exit:
State = 0;
/* Cleanup network */
MqttClientNet_DeInit(&net);
return -1;
}
/**
* @ingroup MQTT使用函数
* @brief MQTT读取消息的Loop
* @details
* @param topicName-> 需要的去的那个Topic的消息
* @param MqttQoS-> 消息服务质量
MQTT_QOS_0 = 0, At most once delivery
MQTT_QOS_1 = 1, At least once delivery
MQTT_QOS_2 = 2, Exactly once delivery
MQTT_QOS_3 = 3, Reserved - must not be used
*/
int MQTTclient_ReadLoop(void)
{
int rc;
uint8_t readCount = 0 ,temp =1;
/* Read Loop */
#ifdef Debug
printf("MQTT Waiting for message...\r\n");
#endif
while (IsConnnected) {
/* Try and read packet */
IsReadingMessage = 1;
if(IsConnnected)
rc = MqttClient_WaitMessage(&client, cmd_timeout_ms);
IsReadingMessage = 0;
if (rc == MQTT_CODE_ERROR_TIMEOUT) {
//Ping
readCount ++;
temp = 1;
if(cmd_timeout_ms/1000)
{
temp = cmd_timeout_ms/1000;
}
if(readCount > DEFAULT_KEEP_ALIVE_SEC/2/temp)
{
readCount = 0;
#ifdef Lock
MQTT_Lock(osWaitForever);
#endif
rc = MqttClient_Ping(&client);
#ifdef Lock
MQTT_Unlock();
#endif
if (rc != MQTT_CODE_SUCCESS) {
#ifdef Debug
printf("MQTT Ping Keep Alive Error: %s (%d)\r\n",
MqttClient_ReturnCodeToString(rc), rc);
#endif
}
else
{
#ifdef Debug
printf("Ping....\r\n");
#endif
}
}
}
else if (rc != MQTT_CODE_SUCCESS) {
/* There was an error */
printf("MQTT Message Wait: %s (%d)\r\n",
MqttClient_ReturnCodeToString(rc), rc);
break;
}
}
/* Check for error */
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
return 0;
exit:
/* Cleanup network */
IsConnnected = false;
MqttClientNet_DeInit(&net);
return -1;
}
/**
* @ingroup MQTT使用函数
* @brief MQTT客户端断开链接
* @details
* @param void
*/
int MQTTclient_Disconnect(void)
{
int rc;
State = 0;
/* Disconnect */
#ifdef Lock
MQTT_Lock(osWaitForever);
#endif
rc = MqttClient_Disconnect(&client);
printf("MQTT Disconnect: %s (%d)\r\n",
MqttClient_ReturnCodeToString(rc), rc);
#ifdef Lock
MQTT_Unlock();
#endif
/* Cleanup network */
MqttClientNet_DeInit(&net);
return 0;
}
`
in the Main.c file, I just for the net initialization, and call MQTTclient_ReadLoop:
`
int main(void)
{
/* I omitted the initialization section. I think this part is OK. */
if(!MQTTClient_Connect_With_Name(getBrokerAddress(),getUserName(),pwd)){
IsConnnected = 1;
}
else{
IsConnnected = 0;
}
while(1)
{
if(IsConnnected)
{
MQTTclient_Subscribe(DEFAULT_SUBSCRIBE_TOPIC,MQTT_QOS_0);
//MQTTClient_Publish(DEFAULT_SUBSCRIBE_TOPIC,MQTT_QOS_0, "STM32 TEST MESSAGE.", strlen("STM32 TEST MESSAGE."));
MQTTclient_ReadLoop();
}
else
{
osDelay(300);
}
}
}
For "timeout error" (-7), I think it is also no expected. I have done the experiment and the debug message is:
Hi @moxigua
Since this is unencrypted, a packet capture will be able to show you if a malformed packet is causing the broker to disconnect. Could you please attach a packet trace using wireshark?
If you would prefer to continue this discussion in a more private environment, please feel free to open a support ticket by sending an email to support@wolfssl.com
Thanks, @embhorn
Hi @moxigua
I am closing this issue since it has been inactive for quite some time. Please feel free to reopen with further comments.
Recently, I wanna to port the wolfMQTT library to my board which is compoent by STM32F103ZET6 + W5500. And my develop environment is MDKV5.31.0.0. Here is my debugging phenomenon. And the MQTT server is Iot Core In Baidu.
When I used the wolfMQTTV0.8, Everything is OK! The message from USART is : ` ip is :192.168.1.129 gw is :192.168.1.1 mac is :0.8.220.17.18.22 DHCP LEASED TIME : 7200 Sec. MQTT Net Init: Success (0) MQTT Init: Success (0)
When I used the wolfMQTTV0.9 to V0.11 with the same code. The timeout error will be occured when send keep alive : `=== W5500 NET CONF === ip is :192.168.1.129 gw is :192.168.1.1 mac is :0.8.220.17.18.22 DHCP LEASED TIME : 7200 Sec. MQTT Net Init: Success (0) MQTT Init: Success (0)
When I used the wolfMQTTV0.12 to The lastest version with the same code. I can't communicat with MQTT.fx anymore. But From the debug message, we can see that stm32 can connect MQTT server OK, while it will be disconnected by some unkown errors. The debug message is: `=== W5500 NET CONF === ip is :192.168.1.129 gw is :192.168.1.1 mac is :0.8.220.17.18.22 DHCP LEASED TIME : 7200 Sec. MQTT Net Init: Success (0) MQTT Init: Success (0)
my test file is :
MQTT_Temp.zip