openucx / ucx

Unified Communication X (mailing list - https://elist.ornl.gov/mailman/listinfo/ucx-group)
http://www.openucx.org
Other
1.15k stars 427 forks source link

How to Use Shared Memory for Intra-node Inter-process Data Transfer? #10016

Open Clownier opened 3 months ago

Clownier commented 3 months ago

We have identified that deploying multiple modules for data transmission on each server within the cluster leads to second-level latency tails in RDMA cluster data transfers, as detailed in issue 9976. After reviewing some research papers, we suspect that the root cause might be loopback traffic among multiple modules running on the same server.

Consequently, we are exploring the possibility of leveraging Inter-Process Communication (IPC) mechanisms to bypass loopback traffic within a single machine, while minimizing major modifications to our existing project.Our current project follows the tag matching approach similar to the hello world example for multi-server to multi-server (including self) data transfers.

We are seeking guidance on how to adapt our existing logic to use shared memory for transmission between connections on the same machine, as well as inquiring if there are any relevant demo examples or tutorials that could serve as a reference. Your assistance in this matter would be greatly appreciated.

yosefe commented 3 months ago

@Clownier Shared memory transports should be enabled by default. Can you pls provide the following information:

  1. How ucp_init and ucp_ep_create are used (e.g which parameters/flags are passed)
  2. Run the program with "UCX_INFO=y" and "UCX_PROTO_INFO=y" and provide the output
Clownier commented 3 months ago
  1. parameters/flags @yosefe
  2. ucp_init and ucp_ep_create use parameters/flags:
    
    /* Create context */
    ucp_params_t ucp_params;
    ucp_params.field_mask   = UCP_PARAM_FIELD_FEATURES |
    UCP_PARAM_FIELD_REQUEST_INIT |
    UCP_PARAM_FIELD_REQUEST_SIZE;
    ucp_params.features     = UCP_FEATURE_TAG |
    UCP_FEATURE_STREAM |
    UCP_FEATURE_WAKEUP;
    ucp_params.request_init = RequestInit;
    ucp_params.request_size = sizeof(UcxRequest);
ucs_status_t status = ucp_init(&ucp_params, nullptr, &context_);
if (status != UCS_OK) {
    UCX_LOGV_WARNING("ucp init failed :%s", ucs_status_string(status));
    return false;
}
/* Create worker */
ucp_worker_params_t worker_params;
worker_params.field_mask  = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
status = ucp_worker_create(context_, &worker_params, &worker_);

2. It seems that our version does not support the parameters UCX_INFO=y and UCX_PROTO_INFO=y. When using the io_demo, the output you received indicates this incompatibility.

[1721287668.093594] [DEMO] Starting io_demo pid 14174 on [1721287668.093667] [DEMO] Command line: io_demo -p 8002 [1721287668.093694] [DEMO] UCX library path: io_demo [1721287668.108859] [UCX] created context 0x6d4b70 with TAG [1721287668.110699] [] parser.c:1909 UCX WARN unused env variables: UCX_INFO,UCX_PROTO_INFO (set UCX_WARN_UNUSED_ENV_VARS=n to suppress this warning) [1721287668.110729] [UCX] created worker 0x6e0dc0 [1721287668.110915] [UCX] started listener 0x6dce60 on 0.0.0.0:8002 [1721287669.110932] [DEMO] active: 0/0, buffers:0 [1721287670.110933] [DEMO] active: 0/0, buffers:0 [1721287671.110936] [DEMO] active: 0/0, buffers:0 ^CRun-time signal handling: 2 [1721287672.107352] [UCX] destroy connections [1721287672.107382] [UCX] iomsg receive request 0x7199d8 failed: Request canceled



3. The environment, business context, and version information we are using are the same as those mentioned in issue https://github.com/openucx/ucx/issues/9976.
- UCX version used: v1.12.0
- To ensure that the process can communicate with other machines over RDMA,the environment variable UCX_TLS is configured with setenv("UCX_TLS", "rc_x,tcp", 1); in the code.
Clownier commented 3 months ago

@Clownier Shared memory transports should be enabled by default. Can you pls provide the following information:

  1. How ucp_init and ucp_ep_create are used (e.g which parameters/flags are passed)
  2. Run the program with "UCX_INFO=y" and "UCX_PROTO_INFO=y" and provide the output

@yosefe Or we can transform the problem: How can we make the connection to other servers use UCX's RDMA transmission and the connection to this server use UCX's TCP transmission in one process?

yosefe commented 3 months ago
  1. can you pls post also the params for ucp_ep_create?
  2. regardless, this version is pretty old, i'd strongly recommend upgrading to v1.17.0
  3. setting UCX_TLS=rc_x,tcp disables shared memory transports. I'd suggest to not set UCX_TLS at all (what happens if you don't?) or set it to UCX_TLS=rc_x,tcp,sm
