ros / ros_realtime

Other
16 stars 13 forks source link

ros_realtime: wrap more system calls #1

Open ablasdel opened 11 years ago

ablasdel commented 11 years ago

Migrated from code.ros.org, issue #2496 Reported by: jfaust

https://code.ros.org/trac/ros/ticket/2496

brk()/sbrk() pthread calls

ablasdel commented 11 years ago

Changed 2 years ago by mrinal I'm using rosrt successfully on Xenomai, thanks for the great work! The Subscriber works as expected. The Publisher works too, but causes xenomai to switch to secondary mode because it uses a boost (pthread) condition variable to signal the publishing thread. I'm going to replace this with a xenomai condition variable for my own use, which will prevent the mode switch. Before I do this, I wanted to know if you had some design in mind to wrap pthread calls. The way I plan to do this is to define a rosrt::mutex and rosrt::condition_variable which mirrors the boost api, but with #ifdefs for the xenomai architecture. If there's a better way I can do this, that would make my patch worthy of inclusion into rosrt, please let me know. Thanks! Changed 2 years ago by jfaust Do you know why xenomai switches mode when signalling the condition? My impression is it shouldn't have to do that. Since it's doing it though, that does seem reasonable. Changed 2 years ago by mrinal Xenomai will switch mode if any pthread function is called, this is expected. I've also confirmed that "pthread_cond_signal" is the reason for the mode switch I'm seeing in the publisher. The standard way to get around it is to use the "POSIX skin" of Xenomai, which wraps all pthread calls (  http://www.xenomai.org/index.php/Porting_POSIX_applications_to_Xenomai ). But this creates other strange problems, like rosconsole failing to initialize - I posted this on ros-users a month or two ago. I think the reason for this may be that rosconsole, and other ROS libraries that I use in my Xenomai app would also need to be linked with the Xenomai POSIX skin, which is painful. Changed 2 years ago by mrinal ▪ attachment rosrt_xenomai.diff  added First patch for rosrt Xenomai support Changed 2 years ago by mrinal Attached is my first version of the patch for Xenomai. It turned out to be a little more involved than described earlier. Xenomais mutexes and condition variables cannot be used from a non-realtime pthread, so I also added a bare-bones rosrt::thread wrapper, which thePublisherManager? uses. This also implies that rosrt::init() must be called from a real-time thread/task. Some of the tests had to be modified to work correctly on Xenomai because an loop without sleeps in a real-time thread can starve the system since there will be no preemption. Changed 2 years ago by jfaust Can you create a new ticket for this stuff? It's not really applicable to this one. Changed 2 years ago by jfaust ▪ owner changed from jfaust to gerkey

Changed 2 years ago by mrinal For the record, the patch I posted above is attached to ticket #3231 instead. Changed 20 months ago by gerkey ▪ cc straszheim, sglaser added We've been sitting on this for a long time, and we're not currently doing anything substantial with rosrt (right?). Mrinal, are you interested in picking it up? Changed 20 months ago by gerkey ▪ owner changed from gerkey to mrinal

Changed 20 months ago by mrinal ▪ status changed from new to assigned Sure, we are actively using rosrt here on Xenomai. Our version of rosrt has been diverging a bit from the willow version, so it would be nice to bring it back in sync again. It would be cool to have someone do code-reads of our patches though, before I integrate them. Changed 19 months ago by gerkey ▪ cc kwc added How about we give you commit access to the ros repo, so that you can push changes toros_realtime directly? Changed 19 months ago by mrinal Fine by me. Changed 19 months ago by gerkey Ok, Mrinal, you have commit permission to the ros repo. Use this power wisely :) It would be great to see a new release of ros_realtime as these new features flow in. Looks like it hasn't been released since last November. Note that it's still < 1.0, so there are no guarantees about stability, meaning that it's fine to change the API and/or add features (but, of course, try not to gratuitously break existing users).

ablasdel commented 11 years ago

patch

diff --git a/ros_extras/ros_realtime/rosrt/CMakeLists.txt b/ros_extras/ros_realtime/rosrt/CMakeLists.txt
index 4e64214..a717b1a 100644
--- a/ros_extras/ros_realtime/rosrt/CMakeLists.txt
+++ b/ros_extras/ros_realtime/rosrt/CMakeLists.txt
@@ -11,6 +11,19 @@ include($ENV{ROS_ROOT}/core/rosbuild/rosbuild.cmake)

 rosbuild_init()

