eclipse-cyclonedds / cyclonedds

Eclipse Cyclone DDS project
https://projects.eclipse.org/projects/iot.cyclonedds
Other
798 stars 349 forks source link

Race Condition when reading from DDS_BUILTIN_TOPIC_DCPSPARTICIPANT #2039

Open Dominik-38 opened 2 weeks ago

Dominik-38 commented 2 weeks ago

Hello,

I’m currently working on implementing a “Domain Scan” functionality, which involves taking a screenshot of the DDS system at a specific point in time. In my example, I’ve written a loop that iterates over all domains and retrieves the active participants for each domain (using the built-in topic participant). This is the code:

#include "dds/dds.h"
#include <stdio.h>
#include <stdlib.h>

#define MAX_SAMPLES 10

int main(int argc, char** argv)
{
    dds_return_t rc;
    const uint8_t maxDomainID = 233;
    (void)argc;
    (void)argv;

    // Iteration over all possible DomainIDs to find which Domain is active and how many participants are there
    for(unsigned int domainID=0; domainID < maxDomainID; domainID++)
    {
        dds_entity_t monitoringParticipant = dds_create_participant(domainID, NULL, NULL);
        if (monitoringParticipant < 0)
            DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-monitoringParticipant));

        dds_entity_t builtinReaderPart = dds_create_reader(monitoringParticipant, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL);
        if (builtinReaderPart < 0)
            DDS_FATAL("dds_create_reader: %s\n", dds_strretcode(-builtinReaderPart));

        // PROCESSING PARTICIPANTS READER START
        dds_sample_info_t sampleInfo[MAX_SAMPLES];
        void* samples[MAX_SAMPLES] = {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL};

        dds_sleepfor(DDS_MSECS(2)); 

        dds_return_t participantsRead = dds_take(builtinReaderPart, samples, sampleInfo, MAX_SAMPLES, MAX_SAMPLES);
        if (participantsRead < 0)
            DDS_FATAL("dds_take on participants: %s\n", dds_strretcode(-participantsRead));

        /* Check if we read some data and it is valid. */
        if ((participantsRead > 0) && (sampleInfo[0].valid_data))
        {
            printf("read data of participants is valid! \n");
        }

        dds_domainid_t currentDomainID;
        dds_get_domainid(monitoringParticipant, &currentDomainID);

        printf("participants (including monitoringParticipant) found in domain %d is: %d \n", currentDomainID, participantsRead);
        // PROCESSING PARTICIPANTS READER END

        // Clean up 
        rc = dds_delete(builtinReaderPart);
        if (rc != DDS_RETCODE_OK) {
            fprintf(stderr, "Error deleting reader part: %s\n", dds_strretcode(-rc));
            return EXIT_FAILURE;
        }

        rc = dds_delete(monitoringParticipant);
        if (rc != DDS_RETCODE_OK) {
            fprintf(stderr, "Error deleting participant: %s\n", dds_strretcode(-rc));
            return EXIT_FAILURE;
        }
    }
    return EXIT_SUCCESS;
}

For testing purposes, I’ve started various threads (e.g., subscribers from the HelloWorld example) so that participants are active on different domains.

However, I encountered the following error (output from ThreadSanitizer):

issuepic

To me, this looks like a race condition within cyclonedds, which occurs intermittently.

Am I using the above code correctly, or is this error already known and preventable?

Best regards

eboasson commented 2 weeks ago

@Dominik-38, your code looks fine. You can leave out dds_delete(builtinReaderPart) (it automatically deletes it when you delete the participant), and you can reduce the initialization of the samples array to just void* samples[MAX_SAMPLES] = {NULL} but otherwise I have no suggestions ...

The race condition is not known to me. It may be a false positive because I know tsan flags a bunch of (what are, to the best of my knowledge) false positives due to the use of atomic operations in some situations, but at the same time I cannot believe all it's nothing but false positives. So much code, more than enough attempts at cleverness ... there are race conditions in the code.

Is there any chance you could get a deeper stack trace? Just the top two frames don't tell me all that much.

Dominik-38 commented 2 weeks ago

@eboasson Thank you for your answer!

To reduce false positives regarding the thread sanitizer I used following ThreadSanitizerIgnoreList.txt:

src:*/bits/stl_vector.h
src:*/bits/stl_iterator.h
src:*/bits/shared_ptr_base.h

src:*/internal/gtest-port.h

src:*/impl/stream_impl.hpp
src:*/impl/static_buffer.ipp
src:*/impl/socket_ops.ipp

fun:prepare
fun:commit
fun:consume

Furthermore here is the error code that appears in VS (as mentioned, not regularly and always in different places):

errorpic

errorpic2

I am developing under Windows 11 and used wsl to build under linux so i can use the thread sanitizer. Maybe the provided information above helps to gain a better understanding.

Otherwise I will work on providing more detailed stack trace and information about the problem.

eboasson commented 5 days ago

I do have Windows 10 (not 11) and WSL2, but WSL2 is a VM so it shouldn't make much of a difference whether one uses Win11+WSL, macOS+qemu+Ubuntu or native Linux. Being lazy, I tried the macOS/qemu/Ubuntu one.

The suppressions file that you included in the comment wasn't acceptable to tsan, but it also doesn't really look like one that matches the Cyclone sources. Probably there was a mixup, but no worries, I just did:

race:ddsrt_chh_lookup_internal
race:gcreq_queue_thread

