eclipse-cyclonedds / cyclonedds-python

Other
54 stars 44 forks source link

[Question] About the usage of shared memory in docker #213

Open morkovka1337 opened 10 months ago

morkovka1337 commented 10 months ago

Hi,

I'm trying to setup environment and other thins to be able to transfer messages using shared memory. First of all, here is my Image class:

Image class ``` @dataclass class MainCameraImage(IdlStruct): img: array[uint8, 4147200] shape: array[uint64, 3] @classmethod def from_numpy(cls, img): assert len(img.shape) == 3 seq_img = img.tobytes() return MainCameraImage(seq_img, tuple(img.shape)) def to_numpy(self): shape = self.shape return np.frombuffer(self.img, dtype=np.uint8).reshape(shape) ```

Note that image array has a fixed size. Then I implement publisher and subscriber:

Publisher ``` import cv2 import time import numpy as np import fire from cyclonedds.core import Listener, Qos, Policy from cyclonedds.util import duration from cyclonedds.internal import dds_infinity from cyclonedds.domain import DomainParticipant from cyclonedds.topic import Topic from cyclonedds.pub import DataWriter from dds_data_structures import MainCameraImage, Image, CameraState,DeviceState def stream(dev=0, resize_to=None, output_topic_names=['camera_images', 'camera_images_vis', 'camera_images_clr'], data_type='MainCameraImage', cap_gstreamer=True): data_type = MainCameraImage participant = DomainParticipant(0) writers = [] # unknown, disabled, enabled, error qos = Qos( # livelines: automatic # deadline: dds_infinity # reliability: reliable # durability: volatile # history: keep last Policy.Liveliness.Automatic(lease_duration=duration(seconds=10)), Policy.Deadline(deadline=dds_infinity), Policy.Reliability.Reliable(max_blocking_time=duration(seconds=1)), Policy.Durability.Volatile, Policy.History.KeepLast(1) ) for topic_name in output_topic_names: topic_out = Topic(participant, topic_name, data_type) writers.append(DataWriter(participant, topic_out, qos)) frame = np.random.randint(0, 256, (1280, 1080, 3), dtype=np.uint8) print(frame.shape, frame.dtype) while True: time_start = time.time() ret, frame = 1, frame if not ret: print('Error: Unable to read frame') break if resize_to is not None: frame = cv2.resize(frame, resize_to[::-1]) for writer in writers: writer.write(data_type.from_numpy(frame)) loop_time = time.time() - time_start print(f'Streaming Camera with topic {output_topic_names[0]}, {loop_time=}', end='\r') if __name__ == '__main__': fire.Fire(stream) ```
Subscriber ``` import cv2 import time import numpy as np import fire from cyclonedds.core import Listener, Qos, Policy from cyclonedds.util import duration from cyclonedds.internal import dds_infinity from cyclonedds.domain import DomainParticipant from cyclonedds.topic import Topic from cyclonedds.sub import DataReader from dds_data_structures import MainCameraImage def receive(topic_name='camera_images'): data_type = MainCameraImage participant = DomainParticipant(0) qos = Qos( # livelines: automatic # deadline: dds_infinity # reliability: reliable # durability: volatile # history: keep last Policy.Liveliness.Automatic(lease_duration=duration(seconds=10)), Policy.Deadline(deadline=dds_infinity), Policy.Reliability.Reliable(max_blocking_time=duration(seconds=1)), Policy.Durability.Volatile, Policy.History.KeepLast(1) ) topic_in = Topic(participant, topic_name, data_type) reader = DataReader(participant, topic_in, qos) while True: time_start = time.time() img = reader.read() if len(img) == 0: continue img = img[0].to_numpy() loop_time = time.time() - time_start print(f'Received img {img.shape=}, {loop_time=}', end='\r') if __name__ == '__main__': fire.Fire(receive) ```

I've checked the requirements to the QoS for both publisher and subscriber here (as far as I can see they should be identical).

Next step I'm running iceoryx roudi in one docker container with --net=host --ipc=host -v /dev:/dev -v /tmp:/tmp. I also set the environment variable CYCLONEDDS_URI to the config file. I'm using default config file:

CYclonedds uri config ``` default true info ```

