ros2 / rosbag2

Apache License 2.0
283 stars 251 forks source link

Deserialize the cdr message #473

Closed anacsousa1 closed 4 years ago

anacsousa1 commented 4 years ago

Hello, thank you for the repository and for the instructions. I have been using the rosbag2 play, record, and info successfully.

However, I need to save data in another format (along with the bag files). And I have not found a python-API for data deserialization.

I innocently thought I could create a python API to open the CDR file and read it as messages. What I have done so far was Opening and closing the bag file; Finding if a specific topic is in the database; Getting all messages in the topic; Getting all topics names; Getting all message types; Getting message types for particular topics.

So, with that, I can get each message sent. However, I am at the moment of actual deserialization, and I couldn't fully understand how the cdr file is constructed.

When I print a message, I get, for example: b'\x00\x01\x00\x00\x0f\x00\x00\x00Fake ergometer\x00\x00\x00\x00\x00\x00\xcd\xcc\xcc\xcc\xcc\xcc<@\xfd\xff\xff\xff\xff\xffA@'

I know that the first field is a string with the value "Fake ergometer," so I know I am on the right path. But how do I go from there? What does the header mean, for example? \x00\x01\x00\x00\x0f\x00\x00\x00

I have been trying to read the CDR documentation (from OMG). Still, it is not direct, and sometimes it mentions different versions of GIOP, so I get confused.

I appreciate any help.

Bellow are the functions for the python-API that I created:

import sqlite3

def connect(sqlite_file):
    """ Make connection to an SQLite database file. """
    conn = sqlite3.connect(sqlite_file)
    c = conn.cursor()
    return conn, c

def close(conn):
    """ Close connection to the database. """
    conn.close()

def countRows(cursor, table_name, print_out=False):
    """ Returns the total number of rows in the database. """
    cursor.execute('SELECT COUNT(*) FROM {}'.format(table_name))
    count = cursor.fetchall()
    if print_out:
        print('\nTotal rows: {}'.format(count[0][0]))
    return count[0][0]

def getHeaders(cursor, table_name, print_out=False):
    """ Returns a list of tuples with column informations:
    (id, name, type, notnull, default_value, primary_key)
    """
    # Get headers from table "table_name"
    cursor.execute('PRAGMA TABLE_INFO({})'.format(table_name))
    info = cursor.fetchall()
    if print_out:
        print("\nColumn Info:\nID, Name, Type, NotNull, DefaultVal, PrimaryKey")
        for col in info:
            print(col)
    return info

def getAllElements(cursor, table_name, print_out=False):
    """ Returns a dictionary with all elements of the table database.
    """
    # Get elements from table "table_name"
    cursor.execute('SELECT * from({})'.format(table_name))
    records = cursor.fetchall()
    if print_out:
        print("\nAll elements:")
        for row in records:
            print(row)
    return records

def isTopic(cursor, topic_name, print_out=False):
    """ Returns topic_name header if it exists. If it doesn't, returns empty.
        It returns the last topic found with this name.
    """
    boolIsTopic = False
    topicFound = []

    # Get all records for 'topics'
    records = getAllElements(cursor, 'topics', print_out=False)

    # Look for specific 'topic_name' in 'records'
    for row in records:
        if(row[1] == topic_name): # 1 is 'name' TODO
            boolIsTopic = True
            topicFound = row
    if print_out:
        if boolIsTopic:
             # 1 is 'name', 0 is 'id' TODO
            print('\nTopic named', topicFound[1], ' exists at id ', topicFound[0] ,'\n')
        else:
            print('\nTopic', topic_name ,'could not be found. \n')

    return topicFound

def getAllMessagesInTopic(cursor, topic_name, print_out=False):
    """ Returns all timestamps and messages at that topic.
    There is no deserialization for the BLOB data.
    """
    count = 0
    timestamps = []
    messages = []

    # Find if topic exists and its id
    topicFound = isTopic(cursor, topic_name, print_out=False)

    # If not find return empty
    if not topicFound:
        print('Topic', topic_name ,'could not be found. \n')
    else:
        records = getAllElements(cursor, 'messages', print_out=False)

        # Look for message with the same id from the topic
        for row in records:
            if row[1] == topicFound[0]:     # 1 and 0 is 'topic_id' TODO
                count = count + 1           # count messages for this topic
                timestamps.append(row[2])   # 2 is for timestamp TODO
                messages.append(row[3])     # 3 is for all messages

        # Print
        if print_out:
            print('\nThere are ', count, 'messages in ', topicFound[1])

    return timestamps, messages

