eclipse-cyclonedds / cyclonedds-python

Other
54 stars 44 forks source link

DataReader Listener Thread Safety #208

Open clunietp opened 11 months ago

clunietp commented 11 months ago

The following code appears to trigger a deadlock when attempting to use a Listener on a DataReader.

I'm using CycloneDDS commit: d38e63ff8ed5123650beab9bef2b294b56628696: 28 Jun 23 and CycloneDDS Python commit: 7486f5504a988efbd4838b8ecb881d20e34a7644: 25 May 23 and installing the Python library via CYCLONEDDS_HOME=${CYCLONEDDS_HOME} python3 -m pip install /path/to/cyclonedds-python

from concurrent.futures import ThreadPoolExecutor, wait
from dataclasses import dataclass
from time import sleep
from cyclonedds.pub import DataWriter
from cyclonedds.sub import DataReader
from cyclonedds.core import Listener
from cyclonedds.topic import Topic
from cyclonedds.domain import DomainParticipant
from cyclonedds.idl.annotations import key as _key
from cyclonedds.idl import IdlStruct

@dataclass
class KeyedString(IdlStruct, typename="DDS.KeyedString"):
    "Represents a key-value string pair"
    key: str
    value: str
    _key("key")

class MyListener(Listener):
    def on_data_available(self, _):
        print("on_data_available")
        for i in range(10):
            print(i)
            sleep(0.1)

def test_listener():

    participant = DomainParticipant()
    topic = Topic(domain_participant=participant,
                topic_name="foo", data_type=KeyedString)
    writer = DataWriter(publisher_or_participant=participant, topic=topic)
    _ = DataReader(subscriber_or_participant=participant,
                topic=topic, listener=MyListener())

    def do_write():
        writer.write(
            sample=KeyedString("hello", "world"))

    with ThreadPoolExecutor(max_workers=2) as thread_pool:
        wait([thread_pool.submit(do_write) for _ in range(10)])

if __name__ == '__main__':
    test_listener()

Output:

on_data_available
0
(then it just hangs here until I kill the python process)

Setting the DataReader listener to None resolves the deadlock. Is this an issue with the listener (or elsewhere), or am I doing something wrong here?

eboasson commented 11 months ago

There is a lot here that I don't enough about (I don't know Python all that well) to give a definitive answer, but:

  1. Listeners in the Cyclone core are (still) somewhat finicky and can easily deadlock if you try to do "too much" in them. The reason is that I aimed to minimise the overhead by invoking them from fairly deep inside the stack. It is the only way to really make listeners have some expressive power that waitsets don't offer, but I have become convinced that it was the wrong choice for the default behaviour. (Several issues on cyclonedds to be found because of this.)
  2. Listeners in Python build upon those in the code (suprise!) and so has the same limitations, but I'm sure the Python GIL makes them somewhat worse.

I would have expected your listener to have at least printed 10 lines, even if it'd then hang because of something in Cyclone: inside the listener you're not doing anything with Cyclone, so there's no reason you'd deadlock in Cyclone. That suggests to me that it is instead deadlocking on the GIL. I'd guess it has something to do with unlocking the GIL on I/O, but here I'm really in what (for me) are uncharted waters.

If I am guessing right, then the only proper solution is to not call directly from the Cyclone core into a Python listener, but instead do it on a separate thread. Supporting calling application listeners on a separate thread also happens to be the solution to 1. There's even code floating around that kinda does that, e.g., https://github.com/eclipse-cyclonedds/cyclonedds/blob/master/src/tools/ddsperf/async_listener.c, but that's just a hack to solve a problem without having to solve the general case. I suppose it could be ported to the Python binding by someone who knows Python a lot better than I do.

I would personally go for addressing 1, but I don't have the time for that right now, unfortunately.

clunietp commented 11 months ago

@eboasson Thanks for taking a look. It is unfortunate that Python+Listeners, in their current state, are unsuitable for usage in a concurrent writer scenario. I'll see what I can do with a WaitSet instead.