Texera / texera

Collaborative Machine-Learning-Centric Data Analytics Using Workflows
https://texera.github.io
Apache License 2.0
163 stars 72 forks source link

C++ Actor Framework (CAF) evaluation #631

Closed shengquan-ni closed 5 years ago

shengquan-ni commented 6 years ago

evaluate the C++ actor framework

shengquan-ni commented 6 years ago

1st Weekly Update

Installing CAF:

git clone https://github.com/actor-framework/actor-framework
cd actor-framework
./configure
make
make install [as root, optional]
make test

(require GCC 4.8 as minimal compiler version)

Issue when running CAF program:

the CAF shared object file(.so) compiled from my ubuntu is different from the ones compiled from Google Cloud machines. When I tried to execute my CAF program in Google Cloud machines, it cannot find some symbols in the .so files. After I replaced the .so files with these files in my ubuntu, it can run normally.

Solution: change the OS version of Google cloud machines from Ubuntu 14.04 to 16.04 (Xenial image)

Manager & Workers system architecture:

untitled diagram 1 1

How it works:

In the system shown above, we have 2 types of actors: Manager and Worker. We only have one Manager in the example(may have more in the future). The Manager maintains a list of workers(can also be a CAF pre-defined class actor_pool), initialize it when spawned by the system. Similarly, the Worker maintains an address of the Manager it serves in order to send communications.

We also have 3 types of communications: message, work and switch. varlu4rfpzpk 3gv205cd a

A "message" is a string, containing information to let the receiver know and it can be sent from both(e.g. Workers need to send a message when it finishes work).

From Manager's view: 3w8k0 pd tou86_ f 8g_w From Worker's view: n3r2j 5ydh kf2 o a0ck54

A "work" is a general serializable thing. Only the Manager can send work to Workers, let them do some specific works. From Manager's view: aj3zspqut_ ubd xahw ui From Worker's view: mrt1rc w_jfcfy pq aq2

A "switch" is another general serializable thing. Only the Manager can send switch to Workers, let them change their behaviors. From Manager's view: rlehucs5 pts 3 i 3d5cn From Worker's view: vkcu m88vx gekt stqei

How the system runs:

  1. Starts from a fixed entry point called "caf_main" (all CAF programs use this as the entry point). Before calling any user-defined code, CAF will initialize itself as well as other required components(e.g. middleman) and store them in a class called "actor_system". 28uhee z9 kq l28 pvvb0 What is Middleman? The middleman is the main component of the I/O module and enables distribution. It transparently manages proxy actor instances representing remote actors, maintains connections to other nodes, and takes care of serialization of messages(page 68 of the CAF manual).

    • In manager machines, the system will spawn a Manager and publish itself through a user-specified port.
    • In worker machines, the system will spawn a Worker and tries to connect to a Manager through user-specified host and port.
  2. Users can send messages or works through the Manager to all connected Worker or one specific connected Worker.

Inheritance:

CAF does support a part of the inheritance. We can make composable behaviors and generate new behaviors by composing old behaviors. It is different from the real inheritance since we cannot inherit member variables of actors, we can only inherit their behaviors.

shengquan-ni commented 6 years ago

2nd Weekly Update

Introduce RapidJSON(a thrid-party lib under MIT lincense):

RapidJSON is a JSON parser and generator for C++. It was inspired by RapidXml.

Step-by-step demostration:

https://www.youtube.com/watch?v=HXvK5hyRc5M

shengquan-ni commented 6 years ago

3rd Weekly Update

About architecture change

Current architecture:

tim 20180717155332 Pros:

Cons:

Two ways to modify the current architecture to avoid bottleneck:

Spark-like:

tim 20180717154821 Pros:

Cons:

Multiple Agents (a live example: DtCraft):

tim 20180717164157 Pros:

Cons:

Performance measure

My machine information:

shengquan-ni commented 6 years ago

4th Weekly Update

Pause & Resume functionality

I've implemented the pause & resume functionality in my demo distributed system, based on two special message types - Pause and Resume message.

The whole process in detail:

When the Manager receives a signal of "pause the execution process of Agent A", it will send a Pause message to Agent A. (Resume has exactly the same behavior)

[=](pause_atom,int idx)
{
    auto& running_agents = self->state.running_agents;
    if (running_agents.find(idx)!=running_agents.end())
        self->send(running_agents[idx], pause_atom::value);
    else
        aout(self) << "invaild Agent index" << endl;
}

Then the Agent will continue propagating the Pause message to its Workers. The Agent will iterate over its Workers, send each worker the Pause message and wait for the response in 200ms. if there is no response, the Worker will be put into a set of failed Workers. After iterating all the Workers, the Agent will send the Pause message again and again to the failed Workers until every Worker receives. The Agent will send all Workers a Pause message, if a Worker fails to respond, the Agent will send Pause or Resume message again depending on the current workflow state since there is a chance that the user want to resume the workflow when partial Workers paused successfully. (Resume has exactly the same behavior)

