Closed jeffpg144 closed 4 months ago
Hi @jeffpg144. Currently this function of C API is not supported in Python. But you can implement the same behavior by using listeners with on_subscription_matched
callback.
from cyclonedds.core import Listener, Qos
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.pub import DataWriter
from cyclonedds.sub import DataReader
from msg import Msg
class SubMatchedListener(Listener):
def on_subscription_matched(self, reader, status):
# optionally you can store `instance_handle`s to have access to information about matching writers in a future.
print(f"<< act ih: {status.last_publication_handle}")
listener = SubMatchedListener()
qos = Qos()
domain_participant = DomainParticipant()
topic = Topic(domain_participant, 'topic', Msg, qos=qos)
reader = DataReader(domain_participant, topic, listener=listener)
writers = []
msg = Msg(data=1)
for i in range(0, 5):
writers.append(DataWriter(domain_participant, topic, qos))
writers[i].write(msg)
print(f">> exp ih: {writers[i].get_instance_handle()}")
Thanks, I'll give it a try.
I've searched the python code and documentation but can't find how to access the following cyclonedds c API.
https://cyclonedds.io/docs/cyclonedds/latest/api/builtin.html?highlight=get_matched#c.dds_get_matched_publications
Is this API missing from python? If missing, is there a work around or would you consider adding it.
Thanks.