eclipse / mosquitto

Eclipse Mosquitto - An open source MQTT broker
https://mosquitto.org
Other
8.93k stars 2.37k forks source link

add timeout arg to mosquitto_connect() function #3051

Open legale opened 4 months ago

legale commented 4 months ago

This function allows me to use an arbitrary timeout for establishing a connection. When connection issues generate feedback to the Mosquitto client (e.g., rejected TCP packets), there are no problems, and the mosquitto_connect() function quickly returns a result. However, in situations where there is no feedback to the client (e.g., packets are dropped or disappear for other reasons), mosquitto_connect() can attempt an unsuccessful connection for several minutes.

To avoid modifying the libmosquitto library code, this external function works in conjunction with mosquitto_connect_async() and mosquitto_socket().

After calling the connect function, my function waits for a specified timeout period for the connection to be established. When this happens, the function returns 0. As a result, I achieve functionality that could be directly implemented in the library through a modified mosquitto_connect() function.

If we add an additional timeout argument:

int mosquitto_connect(struct mosquitto *mosq, const char *host, int port, int keepalive, int timeout)

We could implement the logic similar to the one used in my function inside this modified function.

What do you think about this?

static int check_tcp_connection(int fd, int timeout_ms) {
  fd_set write_fds;
  struct timespec timeout;
  int ret;
  socklen_t optlen;
  int error;

  FD_ZERO(&write_fds);
  FD_SET(fd, &write_fds);

  timeout.tv_sec = timeout_ms / 1000;
  timeout.tv_nsec = (timeout_ms % 1000) * 1000000;

  ret = pselect(fd + 1, NULL, &write_fds, NULL, &timeout, NULL);

  if (ret == -1) {
    return MOSQ_ERR_ERRNO; // Error occurred
  } else if (ret == 0) {
    return MOSQ_ERR_TIMEOUT; // Timeout occurred
  } else {
    optlen = sizeof(error);
    if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen) < 0) {
      return MOSQ_ERR_ERRNO; // An error occurred
    }
    if (error == 0) {
      return MOSQ_ERR_SUCCESS; // Connection is established
    } else if (error == EINPROGRESS) {
      return MOSQ_ERR_CONN_PENDING; // Connection is still in progress
    } else {
      return MOSQ_ERR_CONN_REFUSED; // Connection failed
    }
  }
}
veersatyag commented 3 months ago

timeout should be addedd for all mosquitto connect funtion in the library

KevKzTW commented 2 months ago

I also have to fix my application timeout issue since my fool user configed host ip to a Windows PC with firewall :P

@legale so you using mosquitto_connect_async() to create socket and use mosquitto_socket() to get socket fd for your own pselect control ?

And did you have multiple MQTT connection in the same select or epoll ? I also have done this combo, but it may not finish MQTT connect flow when I need maintain multiple connections with my thirdparty event loop library.

I tried to use setsockopt(mosquitto_socket(), SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)) then call mosquitto_reconnect() again, hope it can be timeout and return, but it was not working.

I tried to modify net__try_connect_tcp() to setsockopt() timeout before connect, it worked. But it changed to return MOSQ_ERR_KEEPALIVE in on_disconnect callback, I think it is weird for me. (I have to do connection error handle in on_disconnect callback)

I think if mosquitto_connect() have timeout arg, it would face the same problem, when user's timeout > MQTT keep alive time.

Now I decided to use a thread to do connect and change control to my event loop when it's connect return success. If I have to clean the connecting thread, I would use a garbage collection thread to clean all of connecting thread. I could never care the timeout :P

that's my experiense.

ginotsai123456 commented 1 month ago

timeout function is good, I recently find my customer drop all of the receiving packages from aws MQTT by their MIS and cause my program hang there. I still don't know how to solve it so I decide to open a std::thread to run the connection and let it hang.