rosjava / rosjava_core

An implementation of ROS in pure Java with Android support.
212 stars 166 forks source link

ServiceResponseHandler uses unsafe queue for returning service responses to caller #261

Open msmcconnell opened 6 years ago

msmcconnell commented 6 years ago

The ServiceResponseHandler uses a queue to handler ros service requests between nodes. Since the response to a service call is simply a poll from the queue, service responses can become mismatched if a node contains multi-threaded components making the same service call. If the first request does not complete before the second request, the responses will be flipped. A mapping is needed to ensure only the correct responses are returned.

Relevant variable: Line 41 private final Queue<ServiceResponseListener<ResponseType>> responseListeners;

Relevant method: Line 52

  @Override
  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    final ServiceResponseListener<ResponseType> listener = responseListeners.poll();
    Preconditions.checkNotNull(listener, "No listener for incoming service response.");
    final ServiceServerResponse response = (ServiceServerResponse) e.getMessage();
    final ChannelBuffer buffer = response.getMessage();
    executorService.execute(new Runnable() {
      @Override
      public void run() {
        if (response.getErrorCode() == 1) {
          listener.onSuccess(deserializer.deserialize(buffer));
        } else {
          String message = Charset.forName("US-ASCII").decode(buffer.toByteBuffer()).toString();
          listener.onFailure(new RemoteException(StatusCode.ERROR, message));
        }
      }
    });
  }
jubeira commented 6 years ago

Hi @msmcconnell, thanks for reporting this. Do you have a code sample to test that this is effectively failing? That would help in case we apply a fix to verify whether it works or not. Do you propose a particular solution for the problem?

msmcconnell commented 6 years ago

I have an integration test between two nodes where one of them is multi-threaded. I would need to remove a small dependency on my own code but it wouldn't be too hard. I can provide the code sample here. I'll try to think of a solution.

msmcconnell commented 6 years ago

Here is a code sample for an integration test I used to verify that this was happening. If this is run as a unit test with gradle the user should see some mismatched thread numbers and a failure from the service call. Test Class

import cav_srvs.GetDriversWithCapabilities;
import cav_srvs.GetDriversWithCapabilitiesRequest;
import cav_srvs.GetDriversWithCapabilitiesResponse;
import gov.dot.fhwa.saxton.carma.rosutils.RosTest;
import org.ros.message.MessageFactory;
import org.ros.namespace.GraphName;
import org.ros.node.AbstractNodeMain;
import org.ros.node.ConnectedNode;
import org.ros.node.service.*;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.ros.node.Node;
import org.ros.node.NodeConfiguration;
import org.junit.Test;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
 * Class for running a multi-threaded service request
 */
public class ServiceTest extends RosTest {

  /**
   * Test if a multi-threaded node can safely call services from another node
   */
  @Test public void testServiceAvailability() throws Exception {
    final NodeConfiguration secondConfig = NodeConfiguration.newPrivate(rosCore.getUri());
    final MessageFactory messageFactory = secondConfig.getTopicMessageFactory();
    final CountDownLatch countDownLatch = new CountDownLatch(100);
    final String SERVICE_NAME = "test_service";
    final int NUM_THREADS = 10;

    // Create the anonymous node to test the server
    AbstractNodeMain serverNode = new AbstractNodeMain() {
      @Override public GraphName getDefaultNodeName() {
        return GraphName.of("server_node");
      }

      @Override public void onStart(final ConnectedNode connectedNode) {

        // Setup Server
        ServiceServer<GetDriversWithCapabilitiesRequest, GetDriversWithCapabilitiesResponse> testServer =
         connectedNode.newServiceServer(SERVICE_NAME, GetDriversWithCapabilities._TYPE,
          new ServiceResponseBuilder<GetDriversWithCapabilitiesRequest, GetDriversWithCapabilitiesResponse>() {

            @Override
            public void build(GetDriversWithCapabilitiesRequest req, GetDriversWithCapabilitiesResponse res) {
              res.setDriverData(req.getCapabilities());
            }
          });
      }
    };

    // Create the anonymous node to test the server
    AbstractNodeMain clientNode = new AbstractNodeMain() {
      List<ClientThread> threads = new LinkedList<>();
      @Override public GraphName getDefaultNodeName() {
        return GraphName.of("client_node");
      }

      @Override public void onStart(final ConnectedNode connectedNode) {

        // Assert that the service was created

        ServiceClient<GetDriversWithCapabilitiesRequest, GetDriversWithCapabilitiesResponse> serviceClient;
        try {
          serviceClient = connectedNode.newServiceClient(SERVICE_NAME, GetDriversWithCapabilities._TYPE);
          for(int i = 0; i < NUM_THREADS; i++) {
            ClientThread newThread = new ClientThread("Thread " + i, countDownLatch, messageFactory,
            SERVICE_NAME, serviceClient, connectedNode.getLog());
            newThread.start();
            threads.add(newThread);
          }
        } catch (org.ros.exception.ServiceNotFoundException e) {
          fail("Couldn't find service " + SERVICE_NAME);
        }
      }

      @Override
      public void onShutdown(Node node) {
        for (ClientThread thread: threads) {
          thread.interrupt();
        }
      }
    };

    // Start the transform server node
    nodeMainExecutor.execute(serverNode, nodeConfiguration);
    // Give time for service to be available
    Thread.sleep(1000);
    // Start the anonymous node to test the server
    nodeMainExecutor.execute(clientNode, secondConfig);
    assertTrue(countDownLatch.await(20, TimeUnit.SECONDS)); // Check if service calls were successful
    // // Shutdown nodes
    // nodeMainExecutor.shutdownNodeMain(clientNode);
    // // Shutting down the transform server from this test results in a exception on printing the service address
    // nodeMainExecutor.shutdownNodeMain(serverNode);
    // Stack trace is automatically logged
    // ROS is shutdown automatically in cleanup from ROS Test
  }

}

