eclipse-cyclonedds / cyclonedds-python

Other
54 stars 44 forks source link

OOM when using a QoS with `max_instances` larger than a small value (or the default: -1) #232

Closed clunietp closed 4 months ago

clunietp commented 4 months ago

The following reproducer results in an OOM on x64-linux, Debian 11.8. This is running inside a Docker container. Host machine has 32GB RAM.

I'm using CycloneDDS commit: 2cfdc1b7cfa1c3d9b37b09109113b1f5d3b01802: 05 Feb 24 and CycloneDDS Python commit: 637cfe583a5078af41c481c98daf7474fdb2a786: 06 Sep 23 and installing the Python library via CYCLONEDDS_HOME=${CYCLONEDDS_HOME} python3 -m pip install /path/to/cyclonedds-python

from dataclasses import dataclass
from cyclonedds.pub import DataWriter
from cyclonedds.sub import DataReader
from cyclonedds.topic import Topic
from cyclonedds.domain import DomainParticipant
from cyclonedds.idl.annotations import key as _key
from cyclonedds.idl import IdlStruct
from cyclonedds.qos import Policy, Qos

@dataclass
class KeyedString(IdlStruct, typename="DDS.KeyedString"):
    "Represents a key-value string pair"
    key: str
    value: str
    _key("key")

def test_qos(qos_to_test: Qos):

    N_ITERATIONS = 50

    print(f"Testing QoS: {str(qos_to_test)}")

    # create DDS entities
    participant = DomainParticipant()
    topic = Topic(domain_participant=participant,
                topic_name="foo", data_type=KeyedString)
    writer = DataWriter(publisher_or_participant=participant,
                        topic=topic, qos=qos_to_test)
    _ = DataReader(subscriber_or_participant=participant,
                topic=topic, qos=qos_to_test)

    # test N samples
    sample = KeyedString(key="Hello", value="World")
    for i in range(N_ITERATIONS):
        print(f"Sample {i}")
        sample.value = str(i)
        writer.write(sample=sample)
        writer.dispose(sample=sample)

    # test N instances
    for i in range(N_ITERATIONS):
        print(f"Instance {i}")
        instance = KeyedString(key=str(i), value=str(i))
        writer.write(sample=instance)
        writer.dispose(sample=instance)

if __name__ == '__main__':

    # test using a QoS with lower instance limits
    test_qos(qos_to_test=Qos(Policy.ResourceLimits(max_instances=10)))

    # test using a QoS with higher instance limits
    test_qos(qos_to_test=Qos(Policy.ResourceLimits(max_instances=50)))

Output:

Testing QoS: Qos(Policy.ResourceLimits(max_samples=-1, max_instances=10, max_samples_per_instance=-1))
Sample 0
Sample 1
Sample 2
...
Sample 48
Sample 49
Instance 0
Instance 1
Instance 2
...
Instance 48
Instance 49
Testing QoS: Qos(Policy.ResourceLimits(max_samples=-1, max_instances=50, max_samples_per_instance=-1))
Sample 0
Sample 1
Sample 2
...
Sample 48
Sample 49
Instance 0
Instance 1
Instance 2
...
Instance 30
Instance 31
Killed

Running apt-get install time; /usr/bin/time -v python3 path/to/my/test.py results in Command terminated by signal 9...Maximum resident set size (kbytes): 14533312 It doesn't make a difference if I call dispose or not on the publisher side It doesn't make a difference if I add a sleep(1.0) between write iterations It does not seem related to the message size, as I first encountered this error at a similar iteration count using much larger messages than KeyedString takeing from the reader within each iteration has no effect Adding QoS properties of ReaderDataLifecycle(autopurge...=duration(nanoseconds=1)), Policy.History.KeepLast(1), Policy.WriterDataLifecycle(autodispose=True) has no effect Registering/unregistering each instance has no effect

This appears to be a bug in the implementation. Is there a workaround other then setting max_instances to some arbitrary limit?

clunietp commented 4 months ago

A more refined reproducer:

from dataclasses import dataclass
from cyclonedds.pub import DataWriter
from cyclonedds.sub import DataReader
from cyclonedds.topic import Topic
from cyclonedds.domain import DomainParticipant
from cyclonedds.idl.annotations import key as _key
from cyclonedds.idl import IdlStruct
from cyclonedds.qos import Policy, Qos
from cyclonedds.util import duration
from time import sleep

@dataclass
class KeyedString(IdlStruct, typename="DDS.KeyedString"):
    "Represents a key-value string pair"
    key: str
    value: str
    _key("key")

def test_qos(reader_qos: Qos, writer_qos):

    N_ITERATIONS = 50

    # create DDS entities
    participant = DomainParticipant()
    topic = Topic(domain_participant=participant,
                topic_name="foo", data_type=KeyedString)
    writer = DataWriter(publisher_or_participant=participant,
                        topic=topic, qos=writer_qos)
    reader = DataReader(subscriber_or_participant=participant,
                        topic=topic, qos=reader_qos)

    # test N instances
    for i in range(N_ITERATIONS):

        print(f"Instance {i}")
        instance = KeyedString(key=str(i), value=str(i))
        writer.write(sample=instance)
        print(f"\tWrote instance {i}")
        sleep(0.1)
        reader.take_one(timeout=duration(seconds=5.0))
        print(f"\tRead instance {i}")
        writer.dispose(sample=instance)

if __name__ == '__main__':

    reader_qos = Qos(
        Policy.ResourceLimits(max_instances=5),
        Policy.History.KeepLast(1),
        Policy.Reliability.Reliable(duration(seconds=1)),
        Policy.Durability.TransientLocal
    )

    writer_qos = Qos(
        Policy.ResourceLimits(max_instances=-1),
        Policy.Reliability.Reliable(duration(seconds=1)),
        Policy.Durability.TransientLocal
    )

    # test QoS
    test_qos(
        reader_qos=reader_qos,
        writer_qos=writer_qos
    )

Results in the output:

Instance 0
        Wrote instance 0
        Read instance 0
Instance 1
        Wrote instance 1
        Read instance 1
Instance 2
        Wrote instance 2
        Read instance 2
Instance 3
        Wrote instance 3
        Read instance 3
Instance 4
        Wrote instance 4
        Read instance 4
Instance 5
1708025928.342746 [0]    python3: The writer could not deliver data on time, probably due to a local reader resources being full
Traceback (most recent call last):
File "/path/to/file.py", line 63, in <module>
    test_qos(
File "/path/to/file.py", line 39, in test_qos
    writer.write(sample=instance)
File "/usr/local/lib/python3.9/dist-packages/cyclonedds/pub.py", line 192, in write
    raise DDSException(ret, f"Occurred while writing sample in {repr(self)}")
cyclonedds.core.DDSException: [DDS_RETCODE_TIMEOUT] A timeout has occurred. Occurred while writing sample in <Entity, type=cyclonedds.pub.DataWriter, addr=0x7fc78e2778b0, id=1880116342>

If I increase the Datareader's max_instances to a higher value, I reliably get an OOM at "Instance 32".

I see two issues here:

My requirement is that I must send and receive an arbitrary number of instances for a given message type, and that number is >> 31.