spcl / fmi

Function Message Interface (FMI): library for message-passing and collective communication for serverless functions.
https://mcopik.github.io/projects/fmi/
15 stars 10 forks source link

Some examples about how to send string using FMI::Communicator and how to delopy the FMI in distributed nodes #20

Open YingHREN opened 2 months ago

YingHREN commented 2 months ago

Hi, this is excellent work. I want to kindly ask if you could give me some examples about how to send string using FMI::Communicator(I tried Data<void*> but the bcast seems failed) and how to deploy the FMI in distributed nodes, I think I should deploy TCPPunch in one node. And give different arguments to different nodes?

constexpr int num_peers = 4;
std::vector<FMI::Comm::Data<void*>> d(num_peers);
const char* message = "abc";

d[0] = FMI::Comm::Data<void*>(const_cast<char*>(message), std::strlen(message) + 1);

int peer_id = 0;
for (int i = 1; i < num_peers; i++) {
    int pid = fork();
    if (pid == 0) {
        peer_id = i;
        break;
    }
}

auto ch = std::make_unique<FMI::Communicator>(peer_id, num_peers, config_path, comm_name);
ch->bcast(d[peer_id], 0);

char* received_data = reinterpret_cast<char*>(d[peer_id].get());

if (std::strcmp(received_data, "abc") == 0) {
    std::cout << "Peer " << peer_id << " received correct data: " << received_data << std::endl;
} else {
    std::cerr << "Peer " << peer_id << " received incorrect data: " << received_data << std::endl;
}

For this code only output "Peer 0 received correct data: abc" no other process output

YingHREN commented 2 months ago
constexpr int num_peers = 4;
std::vector<FMI::Comm::Data<void*>> d(num_peers);
const char* message = "abc";

int peer_id = 0;
for (int i = 1; i < num_peers; i++) {
    int pid = fork();
    if (pid == 0) {
        peer_id = i;
        break;
    }
}

FMI::Comm::Data<void*>data;
char* data_raw = new char[std::strlen(message) + 1];
data = FMI::Comm::Data<void*>(data_raw, std::strlen(message) + 1);
if(peer_id==0){
   data = FMI::Comm::Data<void*>(const_cast<char*>(message), std::strlen(message) + 1);
}
auto ch = std::make_unique<FMI::Communicator>(peer_id, num_peers, config_path, comm_name);
ch->bcast<void*>(data, 0);

char* received_data = reinterpret_cast<char*>(data.get());

I fix it by this

mcopik commented 2 months ago

@YingHREN Hi, I'm glad to hear you find our project useful!

Example of starting multiple processes by invoking Lambdas; you can just change the lambda invoke to starting a process with arguments:

peers=YOUR_NUMBER_OF_PEERS
ARRAY=()
for peernum in $(seq 1 $peers); do
    peer_id=$(($peernum - 1))
    aws lambda invoke --region eu-central-1 --cli-read-timeout 600 --function-name fmibenchmark --cli-binary-format raw-in-base64-out --payload '{"timestamp": "'$timestamp'", "numPeers": '"$peers"', "peerID":'"$peer_id"', "benchmark":"'$benchmark'" }' &
    PID=$!
    ARRAY+=($PID)
done

for i in "${ARRAY[@]}"; do
  echo "Wait $i"
  wait $i
done
YingHREN commented 2 months ago

Thank you for your reply! And I have another two questions.

First, do you support AlltoAll or could I just do it using multiple scatters?

