rapidsai / ucx-py

Python bindings for UCX
https://ucx-py.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
121 stars 58 forks source link

EndpointReuse avoiding collisions on tag send/recv #586

Open felipeblazing opened 4 years ago

felipeblazing commented 4 years ago

We would have liked blazing to have its own ucx endpoint for sending and receiving tag messages in ucx. This is not possible if we want to leverage cuda_ipc.

In order to use cuda_ipc we need to have the same endpoints for different processes so that they can share the ipc handles so that a process does not try to open the same mem handle more than once. Because we are using the same end care must be taken if we plan on having different listeners on the same end_point to make sure that the tags do not match between the different listeners when we are probing for tags.

Right now the way we had set up blazing to work with ucx is that it polls for messages that begin a transmission looking at only the last 2 bytes in the tag. If the last two bytes in the tag are 0x0000 it says this is a message that is beginning the transmission of a dataframe. The tag mask is something like 0x0000...FFFF. We realize that this could be very problematic as now we have a chance of probing messages that dask was sending instead of ones meant for us.

It seems like we need a way to ensure that when we are reusing endpoints that we cannot accidentally intercept each others messages. Perhaps we can use some of the bits in the tag to indicate who is the originator of some message so we can make sure to only filter those whose bits match the origin. Right now it feels like it would not be hard to accidentally intercept messages from cuml or dask.

Something like. The last n bits indicate which library a tag belongs to. When you create your listener using EndpointReuse.create_listener the listener you get back gets some bits that you need to make sure are set appropriately when you generate tags that you want to communicate. This way each library could be sure not to intercept messages that were not meant for it with ucp_tag_probe methods.

pentschev commented 4 years ago

@madsbk @quasiben @jakirkham do you guys have any ideas here? I suggested having part of the tag indicating who's the owner of that message, e.g., Dask, cuML or BlazingSQL. A huge downside of that approach would be that every library would need to do some kind of handling with regards to the tag which can make things more confusing.

Right now for virtually any workflow using CUDA IPC we need to reuse the endpoints, so I don't see any other way we can resolve both issues. Any suggestions are appreciated. :)

felipeblazing commented 4 years ago

@cjnolet Do you guys have a solution for this in CUML or will you be subjected to the same potential pitfalls and tag collision?

felipeblazing commented 4 years ago

My ultimate solution would be for UCX itself to be aware that this ipc handle already exists and manage that at the ucp_worker level but I believe I heard from people that know more about UCX than I do that its currently the end point which manages that lifecycle.

quasiben commented 4 years ago

I can see where endpoint reuse across libraries could be a problem as you've outlined @felipeblazing . I don't think cuML is using EndpointReuse however and instead relies on getting the endpoints from dask or generating it themselves? Again, @cjnolet would know.

Do you have thoughts on what would then be required in dask/cuML/blazing to support byte matching ?

lf = ucx_create_listener(...., bytes=0x0001)
ep = await ucx_create_endpoint(ip, port...bytes=0x001)

And then have some social contract for other libraries to not use 0x0001 ?

felipeblazing commented 4 years ago

And then have some social contract for other libraries to not use 0x0001 ?

This is one option. We get everyone together and we make a social agreement and make some kind of file in the github repo that shows those agreements and people can PR to get a value allocated to them. Another option would be something like requiring libraries to register with some process global context that gives them the bits that are to be used. So like dask would ask it and get 0x0000 then cuml woudl ask asnd get 0x0001 blazing asks and gets 0x0002.

Do you have thoughts on what would then be required in dask/cuML/blazing to support byte matching ?

Off the top of my head we would need to agree on a part of the TAG that we ALL have to match for and to have unique values set in that part of the tag for every library. For blazing this wouldn't be a very big change and I don't think that it would be too onerous to most libraries to have to do this.