Lightning-AI / pytorch-lightning

Pretrain, finetune and deploy AI models on multiple GPUs, TPUs with zero code changes.
https://lightning.ai
Apache License 2.0
27.93k stars 3.34k forks source link

How to log artifacts on rank > 0? #19453

Open rob-hen opened 6 months ago

rob-hen commented 6 months ago

Bug description

I am using a GPU to generate a result file (e.g. a video, text file, image,...). Then I do self.logger.experiment.log_artifact( self.logger.run_id, artifact_path, log_folder) to log the file. However, this does not work in multi-gpu setup, as the logger for rank >0 is set to DummyLogger. In the multi-GPU setup, I would like each GPU to generate one file, and log all into the same folder (I take care there is no naming collision). How can this be achieved?

cc @awaelchli @borda

What version are you seeing the problem on?

v1.9

How to reproduce the bug

No response

Error messages and logs

# Error messages and logs here please

Environment

Current environment ``` #- Lightning Component (e.g. Trainer, LightningModule, LightningApp, LightningWork, LightningFlow): #- PyTorch Lightning Version (e.g., 1.5.0): #- Lightning App Version (e.g., 0.5.2): #- PyTorch Version (e.g., 2.0): #- Python version (e.g., 3.9): #- OS (e.g., Linux): #- CUDA/cuDNN version: #- GPU models and configuration: #- How you installed Lightning(`conda`, `pip`, source): #- Running environment of LightningApp (e.g. local, cloud): ```

More info

No response

rob-hen commented 6 months ago

At least for logging with MLflow to the local storage, I created a workaround by broadcasting the artifact folder and manually copying the data into that folder. But this will not work with other interfaces, of course.

awaelchli commented 6 months ago

Hey @rob-hen The behavior you described is intentionally designed like this, so it's not a bug. Logging occurs on rank 0. In general, if results from other ranks should be included in the logs, they have to be synchronized/collected first before logging. For example, via self.all_gather() in LightningModule, or by using torchmetrics if it's a metric.

In the multi-GPU setup, I would like each GPU to generate one file, and log all into the same folder (I take care there is no naming collision). How can this be achieved?

If you write a file on each GPU process, and take care of the naming collisions, you can replace

self.logger.experiment.log_artifact( self.logger.run_id, artifact_path, log_folder)

with

wandb.log_artifact( self.logger.run_id, artifact_path, log_folder)

(assuming you are using wandb)

But since this will run on every process, make sure the log_folder is unique to the rank of the process.

This will do what you asked for, but personally I wouldn't do this. I would create the files in the same folder and then log them on rank 0 only using self.logger.experiment for sanity.

rob-hen commented 6 months ago

Thanks for your answer @awaelchli

I am not using wandb but MLFlow. Since all workers use a shared file system, I could have each worker write into a common folder, then synchronize on the finshed result, and then have rank 0 looping through the results and store everything via self.logger.experiment.log_artifact.

Whats the best way for the synchronization? The files are created during validation_step, so I would just need to synchronize on on_validation_epoch_end.

awaelchli commented 6 months ago

Sure, the logger doesn't matter.

Since all workers use a shared file system, I could have each worker write into a common folder, then synchronize on the finshed result, and then have rank 0 looping through the results and store everything via self.logger.experiment.log_artifact.

Yes I agree. No special synchronization is needed in that case. You would just call self.logger.experiment.log_artifact in on_validation_epoch_end with a for-loop over the files listed in that folder.

rob-hen commented 6 months ago

So your answer implies that on_validation_epoch_end is already synchronized across all ranks, is that correct?

awaelchli commented 6 months ago

It's not guaranteed to run at the same time no. So in this case it would require you to add a barrier. I forgot that sorry:

def on_validation_epoch_end(self):
    self.trainer.strategy.barrier()
    for file in listdir:
        self.logger.experiment.log_artifact(file, ...)
rob-hen commented 6 months ago

@awaelchli
Is there also a way to let only rank 0 stop until all ranks have finished validation_step? The other ranks do not have to wait for it.

awaelchli commented 6 months ago

Yes, good point. Just add another barrier at the end :)

def on_validation_epoch_end(self):
    self.trainer.strategy.barrier()  # all processes must meet here and shake hands before continuing execution
    for file in listdir:
        self.logger.experiment.log_artifact(file, ...)
    self.trainer.strategy.barrier()  # all processes must meet here and shake hands before continuing execution
rob-hen commented 6 months ago

I think that solution is addressing something else.

What I mean is all GPUs with rank > 0 do not have to wait. They can create the files in validation_step, go to on_validation_epoch_end and do some computations there without waiting that all other works have finished validation_step.

Only for rank 0: It needs to wait until all GPUs have entered on_validation_epoch_end. Then it can send each file to the logger.

rob-hen commented 6 months ago

Something like:

def on_validation_epoch_end(self):

  self.trainer.strategy.register_entrance() # register each rank that has entered this area without blocking.

  if self.trainer.is_global_zero:
     self.trainer.strategy.stop_until_entrace(num_ranks = self.trainer.world_size) # get the number of ranks that have entered this area. if the number of entered ranks is smaller than self.trainer.world_size, wait. 
     # send the files to the logger
awaelchli commented 6 months ago

These statements aren't correct. There is only one process per GPU always. You can't have collective operations or synchronization under a guard like if self.trainer.is_global_zero:. This will just create a deadlock.

Also, num_ranks = self.trainer.world_size is always True. The rank is the process index, and the total count of ranks/processes is the world size by definition. Can you try the suggestion in https://github.com/Lightning-AI/pytorch-lightning/issues/19453#issuecomment-1941435882?

rob-hen commented 6 months ago

With num_ranks = self.trainer.world_size, I meant that stop_until_entrace gets the information to wait until the defined number of ranks num_ranks have entered the area. So we want to wait until all self.trainer.world_size have finished. But we want to wait only on rank 0.

My idea is: Each GPU that goes through self.trainer.strategy.register_entrance() increments a counter on rank 0.
Rank 0 then checks with stop_until_entrace the value of the counter. If the counter is below num_ranks, it is blocked and waits.

I am not sure if this is possible to implement.

rob-hen commented 6 months ago

Can you try the suggestion in https://github.com/Lightning-AI/pytorch-lightning/issues/19453#issuecomment-1941435882? That will work, but I am asking it if can be done more efficiently.

As only rank 0 has the logger, shouldnt it be:

def on_validation_epoch_end(self):
    self.trainer.strategy.barrier()  # all processes must meet here and shake hands before continuing execution
    if self.trainer.is_global_zero:
      for file in listdir:
          self.logger.experiment.log_artifact(file, ...)
    self.trainer.strategy.barrier()  # all processes must meet here and shake hands before continuing execution
awaelchli commented 6 months ago

Sure yes, if listing the directory is expensive, then that will be a bit more efficient. However, I since processes need to wait anyway for process 0, it doesn't matter if they run a loop that doesn't do anything. I mainly wrote it like that for readability.

awaelchli commented 6 months ago

Did you manage to make it work? Did you have any further questions about this?

rob-hen commented 6 months ago

@awaelchli I actually found a solution to the approach I was proposing.

Since I have a shared folder, each rank creates a file in the shared folder to signal that it has gone through validation_step.

Now when rank 0 enters on_validation_epoch_end it will look into the shared folder, count the number of files in the shared folder. If the number of files is less than the number of desired files (number of files must be self.trainer.world_size), I will send that rank for t seconds to sleep via time.sleep() and check again.

This way, only rank 0 is blocked, but not the other ranks. May be this mechanism could be integrated into pytorch lightning?