It4innovations / hyperqueue

Scheduler for sub-node tasks for HPC systems with batch scheduling
https://it4innovations.github.io/hyperqueue
MIT License
266 stars 20 forks source link

Adding a function to get results back from Python API function #715

Open RINO-GAELICO opened 2 weeks ago

RINO-GAELICO commented 2 weeks ago

It seems that the Client in Python API is able to wait for jobs using the wait_for_jobs() function, but it is not able to receive the results that are returned by that same job.

job = Job() job.function(my_function) submitted = client.submit(job) client.wait_for_jobs(submitted)

?? there is no function to receive results

Kobzol commented 2 weeks ago

Hi! We call the feature that you're describing (the ability for a task to return data that is then used as an input for downstream tasks) data transfer.

Although this has been partially implemented in HQ in the past, it has never been exposed as a user-facing feature. Mostly because we were primarily focusing on the command line interface, where data transfers are not that relevant.

In the Python API, data transfers are of course incredibly useful, however HQ does not support them yet. The Python API is highly experimental and we'll be probably making large changes to it, in part also to incorporate data transfers, which are definitely on our roadmap. This needs a lot of design work, because data transfers have highly nontrivial interaction with fault tolerance, multi-node tasks amd other concepts.

If you absolutely need data transfers now, you can either employ some hack (e.g. transfer data through the filesystem), or use a different tool, emg. Dask with the PBS/Slurm plugin (Dask jobqueue) that can automatically create allocations for you.

RINO-GAELICO commented 2 weeks ago

Thank you @Kobzol for your answer. I am working at NERSC during the summer with @tylern4 and I was interested in developing a feature that could allow for returning the value from Python functions executed via HyperQueue. The idea is to have functionality similar to what libraries like Python multiprocessing, Parsl/Globus Compute, and HPE's Dragon provide with Python futures.

I was looking at the way the Python function is wrapped up and pickled. here
As an initial step, I am considering catching the return value of the object-task and finding a way to either write it to a file or return it directly to the Client. Would you have any guidance or suggestions on how to approach this? Any insights on where to start or potential pitfalls to avoid would be greatly appreciated.

Kobzol commented 2 weeks ago

So, just to clarify, implementing data transfers into HQ is highly non-trivial and it needs a lot of design work first. HyperQueue actually has a partial implementation of data transfers, but it is not currently exposed to users, because the hard part is the design work needed to make it compatible with the programming model of HQ. Me and @spirali plan to work on data transfers in the upcoming months, but it is not our highest priority at the moment. In theory, we could mentor someone to work on them, but probably not during this summer (definitely I don't have time for mentoring, maybe @spirali has, IDK).

That being said, if you just want to hack something for your personal usage (rather than something that would be upstreamed directly into HQ), there are definitely some approaches you could take! Depends on how you want to consume the outputs of tasks. For example, you could modify the code that runs a task to take the output of the task function, serialize it (e.g. with [cloud]pickle), and store it to disk under the name/ID of the task. And then you can again deserialize it and use it in the dependent tasks, but how exactly should do that is the hard (design) part :)

RINO-GAELICO commented 2 weeks ago

@Kobzol Thank you for clarifying that. Given what you said, for the moment it would be nice to implement something that can be used at Nersc. I'd like to follow your suggestion and try to modify the code that runs the task to take the output of the function. Should I be looking inside crates/pyhq/python/hyperqueue or I should go check the tako framework and how it serialize/deserialize the tasks?

spirali commented 2 weeks ago

Hi,

There are several possible options:

1) Saving results into file is relatively easy. The only problem is that you may potentially create many files. The another problem is that unnecessary files have to be deleted outside of HQ.

2) Sending data to the client immediately. HQ does not provide any direct connections from worker to client directly (as it may not be always possible, consider creating a connection from compute node to your laptop where a client may run). It is possible to extend server as proxy, but we generally want to avoid to send data transfers through HQ server, as its main purpose is to be an orchestration of the computation.

But in general, you may create a connection on your own and create a connection from ending task to a python code that emitted HQ submit. I am actually using it in one of my experimental pipelines. But it is a quite special wrt HQ usage, because it assumes that some code has to run on the client for the whole computation time.

3) Implement complete data layer in HQ that allows peer to peer connection between computing tasks. It is actually my goal in the following months, but it is super difficult especially wrt to resilience. (Assume that you have computed a result of task A on a worker and allocation is killed by SLURM, then a new allocation is spawned and task B wants reads the results from A ...)

Note: There is a streaming layer in HQ used for streaming stdout/stderr. It has its own difficulties and once the complete data layer is implemented; the streaming code will be removed.

RINO-GAELICO commented 1 week ago

Thanks @spirali, I will try to work on task_main() https://github.com/It4innovations/hyperqueue/blob/4a37594fabe54934d2ea5e2a77b204112e29e359/crates/pyhq/python/hyperqueue/task/function/init.py#L51 and see if I can pipe the results back to Client. On a different note, I noticed that submitting multiple python functions results in a considerable delay due to the respawning of Python environment for each function/task. Do you have a recommendation on how to improve the runtime of multiple functions?

Kobzol commented 1 week ago

This is very interesting to me! What is your use-case, how many tasks are you spawning? Starting the interpreter should take few tens of milliseconds, so the overhead should only start showing up if you start a larger number of tasks.

We do have a plan for "stateful workers", where the Python interpreter would be cached between task invocations, but it's still far off.

tylern4 commented 1 week ago

As a test usecase @RINO-GAELICO is looking at distributed inference from some pytorch models. So it's loading torch that starts to become a bottleneck.

time python -c 'print("hello")'
hello
python -c 'print("hello")'  0.06s user 0.03s system 96% cpu 0.098 total
# vs
$ time python -c 'import torch; print("hello")'
hello
python -c 'import torch; print("hello")'  14.53s user 0.92s system 594% cpu 2.600 total

I suspect for other complex python packages this will probably be the same so stateful workers could really speedup python processes.

Kobzol commented 1 week ago

Right. Yeah, loading torch or tensorflow at the beginning of the script will definitely be a bottleneck. Something like an actor framework would probably be much better for that (e.g. https://distributed.dask.org/en/latest/actors.html).

Could you talk a bit more about your use-case? Distributed ML inference sounds like something that is very poorly suited for HQ, to be honest :) At least for a latency-bound situation, which seems like what you're describing. If the inference is bandwidth bound (e.g. I need to infer a million images on the disk), then HQ would be useful, but in that case you probably wouldn't be even thinking about the startup cost.

tylern4 commented 1 week ago

I'm interested in how we support FaaS like use cases on HPC platforms and HQ seemed interesting as a nice backend to look at for running functions since it was already working on HPC systems and because I'm interested in rust. The actual use case for our HPC users might not be torch or tensorflow but loading a whole bunch of python libraries, doing some computation, and getting back a result might be so that's really what we're trying to test.

The project is a bit of an investigation of what tools work well on our HPC system and for what types of tasks. So like you say maybe HQ isn't the right tool for an inference engine but we're planning to look at other use cases as well, including compiled programs which might work better.

Kobzol commented 1 week ago

HyperQueue is primarily designed for batch computing, i.e. you have a task graph with many tasks that you want to compute, you submit it to HQ, and then you wait until everything is done. The primary metric is throughput. FaaS is a very different paradigm, where you receive computational requests in an unknown order and at an unknown time, and you need to have some infrastructure prepared to start executing the task. The primary metric is latency. While HQ could in theory be used for this, it's not designed for such use-cases at all and probably it's better to use a different tool.