Clownier commented 3 months ago
  1. can you pls post also the params for ucp_ep_create?
  2. regardless, this version is pretty old, i'd strongly recommend upgrading to v1.17.0
  3. setting UCX_TLS=rc_x,tcp disables shared memory transports. I'd suggest to not set UCX_TLS at all (what happens if you don't?) or set it to UCX_TLS=rc_x,tcp,sm

@yosefe Thank you for your reply, we have been using UCX_TLS to configure the transmission method before, and did not notice ucp_config. At present, we have removed UCX_TLS = rc_x, tcp, and passed ucp_config_modify (ucp_config, "TLS", "tcp, shm"); 'Implemented local loopback traffic goes TCP instead of RDMA, and can see local traffic goes Device lo through the nload tool. image The configuration is as follows:

/********* Server *************/
    ucp_params_t ucp_params;
    ucp_params.field_mask   = UCP_PARAM_FIELD_FEATURES |
                              UCP_PARAM_FIELD_REQUEST_INIT |
                              UCP_PARAM_FIELD_REQUEST_SIZE;
    ucp_params.features     = UCP_FEATURE_TAG |
                              UCP_FEATURE_STREAM |
                              UCP_FEATURE_WAKEUP;
    ucp_params.request_init = RequestInit;
    ucp_params.request_size = sizeof(UcxRequest);
    ucs_status_t status = ucp_init(&ucp_params, NULL, &context_);
    ucp_worker_params_t worker_params;
    worker_params.field_mask  = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
    worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
    status = ucp_worker_create(context_, &worker_params, &worker_);

    ucp_ep_params_t ep_params;
    ep_params.field_mask   = UCP_EP_PARAM_FIELD_CONN_REQUEST;
    ep_params.conn_request = conn_req;
    ep_params.field_mask      |= UCP_EP_PARAM_FIELD_ERR_HANDLER |
                                 UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE;
    ep_params.err_mode         = UCP_ERR_HANDLING_MODE_PEER;
    ep_params.err_handler.cb   = ErrorCallback;
    ep_params.err_handler.arg  = reinterpret_cast<void*>(this);
    ucs_status_t status = ucp_ep_create(context_.worker(), &ep_params, &ep_);

/********* client not connect to local host *************/
    ucp_params_t ucp_params;
    ucp_params.field_mask   = UCP_PARAM_FIELD_FEATURES |
                              UCP_PARAM_FIELD_REQUEST_INIT |
                              UCP_PARAM_FIELD_REQUEST_SIZE;
    ucp_params.features     = UCP_FEATURE_TAG |
                              UCP_FEATURE_STREAM |
                              UCP_FEATURE_WAKEUP;
    ucp_params.request_init = RequestInit;
    ucp_params.request_size = sizeof(UcxRequest);
    ucs_status_t status = ucp_init(&ucp_params, NULL, &context_);
    ucp_worker_params_t worker_params;
    worker_params.field_mask  = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
    worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
    status = ucp_worker_create(context_, &worker_params, &worker_);

    ucp_ep_params_t ep_params;
    ep_params.field_mask       = UCP_EP_PARAM_FIELD_FLAGS       |
                                 UCP_EP_PARAM_FIELD_SOCK_ADDR;
    ep_params.flags            = UCP_EP_PARAMS_FLAGS_CLIENT_SERVER;
    ep_params.sockaddr.addr    = saddr;
    ep_params.sockaddr.addrlen = addrlen;
    // create endpoint
    ep_params.field_mask      |= UCP_EP_PARAM_FIELD_ERR_HANDLER |
                                 UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE;
    ep_params.err_mode         = UCP_ERR_HANDLING_MODE_PEER;
    ep_params.err_handler.cb   = ErrorCallback;
    ep_params.err_handler.arg  = reinterpret_cast<void*>(this);
    ucs_status_t status = ucp_ep_create(context_.worker(), &ep_params, &ep_);

/********* client connect to local host *************/

    ucp_config_t* ucp_config = nullptr;
    ucs_status_t status = ucp_config_read(NULL, NULL, &ucp_config);
    status = ucp_config_modify(ucp_config, "TLS", "tcp,shm");
    ucp_params_t ucp_params;
    ucp_params.field_mask   = UCP_PARAM_FIELD_FEATURES |
                              UCP_PARAM_FIELD_REQUEST_INIT |
                              UCP_PARAM_FIELD_REQUEST_SIZE;
    ucp_params.features     = UCP_FEATURE_TAG |
                              UCP_FEATURE_STREAM |
                              UCP_FEATURE_WAKEUP;
    ucp_params.request_init = RequestInit;
    ucp_params.request_size = sizeof(UcxRequest);
    ucs_status_t status = ucp_init(&ucp_params, ucp_config, &context_);

    ucp_ep_params_t ep_params;
    ep_params.field_mask       = UCP_EP_PARAM_FIELD_FLAGS       |
                                 UCP_EP_PARAM_FIELD_SOCK_ADDR;
    ep_params.flags            = UCP_EP_PARAMS_FLAGS_CLIENT_SERVER;
    ep_params.sockaddr.addr    = saddr;
    ep_params.sockaddr.addrlen = addrlen;
    // create endpoint
    ep_params.field_mask      |= UCP_EP_PARAM_FIELD_ERR_HANDLER |
                                 UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE;
    ep_params.err_mode         = UCP_ERR_HANDLING_MODE_PEER;
    ep_params.err_handler.cb   = ErrorCallback;
    ep_params.err_handler.arg  = reinterpret_cast<void*>(this);
    ucs_status_t status = ucp_ep_create(context_.worker(), &ep_params, &ep_);