def getAllTopicsNames(cursor, print_out=False):
    """ Returns all topics names.
    """
    topicNames = []
    # Get all records for 'topics'
    records = getAllElements(cursor, 'topics', print_out=False)

    # Save all topics names
    for row in records:
        topicNames.append(row[1])  # 1 is for topic name TODO
    if print_out:
        print('\nTopics names are:')
        print(topicNames)

    return topicNames

def getAllMsgsTypes(cursor, print_out=False):
    """ Returns all messages types.
    """
    msgsTypes = []
    # Get all records for 'topics'
    records = getAllElements(cursor, 'topics', print_out=False)

    # Save all message types
    for row in records:
        msgsTypes.append(row[2])  # 2 is for message type TODO
    if print_out:
        print('\nMessages types are:')
        print(msgsTypes)

    return msgsTypes

def getMsgType(cursor, topic_name, print_out=False):
    """ Returns the message type of that specific topic.
    """
    msg_type = []
    # Get all topics names and all message types
    topic_names = getAllTopicsNames(cursor, print_out=False)
    msgs_types = getAllMsgsTypes(cursor, print_out=False)

    # look for topic at the topic_names list, and find its index
    for index, element in enumerate(topic_names):
        if element == topic_name:
            msg_type = msgs_types[index]
    if print_out:
        print('\nMessage type in', topic_name, 'is', msg_type)

    return msg_type
emersonknapp commented 4 years ago

@mabelzhang is this issue related to https://github.com/ros2/rosbag2/issues/476 ? Either way, do you have insight on how this functionality can be accomplished via the python API?

mabelzhang commented 4 years ago

I have very limited understanding of CDR (zero understanding) and message deserialization (a little more than zero), so I will try to answer what I can and hope something hits.

Looks like this implementation deals with sqlite3 directly. If getAllMessagesInTopic() returns the same type as data in #308 here https://github.com/ros2/rosbag2/pull/308/files#diff-4f92c776f8e6c0d824f1b53120785d3dR45 (basically the std::shared_ptr<rcutils_uint8_array_t> serialized_data field of rosbag2_storage::SerializedBagMessage returned from Reader.read_next()), then you can use rosidl_runtime_py.utilities.get_message() and rclpy.serialization.deserialize_message() as in the linked file to deserialize. In that case, yes this is what #476 wants to add to make deserialization friendlier.

I think that's what you're asking - deserializing the ROS messages, as opposed to CDR? If that's not what you're asking, and you're asking about CDR, then I have no clue.

emersonknapp commented 4 years ago

From my understanding the goal here is "read data out of rosbag into an understandable format so that it can be saved out in a different format" - I think currently this is talking about CDR because that's the data that was discovered to be available, not because it's a desired format to work with. I'm assuming that if there were a Python API available to read out the bag as deserialized Python message objects then that would solve the problem. @anacsousa1 can you confirm if this is what you're trying to accomplish?

emersonknapp commented 4 years ago

We might have rmw_deserialize methods in rclpy, need to double check

mabelzhang commented 4 years ago

rmw_deserialize is exactly what's done in rclpy.serialization.deserialize_message() https://github.com/ros2/rclpy/pull/495/files#diff-80dda03110b5be606b823d6b0558b5c2R4775

Karsten1987 commented 4 years ago

@anacsousa1 it's great that you've come already thus far and the link Mabel posted should do exactly what you're missing, namely taking a buffer with serialized CDR data and convert it into a python ROS2 message. Please don't go down that rabbit hole of implementing a CDR parser in python. ROS2 has code in place to do that for you ;-)

anacsousa1 commented 4 years ago

Hey! Thank you all for the responses! I won't be able to do any further tests until the beginning of the next week. But I wanted to make sure I understood the idea: Using the rmw_deserialize (in rclpy), I may create a node that opens a bag file, and then deserialize that? Is it possible to give me a basic example of that?

I really did not intend to create the API, if there is an easier path, I definitely prefer that!

mabelzhang commented 4 years ago