Second, about the bcast operation in your paper it shows that the result is very close when comparing Redis and TCP when it is 8 receivers. In comparison, I saw that in the paper Sonic(https://www.usenix.org/system/files/atc21-mahgoub.pdf) which you also mention in your paper, they say when the fanout degree increases, the redis or S3 latency will be smaller than direct transfer. You mention in the paper that TCP is the best in all the situations. What do you think of this point?

mcopik commented 2 months ago

@YingHREN I don't think we implemented all-to-all, but you should be able to emulate it with existing algorithms :-)

Regarding bcast, it seems that Sonic paper uses an algorithm where a single root sends the data to all recipients:

With low degrees of parallelism (i.e., low values of K), Direct-Passing achieves lower latency than Remote-Storage. This is because copying the data directly across the two VMs is faster than copying the data twice over the network, to and from the remote storage. However, with increasing values of K, Direct-Passing suffers from the limited network bandwidth of the VM of the sending λ, which becomes the bottleneck as it tries to copy the intermediate data to K VMs simultaneously.

We also notice in our paper that distributing data from one sender to multiple recipient scales well with storage like S3, since it is naturally distributed to multiple servers. However, for TCP, we use a classic HPC algorithm where multiple processes distribute data through a binary tree. This avoids the bottleneck problem created by the limited bandwidth on a single link.

Source: https://github.com/spcl/fmi/blob/main/src/comm/PeerToPeer.cpp#L14

In our paper, TCP performed best for all experiments. Of course, there are many different algorithms for broadcast, e.g., pipeline operations for large payloads. I think that depending on the network performance, payload size, number of clients, different algorithms and mixes of storage and direct passing will perform best.

https://en.wikipedia.org/wiki/Broadcast_(parallel_pattern)

YingHREN commented 2 months ago

Thank you for your explanation!

I am writing a all2all using the ring to best use the bandwidth, I want to send and receive the data from the other node at the same time so I use two threads. But it seems that it is easy to be error that FMI::Utils::TimeoutFMI::Utils::Timeout'. I think the reason is that when one process is receiving another process already ended? Could you give me some suggestions about this. should I make a scheduler to sync every round before another round? (I already set the timeout in json to 10000=10s)

void FMI::Comm::PeerToPeer::all2all(channel_data sendbuf, channel_data recvbuf) {
    std::size_t single_buffer_size = sendbuf.len / num_peers;
    std::vector<char> tmp_recvbuf(single_buffer_size);

    Utils::peer_num send_peer = (peer_id + 1) % num_peers;
    Utils::peer_num recv_peer = (peer_id - 1 + num_peers) % num_peers;

    std::memcpy(recvbuf.buf + peer_id * single_buffer_size, 
                sendbuf.buf + peer_id * single_buffer_size, 
                single_buffer_size);

    for (int round = 0; round < num_peers - 1; ++round) {
        // std::thread send_thread([&]() {
        //     send({sendbuf.buf + send_peer * single_buffer_size, single_buffer_size}, send_peer);
        //     std::cout << "round: " << round << " peer_id: " << peer_id << " send to: " << send_peer << std::endl;
        // });

        std::thread recv_thread([&]() {
            recv({tmp_recvbuf.data(), single_buffer_size}, recv_peer);
            std::cout << "round: " << round << " peer_id: " << peer_id << " recv from: " << recv_peer << std::endl;
        });

        send({sendbuf.buf + send_peer * single_buffer_size, single_buffer_size}, send_peer);
            std::cout << "round: " << round << " peer_id: " << peer_id << " send to: " << send_peer << std::endl;

        recv_thread.join();

        std::memcpy(recvbuf.buf + recv_peer * single_buffer_size, tmp_recvbuf.data(), single_buffer_size);
        tmp_recvbuf.clear();
        send_peer = (send_peer + 1) % num_peers;
        recv_peer = (recv_peer - 1 + num_peers) % num_peers;
    }
}
round: 0 peer_id: 0 send: 1round: 
0 peer_id: 0 recv: 1
round: 0 peer_id: 1 recv: 0
round: 0 peer_id: 1 send: 0
Peer 1 received data: 2
Peer 1 received data: 2
[2024-09-17 08:53:54.944196] [0x00007e3bc3643640] [error]   0: Error when receiving: Connection refused
Peer 0 received data: 1
Peer 0 received data: 0
mcopik commented 2 months ago

@YingHREN timeout is used here to detect broken TCP connections. Are you starting two processes - on Lambda functions? Or are you debugging it locally?

A dedicated thread should not be necessary - just call send and then follow with a blocking recv.

YingHREN commented 2 months ago

I am debugging locally. About the blocking type send and recv for all2all, I wrote a pairs type all2all and run it well! while it seemed that AWS supports both send and recv bandwidth so maybe make it two threads will be better? https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-network-bandwidth.html

Instance bandwidth specifications apply to both inbound and outbound traffic for the instance. For example, if an instance specifies up to 10 Gbps of bandwidth, that means it has up to 10 Gbps of bandwidth for inbound traffic, and up to 10 Gbps for outbound traffic. The network bandwidth that's available to an EC2 instance depends on several factors, as follows.

mcopik commented 2 months ago

@YingHREN Since you are trying it locally, it is likely that there's simply a deadlock: one rank/process posts a receive that has no matching send in the algorithm, or the sender should post the send but it's not doing it correctly.

Regarding the blocking: send is implemented using a send on TCP's socket so it should not be blocking unless you saturate OS send buffers. I'd recommend first implement it without threads and then trying optimizations to verify that they would actually improve things :) A single-thread solution will be easier to debug.

YingHREN commented 1 month ago

Thanks. I finished the single-thread solution for all2all. It seems to work well for now. And for the double thread send and recv. I guess the reason for the fail is the socket management in the TCPpunch server. Maybe the "sockets vector"

void FMI::Comm::Direct::check_socket(FMI::Utils::peer_num partner_id, std::string pair_name) {
    if (sockets.empty()) {
        sockets = std::vector<int>(num_peers, -1);
    }
    if (sockets[partner_id] == -1) {
        try {
            sockets[partner_id] = pair(pair_name, hostname, port, max_timeout);
        } catch (Timeout) {
            throw Utils::Timeout();
        }

        struct timeval timeout;
        timeout.tv_sec = max_timeout / 1000;

access when creating or getting the sockets because the partner id is the same for double threads and error is "connection refused". But I changed something, it is still not working

mcopik commented 1 month ago

@YingHREN Yes, the implementation assumes that there's one socket per endpoint. Since socket operations are thread-safe, you can use them from multiple threads.

https://stackoverflow.com/questions/1981372/are-parallel-calls-to-send-recv-on-the-same-socket-valid/1981439#1981439