+# Check for Xenomai:
+set(XENOMAI_SEARCH_PATH /usr/local/xenomai /usr/xenomai /usr)
+find_path(XENOMAI_DIR bin/xeno-config ${XENOMAI_SEARCH_PATH})
+if (XENOMAI_DIR)
+  set (XENOMAI_CONFIG ${XENOMAI_DIR}/bin/xeno-config)
+  message ("Xenomai found in ${XENOMAI_DIR}")
+  execute_process(COMMAND ${XENOMAI_CONFIG} --skin=native --cflags OUTPUT_VARIABLE XENOMAI_CFLAGS OUTPUT_STRIP_TRAILING_WHITESPACE)
+  execute_process(COMMAND ${XENOMAI_CONFIG} --skin=native --ldflags OUTPUT_VARIABLE XENOMAI_LDFLAGS OUTPUT_STRIP_TRAILING_WHITESPACE)
+  set (ROSRT_PLATFORM_CFLAGS ${XENOMAI_CFLAGS})
+  set (ROSRT_PLATFORM_LDFLAGS ${XENOMAI_LDFLAGS})
+  #add_definitions(${XENOMAI_CFLAGS})
+endif (XENOMAI_DIR)
+
 #set the default path for built executables to the "bin" directory
 set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
 #set the default path for built libraries to the "lib" directory
@@ -23,11 +36,17 @@ set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib)

 include_directories(include)

+#add platform-specific compile flags
+add_definitions(${ROSRT_PLATFORM_CFLAGS})
+
 #common commands for building c++ executables and libraries
 rosbuild_add_library(${PROJECT_NAME} src/malloc.cpp src/simple_gc.cpp src/publisher.cpp src/subscriber.cpp src/init.cpp)
 rosbuild_add_boost_directories()
 rosbuild_link_boost(${PROJECT_NAME} thread)

+#add platform-specific linker flags
+target_link_libraries(${PROJECT_NAME} ${ROSRT_PLATFORM_LDFLAGS})
+
 rosbuild_add_executable(test_publisher EXCLUDE_FROM_ALL test/test_publisher.cpp)
 rosbuild_add_gtest_build_flags(test_publisher)
 target_link_libraries(test_publisher ${PROJECT_NAME})