Use rosidl_runtime_py.utilities.get_message() and rclpy.serialization.deserialize_message(). They wrap around rmw_deserialize and will save you a lot of time. See example usage here https://github.com/ros2/rosbag2/pull/308/files#diff-4f92c776f8e6c0d824f1b53120785d3dR46

If you want to try another API, you can try the branch in this PR https://github.com/ros2/rosbag2/pull/308 . It is almost ready to merge, I just need to add one more feature and look into a compiler warning on Windows. Everything is working.

anacsousa1 commented 4 years ago

Hey @mabelzhang , thanks a lot for the answer. I believe I am too noob in this. If you could help me with more "baby steps" I would really appreciate it. I have foxy distribution installed in Ubuntu 20.04, and then I installed rosbag2 as Debian package as described at the readme file. I am able to record and play data successfully.

But I am having trouble using the functions you suggested. A few things I tried:

  1. Cloning the branch https://github.com/mabelzhang/rosbag2/tree/rosbag2_py: git clone https://github.com/mabelzhang/rosbag2/tree/rosbag2_py. It fails

    fatal: repository '[https://github.com/mabelzhang/rosbag2/tree/rosbag2_py/](https://github.com/mabelzhang/rosbag2/tree/rosbag2_py/)' not found
  2. Cloning the master, and building it from source (so I could use the cpp examples):

    mkdir -p ~/rosbag_ws/src
    cd ~/rosbag_ws/src
    git clone https://github.com/ros2/rosbag2.git
    colcon build

    If fails:

    CMake Error at CMakeLists.txt:109 (find_package):
      By not providing "Findtest_msgs.cmake" in CMAKE_MODULE_PATH this project
      has asked CMake to find a package configuration file provided by
      "test_msgs", but CMake did not find one.
    
      Could not find a package configuration file provided by "test_msgs" with
      any of the following names:
    
        test_msgsConfig.cmake
        test_msgs-config.cmake
    
      Add the installation prefix of "test_msgs" to CMAKE_PREFIX_PATH or set
      "test_msgs_DIR" to a directory containing one of the above files.  If
      "test_msgs" provides a separate development package or SDK, be sure it has
      been installed.
    
    ---
    Failed   <<< rosbag2_cpp [1.40s, exited with code 1]

    It also fails for colcon build --packages-select rosbag2_cpp

  3. I also tried to add the test_msgs and rcl_interfaces as suggested here. But it also fails:

    CMake Error at CMakeLists.txt:21 (find_package):
      By not providing "Findtest_interface_files.cmake" in CMAKE_MODULE_PATH this
      project has asked CMake to find a package configuration file provided by
      "test_interface_files", but CMake did not find one.
    
      Could not find a package configuration file provided by
      "test_interface_files" with any of the following names:
    
        test_interface_filesConfig.cmake
        test_interface_files-config.cmake
    
      Add the installation prefix of "test_interface_files" to CMAKE_PREFIX_PATH
      or set "test_interface_files_DIR" to a directory containing one of the
      above files.  If "test_interface_files" provides a separate development
      package or SDK, be sure it has been installed.

As you can see, I am totally lost...

Karsten1987 commented 4 years ago

all you'd need to do is to install the test msgs package - we have binary packages for this, so you don't need to compile it yourself. You could either use rosdep install see an example here or simply install the test_msgs package needed for rosbag2 (sudo apt-get install ros-foxy-test-msgs).

anacsousa1 commented 4 years ago

Thanks a lot for the answer, I installed the test msgs and build it successfully!

sudo apt-get install ros-foxy-test-msgs
colcon build --packages-select rosbag2_cpp rosbag2_test_common shared_queues_vendor sqlite3_vendor rosbag2_storage rosbag2_storage_default_plugins

I expected to find an executable at the CMake, as it does not have one (so it is not just running ros2 run rosbag2_tests test), how do I run the code?

I thank you for the patience with this type of questions.

mabelzhang commented 4 years ago

Cloning the branch https://github.com/mabelzhang/rosbag2/tree/rosbag2_py

Clone from the root (the URL from the green button on the repo page):

git clone https://github.com/mabelzhang/rosbag2.git
git checkout rosbag2_py

It also fails for colcon build --packages-select rosbag2_cpp

To simplify the packages you pass to --packages-select, use --packages-up-to to build all the dependencies of a package, like --packages-up-to rosbag2_py.

I expected to find an executable at the CMake