Finally I run above scripts in the docker containers with the same net and ipc options. They work fine and subscriber receives messages from publisher. What I'm trying to understand:

  1. How to make sure that messages are being sent via shared memory?
  2. ipcs -m shows only firefox and vs code processes (I've checked pids via `ps -p ). Should my subscriber and publisher be there if everything works as expected?
morkovka1337 commented 10 months ago

I've changed a bit cycloneconfig to enable tracing. When I start script, it reports its iceoryx address: Publisher:

1692100063.009583 [0]    python3: Current process name for iceoryx is iceoryx_rt_71_1692100063009382776
1692100063.010857 [0]    python3: My iceoryx address: 16/[a8:a1:59:71:3e:95:00:00:00:00:00:00:00:00:00:00]:0

Subscriber:

1692100056.575285 [0]    python3: Current process name for iceoryx is iceoryx_rt_74_1692100056575087207
1692100056.576568 [0]    python3: My iceoryx address: 16/[a8:a1:59:71:3e:95:00:00:00:00:00:00:00:00:00:00]:0

When I interrupt them, iceoryx reports the following:

unified_pipeline_dev-roudi-1  | 2023-08-15 14:48:30.929 [Warning]: Application iceoryx_rt_74_1692100056575087207 not responding (last response 1534 milliseconds ago) --> removing it
unified_pipeline_dev-roudi-1  | 2023-08-15 14:48:31.630 [Warning]: Application iceoryx_rt_71_1692100063009382776 not responding (last response 1502 milliseconds ago) --> removing it

Does it mean that everything works as expected? If yes, why don't I see my processes in ipcs?

eboasson commented 10 months ago

From a look at your code I think it should be happy to use Iceoryx. I should try it, but ...

In Unix/Linux there are multiple ways of creating shared memory, and System V shared memory (which ipcs -m reports on) is only one of them, and it is actually the less Unix-y way of doing it. The other tricks involve mmap:

Maybe some other trick exists, a lot has happened in recent years and I've not tried to follow it all, but these are the classic ones. I'm pretty sure Iceoryx uses the third.

You can see it if you look at the VM maps, either by cating the right file in /proc or by using pmap. If you do that, you'll find the memory. That much is obvious from the trace and the roudi output.

To find out if it really uses Iceoryx for this data one option is to dig deeper in the Cyclone traces, it will tell you in the discovery output which addresses it uses and there you can find whether it uses Iceoryx. Or, you can use the Iceoryx introspection tool, it will show you what's going on there.

Finally, in a case like this where it is basically everything-via-iceoryx or everything-over-the-network, looking at network statistics will also work. netstat or something like top should do fine. Indeed, even top -H (which shows individual threads) is often already sufficient: just look for threads named recv/recvUC/recvMC. If none of those take substantial time while you're pushing a lot of large samples through, then it is not sending them over the network.

morkovka1337 commented 10 months ago

Thank you for your such detailed reply!

Suppose the PID of one of my processes is 37288 (publisher or subscriber, I guess, does not matter). I've checked pmap -x 37288, and it outputs: 37288: python3 /root/vloginov/image_ping.py

I've also checked top -H, for the recvUC it shows TIME+ like 1:49 whereas TIME+ for python process is like 11:36. Is it the ratio you are writing about?

Finally, in the netstat -p I do not see any of my processes. I think, this is the strongest signal I'm not using network, is it true?

However, the thing that concerns me the most is that when I stop the iceoryx and rerun both scripts without CYCLONEDDS_URI variable, the loop time is the same, also, I do not see my scripts in the netstat -p. Am I doing something wrong?

morkovka1337 commented 9 months ago

bump this

eboasson commented 8 months ago

Sorry that you had to bump it ...

1:49 for recvUC sounds like it might not be using Iceoryx, that's quite a bit of CPU time. Python in is generally good at consuming CPU cycles, so that the python process taking a fair bit of time is probably simply application code executing.

However, the thing that concerns me the most is that when I stop the iceoryx and rerun both scripts without CYCLONEDDS_URI variable, the loop time is the same, also, I do not see my scripts in the netstat -p. Am I doing something wrong?

That's suspicious. A lot of the DDS work happens in the background, so if the Python application is simply taking a lot of time to process the data, it is possible that its execution time is almost independent of the work done in DDS regardless of whether it uses Iceoryx or the loopback interface.

I don't know exactly what Linux outputs with pmap -x (I can still remember the Solaris output, but that's of no use to you!) and I don't know exactly what Linux does in netstat -p and trying it out on macOS is quite meaningless. I'll try to get help some help 🙂