diff --git a/ros_extras/ros_realtime/rosrt/include/rosrt/detail/condition_variable.h b/ros_extras/ros_realtime/rosrt/include/rosrt/detail/condition_variable.h
new file mode 100644
index 0000000..0c7b4b6
--- /dev/null
+++ b/ros_extras/ros_realtime/rosrt/include/rosrt/detail/condition_variable.h
@@ -0,0 +1,145 @@
+/*****************************************************************************
+ * Boost Software License - Version 1.0 - August 17th, 2003
+ *
+ * Permission is hereby granted, free of charge, to any person or organization
+ * obtaining a copy of the software and accompanying documentation covered by
+ * this license (the "Software") to use, reproduce, display, distribute,
+ * execute, and transmit the Software, and to prepare derivative works of the
+ * Software, and to permit third-parties to whom the Software is furnished to
+ * do so, all subject to the following:
+ *
+ * The copyright notices in the Software and this entire statement, including
+ * the above license grant, this restriction and the following disclaimer,
+ * must be included in all copies of the Software, in whole or in part, and
+ * all derivative works of the Software, unless such copies or derivative
+ * works are solely in the form of machine-executable object code generated by
+ * a source language processor.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
+ * SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
+ * FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * \author Mrinal Kalakrishnan <mail@mrinal.net>
+ ******************************************************************************/
+
+#ifndef ROSRT_CONDITION_VARIABLE_H_
+#define ROSRT_CONDITION_VARIABLE_H_
+
+#include <boost/utility.hpp>
+#include <boost/assert.hpp>
+#include <boost/thread/exceptions.hpp>
+#include <boost/thread/locks.hpp>
+#include <rosrt/detail/mutex.h>
+
+#ifdef __XENO__
+#include <native/cond.h>
+#else
+#include <boost/thread/condition_variable.hpp>
+#endif
+
+namespace rosrt
+{
+
+/**
+ * Wrapper for a "real-time" condition variable, implementation differs based on platform.
+ * Falls back to boost::condition_variable on generic platforms.
+ *
+ * Attempts to mimic the boost::condition_variable api, but this is not a complete implementation,
+ * it's only intended for internal rosrt use.
+ */
+class condition_variable: boost::noncopyable
+{
+private:
+
+#ifdef __XENO__
+  RT_COND cond_;
+#else
+  boost::condition_variable cond_;
+#endif
+
+public:
+  condition_variable()
+  {
+#ifdef __XENO__
+    int const res = rt_cond_create(&cond_, NULL);
+    if (res)
+    {
+      throw boost::thread_resource_error();
+    }
+#endif
+  }
+
+  ~condition_variable()
+  {
+#ifdef __XENO__
+    BOOST_VERIFY(!rt_cond_delete(&cond_));
+#endif
+  }
+
+  void wait(boost::unique_lock<mutex>& m)
+  {
+#ifdef __XENO__
+    rosrt::mutex::native_handle_type native_mutex = m.mutex()->native_handle();
+    int const res = rt_cond_wait(&cond_, native_mutex, TM_INFINITE);
+    BOOST_VERIFY(!res);
+#else
+    pthread_cond_t* native_cond = cond_.native_handle();
+    pthread_mutex_t* native_mutex = m.mutex()->native_handle();
+    BOOST_VERIFY(!pthread_cond_wait(native_cond, native_mutex));
+#endif
+  }
+
+  template<typename predicate_type>
+  void wait(boost::unique_lock<mutex>& m, predicate_type pred)
+  {
+    while (!pred())
+      wait(m);
+  }
+
+#ifdef __XENO__
+  typedef RT_COND* native_handle_type;
+#else
+  typedef boost::condition_variable::native_handle_type native_handle_type;
+#endif
+
+  native_handle_type native_handle()
+  {
+#ifdef __XENO__
+    return &cond_;
+#else
+    return cond_.native_handle();
+#endif
+  }
+
+  void notify_one()
+  {
+#ifdef __XENO__
+    int const res = rt_cond_signal(&cond_);
+    if (res)
+    {
+      ROS_ERROR("rt_cond_signal returned %d", res);
+    }
+    BOOST_VERIFY(!res);
+#else
+    cond_.notify_one();
+#endif
+  }
+
+  void notify_all()
+  {
+#ifdef __XENO__
+    BOOST_VERIFY(!rt_cond_broadcast(&cond_));
+#else
+    cond_.notify_all();
+#endif
+  }
+
+};
+
+}
+
+#endif /* ROSRT_CONDITION_VARIABLE_H_ */
diff --git a/ros_extras/ros_realtime/rosrt/include/rosrt/detail/mutex.h b/ros_extras/ros_realtime/rosrt/include/rosrt/detail/mutex.h
new file mode 100644
index 0000000..d68f87e
--- /dev/null
+++ b/ros_extras/ros_realtime/rosrt/include/rosrt/detail/mutex.h
@@ -0,0 +1,135 @@
+/*****************************************************************************
+ * Boost Software License - Version 1.0 - August 17th, 2003
+ *
+ * Permission is hereby granted, free of charge, to any person or organization
+ * obtaining a copy of the software and accompanying documentation covered by
+ * this license (the "Software") to use, reproduce, display, distribute,
+ * execute, and transmit the Software, and to prepare derivative works of the
+ * Software, and to permit third-parties to whom the Software is furnished to
+ * do so, all subject to the following:
+ *
+ * The copyright notices in the Software and this entire statement, including
+ * the above license grant, this restriction and the following disclaimer,
+ * must be included in all copies of the Software, in whole or in part, and
+ * all derivative works of the Software, unless such copies or derivative
+ * works are solely in the form of machine-executable object code generated by
+ * a source language processor.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
+ * SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
+ * FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * \author Mrinal Kalakrishnan <mail@mrinal.net>
+ ******************************************************************************/
+
+#ifndef ROSRT_MUTEX_H_
+#define ROSRT_MUTEX_H_
+
+#include <boost/utility.hpp>
+#include <boost/assert.hpp>
+#include <boost/thread/exceptions.hpp>
+#include <boost/thread/locks.hpp>
+
+#ifdef __XENO__
+#include <native/mutex.h>
+#else
+#include <boost/thread/mutex.hpp>
+#endif
+
+namespace rosrt
+{
+
+/**
+ * Wrapper for a "real-time" mutex, implementation differs based on platform.
+ * Falls back to boost::mutex on generic platforms.
+ *
+ * Attempts to mimic the boost::mutex api, but this is not a complete implementation,
+ * it's only intended for internal rosrt use.
+ */
+class mutex: boost::noncopyable
+{
+private:
+
+#ifdef __XENO__
+  RT_MUTEX mutex_;
+#else
+  boost::mutex mutex_;
+#endif
+
+public:
+
+  mutex()
+  {
+#ifdef __XENO__
+    int const res = rt_mutex_create(&mutex_, NULL);
+    if (res)
+    {
+      throw boost::thread_resource_error();
+    }
+#endif
+  }
+
+  ~mutex()
+  {
+#ifdef __XENO__
+    BOOST_VERIFY(!rt_mutex_delete(&mutex_));
+#endif
+  }
+
+  void lock()
+  {
+#ifdef __XENO__
+    int const res = rt_mutex_acquire(&mutex_, TM_INFINITE);
+    BOOST_VERIFY(!res);
+#else
+    mutex_.lock();
+#endif
+  }
+
+  void unlock()
+  {
+#ifdef __XENO__
+    BOOST_VERIFY(!rt_mutex_release(&mutex_));
+#else
+    mutex_.unlock();
+#endif
+  }
+
+  bool try_lock()
+  {
+#ifdef __XENO__
+    int const res = rt_mutex_acquire(&mutex_, TM_NONBLOCK);
+    BOOST_VERIFY(!res || res==EWOULDBLOCK);
+    return !res;
+#else
+    return mutex_.try_lock();
+#endif
+  }
+
+#ifdef __XENO__
+  typedef RT_MUTEX* native_handle_type;
+#else
+  typedef boost::mutex::native_handle_type native_handle_type;
+#endif
+
+  native_handle_type native_handle()
+  {
+#ifdef __XENO__
+    return &mutex_;
+#else
+    return mutex_.native_handle();
+#endif
+  }
+
+  typedef boost::unique_lock<mutex> scoped_lock;
+  typedef boost::detail::try_lock_wrapper<mutex> scoped_try_lock;
+
+};
+
+}
+
+#endif /* ROSRT_MUTEX_H_ */
diff --git a/ros_extras/ros_realtime/rosrt/include/rosrt/detail/publisher_manager.h b/ros_extras/ros_realtime/rosrt/include/rosrt/detail/publisher_manager.h
index 8c75518..b41e17b 100644
--- a/ros_extras/ros_realtime/rosrt/include/rosrt/detail/publisher_manager.h
+++ b/ros_extras/ros_realtime/rosrt/include/rosrt/detail/publisher_manager.h
@@ -41,7 +41,9 @@
 #include <ros/publisher.h>
 #include <rosrt/publisher.h>
 #include <lockfree/object_pool.h>