Which file are you trying to run? If you're looking to run the test Python scripts in rosbag2_py, you can do colcon test --packages-select rosbag2_py, and the log files will be in log/test_*/rosbag2_py/stdout.log. Or you can pass in --event-handlers console_direct+ to output verbosely to the terminal.

For your use case, another way to try out the deserialization is to copy those two lines get_message and deserialize_message from test_sequential_reader.py, with the relevant import lines, to your code and give it a try.

anacsousa1 commented 4 years ago

@mabelzhang, thanks a lot!

I used the get_message and deserialize_message as the example you showed me, and it is working perfectly with the functions I have already created:

import rosbag_api as bag
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message

bag_file = 'rosbag2_2020_07_13-18_13_16/rosbag2_2020_07_13-18_13_16_0.db3'
topic_name = '/control/stim'

### connect to the database
conn, c = bag.connect(bag_file)

### get all topics names and types
topic_names = bag.getAllTopicsNames(c, print_out=False)
topic_types = bag.getAllMsgsTypes(c, print_out=False)

# Create a map for quicker lookup
type_map = {topic_names[i]:topic_types[i] for i in range(len(topic_types))}

### get all timestamps and all messages
t, msgs = bag.getAllMessagesInTopic(c, topic_name, print_out=False)

### >>> deserialization
msg_type = get_message(type_map[topic_name])
x = deserialize_message(msgs[1], msg_type)
print(x)

### close connection to the database
bag.close(conn)

Side note: errors related to pybind11:

I cloned the repository, checkout to your branch, and build the code (thank you a lot for the --packages-up-to tip!). Then, I successfully tested the rosbag2_py and opened the log.

CMake Error at CMakeLists.txt:20 (find_package):
  By not providing "Findpybind11.cmake" in CMAKE_MODULE_PATH this project has
  asked CMake to find a package configuration file provided by "pybind11",
  but CMake did not find one.

I tried to fix with sudo apt install python3-pip and pip3 install pybind11 as mentioned here, but I was unable to install it through apt (E: Unable to locate package python-pybind11) and pip3 already installed it (Requirement already satisfied: pybind11 in /home/ana/.local/lib/python3.8/site-packages (2.5.0)). So, I had to install it from source as suggested here:

# Some prerequisites (but not all of them)
apt-get install cmake
pip3 install pytest

# Clone, build and install 
git clone https://github.com/pybind/pybind11.git 
cd pybind11 
mkdir build 
cd build 
cmake .. 
sudo make install

Issue closed

Anyway, now, I am able to get information from messages perfectly. =]

mabelzhang commented 4 years ago

Thank you for the feedback! Glad it worked for you!

We added pybind11_vendor to ros2.repos a few days ago, as it is required for rosbag2_py. People should be able to check out pybind11_vendor, and building it will install pybind.

justinberi commented 3 years ago

For anyone landing here and using the foxy branch that doesn't have the rosbag2_py. Here is an example using sqlite3

import sqlite3
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message

import matplotlib.pyplot as plt

class BagFileParser():
    def __init__(self, bag_file):
        self.conn = sqlite3.connect(bag_file)
        self.cursor = self.conn.cursor()

        ## create a message type map
        topics_data = self.cursor.execute("SELECT id, name, type FROM topics").fetchall()
        self.topic_type = {name_of:type_of for id_of,name_of,type_of in topics_data}
        self.topic_id = {name_of:id_of for id_of,name_of,type_of in topics_data}
        self.topic_msg_message = {name_of:get_message(type_of) for id_of,name_of,type_of in topics_data}

    def __del__(self):
        self.conn.close()

    # Return [(timestamp0, message0), (timestamp1, message1), ...]
    def get_messages(self, topic_name):

        topic_id = self.topic_id[topic_name]
        # Get from the db
        rows = self.cursor.execute("SELECT timestamp, data FROM messages WHERE topic_id = {}".format(topic_id)).fetchall()
        # Deserialise all and timestamp them
        return [ (timestamp,deserialize_message(data, self.topic_msg_message[topic_name])) for timestamp,data in rows]