Thread Class

import cav_srvs.GetDriversWithCapabilities;
import cav_srvs.GetDriversWithCapabilitiesRequest;
import cav_srvs.GetDriversWithCapabilitiesResponse;
import org.ros.exception.RemoteException;
import org.ros.message.MessageFactory;
import org.ros.node.service.*;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

/**
 * Class for making service requests
 */
public class ClientThread extends Thread {
  final String requestStr;
  final CountDownLatch countDownLatch;
  final MessageFactory messageFactory;
  final ServiceClient<GetDriversWithCapabilitiesRequest, GetDriversWithCapabilitiesResponse> serviceClient;
  final String service;
  final boolean[] done = new boolean[1];
  final Log log;

  ClientThread(String requestStr, CountDownLatch countDownLatch, MessageFactory messageFactory,
   String service, ServiceClient<GetDriversWithCapabilitiesRequest, GetDriversWithCapabilitiesResponse> serviceClient, Log log) {
    this.requestStr = requestStr;
    this.countDownLatch = countDownLatch;
    this.messageFactory = messageFactory;
    this.service = service;
    this.serviceClient = serviceClient;
    this.done[0] = true;
    this.log = log;
  }

  @Override
  public void run(){  
    // Build ros messages
    if (done[0]) {
      final List<String> caps = new LinkedList<>();
      caps.add(requestStr);
      done[0] = false;
      GetDriversWithCapabilitiesRequest req =
      messageFactory.newFromType(GetDriversWithCapabilitiesRequest._TYPE);

      req.setCapabilities(caps);

      serviceClient.call(req, new ServiceResponseListener<GetDriversWithCapabilitiesResponse>() {
        @Override public void onSuccess(GetDriversWithCapabilitiesResponse response) {
          log.info("\n\n Request: " + req.getCapabilities().get(0) + " Result: " + response.getDriverData().get(0) + " \n\n");
          assertTrue(response.getDriverData().get(0).equals(req.getCapabilities().get(0)));
          countDownLatch.countDown();
          done[0] = true;
        }

        @Override
        public void onFailure(RemoteException e) {
          fail("Service request failed for request " + requestStr);
        }
      });
    }
  }  
}

The service used in this case looks like this.

String[] capabilities
---
String[] driver_data
jubeira commented 6 years ago

Thanks @msmcconnell, this looks useful! If you find a fix for it please submit a PR. I will try to test this when I have some time.

msmcconnell commented 6 years ago

I'll look into making a fix. Which branch is the preferred one to develop on? The README says master but it is listed as obsolete/master and the last PR was onto kinetic. Does that mean I should branch directly off kinetic?

jubeira commented 6 years ago

Good catch; that readme should be updated as well. For a while now we are targeting everything to Kinetic, so please use it as a starting point, and then target your PR there.