Stiffstream / sobjectizer

An implementation of Actor, Publish-Subscribe, and CSP models in one rather small C++ framework. With performance, quality, and stability proved by years in the production.
https://stiffstream.com/en/products/sobjectizer.html
Other
481 stars 47 forks source link

Either passing channel or channel name #53

Closed ilpropheta closed 1 year ago

ilpropheta commented 1 year ago

Hi, quick design & style-related question: I have a bunch of agents that publish some signals to some channels (every agent to its own channel):

agent1 -> channel1
agent2 -> channel2
agent3 -> channel3
...

Such agents are dynamic during the lifetime of the application. Signals are sent sporadically.

Now, I need a global agent - let's call it Monitor - to subscribe to all of them, even if they are not known in advance. My idea is to simply have a known channel the Monitor subscribes to - let's call it NotificationChannel. Every time a new agent comes out, it sends a notification to NotificationChannel, this way Monitor can subscribe to the specific channel.

Monitor subscribes to NotificationChannel
...
agent1 sends a notification containing channel1 to NotificationChannel
Monitor subscribes to channel1
...

Two questions:

Many thanks!

eao197 commented 1 year ago

Hi!

(design-related) what do you think of this approach?

There is a danger that agent_i will send a message to channel_i before Monitor receive and process a notification_i from the NotificationChannel.