Splinter1984 commented 8 months ago

hi @morkovka1337! i have something to say. at first it's good to see iox-roudi debug output, it can tell to us more then just info log. to enable it you can use next command.

iox-roudi -l debug

the output will be contain information about apps registration like this

Reserving 22740232 bytes in the shared memory [iceoryx_mgmt]
[ Reserving shared memory successful ] 
2023-10-16 14:47:03.299 [ Debug ]: Registered memory segment 0x7f6164ed9000 with size 22740232 to id 1
Reserving 149264720 bytes in the shared memory [root]
[ Reserving shared memory successful ] 
2023-10-16 14:47:03.350 [ Debug ]: Roudi registered payload data segment 0x7f615c07f000 with size 149264720 to id 2
RouDi is ready for clients

and after you will start your pub/sub example it will provide to us next information

2023-10-16 14:48:38.859 [ Debug ]: Registered new application iceoryx_rt_140_1697467718858636401
2023-10-16 14:48:38.860 [ Debug ]: Created new ConditionVariable for application iceoryx_rt_140_1697467718858636401
2023-10-16 14:48:39.619 [ Debug ]: Registered new application iceoryx_rt_139_1697467719618867611
2023-10-16 14:48:39.620 [ Debug ]: Created new ConditionVariable for application iceoryx_rt_139_1697467719618867611

for right now we know that roudi works and also reserve segments in shm. let's check shm^1 usage with next command.

lsof -r1 /dev/shm/ | grep python3

as you can see there is roudi reserved segments that used by python3 processes with 73782 and 73811 PIDs.

python3   73782         root  mem    REG   0,25 149264720 3291 /dev/shm/root
python3   73782         root  mem    REG   0,25  22740232 3290 /dev/shm/iceoryx_mgmt
python3   73782         root    6u   REG   0,25  22740232 3290 /dev/shm/iceoryx_mgmt
python3   73782         root    7u   REG   0,25 149264720 3291 /dev/shm/root
python3   73811         root  mem    REG   0,25 149264720 3291 /dev/shm/root
python3   73811         root  mem    REG   0,25  22740232 3290 /dev/shm/iceoryx_mgmt
python3   73811         root    6u   REG   0,25  22740232 3290 /dev/shm/iceoryx_mgmt
python3   73811         root    7u   REG   0,25 149264720 3291 /dev/shm/root

also lets check next command.

lsof -r1 /dev/shm/ | grep iox-roudi 

as you can see there is another reserved segment that used by iox-roudi processes.

iox-roudi 73617         root  mem    REG   0,25 149264720 3291 /dev/shm/root
iox-roudi 73617         root  mem    REG   0,25  22740232 3290 /dev/shm/iceoryx_mgmt
iox-roudi 73617         root    4u   REG   0,25  22740232 3290 /dev/shm/iceoryx_mgmt
iox-roudi 73617         root    5u   REG   0,25 149264720 3291 /dev/shm/root

but wait let's check which processes associated with PIDs that used shared memory.

pmap -x 73811
73811:   python3 213-docker/s_sub.py
pmap -x 73782
73782:   python3 213-docker/s_pub.py

from here we already can determine that we are using iceoryx for communication. cause if you will repeat all this steps with disabled shared memory in cyclonedds.xml config file, you will not see the same output.


