eclipse-cyclonedds / cyclonedds

Eclipse Cyclone DDS project
https://projects.eclipse.org/projects/iot.cyclonedds
Other
798 stars 349 forks source link

Reduce dds_waitset_wait_impl() time complexity by avoinding to iterate over all attached entities #2011

Closed ivanpauno closed 3 weeks ago

ivanpauno commented 1 month ago

First, I'm not really familiar with the waitset implementation, so apologies if the suggestion is incorrect.

When moving the previously but no longer triggering entities back to the observed list, the implementation is currently iterating over all the waitset entities. It seems to me that the iteration could be done only over the previous ws->ntriggered entities. This improves time complexity, as I would expect ntriggered to be typically a low number, while the total amount of conditions can be much larger.

ivanpauno commented 1 month ago

One might observe one of ntriggered .. nentities-1 being in the "triggered" state because of a concurrent even that causes it to trigger, but in that case it would be moved to triggered part of the array later, after unlock the waitset's mutex.

Yes, agreed!

I'll let it sit in my brain for a bit longer, but that's more because I always worry about accidentally breaking fundamental pieces of code because I overlooked something.

Understood, this kind of change generate tricky issues...

I am also now wondering whether it would then not make more sense to do it slightly differently, by not resetting ntriggered to 0 but to scan 0 .. ntriggered-1 and move non-triggering ones to the end and decrementing ntriggered. Something like (not tried this at all!):

Seems valid to me. A small alternative of the current diff to skip the unnecessary swaps (not tested yet).

  size_t previous_ntriggered = ws->ntriggered;
  ws->ntriggered = 0;
  for (size_t i = 0; i < previous_ntriggered; i++)
  {
    if (is_triggered (ws->entities[i].entity))
    {
      if (i > ws->ntriggered) {
        dds_attachment tmp = ws->entities[i];
        ws->entities[i] = ws->entities[ws->ntriggered];
        ws->entities[ws->ntriggered++] = tmp;
      } else {
       ws->ntriggered++;
      }
    }
  }

any of the alternatives seem fine to me, I can update the PR to any of those.

And on the more general point you raise of nentities getting really large ... at some point the linear scan dds_waitset_observer gets painful ... Anyway, one thing at the time!

I also thought of that. One way is to store in the entity the current index for itself in the waitset (which would be stored in dds_entity_observer) and then make sure it's updated in every possible swap:

typedef struct dds_entity_observer {
  dds_entity_callback_t m_cb;
  dds_entity_delete_callback_t m_delete_cb;
  struct dds_waitset *m_observer;
  size_t m_observer_my_index; /* [wait_lock of m_observer] */
  struct dds_entity_observer *m_next;
} dds_entity_observer;

The new m_observer_my_index would be passed as a parameter to dds_waitset_observer() for O(1) lookup. The issue is that swapping is not that easy:

      dds_attachment tmp = ws->entities[i];
      ws->entities[i] = ws->entities[ws->ntriggered];
      ws->entities[ws->ntriggered] = tmp;
      // if only one m_observer instead of m_observers
      ws->entities[i].entity->m_observer.m_observer_my_index = i;
      ws->entities[ws->ntriggered].entity->m_observer.m_observer_my_index = ws->ntriggered;
      ws->ntriggered+;

because there may be multiple observers, the above does not work. Slightly unrelated question, is there any use case of multiple observers?

So maybe the attachment could have a pointer to index:

typedef struct dds_attachment {
  dds_entity *entity;
  size_t *entity_waitset_index; /* [wait_lock of dds_waitset storing the attachment], points to m_observer_my_index of the observer related to this waitset in entity->m_observers*/
  dds_entity_t handle;
  dds_attach_t arg;
} dds_attachment;

But that whole logic may be too intrincate.

Of course, changing the waitset to a hashed set of untriggered entities plus an array of triggered entities also solves the issue. Though I sort of like the simplicity of only having one array and avoiding the hashed set like now.

ivanpauno commented 2 weeks ago

Hi @ivanpauno, I'm going to merge it as-is: it has the advantage of being super simple while getting most of the benefit.

Thanks for merging! Yes, I was not planning to make the other changes in this PR.

I would expect that in most cases where one has many entities attached to a waitset, there will usually be at most a few "hot" ones, and so my expectation is that there is very little to be gained in dds_wait_impl.

Yes, agreed. I guess the only potential bad case is if you have a lot of entities that are triggered at aprox the same rate.

To really improve things further really means changing the code running when the trigger happens. There all the intricacies you note mean it is better to work on that in a separate effort.

Thinking a bit more about that, it may be possible to only store an array of triggered entities, and do not track un-triggered ones at all. With that, all the operations should look like:

BTW, I did not check the code while writing this, but I think that may be possible...