Helmholtz-AI-Energy / perun

Perun is a Python package that measures the energy consumption of you applications.
https://perun.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
49 stars 2 forks source link

Is there a Python API to collect metrics without writing? #140

Open pfackeldey opened 1 month ago

pfackeldey commented 1 month ago

Dear perun devs,

thank you very much for this package!

I want to use perun to track energy consumption of multiple functions, but not print or write the energy consumptions immediately. I'd like to collect all of them in one python process and create a summary table by myself. Could you explain how one can achieve that?

Best, Peter

JuanPedroGHM commented 1 month ago

Hi @pfackeldey,

Could you add a bit more context about what you want to do?

perun only outputs the text report when the script or function being monitored exits. It creates a text report, but also creates an hdf5 file that contains all the raw data, along with some statistics about each hardware device and compute node used. An easy way to explore the data is using https://myhdf5.hdfgroup.org/ or any library that allows you to handle hdf5 files.

Another way to obtain the raw data is to change the output format to csv, either on the command line:

perun monitor --format csv main.py

You can also change it on the function decorator or on the configuration file. More info here

At the moment, the only official API is the decorator interface and the command line.

pfackeldey commented 1 month ago

Hi @JuanPedroGHM,

I'd like to measure the energy consumption of a dask cluster. Now keep in mind, that these are typically distributed, so I want to track the energy consumption of each dask worker. Once the processing is done, I'd like to collect all metrics and accumulate them. Using perun python_script.py won't work, as the dask workers are not living inside this python process. Using @monitor() will print (or write) the resource consumption within each worker. What I'm interested in is to keep the measured metrics in memory (e.g. as a dict / struct / ...), and not write or print them. Something like:

metrics = monitor(dask_task)

Then I'd be able to collect these metrics objects and accumulate them once the dask cluster is done with processing. A metric could even support __iadd__ and __add__, so accumulation would be straight forward.

Does that make sense to you?

Best, Peter

JuanPedroGHM commented 1 month ago

Hi @pfackeldey,

perun works by spawning a sampling subprocess in each computational node, given that they are able to communicate using MPI. That means that the raw data has no information about the individual processes or functions being run. Information about individual functions is extracted only after the sampling subprocess exits and perun collects the information from all nodes, using timestamps about the start and exit time of the functions that are decorated with @monitor().

What this means, is that there is no way at the moment to extract energy values or any other metric from the function while the application is still running, only after the full program has exited. It is not really meant to be used as a live monitoring tool.

With the right setup, perun can work fairly well with dask, if it is using MPI in the background as a distribution library, although some timestamps about function execution might be off because of how dask schedules and executes the computational graph.

I can provide some example setups where we managed to benchmark some dask scripts using perun if you are interested.

Best, Juan

pfackeldey commented 1 month ago

Oh, got it! Sure, please share your example setups to benchmark some dask scripts!

JuanPedroGHM commented 1 month ago

Hi @pfackeldey ,

here is a single file that measures the time it takes dask to do PCA on chunks of a large 2D array. This depends on dask_mpi, mpi4py and OpenMPI to properly setup a dask cluster that is compatible with perun. The PCA function is from scikit-learn.

import os

from mpi4py import MPI
comm = MPI.COMM_WORLD

from dask_mpi import initialize
nthreads = int(os.environ.get("N_THREADS", 1))
initialize(comm=comm, nthreads=nthreads)

import dask
import dask.distributed
import dask.array as da
from sklearn.decomposition import PCA

from perun import monitor

def main():
    # Initialize the Dask-MPI environment
    with dask.distributed.Client() as client:
        print(f"Rank {comm.Get_rank()} ->  Connected to Dask client:", client)

        # Define a Dask array with random data
        print(f"Rank {comm.Get_rank()} -> Computing array")
        x = da.random.random((100000, 100000), chunks=(5000, 5000))
        print(x)
        print(f"Rank {comm.Get_rank()} -> Computed array array")

        def create_processing_function(comm):
            @monitor()
            def process_function(data):
                print(f"Rank {comm.Get_rank()} -> Processing data")
                pca = PCA(n_components=10)
                return pca.fit_transform(data)
            return process_function

        pca = create_processing_function(comm)
        # Use Dask to parallelize the computation
        result = x.map_blocks(pca, dtype=float).compute()

        print("Result of the computation:", result)

    print(f"Rank {comm.Get_rank()} -> Client closed")

if __name__ == "__main__":
    main()
    print("MPI Closed")

To start the script with perun and MPI, use the following command:

N_THREADS=8 mpirun -n 4 perun monitor main.py

Let me know if this works for you.