I don't like such general suppressions, but there is a reason behind them: there are some lock-free lookups in Cyclone, in particular for mapping key value to "instance handles" and for looking up entity GUIDs. The first is the lock-free hash table lookup, and the second "gc" (garbage collector) thread that takes care of the deferred freeing. By design it does things that look like race conditions to the thread sanitizer ... I don't think these two suppressions cover all false positives and I do think it will suppress a potential real one, but it least it is makes it workable to run thread sanitizer with my (modified) version of your scanner.

On consideration then, as the race condition you caught involves the "gc" thread we don't need to worry too much about it. I think it is reasonable to suppress it.

Then, on to the "failed to bind to ANY:49401": that I understand. Whenever you create a domain there's a bunch of sockets involved, amongst which one with a kernel-allocated random port number. In this case it just so happens that the kernel-allocated port number collides with the multicast port prescribed by the DDSI spec.

It could be any process that has a socket bound to that port number, but chances are it is a Cyclone and quite possibly the domain scanning process.

To some extent the problem is caused by Cyclone: if it always used the "well-known"[^1] port numbers this particular collision would not occur. If you set <Discovery><ParticipantIndex>auto</ParticipantIndex></Discovery> it will create sockets with those "well-known" port numbers and not need one of those kernel-allocated ones.

It is also in part caused by Cyclone binding the multicast sockets to INADDRY_ANY. Someone asked about the reason behind that before in #1923. If instead it bound the multicast sockets to 239.255.0.1 it wouldn't be an issue.

In my modified version of your scanner, the problem is bigger still. The reason I modified it is that you give it only 2ms to discover the rest of the world. It is probably enough in many cases but it is definitely not always so. I figured I might try discovering on all domains in parallel, instead. Now, that consistently runs into the port number problem. If I set the option to avoid that, it runs into some other limitation. (There'll be very many file descriptors, and I wouldn't be surprised if things went pear-shaped because of that. It is a detail worth looking into.)

There's a pretty decent chance it my parallel version contains some DDS/Cyclone detail that you didn't know yet, so I am sharing it. I won't add it as an example unless (1) you'd happy with my including such a derived work, and (2) it doesn't run into those issues anyway 🙂

#include "dds/dds.h"
#include <stdio.h>
#include <stdlib.h>

#define MAX_DOMAINID 100

int main(int argc, char** argv)
{
  dds_return_t rc;
  (void)argc;
  (void)argv;

  dds_entity_t waitset = dds_create_waitset (DDS_CYCLONEDDS_HANDLE);

  // Create a participant in "all" domains
  for(unsigned int domainID=0; domainID < MAX_DOMAINID; domainID++)
  {
    dds_entity_t monitoringParticipant = dds_create_participant(domainID, NULL, NULL);
    assert (monitoringParticipant > 0);
    dds_entity_t builtinReaderPart = dds_create_reader(monitoringParticipant, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL);
    assert (builtinReaderPart > 0);
    dds_entity_t builtinReaderReadCond = dds_create_readcondition (builtinReaderPart, 0);
    rc = dds_waitset_attach (waitset, builtinReaderReadCond, builtinReaderPart);
    assert (rc == 0);
  }

  // Wait two seconds to see who we discover
  int nparts[MAX_DOMAINID + 1] = { 0 };
  dds_time_t tstop = dds_time () + DDS_SECS (2);
  while (dds_time () < tstop)
  {
    dds_attach_t trig[10];
    int ntrig;
    ntrig = dds_waitset_wait_until (waitset, trig, sizeof (trig) / sizeof (trig[0]), tstop);
    assert (ntrig >= 0);
    if ((size_t) ntrig > sizeof (trig) / sizeof (trig[0]))
      ntrig = (int) (sizeof (trig) / sizeof (trig[0]));
    for (int i = 0; i < ntrig; i++)
    {
      dds_entity_t rd = (dds_entity_t) trig[i];
      dds_sample_info_t sampleInfo;
      void *sample = NULL;
      if (dds_take (rd, &sample, &sampleInfo, 1, 1) > 0)
      {
        dds_domainid_t currentDomainID;
        dds_get_domainid (rd, &currentDomainID);
        // 2s means a participant might leave as well, update its QoS ...
        // we only get samples not seen before because of dds_take
        // - NEW: not yet read this particular key value, OLD: read it before
        // - ALIVE: participant exists, DISPOSED: participant gone
        // Thus:
        // - NEW & ALIVE - it exists and it hadn't been counted yet
        // - NEW & DISPOSED - it used to exist but not anymore, but it hadn't been counted yet
        // - NOT_NEW & ALIVE - update, but already counted
        // - NOT_NEW & DISPOSED - gone and was included in the count
        if (sampleInfo.instance_state == DDS_ALIVE_INSTANCE_STATE && sampleInfo.view_state == DDS_NEW_VIEW_STATE)
          nparts[currentDomainID]++;
        else if (sampleInfo.instance_state == DDS_NOT_ALIVE_DISPOSED_INSTANCE_STATE && sampleInfo.view_state == DDS_NOT_NEW_VIEW_STATE)
          nparts[currentDomainID]--;
        dds_return_loan (rd, &sample, 1);
      }
    }
  }

  dds_delete (DDS_CYCLONEDDS_HANDLE);
  for (unsigned int domainID=0; domainID < MAX_DOMAINID; domainID++)
  {
    // there's always at least one, that's this participant
    if (nparts[domainID] != 1)
      printf ("dom %u npart %d\n", domainID, nparts[domainID]);
  }
  return EXIT_SUCCESS;
}

[^1]: Only "well-known" in the context of the DDSI spec.