eclipse-cyclonedds / cyclonedds-python

Other
59 stars 47 forks source link

DDS initialisation is not thread-safe (Topic) #214

Closed simon-at-spire closed 1 year ago

simon-at-spire commented 1 year ago

As discussed on Discord, if I instantiate 2 dds services on the same process at the same time (in my case that was my earlier attempts at testing), there is a race condition that makes the initialisation fail some time (~ 50%)

Minimal code that reproduce the issue: Spin up 2 threads that will start at the same time and initialise their own DomainParticipant/Topic/Data{Reader/Writer}:

from dataclasses import dataclass
import threading
from time import sleep

from cyclonedds.domain import DomainParticipant
from cyclonedds.pub import DataWriter
from cyclonedds.sub import DataReader
from cyclonedds.topic import Topic
import cyclonedds.idl as idl

@dataclass
class Msg(idl.IdlStruct, typename="Msg"):
    telemetry_type: str
    data: str

def create_writer(evt: threading.Event):
    dp = DomainParticipant()
    tp = Topic(dp, "mytopic", Msg)
    dw = DataWriter(dp, tp)

    while not evt.wait(1):
        dw.write(Msg("Hello", "world"))

def create_reader(evt: threading.Event):
    dp = DomainParticipant()
    tp = Topic(dp, "mytopic", Msg)
    dr = DataReader(dp, tp)

    while not evt.wait(1):
        for msg in dr.take(10):
            print(msg)

stop_evt = threading.Event()
r = threading.Thread(target=create_reader, args=(stop_evt,))
w = threading.Thread(target=create_writer, args=(stop_evt,))

r.start(), w.start()

sleep(10), stop_evt.set()

r.join(), w.join()

Might work:

root@simon:/code# python3 scripts/recreate_error.py 
Msg(telemetry_type='Hello', data='world')
Msg(telemetry_type='Hello', data='world')
Msg(telemetry_type='Hello', data='world')
Msg(telemetry_type='Hello', data='world')

or get this error:

root@simon:/code# python3 scripts/recreate_error.py 
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "scripts/recreate_error.py", line 19, in create_writer
    tp = Topic(dp, "mytopic", Msg)
  File "/usr/local/lib/python3.8/dist-packages/cyclonedds/topic.py", line 55, in __init__
    data_type.__idl__.fill_type_data()
  File "/usr/local/lib/python3.8/dist-packages/cyclonedds/idl/_main.py", line 318, in fill_type_data
    self._xt_data = XTBuilder.process_type(self.datatype)
  File "/usr/local/lib/python3.8/dist-packages/cyclonedds/idl/_xt_builder.py", line 219, in process_type
    no_dependency_hashes = [cls._resolve_typehash('', _type) for _type in no_dependency_types]
  File "/usr/local/lib/python3.8/dist-packages/cyclonedds/idl/_xt_builder.py", line 219, in <listcomp>
    no_dependency_hashes = [cls._resolve_typehash('', _type) for _type in no_dependency_types]
  File "/usr/local/lib/python3.8/dist-packages/cyclonedds/idl/_xt_builder.py", line 473, in _resolve_typehash
    hash = TypeHash(
  File "/usr/local/lib/python3.8/dist-packages/cyclonedds/idl/_xt_builder.py", line 120, in __init__
    minimal_type_object_serialized or self.minimal_type_object.serialize(endianness=Endianness.Little, use_version_2=True)[4:]
  File "/usr/local/lib/python3.8/dist-packages/cyclonedds/idl/__init__.py", line 161, in serialize
    return self.__idl__.serialize(self, buffer=buffer, endianness=endianness, use_version_2=use_version_2)
  File "/usr/local/lib/python3.8/dist-packages/cyclonedds/idl/_main.py", line 126, in serialize
    if self.version_support.SupportsBasic & self.version_support:
AttributeError: 'NoneType' object has no attribute 'SupportsBasic'
simon-at-spire commented 1 year ago

Platform: x86_64

$ pip list | grep cyclonedds
cyclonedds             0.10.2
$ python3 --version
Python 3.8.10
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.6 LTS
Release:        20.04
Codename:       focal
eboasson commented 1 year ago

You might want to give this a try. It doesn't win any beauty contest and it is done without sufficient knowledge about memory ordering guarantees in Python to feel confident about using double-checked locking ...

It works for me though 🙂

diff --git a/cyclonedds/idl/_main.py b/cyclonedds/idl/_main.py
index ffc004a..decd25d 100644
--- a/cyclonedds/idl/_main.py
+++ b/cyclonedds/idl/_main.py
@@ -16,6 +16,7 @@ from enum import EnumMeta, Enum
 from inspect import isclass
 from struct import unpack
 from hashlib import md5
+import threading

 from ._support import Buffer, Endianness, CdrKeyVmNamedJumpOp, KeyScanner, KeyScanResult, SerializeKind, DeserializeKind
 from ._type_helper import get_origin, get_args, Annotated
@@ -52,6 +53,8 @@ class IDLNamespaceScope:
 class IDL:
     def __init__(self, datatype):
         self._populated: bool = False
+        self._lock = threading.RLock()
+        self._populating: bool = False
         self.buffer: Buffer = Buffer()
         self.datatype: type = datatype
         self.keyless: bool = None
@@ -67,9 +70,9 @@ class IDL:
         self._xt_bytedata: Tuple[Optional[bytes], Optional[bytes]] = (None, None)
         self.member_ids: Dict[str, int] = None

-    def populate(self):
-        if not self._populated:
-            self._populated = True
+    def populate_locked(self):
+        if not self._populating:
+            self._populating = True
             annotations = get_idl_annotations(self.datatype)
             field_annotations = get_idl_field_annotations(self.datatype)

@@ -119,6 +122,13 @@ class IDL:
                 else:
                     self.v2_key_max_size = 17  # or bigger ;)

+    def populate(self):
+        with self._lock:
+            self.populate_locked()
+        # hopefully the memory order guarantees of Python are strong enough to make it
+        # impossible for another thread to observe a partially populated self
+        self._populated = True
+
     def serialize(self, object, use_version_2: bool = None, buffer=None, endianness=None) -> bytes:
         if not self._populated:
             self.populate()
simon-at-spire commented 1 year ago

Thank you, it looks like it would solve the issue, but we have been starting the services one after the other so far and it is working, so this was just a bug report. Do you want me to test the diff on our platform to validate it?

eboasson commented 1 year ago

it is not too much trouble to check it on your platform, then that would be great. I have only had a chance to try it on macOS and there it simply failed 100% of the time, so clearly it is less "interesting" than your platform!

simon-at-spire commented 1 year ago

Yep, I can confirm that it worked on my platform (10 success out of 10 runs) ! Thank you for taking the time to investigate!