about recvUC. i am more interested in another section that TIME+ in output of top -H command (small messages at a not-so-high rate doesn't really take much CPU). let's compare section of python processes SHM with and without iceoryx.

~$ top -H| grep python3 # with shm   -> | SHM |
  73782 root      20   0 1841604  96988  39840 R  99.9   0.3   2:40.20 python3                                                                                                                              
  73811 root      20   0 1832428 103272  39360 R  94.4   0.3   2:39.77 python3                                                                                                                              
  73811 root      20   0 1832428 103524  39360 R  99.9   0.3   2:42.80 python3                                                                                                                              
  73782 root      20   0 1837600  91696  39840 R  99.3   0.3   2:43.21 python3                                                                                                                                                                                                                                                           

~$ top -H| grep python3 # without shm ->| SHM |
  74350 root      20   0 1591340  96548  39680 R  99.9   0.3   0:13.32 python3                                                                                                                              
  74377 root      20   0 1582120 102960  39360 R  94.4   0.3   0:10.94 python3                                                                                                                              
  74377 root      20   0 1582120 101448  39360 R  99.9   0.3   0:13.97 python3                                                                                                                              
  74350 root      20   0 1587292  92356  39680 R  99.7   0.3   0:16.33 python3     
morkovka1337 commented 8 months ago

Thank you for such a detailed analysis of my sutiation! I've checked previously the output of the roudi and saw the lines above about regestering memory segment. I will also do the rest steps and write here about my results.

morkovka1337 commented 8 months ago

@Splinter1984 I've checked all the proposed steps and can fully reproduce your output.

BTW, regrarding the situation that with and without roudi loop time of send \receive is the same. I've tried to run containers without setting the CYCLONEDDS_URI variable. The result suprisingly became order of magnitude faster (loop time ~0.003 sec vs ~0.05 sec when using CYCLONEDDS_URI var. Note that neither iox-roudi, nor lsof does not register usage of shared memory by python3 process. However, top -H| grep python3 shows relatively the same result of shm occupation (~48k & 17k for writer and reader, respectively):

 662001 root      20   0 1833824 113572  47716 R  99.9   0.1   2:11.01 python3                                                                
 662064 root      20   0 1569168  76640  17316 S  18.8   0.1   0:25.96 python3                                                                
 662001 root      20   0 1833824 113592  47716 R  99.7   0.1   2:14.02 python3                                                                
 662064 root      20   0 1569168  75848  17316 R  21.2   0.1   0:26.60 python3                                                                
 662001 root      20   0 1833824 113524  47644 R  99.9   0.1   2:17.03 python3                                                                
 662064 root      20   0 1569168  79760  17268 R  20.6   0.1   0:27.22 python3 
morkovka1337 commented 8 months ago

I've realised that previously I was checking wrong loop time. What I was checking actually was serialization time over network and shared memory. Instead, I've modified my reader, so the function looks the following:

def receive(topic_name='test_cyclonedds_shm'):
    data_type = MainCameraImage
    participant = DomainParticipant(0)

    qos = Qos(
        # livelines: automatic
        # deadline: dds_infinity
        # reliability: reliable
        # durability: volatile
        # history: keep last
        Policy.Liveliness.Automatic(lease_duration=duration(seconds=10)),
        Policy.Deadline(deadline=dds_infinity),
        Policy.Reliability.Reliable(max_blocking_time=duration(seconds=1)),
        Policy.Durability.Volatile,
        Policy.History.KeepLast(1)
    )
    # qos = Qos(Policy.EntityName("test reader"))

    topic_in = Topic(participant, topic_name, data_type)
    reader = DataReader(participant, topic_in, qos)
    n = 1_000
    avg_time = 0
    i = 0
    img = reader.take_one(timeout=duration(seconds=1))
    while i < n:
        img = reader.take_one(timeout=duration(seconds=1))
        diff_time = datetime.datetime.now() - datetime.datetime.fromtimestamp(int(img.sample_info.source_timestamp // 10**9))
        avg_time += diff_time.microseconds
        i += 1
    print(f'avg transmit time {avg_time / n / 1000} ms')

So I run it with CYCLONEDDS_URI=.... python3 reader.py and just python3 reader.py. In both cases transmssion time is about 500 ms. Afaik sample.source_info.source_timestamp is the sender timestamp, so subtracting current time from source timestamp should give me send time. Am I measuring correctly now?

Splinter1984 commented 7 months ago

hi @morkovka1337. if you want to measure something that looks like transfer process time, you need to create a Listener with on_data_available event registration, and register the time of data_available trigger. then you can compare this time with source_timestamp. this number will be more representative.

morkovka1337 commented 7 months ago

@Splinter1984 thank you, I will try it