But we found that after configuring ucp_config_modify(ucp_config, "TLS", "tcp,shm");`, the loopback traffic went through TCP, but the delay increased, causing our overall business performance (IOPS and bandwidth) to drop a lot. So I would like to ask how do we enable the local loopback traffic to go to shared memory transmission, and what intuitive way to judge whether the shared memory is gone?

In addition, we also want to update the version, but after testing, we found that the 1.12 version could not build chain access normally with the subsequent version (tested the 1.14 version), and the error message The client reported an error Operation rejected by remote peer, and the server reported an error [ucp_ep_create () failed: Invalid parameter]. This made it impossible for us to upgrade one module by one machine. Since our business has gone live, it is unacceptable to stop all services and then pull them all up again. Do you have any suggestions in this regard?

Clownier commented 3 months ago

@yosefe We use

sudo UCX_TLS=tcp,sm,shm io_demo -p 8002
sudo UCX_TLS=tcp,sm,shm io_demo 127.0.0.1:8002 -o write -i 20000000 -w 512 -d 4096:4096

to test on the same machine, expecting to communicate through shared memory, but in fact it seems to use lo for TCP transmission.nload is shown in the figure: image

We want to know how to make it transmit through shared memory?

yosefe commented 3 months ago

@Clownier can you try setting UCX_SYSV_ERROR_HANDLING=y in addition to UCX_TLS=tcp,sm ?

Clownier commented 3 months ago

can you try setting UCX_SYSV_ERROR_HANDLING=y in addition to UCX_TLS=tcp,sm ?

@yosefe Thanks for your reply. We used io_demo for testing. The server directly runs io_demo. The client and results are as follows

  1. UCX_TLS=tcp,shm,sm UCX_MM_ERROR_HANDLING=n io_demo 127.0.0.1 -o write -i 20000000 -w 512 -d 4096:4096 a. [1722321594.284402] [DEMO] total min:106690 max:106690 total:106690 | read 0 MBs min:0(127.0.0.1) max:0 total:0 | write 416.719 MBs min:106690(127.0.0.1) max:106690 total:106690 | active:1/1 buffers:1 b. nload——Device lo [127.0.0.1]——Curr: 3.72 GBit/s
  2. UCX_TLS=tcp,shm,sm,self io_demo 127.0.0.1 -o write -i 20000000 -w 512 -d 4096:4096 a. [1722321889.070131] [DEMO] total min:105170 max:105170 total:105170 | read 0 MBs min:0(127.0.0.1) max:0 total:0 | write 410.795 MBs min:105170(127.0.0.1) max:105170 total:105170 | active:1/1 buffers:1 b. nload——Device lo [127.0.0.1]——Curr: 3.70 GBit/s
  3. UCX_TLS=tcp,shm,sm UCX_MM_ERROR_HANDLING=y io_demo 127.0.0.1 -o write -i 20000000 -w 512 -d 4096:4096 a. [1722321703.059312] [DEMO] total min:215660 max:215660 total:215660 | read 0 MBs min:0(127.0.0.1) max:0 total:0 | write 842.372 MBs min:215660(127.0.0.1) max:215660 total:215660 | active:1/1 buffers:1 b. nload——Device lo [127.0.0.1]——Curr: 624.10 MBit/s
  4. UCX_TLS=rc_x,tcp,shm,sm io_demo 127.0.0.1 -o write -i 20000000 -w 512 -d 4096:4096 a. [1722321957.636130] [DEMO] total min:566370 max:566370 total:566370 | read 0 MBs min:0(127.0.0.1) max:0 total:0 | write 2212.29 MBs min:566370(127.0.0.1) max:566370 total:566370 | active:1/1 buffers:16 b. nload——Device lo [127.0.0.1]——Curr: 9.27 kBit/s

After adding the configuration UCX_MM_ERROR_HANDLING=y, it seems that most of the bandwidth is transmitted through shared memory, but there seems to be no relevant documentation describing the specific reason? Why does this configuration bring about these changes? In addition, although the bandwidth is increased by 1 times compared to TCP transmission after using UCX_MM_ERROR_HANDLING=y, it is still far from the RDMA transmission bandwidth. Is this the upper limit of shared memory transmission? Is there any other way to use shared memory transmission to make its performance close to RDMA transmission?