-#include <boost/thread.hpp>
+#include <rosrt/detail/thread.h>
+#include <rosrt/detail/mutex.h>
+#include <rosrt/detail/condition_variable.h>

 namespace rosrt
 {
@@ -81,11 +83,11 @@ private:
   void publishThread();

   PublishQueue queue_;
-  boost::condition_variable cond_;
-  boost::mutex cond_mutex_;
-  boost::thread pub_thread_;
+  rosrt::condition_variable cond_;
+  rosrt::mutex cond_mutex_;
   ros::atomic<uint32_t> pub_count_;
   volatile bool running_;
+  rosrt::thread pub_thread_;
 };

 } // namespace detail
diff --git a/ros_extras/ros_realtime/rosrt/include/rosrt/detail/thread.h b/ros_extras/ros_realtime/rosrt/include/rosrt/detail/thread.h
new file mode 100644
index 0000000..f192ef3
--- /dev/null
+++ b/ros_extras/ros_realtime/rosrt/include/rosrt/detail/thread.h
@@ -0,0 +1,104 @@
+/*********************************************************************
+* Software License Agreement (BSD License)
+*
+*  Copyright (c) 2010, CLMC Lab, University of Southern California
+*  All rights reserved.
+*
+*  Redistribution and use in source and binary forms, with or without
+*  modification, are permitted provided that the following conditions
+*  are met:
+*
+*   * Redistributions of source code must retain the above copyright
+*     notice, this list of conditions and the following disclaimer.
+*   * Redistributions in binary form must reproduce the above
+*     copyright notice, this list of conditions and the following
+*     disclaimer in the documentation and/or other materials provided
+*     with the distribution.
+*   * Neither the name of the Willow Garage nor the names of its
+*     contributors may be used to endorse or promote products derived
+*     from this software without specific prior written permission.
+*
+*  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+*  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+*  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+*  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+*  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+*  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+*  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+*  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+*  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+*  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+*  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+*  POSSIBILITY OF SUCH DAMAGE.
+*********************************************************************/
+
+#ifndef ROSRT_THREAD_H_
+#define ROSRT_THREAD_H_
+
+#include <boost/utility.hpp>
+
+#ifdef __XENO__
+#include <native/task.h>
+#else
+#include <boost/thread.hpp>
+#endif
+
+extern "C"
+{
+static void thread_proxy(void* arg);
+
+}
+
+namespace rosrt
+{
+
+/**
+ * Thin wrapper for a "real-time" thread, implementation differs based on platform.
+ * Falls back to boost::thread on generic platforms.
+ *
+ */
+class thread: boost::noncopyable
+{
+private:
+#ifdef __XENO__
+  RT_TASK thread_;
+  boost::function<void ()> thread_fn_;
+#else
+  boost::thread thread_;
+#endif
+
+public:
+  explicit thread(boost::function<void ()> thread_fn)
+  {
+#ifdef __XENO__
+    thread_fn_ = thread_fn;
+    rt_task_spawn(&thread_, NULL, 0, 0, T_FPU | T_JOINABLE, thread_proxy, &thread_fn_);
+    //thread_proxy(&thread_fn_);
+#else
+    thread_ = boost::thread(thread_fn);
+#endif
+  }
+
+  ~thread()
+  {
+  }
+
+  void join()
+  {
+#ifdef __XENO__
+    rt_task_join(&thread_);
+#else
+    thread_.join();
+#endif
+  }
+};
+
+}
+
+static void thread_proxy(void* arg)
+{
+  boost::function<void ()>* fn = static_cast<boost::function<void ()>*>(arg);
+  (*fn)();
+}
+
+#endif /* ROSRT_THREAD_H_ */
diff --git a/ros_extras/ros_realtime/rosrt/src/publisher.cpp b/ros_extras/ros_realtime/rosrt/src/publisher.cpp
index b9357d6..8bd960e 100644
--- a/ros_extras/ros_realtime/rosrt/src/publisher.cpp
+++ b/ros_extras/ros_realtime/rosrt/src/publisher.cpp
@@ -95,8 +95,8 @@ PublisherManager::PublisherManager(const InitOptions& ops)
 : queue_(ops.pubmanager_queue_size)
 , pub_count_(0)
 , running_(true)
