ros / ros_comm

ROS communications-related packages, including core client libraries (roscpp, rospy, roslisp) and graph introspection tools (rostopic, rosnode, rosservice, rosparam).
http://wiki.ros.org/ros_comm
765 stars 912 forks source link

Function recv_buff(sock, b, buff_size) when the remote end is closed, resulting in abnormal reconnection. #2123

Closed jinmenglei closed 3 years ago

jinmenglei commented 3 years ago

When I use rosbridge, after I send a message, I close the client connected through rosbridge, which will result in d = sock.recv(buff_size) After receiving an empty string.

 def recv(self, buffersize, flags=None): # real signature unknown; restored from __doc__
        """
        recv(buffersize[, flags]) -> data

        Receive up to buffersize bytes from the socket.  For the optional flags
        argument, see the Unix manual.  When no data is available, block until
        at least one byte is available or until the remote end is closed.  When
        the remote end is closed and all data is read, return the empty string.
        """
        pass

An exception is thrown, and then the socket is reconnected. Finally, the waiting time exceeds 32S. It is difficult to connect, and the port will be wasted when reconnecting. I refer to the C + + code and find that the processing logic is different.

int32_t TransportTCP::read(uint8_t* buffer, uint32_t size)
{
  {
    boost::recursive_mutex::scoped_lock lock(close_mutex_);

    if (closed_)
    {
      ROSCPP_LOG_DEBUG("Tried to read on a closed socket [%d]", sock_);
      return -1;
    }
  }

  ROS_ASSERT(size > 0);

  // never read more than INT_MAX since this is the maximum we can report back with the current return type
  uint32_t read_size = std::min(size, static_cast<uint32_t>(INT_MAX));
  int num_bytes = ::recv(sock_, reinterpret_cast<char*>(buffer), read_size, 0);
  if (num_bytes < 0)
  {
    if ( !last_socket_error_is_would_block() ) // !WSAWOULDBLOCK / !EAGAIN && !EWOULDBLOCK
    {
      ROSCPP_LOG_DEBUG("recv() on socket [%d] failed with error [%s]", sock_, last_socket_error_string());
      close();
    }
    else
    {
      num_bytes = 0;
    }
  }
  else if (num_bytes == 0)
  {
    ROSCPP_LOG_DEBUG("Socket [%d] received 0/%u bytes, closing", sock_, size);
    close();
    return -1;
  }

  return num_bytes;
}

C + + receives zero and closes the connection directly instead of reconnecting. Can we consider closing it, like this:

# add break here
                    if str(e) == "unable to receive data from sender, check sender's logs for details":
                        break
    def receive_loop(self, msgs_callback):
        """
        Receive messages until shutdown
        @param msgs_callback: callback to invoke for new messages received    
        @type  msgs_callback: fn([msg])
        """
        # - use assert here as this would be an internal error, aka bug
        logger.debug("receive_loop for [%s]", self.name)
        try:
            while not self.done and not is_shutdown():
                try:
                    if self.socket is not None:
                        msgs = self.receive_once()
                        if not self.done and not is_shutdown():
                            msgs_callback(msgs, self)
                    else:
                        self._reconnect()
                except TransportException as e:
                    # set socket to None so we reconnect
                    try:
                        if self.socket is not None:
                            try:
                                self.socket.shutdown()
                            except:
                                pass
                            finally:
                                self.socket.close()
                    except:
                        pass
                    self.socket = None
                    # add break here
                    if str(e) == "unable to receive data from sender, check sender's logs for details":
                        break
                        # pass

                except DeserializationError as e:
                    #TODO: how should we handle reconnect in this case?

                    logerr("[%s] error deserializing incoming request: %s"%self.name, str(e))
                    rospyerr("[%s] error deserializing incoming request: %s"%self.name, traceback.format_exc())
                except:
                    # in many cases this will be a normal hangup, but log internally
                    try:
                        #1467 sometimes we get exceptions due to
                        #interpreter shutdown, so blanket ignore those if
                        #the reporting fails
                        rospydebug("exception in receive loop for [%s], may be normal. Exception is %s",self.name, traceback.format_exc())                    
                    except: pass

            rospydebug("receive_loop[%s]: done condition met, exited loop"%self.name)                    
        finally:
            if not self.done:
                self.close()

The test is OK after I modify it

fujitatomoya commented 3 years ago

this sounds reasonable to me. since transport is terminated with 'TransportTerminated' we would not want to reconnect. (in roscpp, if that is neither EAGAIN nor EWOULDBLOCK, it is closed and done.)

if str(e) == "unable to receive data from sender, check sender's logs for details":

instead of that, i would add except catch statement TransportTerminated before TransportException.

jinmenglei commented 3 years ago

@fujitatomoya Thank you. After you fix it, I will pull the code again. I like ROS and hope that the UDP part of ros1 can be updated later.