rapidsai / ucx-py

Python bindings for UCX
https://ucx-py.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
121 stars 58 forks source link

How to create_listener/create_endpoint through PCIe or shared memory when two kube pod in one machine? #1084

Open MoFHeka opened 6 days ago

MoFHeka commented 6 days ago

There is only example for transport data through network card.

pentschev commented 1 day ago

There's no such thing as a listener through shared memory, and I'm not sure what PICE means, perhaps you mean PCIe? A listener is necessarily bound to some sort of networking interface with an IP address and a port, just like a socket. However, this is just the means to establish connection between two processes, UCX will nevertheless use shared memory if that's identified to provide better performance than other available transports between those two processes. You can confirm that by setting UCX_TLS to limit to the transports you want to use it, for example running the internal UCX-Py benchmark with TCP only I see the following result:

$ UCX_TLS=tcp python -m ucp.benchmarks.send_recv --no-detailed-report --n-iter 10000 --n-
bytes 100000 --backend ucp-async -c 0 -b 1 --no-error-handling
Server Running at 10.33.225.163:57742
Client connecting to server at 10.33.225.163:57742
Roundtrip benchmark
================================================================================
Iterations                | 10000
Bytes                     | 97.66 kiB
Object type               | numpy
Reuse allocation          | False
Transfer API              | TAG
UCX_TLS                   | tcp
UCX_NET_DEVICES           | all
================================================================================
Device(s)                 | CPU-only
Server CPU                | 1
Client CPU                | 0
================================================================================
Bandwidth (average)       | 470.28 MiB/s
Bandwidth (median)        | 482.63 MiB/s
Latency (average)         | 202786 ns
Latency (median)          | 197598 ns

If I now enable shared memory as well with UCX_TLS=tcp,sm (note that TCP is necessary to open a listener and establish communication between endpoints) I see the following result:

$ UCX_TLS=tcp,sm python -m ucp.benchmarks.send_recv --no-detailed-report --n-iter 10000 --n-bytes 100000 --backend ucp-async -c 0 -b 1 --no-error-handling
Server Running at 10.33.225.163:58855
Client connecting to server at 10.33.225.163:58855
Roundtrip benchmark
================================================================================
Iterations                | 10000
Bytes                     | 97.66 kiB
Object type               | numpy
Reuse allocation          | False
Transfer API              | TAG
UCX_TLS                   | tcp,sm
UCX_NET_DEVICES           | all
================================================================================
Device(s)                 | CPU-only
Server CPU                | 1
Client CPU                | 0
================================================================================
Bandwidth (average)       | 1.03 GiB/s
Bandwidth (median)        | 1.06 GiB/s
Latency (average)         | 90168 ns
Latency (median)          | 87888 ns

This is now more than twice as fast as the TCP only case seen before, because UCX automatically switches to shared memory.

If you nevertheless need to communicate without a network interface it's still possible to do so by creating an endpoint directly to a worker (without a listener), this test is a good example of how this can be done, essentially you need to get the worker's address and transfer it through some communication channel (like a queue in a multiprocess application), and then create an endpoint to the remote worker, after that everything should work pretty much in the same way, except you'll also need to specify a tag yourself and force it.

Also a note specifically on shared memory: endpoint error handling is not currently supported by UCX and thus you must disable it by specifying endpoint_error_handling=False (and --no-error-handling in the benchmark above) in create_listener/create_endpoint/create_endpoint_from_worker_address, otherwise UCX will just quietly disable it and fallback to another transport that supports error handling (like TCP).