eclipse-cyclonedds / cyclonedds

Eclipse Cyclone DDS project
https://projects.eclipse.org/projects/iot.cyclonedds
Other
889 stars 363 forks source link

The frequency of subscription topic triggering is inconsistent with the frequency of publishing #2075

Open yanzhang920817 opened 3 months ago

yanzhang920817 commented 3 months ago

I subscribed to two topics according to examples/listtopics/listtopics.c, but encountered two strange problems. The first topic settings/controlMode could not print char *, and the second topic rt/clearError was triggered just after it was created. Moreover, when the publisher published once per second, it was not triggered once every 1s, but seemed to be triggered dozens of times. I don't know why? process_topic function:

static bool process_topic(dds_entity_t readcond) {
#define MAXCOUNT 10
    void *samples[MAXCOUNT] = { NULL };
    dds_sample_info_t infos[MAXCOUNT];
    samples[0] = NULL;
    int32_t n = dds_take(readcond, samples, infos, MAXCOUNT, MAXCOUNT);
    //int32_t n = dds_read(readcond, samples, infos, MAXCOUNT, MAXCOUNT);
    bool topics_seen = false;
    for (int32_t i = 0; i < n; i++) {
        //dds_builtintopic_topic_t const * const sample = samples[i];
        dds_builtintopic_topic_t const * const sample = static_cast<dds_builtintopic_topic_t*>(samples[i]);
        struct keystr gs;
        printf ("%s: %s", instance_state_str(infos[i].instance_state), keystr(&gs, &sample->key));
        if (infos[i].valid_data) {
            printf (" %s %s", sample->topic_name, sample->type_name);
            if (strcmp(sample->topic_name, "settings/controlMode") == 0) {
                topics_seen = true;
                printf("\n#################################please enable the robot####################################\n");
                settings_controlMode_msg *msg = static_cast<settings_controlMode_msg*>(samples[i]);
                if (msg == NULL) {
                    printf("msg is null\n");
                } else {
                    if (msg->controlMode == NULL) {
                        printf("cm is null\n");
                    } else {
                        printf("all are not null, msg=%p,cm=%p\n", msg, msg->controlMode);
                        printf("Received message: controlMode = %s\n", msg->controlMode);
                    }
                }
            } else if (strcmp(sample->topic_name, "rt/clearError") == 0) {
                printf("\n#################################please clear error####################################\n");
                ClearError *msg = static_cast<ClearError*>(samples[i]);
                printf("msg id =%d\n", msg->msgid);
            }
        }
        printf ("\n");
    }
    (void) dds_return_loan (readcond, samples, n);
#undef MAXCOUNT
    return topics_seen;
}

`

ListenOperation function:

void *ListenOperation(void *arg)
{
    void **args = (void**)arg;
    LowCmd_ *lowCmd = (LowCmd_ *)args[0];
    pthread_mutex_t *lock = (pthread_mutex_t*)args[1];

    dds_entity_t participant;
    dds_entity_t topic;

    void *samples[MAX_SAMPLES];
    dds_sample_info_t infos[MAX_SAMPLES];
    dds_return_t rc;
    dds_qos_t *qos;
    bool topics_seen = false;
    bool endpoints_exist = false;

    /* Create a Participant. */
    participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL);
    if (participant < 0)
        DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));

    printf("%s: create participant successfully\n", __FUNCTION__);
    const dds_entity_t waitset = dds_create_waitset(participant);
#if 1
    const dds_entity_t reader = dds_create_reader(participant, DDS_BUILTIN_TOPIC_DCPSTOPIC, NULL, NULL);
    if (reader < 0)
    {
        if (reader == DDS_RETCODE_UNSUPPORTED)
            fprintf (stderr, "Topic discovery is not included in the build, rebuild with ENABLE_TOPIC_DISCOVERY=ON\n");
        else
            fprintf (stderr, "dds_create_reader(DCPSTopic): %s\n", dds_strretcode (reader));
        dds_delete (participant);
        pthread_exit(NULL);
    }
#endif
    //const dds_entity_t readcond = dds_create_readcondition (reader, DDS_ANY_STATE);
    const dds_entity_t readcond = dds_create_readcondition (reader, DDS_NOT_READ_SAMPLE_STATE);
    (void) dds_waitset_attach (waitset, readcond, 0);

    const dds_topic_descriptor_t *type_desc[MAX_SUB_TOPICS] = {
        &settings_controlMode_msg_desc, &ClearError_desc};
    //static const dds_entity_t topic_names[MAX_SUB_TOPICS] = {
    const char *topic_names[MAX_SUB_TOPICS] = {
        "setttings/controlMode", "rt/clearError"};
    dds_entity_t topic_entities[MAX_SUB_TOPICS];
    dds_entity_t topic_readers[MAX_SUB_TOPICS];
    dds_entity_t topic_readconds[MAX_SUB_TOPICS];

    for (size_t i = 0; i < MAX_SUB_TOPICS; i++) {
        topic_entities[i] = dds_create_topic(participant, type_desc[i], topic_names[i], NULL, NULL);
        if (topic_entities[i] < 0) {
            fprintf(stderr, "dds_create_topic(%s): %s\n", topic_names[i], dds_strretcode(topic_entities[i]));
            dds_delete(participant);
            pthread_exit(NULL);
        }
        printf("%s [%zu]: create topic successfully\n", __FUNCTION__, i);
        topic_readers[i] = dds_create_reader(participant, topic_entities[i], NULL, NULL);
        if (topic_readers[i] < 0) {
            fprintf (stderr, "dds_create_reader(%s): %s\n", topic_names[i], dds_strretcode(topic_readers[i]));
            dds_delete(participant);
            pthread_exit(NULL);
        }
        printf("%s [%zu]: create reader successfully\n", __FUNCTION__, i);
        topic_readconds[i] = dds_create_readcondition(topic_readers[i], DDS_ANY_STATE);
        (void) dds_waitset_attach(waitset, topic_readconds[i], 0);
    }

    printf("%s is running\n", __FUNCTION__);
    /* Monitor topic creation/deletion indefinitely */
#if 1
    while (1) {
        if (dds_waitset_wait_until(waitset, NULL, 0, DDS_INFINITY) > 0) {
            printf("-------------------------------------------------------------------\n");
            if (process_topic(readcond)) {
                topics_seen = true;
                printf("--------------------------topics_seen-----------------------------------------\n");
            }
        }
    }
#else
    const dds_time_t tstop = dds_time() + DDS_SECS (1000000);
    while (dds_waitset_wait_until (waitset, NULL, 0, tstop) > 0) {
        printf("-------------------------------------------------------------------\n");
        if (process_topic(readcond)) {
           topics_seen = true;
            printf("--------------------------topics_seen-----------------------------------------\n");
        }
    }
#endif
    dds_delete (participant);
    pthread_exit(NULL);
}
`