[=](pause_atom)
{
    self->state.is_paused = true;
    for (auto i : self->state.workers)
    {
        self->request(i, chrono::milliseconds(200), pause_atom::value).await(
        [=]()
        {
            self->state.workers.erase(i);
            self->state.paused_workers.insert(i);
            if (self->state.workers.empty())
                aout(self) << "all Workers paused!" << endl;
        },
        [=](const error& err)
        {
            if (self->state.is_paused)
                self->send(self, pause_atom::value, i);
            else
                self->send(self, resume_atom::value, i);
            aout(self) << self->system().render(err) << endl;
        });
    }
    aout(self) << "sent pause to all Workers" << endl;
},
[=](pause_atom, const Worker& worker)
{
    self->request(worker, chrono::milliseconds(200), pause_atom::value).await(
    [=]()
    {
        self->state.workers.erase(worker);
        self->state.paused_workers.insert(worker);
        if (self->state.workers.empty())
            aout(self) << "all Workers paused!" << endl;
    },
    [=](const error& err)
    {
        if (self->state.is_paused)
            self->send(self, pause_atom::value, worker);
        else
            self->send(self, resume_atom::value, worker);
        aout(self) << self->system().render(err) << endl;
    });
}

When the Worker receives the Pause message, it will set its need_pause flag to true, whenever it receives new work or wants to continue working, that flag will stop it until a Resume message set the flag to false.

[=](work_atom,int idx,vector<vector<string>>& input)
{
    // check if the work is vaild
    if (idx == -1)return;
    //check if the work need to be paused
    if (self->state.need_pause)
    {
        self->state.pending_work.emplace_back(make_pair(idx, move(input)));
        return;
    }
    // do the work...
},
[=](pause_atom) 
{
    self->state.need_pause = true;
},
[=](resume_atom)
{
    self->state.need_pause = false;
    //release the pending work
    while (!self->state.pending_work.empty())
    {
        auto temp = self->state.pending_work.back();
        self->send(self, work_atom::value, temp.first,move(temp.second));
        self->state.pending_work.pop_back();
    }
}

Why I didn't use:

Connection between Agent and Workers:

After inspecting the network traffic of the Manager, I find all packages between the Agent and Workers go through the Manager. This is because the Agent and Workers are all local actors, they don't expose themselves to the network so they cannot find each other directly. Their messages must pass through an intermediate actor, which is the Manager. Due to this, if the Manager shutdowns accidentally, the bridge connects the Agent and Workers will break, they cannot communicate with each other anymore.

In order to solve the problem, the Agent will publish itself after initialization.

How should we design Workers (How large should an entity be)?

Just one type of Worker handles everything?

Pros:

Cons:

Use different types of Worker to handle different operators?

Pros:

Cons:

shengquan-ni commented 6 years ago

5th Weekly Update

Remaining Problems

Priority-aware mailbox (This feature in CAF has a bug)

Solution 1: Use a delegate actor instead of calling remote_spawn.

Solution 2: Modify the source code to enable priority-aware mailbox by default.

template <class Handle, class T, class... Ts>
struct dyn_spawn_class_helper {
  Handle& result;
  actor_config& cfg;
  void operator()(Ts... xs) {
    CAF_ASSERT(cfg.host);
    //change "no_spawn_options" to "priority_aware" may work
    result = cfg.host->system().spawn_class<T, no_spawn_options>(cfg, xs...);
  }
};

Instant pause

On normal workers: By enabling priority-aware mailbox, they should be able to pause immediately.

On blocking workers (doable): Create a monitor actor and a boolean flag, which both of the monitor actor and the blocking worker can access it(but only the monitor actor can modify it). And use while(flag) in worker's blocking behaviors.

code snippet:

class Blocker :public event_based_actor
{
public:
    Blocker(actor_config& cfg,actor a) : event_based_actor(cfg) 
   {
        flag_ptr = new bool();
        *flag_ptr = true;
        send(a, (uintptr_t)flag_ptr);
        send(this, 1);
    }

    behavior make_behavior() override {
        return
        {
            [=](int i)
            {
                cout << "blocking start" << endl;
                while (*flag_ptr)
                {
                        ;
                }
                cout << "blocking end" << endl;
            }
        };
    }
private:
    bool* flag_ptr;
};

class Monitor :public blocking_actor
{
public:
    Monitor(actor_config& cfg) :blocking_actor(cfg){}
    void act() override
    {
        bool blocking=true;
        receive_while(blocking)
        (
            [&](bool flag)
            {
                *flag_ptr = flag;
            },
            [&](uintptr_t flag)
            {
                flag_ptr = (bool*)flag;
            },
            [&](const exit_msg& x)
            {
                blocking=false;
            }
        );
    }
private:
    bool* flag_ptr;
};
shengquan-ni commented 6 years ago

6th Weekly Update

Shortcomings of CAF

Actor Creation

shengquan-ni commented 5 years ago

finished evaluation, close issue