+, pub_thread_(boost::bind(&PublisherManager::publishThread, this))
 {
-  pub_thread_ = boost::thread(&PublisherManager::publishThread, this);
 }

 PublisherManager::~PublisherManager()
@@ -111,7 +111,7 @@ void PublisherManager::publishThread()
   while (running_)
   {
     {
-      boost::mutex::scoped_lock lock(cond_mutex_);
+      rosrt::mutex::scoped_lock lock(cond_mutex_);
       while (running_ && pub_count_.load() == 0)
       {
         cond_.wait(lock);
@@ -122,7 +122,6 @@ void PublisherManager::publishThread()
         return;
       }
     }
-
     uint32_t count = queue_.publishAll();
     pub_count_.fetch_sub(count);
   }
diff --git a/ros_extras/ros_realtime/rosrt/test/test_publisher.cpp b/ros_extras/ros_realtime/rosrt/test/test_publisher.cpp
index f632031..8fd2d90 100644
--- a/ros_extras/ros_realtime/rosrt/test/test_publisher.cpp
+++ b/ros_extras/ros_realtime/rosrt/test/test_publisher.cpp
@@ -41,6 +41,12 @@
 #include <std_msgs/UInt32.h>

 #include <boost/thread.hpp>
+#include <rosrt/detail/thread.h>
+
+#ifdef __XENO__
+#include <native/task.h>
+#include <sys/mman.h>
+#endif

 using namespace rosrt;