if __name__ == "__main__":

        bag_file = '/workspaces/foxy_leg_ws/rosbag2_2021_04_28-04_42_07/rosbag2_2021_04_28-04_42_07_0.db3'

        parser = BagFileParser(bag_file)

        trajectory = parser.get_messages("/joint_trajectory")[0][1] 
        p_des_1 = [trajectory.points[i].positions[0] for i in range(len(trajectory.points))]
        t_des = [trajectory.points[i].time_from_start.sec + trajectory.points[i].time_from_start.nanosec*1e-9 for i in range(len(trajectory.points))]

        actual = parser.get_messages("/joint_states")

        plt.plot(t_des, p_des_1)

        plt.show()
lowbees commented 3 years ago

How to deserialize cdr message without ros? (C++)

cmd23333 commented 2 years ago

Such a method above were available in foxy as eloquent don't have rclpy.serialization so can I do it with ros2 eloquent?

reshmadhotre commented 2 years ago

In Ros2, Is there an generic way of deserialising the messages as in ROS1, like ros topic -b bagfile -p /topic I am looking for generic way to convert ROS2 bag to csv

ksatyaki commented 2 years ago

I get a "string data is not null-terminated" on galactic when using the SequentialReader when deserializing messages.

scch1092 commented 2 years ago

For anyone landing here and using the foxy branch that doesn't have the rosbag2_py. Here is an example using sqlite3

import sqlite3
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message
...

is there also the possibility to use own defined messages?

weidezhang commented 11 months ago

For customized module, I get around it with using importlib directly. please see the following code:

while reader.has_next():
    topic, msg, t = reader.read_next()
    msg_type_str = "moduleprefix1.moduleprefix2." + type_map[topic]
    module_str, class_str = msg_type_str.rsplit(".", 1)
    module = importlib.import_module(module_str)
    msg_type = getattr(module, class_str)
    msg = deserialize_message(msg, msg_type)
    print(f'msg is {msg}')

For anyone landing here and using the foxy branch that doesn't have the rosbag2_py. Here is an example using sqlite3

import sqlite3
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message
...

is there also the possibility to use own defined messages?

ksatyaki commented 11 months ago

Is this obsolete with humble or rolling?

nayak2358 commented 5 months ago

To add to the code of @anacsousa1 , here is the python script for anyone interested to get the values of each attribute of a topic in a CSV file:

import csv
import rosbag_api as bag
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message

bag_file = 'path/to/bag/file.db3'
topic_name = '/name_of_the_topic'
csv_file = '/output/deserialized/file.csv'

### connect to the database
conn, c = bag.connect(bag_file)

### get all topics names and types
topic_names = bag.getAllTopicsNames(c, print_out=False)
topic_types = bag.getAllMsgsTypes(c, print_out=False)

### Create a map for quicker lookup
type_map = {topic_names[i]:topic_types[i] for i in range(len(topic_types))}

### get all timestamps and all messages
t, msgs = bag.getAllMessagesInTopic(c, topic_name, print_out=False)

### To extract the attributes
def extract_fields(message):
    extracted_fields = []

    def recursive_extract(msg, parent_field=''):
        slots_without_underscores = [field[1:] if field.startswith('_') else field for field in msg.__slots__]
        for attr_name in slots_without_underscores:
            attr_value = getattr(msg, attr_name)
            if hasattr(attr_value, '__slots__'):
                recursive_extract(attr_value, f"{parent_field}.{attr_name}" if parent_field else attr_name)
            else:
                extracted_fields.append(f"{parent_field}.{attr_name}" if parent_field else attr_name)

    recursive_extract(message)

    return extracted_fields

### Open CSV file for writing
with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)

    ### Write headers
    ### Extract message structure to dynamically determine headers
    msg_type = get_message(type_map[topic_name])
    sample_msg = deserialize_message(msgs[0], msg_type)
    timestamp = sample_msg.header.stamp.sec + sample_msg.header.stamp.nanosec / 1e9

    headers = extract_fields(sample_msg)
    writer.writerow(['Time'] + headers)

    ### Debugging
    print(sample_msg)
    print(sample_msg.header.stamp.sec)
    print(eval(f"sample_msg.{headers[0]}"))

    ### Deserialize messages and write to CSV
    for msg in msgs:
        deserialized_msg = deserialize_message(msg, msg_type)
        timestamp = deserialized_msg.header.stamp.sec + deserialized_msg.header.stamp.nanosec / 1e9
        row = [timestamp] + [eval(f"sample_msg.{field}") for field in headers]
        writer.writerow(row)

### Close connection to the database
bag.close(conn)

print("Deserialized messages saved to", csv_file)