I would prefer to use synchronous notification from agent_i to Monitor (via https://github.com/Stiffstream/so5extra/wiki/so5extra-1.5-Synchronous-Interaction for example).

(style-related) what would you pass to that NotificationChannel, a reference to the channel itself (e.g. mbox_t) or just the channel name?

It depends. A named mbox can only be MPMC mbox. But if you want (for any reason) to use MPSC mbox as NotificationChannel then you have to share a reference (mbox_t) between agents somehow.

eao197 commented 1 year ago

I think the following scenario:

Monitor subscribes to NotificationChannel
...
agent1 sends a notification containing channel1 to NotificationChannel
Monitor subscribes to channel1
...

has to be extended if agents/channels added and removed dynamically. There should also be a notification about disappearance of agent_i/channel_i. So the communication should look like:

Monitor subscribes to NotificationChannel
...
agent1 sends an appearance notification containing channel1 to NotificationChannel
Monitor subscribes to channel1
...
agent1 sends a disappearance notification containing channel1 to NotificationChannel
Monitor unsubscribes from channel1
...

Without unsubscription you'll get a form of memory leak because subscriptions that no more needed will remain for the Monitor.

eao197 commented 1 year ago

There is also a hardcore way that can be used in the scenario you described, if agent_i send ordinary messages to channel_i. I mean simple cases without using any form of enveloped messages (like revocable messages, for example).

A single mbox can be used for delivering all messages to the Monitor. A message like that will be send to that mbox:

struct msg_message_from_channel final : public so_5::message_t
{
  int channel_id_; // Or something else to differentiate sources.
  so_5::message_ref_t msg_; // The original message.

  ... // Initializing constructor.
};

Then a special mbox wrapper has to be created for enveloping the original channel_i (by using proxy mbox from so5extra for example). That wrapper overrides mbox's do_deliver_message method. Something like that:

class my_cc_mbox_wrapper final : public so_5::extra::mboxes::proxy::simple_t
{
  const int channel_id_;
  const so_5::mbox_t monitor_mbox_;

public:
  my_cc_mbox_wrapper(so_5::mbox_t channel, int channel_id, so_5::mbox_t monitor)
    : so_5::extra::mboxes::proxy::simple_t{std::move(channel)}
    , channel_id_{channel_id}
    , monitor_mbox_{std::move(monitor)}
  {}

  do_deliver_message(
    const std::type_index & msg_type,
    const ::so_5::message_ref_t & message,
    unsigned int overlimit_reaction_deep ) override
    {
      // Actual delivery.
      underlying_mbox().do_deliver_message(
        msg_type,
        message,
    overlimit_reaction_deep );
      // Sending a copy the the monitor.
      so_5::send<msg_message_from_channel>(monitor_mbox_, channel_id_, message);
  }
};

The Monitor agent should subscribe to msg_message_from_channel messages and has to extract actual message pointer from raw so_5::message_ref_t (and here some SObjectizer's internals should be used).

void monitor::evt_message_from_channel(mhood_t<msg_message_from_channel> cmd)
{
  // Let's assume that the original message has type `my_message`.
  const auto & original_content =
    so_5::message_payload_type<my_message>::payload_reference(cmd->msg_);
  ... // Handling of the original message.
}

This sketch assumes that only one type of message (my_message) will be sent to channels. And that my_message is delivered as is, without any enveloping.

ilpropheta commented 1 year ago

Thanks @eao197 for your advices and insights.

I think my scenario is a bit simpler and I should not have some of the issues you have described:

However, that "idle time" I mentioned in the second point is just a side effect due to hardware and I don't want to rely on it. So this might be an issue. I thought about your last proposal, however I think it's not applicable since my application exploits named channels for flexibility. Wrapping a channel into another is not an option for me as long as it's not reachable by name.

Another solution I imagined consists in installing a "redirect_agent_i" that subscribes to channel_i before agent_i can send messages to it. Once agent_i sends a signal to channel_i, redirect_agent_i wraps that into a message containing some sort of id and sends it to the notificationChannel.

Let me recap:

Monitor subscribes to notificationChannel
...
agent1 is going to be created
creation of redirect1 which subscribes to channel1
agent1 fully created
agent1 sends a signal to channel1
redirect1 takes the signal, adds some identity and sends it to notificationChannel
...

redirect_i has to be added for every agent_i, however it should cost ~nothing since it will wake up sporadically.

Please let me know your thoughts.

About this:

It depends. A named mbox can only be MPMC mbox. But if you want (for any reason) to use MPSC mbox as NotificationChannel then you have to share a reference (mbox_t) between agents somehow.

Not relevant for the proposed approach, but in general I am interested in this "style" matter. I am happy with passing mbox_t as part of a message so I just wanted your confirmation that is something "idiomatic" in sobjectizer code.

eao197 commented 1 year ago

I have to add a bit of clarification: a wrapping mbox can be created by agent_i directly. For example, let's assume that you have several named channels and agent_i selects a name by some criteria:

class sender_agent : public so_5::agent_t
{
  static std::string make_channel_name_to_use(const configuration & cfg) {...}

  const so_5::mbox_t channel_;

public:
  sender_agent(context_t ctx, const configuration & cfg)
    : so_5::agent_t{std::move(ctx)}
    , channel_{so_environment().create_mbox(make_channel_name_to_use(cfg))}
  {...}
  ...
};

in that case sender_agent can create a wrapper for the channel by itself:

class sender_agent : public so_5::agent_t
{
  static std::string make_channel_name_to_use(const configuration & cfg) {...}

  static so_5::mbox_t get_and_wrap_channel(
    so_5::environment_t & env,
    const configuration & cfg)
  {
    so_5::mbox_t original_channel = env.create_mbox(make_channel_name_to_use(cfg));
    return std::make_unique<my_cc_mbox_wrapper>(
      original_channel,
      some_channel_id /* let's assume we get it somehow */,
      env.create_mbox("monitor_mbox") /* let's assume the monitor uses that mbox */ );
  }

  const so_5::mbox_t channel_;

public:
  sender_agent(context_t ctx, const configuration & cfg)
    : so_5::agent_t{std::move(ctx)}
    , channel_{get_and_wrap_channel(so_environment(), cfg)}
  {...}
  ...
};

In that case sender_agent will work with wrapped mbox just like with the source mbox (there won't be visible differences).

Not relevant for the proposed approach, but in general I am interested in this "style" matter. I am happy with passing mbox_t as part of a message so I just wanted your confirmation that is something "idiomatic" in sobjectizer code.

I don't think there is "idiomatic sobjectizer code" :) I would start with the approach that is simple at the current moment. So if a named mbox is OK that is fine.

Let me to think about a redirector agents some more time.

ilpropheta commented 1 year ago

Yes, that was clear to me. I only have two concerns:

Apart from those, I think this solves all the other issues.

eao197 commented 1 year ago

redirect_i has to be added for every agent_i, however it should cost ~nothing since it will wake up sporadically.

It seems that the idea with redirect_i agent is a really good one. You can create and register redirect_i agent in the same coop with agent_i, so it can't miss any message that agent_i will send to channel_i.

ilpropheta commented 1 year ago

It seems that the idea with redirect_i agent is a really good one. You can create and register redirect_i agent in the same coop with agent_i, so it can't miss any message that agent_i will send to channel_i.

Thanks for your feedback. Related question: is this true even if such agents are in different coops which share the very same parent?

In practice:

parent_coop
       |__coop_B
             |_agent_1
       |__coop_C
             |_redirect_agent_1

I don't think it's the case, but I am asking for confirmation.

eao197 commented 1 year ago

It agents agent_1 and redirect_agent_1 in different coops then they will start independently (even if they have the same parent). So agent_1 can start its work before redirect_agent_1 completes its so_define_agent.

ilpropheta commented 1 year ago

Gotcha. Thanks for all your tips. I close the issue, feel free to add more comments if you have any.