@@ -152,10 +158,18 @@ void publishThread(Publisher<std_msgs::UInt32>& pub, bool& done)
     if (msg)
     {
       pub.publish(msg);
+#ifdef __XENO__
+      rt_task_yield();
+#endif
     }
     else
     {
+#ifdef __XENO__
+      rt_task_yield();
+      rt_task_sleep(1000000);
+#else
       ros::WallDuration(0.0001).sleep();
+#endif
     }
   }
 }
@@ -170,7 +184,8 @@ TEST(Publisher, multipleThreads)
   Helper helpers[count];
   ros::Subscriber subs[count];

-  boost::thread_group tg;
+  boost::shared_ptr<rosrt::thread> threads[count];
+
   bool done = false;
   for (uint32_t i = 0; i < count; ++i)
   {
@@ -178,8 +193,7 @@ TEST(Publisher, multipleThreads)
     topic << "test" << i;
     pubs[i].initialize(nh.advertise<std_msgs::UInt32>(topic.str(), 0), 100, std_msgs::UInt32());
     subs[i] = nh.subscribe(topic.str(), 0, &Helper::cb, &helpers[i]);
-
-    tg.create_thread(boost::bind(publishThread, boost::ref(pubs[i]), boost::ref(done)));
+    threads[i].reset(new rosrt::thread(boost::bind(publishThread, boost::ref(pubs[i]), boost::ref(done))));
   }

   uint32_t recv_count = 0;
@@ -194,11 +208,18 @@ TEST(Publisher, multipleThreads)
       recv_count += helpers[i].count;
     }

+#ifdef __XENO__
+    rt_task_yield();
+    rt_task_sleep(1000000);
+#else
     ros::WallDuration(0.01).sleep();
+#endif
   }

   done = true;
-  tg.join_all();
+
+  for (uint32_t i=0; i<count; ++i)
+    threads[i]->join();

   ASSERT_GE(recv_count, count * 10000);

@@ -210,11 +231,17 @@ TEST(Publisher, multipleThreads)

 int main(int argc, char** argv)
 {
+#ifdef __XENO__
+  mlockall(MCL_CURRENT | MCL_FUTURE);
+  rt_task_shadow(NULL, "test_rt_publisher", 1, 0);
+#endif
+
   ros::init(argc, argv, "test_rt_publisher");
   testing::InitGoogleTest(&argc, argv);

   ros::NodeHandle nh;
   rosrt::init();

+
   return RUN_ALL_TESTS();
 }
diff --git a/ros_extras/ros_realtime/rosrt/test/test_subscriber.cpp b/ros_extras/ros_realtime/rosrt/test/test_subscriber.cpp
index fa1e320..58c6463 100644
--- a/ros_extras/ros_realtime/rosrt/test/test_subscriber.cpp
+++ b/ros_extras/ros_realtime/rosrt/test/test_subscriber.cpp
@@ -42,6 +42,11 @@

 #include <boost/thread.hpp>

+#ifdef __XENO__
+#include <native/task.h>
+#include <sys/mman.h>
+#endif
+
 using namespace rosrt;

 void publishThread(ros::Publisher& pub, bool& done)
@@ -78,7 +83,11 @@ TEST(Subscriber, singleSubscriber)
       ASSERT_GT((int32_t)msg->data, last);
       last = msg->data;
       ++count;
+      //printf("inc\n");
     }
+#ifdef  __XENO__
+    rt_task_sleep(1000000);
+#endif
   }

   ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL);
@@ -128,6 +137,9 @@ TEST(Subscriber, multipleSubscribersSameTopic)
         all_done = false;
       }
     }
+#ifdef  __XENO__
+    rt_task_sleep(1000000);
+#endif
   }

   ASSERT_EQ(getThreadAllocInfo().total_ops, 0UL);
@@ -182,6 +194,9 @@ TEST(Subscriber, multipleSubscribersMultipleTopics)
         all_done = false;
       }
     }
+#ifdef  __XENO__
+    rt_task_sleep(1000000);
+#endif
   }

   done = true;
@@ -260,6 +275,11 @@ TEST(Subscriber, multipleThreadsSameTopic)

 int main(int argc, char** argv)
 {
+#ifdef __XENO__
+  mlockall(MCL_CURRENT | MCL_FUTURE);
+  rt_task_shadow(NULL, "test_rt_subscriber", 0, 0);
+#endif
+
   ros::init(argc, argv, "test_rt_subscriber");
   testing::InitGoogleTest(&argc, argv);