eclipse-cyclonedds / cyclonedds

Eclipse Cyclone DDS project
https://projects.eclipse.org/projects/iot.cyclonedds
Other
891 stars 363 forks source link

How to subscribe to multiple topics? #2072

Closed yanzhang920817 closed 1 month ago

yanzhang920817 commented 3 months ago

I want to subscribe to 3 topics. I wrote the following code according to the examples/listtopics/listtopics.c in cyclonedds-0.10.5. However, when I publish with another process, I can see that there are messages sent out with cyclonedds ls, but it seems that the waitset is not triggered. How should I modify the following code? Thank you.

void *ListenOperation(void *arg)
{
    void **args = (void**)arg;
    LowCmd_ *lowCmd = (LowCmd_ *)args[0];
    pthread_mutex_t *lock = (pthread_mutex_t*)args[1];

    dds_entity_t participant;
    dds_entity_t topic;

    void *samples[MAX_SAMPLES];
    dds_sample_info_t infos[MAX_SAMPLES];
    dds_return_t rc;
    dds_qos_t *qos;
    bool topics_seen = false;
    bool endpoints_exist = false;

    /* Create a Participant. */
    participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL);
    if (participant < 0)
        DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));

    printf("%s: create participant successfully\n", __FUNCTION__);
    const dds_entity_t waitset = dds_create_waitset(participant);
    const dds_entity_t reader = dds_create_reader(participant, DDS_BUILTIN_TOPIC_DCPSTOPIC, NULL, NULL);
    if (reader < 0)
    {
        if (reader == DDS_RETCODE_UNSUPPORTED)
            fprintf (stderr, "Topic discovery is not included in the build, rebuild with ENABLE_TOPIC_DISCOVERY=ON\n");
        else
            fprintf (stderr, "dds_create_reader(DCPSTopic): %s\n", dds_strretcode (reader));
        dds_delete (participant);
        pthread_exit(NULL);
    }

    const dds_entity_t readcond = dds_create_readcondition (reader, DDS_ANY_STATE);
    (void) dds_waitset_attach (waitset, readcond, 0);

    const dds_topic_descriptor_t *type_desc[MAX_SUB_TOPICS] = {
        &exchange_msg_desc, &settings_controlMode_msg_desc, &ClearError_desc};
    //static const dds_entity_t topic_names[MAX_SUB_TOPICS] = {
    const char *topic_names[MAX_SUB_TOPICS] = {
        "protocol/exchange", "setttings/controlMode", "rt/clearError"};

    dds_entity_t topic_readers[MAX_SUB_TOPICS];
    dds_entity_t topic_readconds[MAX_SUB_TOPICS];

    for (size_t i = 0; i < MAX_SUB_TOPICS; i++) {
        topic_readers[i] = dds_create_reader(participant, topic_names[i], NULL, NULL);
        if (topic_readers[i] < 0) {
            fprintf (stderr, "dds_create_reader(%s): %s\n", topic_names[i], dds_strretcode(topic_readers[i]));
            dds_delete(participant);
            pthread_exit(NULL);
        }
        printf("%s [%zu]: create reader successfully\n", __FUNCTION__, i);
        topic_readconds[i] = dds_create_readcondition(topic_readers[i], DDS_ANY_STATE);
        (void) dds_waitset_attach(waitset, topic_readconds[i], 0);
    }

    printf("%s is running\n", __FUNCTION__);
    /* Monitor topic creation/deletion indefinitely */
    const dds_time_t tstop = dds_time () + DDS_SECS (10);
    while (dds_waitset_wait_until (waitset, NULL, 0, tstop) > 0) {
        printf("-------------------------------------------------------------------\n");
        for (size_t i = 0; i < MAX_SUB_TOPICS; i++) {
            if (process_topic (topic_readconds[i])) {
                topics_seen = true;
            }
        }
        if (!endpoints_exist && process_pubsub (topic_readconds)) {
            endpoints_exist = true;
            for (size_t k = 0; k < MAX_SUB_TOPICS; k++)
               (void) dds_delete (topic_readers[k]);
        }
    }
    dds_delete (participant);
    pthread_exit(NULL);
}