aehrc / cvt2distilgpt2

Improving Chest X-Ray Report Generation by Leveraging Warm-Starting
GNU General Public License v3.0
62 stars 5 forks source link

Regarding Monitoring using Neptune.ai #18

Open yihp opened 1 month ago

yihp commented 1 month ago

I add my username and API token to configuration file: ... neptune_username: yhpmaster neptune_api_key: ** ... But when I train, the following error occurs: ... [root@b62ad5cd2324 home]# dlhpcstarter -t cxrmate -c config/train/longitudinal_gt_prompt_tf.yaml --stages_module tools.stages --train Seed set to 0 PTL no. devices: 2. PTL no. nodes: 1. /usr/local/lib/python3.8/site-packages/lightning/fabric/connector.py:571: precision=16 is supported for historical reasons but its usage is discouraged. Please set your precision to 16-mixed instead! Using 16bit Automatic Mixed Precision (AMP) GPU available: True (cuda), used: True TPU available: False, using: 0 TPU cores HPU available: False, using: 0 HPUs Description, Special token, Index bos_token, [BOS], 1 eos_token, [EOS], 2 unk_token, [UNK], 0 sep_token, [SEP], 3 pad_token, [PAD], 4 cls_token, [BOS], 1 mask_token, [MASK], 5 additional_special_token, [NF], 6 additional_special_token, [NI], 7 additional_special_token, [PMT], 8 additional_special_token, [PMT-SEP], 9 additional_special_token, [NPF], 10 additional_special_token, [NPI], 11 trainable params: 147,456 || all params: 80,916,528 || trainable%: 0.1822 /usr/local/lib/python3.8/site-packages/transformers/models/convnext/feature_extraction_convnext.py:28: FutureWarning: The class ConvNextFeatureExtractor is deprecated and will be removed in version 5 of Transformers. Please use ConvNextImageProcessor instead. warnings.warn( Resuming training from experiments/cxrmate/longitudinal_gt_prompt_tf/trial_0/last.ckpt. You are using a CUDA device ('Z100L') that has Tensor Cores. To properly utilize them, you should set torch.set_float32_matmul_precision('medium' | 'high') which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision [rank: 0] Seed set to 0 Initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/2 [rank: 1] Seed set to 0 PTL no. devices: 2. PTL no. nodes: 1. Description, Special token, Index bos_token, [BOS], 1 eos_token, [EOS], 2 unk_token, [UNK], 0 sep_token, [SEP], 3 pad_token, [PAD], 4 cls_token, [BOS], 1 mask_token, [MASK], 5 additional_special_token, [NF], 6 additional_special_token, [NI], 7 additional_special_token, [PMT], 8 additional_special_token, [PMT-SEP], 9 additional_special_token, [NPF], 10 additional_special_token, [NPI], 11 trainable params: 147,456 || all params: 80,916,528 || trainable%: 0.1822 /usr/local/lib/python3.8/site-packages/transformers/models/convnext/feature_extraction_convnext.py:28: FutureWarning: The class ConvNextFeatureExtractor is deprecated and will be removed in version 5 of Transformers. Please use ConvNextImageProcessor instead. warnings.warn( Resuming training from experiments/cxrmate/longitudinal_gt_prompt_tf/trial_0/last.ckpt. [rank: 1] Seed set to 0 Initializing distributed: GLOBAL_RANK: 1, MEMBER: 2/2 WARNING: Logging before InitGoogleLogging() is written to STDERR I0710 11:32:46.296280 294 ProcessGroupNCCL.cpp:686] [Rank 1] ProcessGroupNCCL initialization options:NCCL_ASYNC_ERROR_HANDLING: 1, NCCL_DESYNC_DEBUG: 0, NCCL_ENABLE_TIMING: 0, NCCL_BLOCKING_WAIT: 0, TIMEOUT(ms): 1800000, USE_HIGH_PRIORITY_STREAM: 0, TORCH_DISTRIBUTED_DEBUG: OFF, NCCL_DEBUG: OFF, ID=213279264 WARNING: Logging before InitGoogleLogging() is written to STDERR I0710 11:32:46.305738 90 ProcessGroupNCCL.cpp:686] [Rank 0] ProcessGroupNCCL initialization options:NCCL_ASYNC_ERROR_HANDLING: 1, NCCL_DESYNC_DEBUG: 0, NCCL_ENABLE_TIMING: 0, NCCL_BLOCKING_WAIT: 0, TIMEOUT(ms): 1800000, USE_HIGH_PRIORITY_STREAM: 0, TORCH_DISTRIBUTED_DEBUG: OFF, NCCL_DEBUG: OFF, ID=223330240

distributed_backend=nccl All distributed processes registered. Starting with 2 processes

I0710 11:32:46.678165 90 ProcessGroupNCCL.cpp:1340] NCCL_DEBUG: N/A Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 703, in urlopen httplib_response = self._make_request( File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 386, in _make_request self._validate_conn(conn) File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 1042, in _validate_conn conn.connect() File "/usr/local/lib/python3.8/site-packages/urllib3/connection.py", line 414, in connect self.sock = ssl_wrapsocket( File "/usr/local/lib/python3.8/site-packages/urllib3/util/ssl.py", line 449, in ssl_wrap_socket ssl_sock = _ssl_wrap_socketimpl( File "/usr/local/lib/python3.8/site-packages/urllib3/util/ssl.py", line 493, in _ssl_wrap_socket_impl return ssl_context.wrap_socket(sock, server_hostname=server_hostname) File "/usr/local/lib/python3.8/ssl.py", line 500, in wrap_socket return self.sslsocket_class._create( File "/usr/local/lib/python3.8/ssl.py", line 1040, in _create self.do_handshake() File "/usr/local/lib/python3.8/ssl.py", line 1309, in do_handshake self._sslobj.do_handshake() ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/requests/adapters.py", line 486, in send resp = conn.urlopen( File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 787, in urlopen retries = retries.increment( File "/usr/local/lib/python3.8/site-packages/urllib3/util/retry.py", line 550, in increment raise six.reraise(type(error), error, _stacktrace) File "/usr/local/lib/python3.8/site-packages/urllib3/packages/six.py", line 769, in reraise raise value.with_traceback(tb) File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 703, in urlopen httplib_response = self._make_request( File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 386, in _make_request self._validate_conn(conn) File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 1042, in _validate_conn conn.connect() File "/usr/local/lib/python3.8/site-packages/urllib3/connection.py", line 414, in connect self.sock = ssl_wrapsocket( File "/usr/local/lib/python3.8/site-packages/urllib3/util/ssl.py", line 449, in ssl_wrap_socket ssl_sock = _ssl_wrap_socketimpl( File "/usr/local/lib/python3.8/site-packages/urllib3/util/ssl.py", line 493, in _ssl_wrap_socket_impl return ssl_context.wrap_socket(sock, server_hostname=server_hostname) File "/usr/local/lib/python3.8/ssl.py", line 500, in wrap_socket return self.sslsocket_class._create( File "/usr/local/lib/python3.8/ssl.py", line 1040, in _create self.do_handshake() File "/usr/local/lib/python3.8/ssl.py", line 1309, in do_handshake self._sslobj.do_handshake() urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/bravado/http_future.py", line 124, in wrapper return func(self, *args, kwargs) File "/usr/local/lib/python3.8/site-packages/bravado/http_future.py", line 291, in _get_incoming_response inner_response = self.future.result(timeout=timeout) File "/usr/local/lib/python3.8/site-packages/bravado/requests_client.py", line 266, in result response = self.session.send( File "/usr/local/lib/python3.8/site-packages/requests/sessions.py", line 703, in send r = adapter.send(request, kwargs) File "/usr/local/lib/python3.8/site-packages/requests/adapters.py", line 501, in send raise ConnectionError(err, request=request) requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/neptune/common/backends/utils.py", line 79, in wrapper return func(*args, *kwargs) File "/usr/local/lib/python3.8/site-packages/neptune/internal/backends/utils.py", line 123, in create_swagger_client return SwaggerClient.from_url( File "/usr/local/lib/python3.8/site-packages/bravado/client.py", line 98, in from_url spec_dict = loader.load_spec(spec_url) File "/usr/local/lib/python3.8/site-packages/bravado/swagger_model.py", line 101, in load_spec response = request( File "/usr/local/lib/python3.8/site-packages/bravado/http_future.py", line 270, in result incoming_response = self._get_incoming_response(timeout) File "/usr/local/lib/python3.8/site-packages/bravado/http_future.py", line 126, in wrapper self.future._raise_connection_error(exception) File "/usr/local/lib/python3.8/site-packages/bravado/http_future.py", line 91, in _raise_connection_error self._raise_error(BravadoConnectionError, 'ConnectionError', exception) File "/usr/local/lib/python3.8/site-packages/bravado/http_future.py", line 79, in _raise_error six.reraise( File "/usr/local/lib/python3.8/site-packages/six.py", line 718, in reraise raise value.with_traceback(tb) File "/usr/local/lib/python3.8/site-packages/bravado/http_future.py", line 124, in wrapper return func(self, args, kwargs) File "/usr/local/lib/python3.8/site-packages/bravado/http_future.py", line 291, in _get_incoming_response inner_response = self.future.result(timeout=timeout) File "/usr/local/lib/python3.8/site-packages/bravado/requests_client.py", line 266, in result response = self.session.send( File "/usr/local/lib/python3.8/site-packages/requests/sessions.py", line 703, in send r = adapter.send(request, kwargs) File "/usr/local/lib/python3.8/site-packages/requests/adapters.py", line 501, in send raise ConnectionError(err, request=request) bravado.http_future.RequestsFutureAdapterConnectionError

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/usr/local/bin/dlhpcstarter", line 8, in sys.exit(main()) File "/usr/local/lib/python3.8/site-packages/dlhpcstarter/main.py", line 126, in main submit(args, cmd_line_args, stages_fnc) File "/usr/local/lib/python3.8/site-packages/dlhpcstarter/main.py", line 21, in submit stages_fnc(args) File "/home/tools/stages.py", line 85, in stages trainer.fit(model, ckpt_path=ckpt_path) File "/usr/local/lib/python3.8/site-packages/lightning/pytorch/trainer/trainer.py", line 543, in fit call._call_and_handle_interrupt( File "/usr/local/lib/python3.8/site-packages/lightning/pytorch/trainer/call.py", line 43, in _call_and_handle_interrupt return trainer.strategy.launcher.launch(trainer_fn, *args, trainer=trainer, kwargs) File "/usr/local/lib/python3.8/site-packages/lightning/pytorch/strategies/launchers/subprocess_script.py", line 105, in launch return function(args, kwargs) File "/usr/local/lib/python3.8/site-packages/lightning/pytorch/trainer/trainer.py", line 579, in _fit_impl self._run(model, ckpt_path=ckpt_path) File "/usr/local/lib/python3.8/site-packages/lightning/pytorch/trainer/trainer.py", line 948, in _run call._call_setup_hook(self) # allow user to set up LightningModule in accelerator environment File "/usr/local/lib/python3.8/site-packages/lightning/pytorch/trainer/call.py", line 88, in _call_setup_hook if hasattr(logger, "experiment"): File "/usr/local/lib/python3.8/site-packages/lightning/fabric/loggers/logger.py", line 118, in experiment return fn(self) File "/usr/local/lib/python3.8/site-packages/lightning/pytorch/loggers/neptune.py", line 380, in experiment return self.run File "/usr/local/lib/python3.8/site-packages/lightning/fabric/loggers/logger.py", line 118, in experiment return fn(self) File "/usr/local/lib/python3.8/site-packages/lightning/pytorch/loggers/neptune.py", line 388, in run self._run_instance = neptune.init_run(self._neptune_init_args) File "/usr/local/lib/python3.8/site-packages/neptune/metadata_containers/run.py", line 410, in init super().init( File "/usr/local/lib/python3.8/site-packages/neptune/metadata_containers/metadata_container.py", line 160, in init self._backend: NeptuneBackend = get_backend(mode=mode, api_token=api_token, proxies=proxies) File "/usr/local/lib/python3.8/site-packages/neptune/internal/backends/factory.py", line 31, in get_backend return HostedNeptuneBackend(credentials=Credentials.from_token(api_token=api_token), proxies=proxies) File "/usr/local/lib/python3.8/site-packages/neptune/internal/backends/hosted_neptune_backend.py", line 183, in init http_client, client_config = create_http_client_with_auth( File "/usr/local/lib/python3.8/site-packages/neptune/internal/backends/utils.py", line 242, in wrapper return func(args, kwargs) File "/usr/local/lib/python3.8/site-packages/neptune/internal/backends/hosted_client.py", line 147, in create_http_client_with_auth client_config = get_client_config(credentials=credentials, ssl_verify=ssl_verify, proxies=proxies) File "/usr/local/lib/python3.8/site-packages/neptune/internal/backends/utils.py", line 242, in wrapper return func(*args, kwargs) File "/usr/local/lib/python3.8/site-packages/neptune/common/backends/utils.py", line 79, in wrapper return func(*args, *kwargs) File "/usr/local/lib/python3.8/site-packages/neptune/internal/backends/hosted_client.py", line 125, in get_client_config backend_client = _get_token_client(credentials=credentials, ssl_verify=ssl_verify, proxies=proxies) File "/usr/local/lib/python3.8/site-packages/neptune/internal/backends/utils.py", line 242, in wrapper return func(args, kwargs) File "/usr/local/lib/python3.8/site-packages/neptune/internal/backends/hosted_client.py", line 115, in _get_token_client create_swagger_client( File "/usr/local/lib/python3.8/site-packages/neptune/common/backends/utils.py", line 163, in wrapper raise NeptuneConnectionLostException(last_exception) from last_exception neptune.common.exceptions.NeptuneConnectionLostException:

----NeptuneConnectionLostException---------------------------------------------------------

The connection to the Neptune server was lost. If you are using the asynchronous (default) connection mode, Neptune continues to locally track your metadata and continuously tries to re-establish a connection to the Neptune servers. If the connection is not re-established, you can upload your data later with the Neptune Command Line Interface tool: neptune sync -p workspace_name/project_name

What should I do?

You can find detailed instructions on the following doc pages:

You may also want to check the following docs page:

Need help?-> https://docs.neptune.ai/getting_help

I0710 11:39:34.716589 90 ProcessGroupNCCL.cpp:874] [Rank 0] Destroyed 1communicators on CUDA device 0 ...

I tried the following: Try to ping app.neptune.ai, and it works

Have you encountered this problem before? Do you need to do some initialization work for Neptune.ai?

anicolson commented 1 month ago

Hi @yihp, could it be related to this? https://github.com/Lightning-AI/pytorch-lightning/issues/10604

Otherwise, are you submitting jobs to a cluster with a firewall? I have found Neptune to be problematic and have moved away from it...

yihp commented 1 month ago

Hi @yihp, could it be related to this? Lightning-AI/pytorch-lightning#10604

Otherwise, are you submitting jobs to a cluster with a firewall? I have found Neptune to be problematic and have moved away from it...

thank your reply! I have turned off the firewall, but the problem is still not solved. How should I configure wandb? Can I configure it directly in train_mimic_cxr_chen_cvt2distilgpt2.yaml?

anicolson commented 1 month ago

@yihp, I have not explored wandb, but I have tried mlflow...

Here's how I setup mlflow in the trainer_instance (https://github.com/csiro-mlai/dl_hpc_starter_pack/blob/main/src/dlhpcstarter/trainer.py):

import hashlib
import inspect
import logging
import os
import signal
import time
import uuid
from datetime import timedelta
from typing import Optional

import torch
from lightning.pytorch import Trainer
from lightning.pytorch.callbacks import (
    Callback,
    EarlyStopping,
    LearningRateMonitor,
    ModelCheckpoint,
)
from lightning.pytorch.loggers import MLFlowLogger, NeptuneLogger
from lightning.pytorch.loggers.csv_logs import CSVLogger
from lightning.pytorch.loggers.tensorboard import TensorBoardLogger
from lightning.pytorch.loops.utilities import _is_max_limit_reached
from lightning.pytorch.plugins.environments import SLURMEnvironment
from lightning.pytorch.strategies import DeepSpeedStrategy

from .cluster import ClusterSubmit

logging.getLogger(
    "neptune.new.internal.operation_processors.async_operation_processor",
).setLevel(logging.CRITICAL)

def trainer_instance(
    task: str,
    config_name: str,
    trial: int,
    monitor: Optional[str] = None,
    monitor_mode: str = 'min',
    early_stopping: bool = False,
    patience: int = 0,
    min_delta: float = 0.0,
    divergence_threshold: Optional[float] = None,
    exp_dir: Optional[str] = None,
    exp_dir_trial: Optional[str] = None,
    sched_inter: Optional[str] = None,  # 'step', 'epoch', or None.
    save_top_k: int = 1,
    every_n_epochs: Optional[int] = 1,
    every_n_train_steps: Optional[int] = None,
    neptune_api_key: Optional[str] = None,
    neptune_username: Optional[str] = None,
    neptune_mode: Optional[str] = 'async',
    num_nodes: int = 1,
    devices: Optional[int] = 1,
    submit: bool = False,
    mbatch_size: Optional[int] = None,
    accumulated_mbatch_size: Optional[int] = None,
    deterministic: bool = True,
    num_sanity_val_steps: int = 0,
    loggers: Optional[list] = None,
    callbacks: Optional[list] = None,  # [RichProgressBar()]
    plugins: Optional[list] = None,
    enable_progress_bar: Optional[bool] = None,
    one_epoch_only: bool = False,
    learning_rate_monitor: bool = False,
    # disable_slurm_environment: bool = False,
    mlflow_logger: bool = False,
    **kwargs,
) -> Trainer:
    """
    Creates an instance of lightning.pytorch.Trainer using key callbacks and loggers. Also changes some
    defaults for the init of lightning.pytorch.Trainer.

    Parameters for lightning.pytorch.Trainer are described here:
        https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html#trainer-class-api
    These will be captured by kwargs and passed to lightning.pytorch.Trainer

    Argument/s:
        task - name of the task.
        config_name - name of the configuration.
        trial - trial identifier.
        monitor - metric to monitor for EarlyStopping and ModelCheckpoint.
        monitor_mode - whether the metric to be monitored is to be maximised or
            minimised.
        early_stopping - stop training when a monitored metric has stopped
            improving.
        patience - no. of epochs with no improvement after which training will
            be stopped.
        min_delta - minimum change in the monitored quantity to qualify as an
            improvement.
        divergence_threshold - stop training as soon as the monitored quantity becomes worse than this threshold.
        exp_dir - experiment directory.
        exp_dir_trial - experiment directory for the trial. All outputs are saved to this path.
        sched_inter - learning rate scheduler interval ('step' or 'epoch').
        save_top_k - best k models saved according to the monitored metric. If
            0, no models are saved. If -1, all models are saved.
        every_n_epochs - save model every n epochs.
        every_n_epochs - save model every n training steps.
        neptune_api_key - API key, found on neptune.ai, for NeptuneLogger.
        neptune_username - Username for on neptune.ai, for NeptuneLogger.
        neptune_mode - https://docs.neptune.ai/api/connection_modes/.
        num_nodes - number of nodes for the job.
        devices - number of devices per node.
        submit - submit to cluster manager.
        mbatch_size - mini-batch size of dataloaders.
        accumulated_mbatch_size - desired accumulated mini-batch size.
        deterministic - ensures that the training is deterministic.
        num_sanity_val_steps - runs n validation batches before starting the training routine.
        loggers - loggers for Trainer.
        callbacks - callbacks for Trainer.
        plugins - plugins for Trainer.
        kwargs - keyword arguments for Trainer.
        enable_progress_bar - show the progress bar (will be turned off for submissions).
        one_epoch_only - perform only one epoch of training.
        learning_rate_monitor - add the LearningRateMonitor as a callback.
        # disable_slurm_environment - disable Lightning's SLURMEnvironment.
    """
    accumulate_grad_batches = None
    loggers = [] if loggers is None else loggers
    callbacks = [] if callbacks is None else callbacks
    plugins = [] if plugins is None else plugins

    if submit:
        plugins.append(SLURMEnvironment(auto_requeue=False))
        enable_progress_bar = False if enable_progress_bar is None else enable_progress_bar

    # Unsure if Lightning's SLURMEnvironment features fault-tolerant training:
    # if disable_slurm_environment:

    #     # See: https://github.com/Lightning-AI/lightning/issues/6389#issuecomment-1377759948
    #     class DisabledSLURMEnvironment(SLURMEnvironment):
    #         def detect() -> bool:
    #             return False

    #         @staticmethod
    #         def _validate_srun_used() -> None:
    #             return

    #         @staticmethod
    #         def _validate_srun_variables() -> None:
    #             return

    #     plugins.append(DisabledSLURMEnvironment(auto_requeue=False))

    # Deepspeed has its own autocast capabilities:
    # if 'strategy' in kwargs and 'precision' in kwargs:
    #     if 'deepspeed' in kwargs['strategy'] and kwargs['precision'] == 16:
    #         raise ValueError('DeepSpeed and "precision=16" are incompatible as DeepSpeed has its own autocast functionality.')

    # DeepSpeed config:
    if 'deepspeed_config' in kwargs:
        kwargs['strategy'] = DeepSpeedStrategy(
            **kwargs['deepspeed_config']
        )

    # Loggers
    loggers.append(CSVLogger(exp_dir_trial))

    # Remove 'lightning_logs' structure for tensorboard to allow different sessions to be grouped:
    loggers.append(
        TensorBoardLogger(
            exp_dir_trial, 
            default_hp_metric=False, 
            version='', 
            name='tensorboard', 
        )
    )  
    # class TensorboardLogConfigTrial(Callback):
    #     def __init__(self, config, trial):
    #         self.config = config
    #         self.trial = trial
    #     def setup(self, trainer, pl_module, stage):
    #         trainer.loggers[-1].log_hyperparams({'config': self.config, 'trial': self.trial})
    # callbacks.append(TensorboardLogConfigTrial(config=config_name, trial=trial))

    if neptune_api_key is not None:
        name = f'{config_name}_t_{trial}'
        custom_run_id = str(
            int.from_bytes(
                hashlib.sha256(name.encode(encoding="ascii",errors="ignore")).digest()[:12], 'little'
            )
        ) 
        assert len(custom_run_id) <= 32, '"custom_run_id" must be less than or equal to 32 characters'
        assert neptune_username is not None, 'You must specify your neptune.ai username.'
        loggers.append(
            NeptuneLogger(
                api_key=neptune_api_key,
                project=f'{neptune_username}/{task.replace("_", "-")}',
                name=name,
                tags=[config_name, f'trial_{trial}'],
                prefix='log',
                custom_run_id=custom_run_id,
                capture_stdout=False,
                capture_stderr=False,
                capture_hardware_metrics=True,
                capture_traceback=False,
                log_model_checkpoints=False,
                source_files=[],
                flush_period=60,
                mode=neptune_mode,
            )
        )

    if mlflow_logger:
        run_name = f'{config_name}_t_{trial}'
        run_id_path = os.path.join(exp_dir_trial, 'mlflow_run_id.txt')
        if os.path.exists(run_id_path):
            with open(run_id_path, 'r') as f:
                run_id = f.read()
        else:
            run_id = None
        loggers.append(
            MLFlowLogger(
                experiment_name=task,
                save_dir=os.path.join(exp_dir, task, '.mlruns'),
                log_model=False,
                tracking_uri=None,
                run_name=run_name,
                run_id=run_id,
                tags={'config': config_name, 'trial': trial},
            )
        )
        if not os.path.exists(run_id_path):
            with open(run_id_path, 'w') as f:
                f.write(loggers[-1].version)

    # Model checkpointing
    assert (every_n_epochs is not None) or (every_n_train_steps is not None), 'Neither "every_n_epochs" or ' \
        '"every_n_train_steps" is set. No checkpoints will be saved.'
    assert save_top_k != 0, '"save_top_k" is 0, therefore, no checkpoints will be saved.'

    if every_n_epochs:
        save_top_k = save_top_k if monitor is not None else -1
        callbacks.append(
            ModelCheckpoint(
                dirpath=exp_dir_trial,
                monitor=monitor,
                mode=monitor_mode,
                save_top_k=save_top_k,
                every_n_epochs=every_n_epochs,
                filename='{epoch:d}-{step:d}-{' + monitor + ':f}' if monitor else '{epoch:d}-{step:d}',
                save_last=True,
                enable_version_counter=False,
            )
        )
        # if 'strategy' in kwargs:
        #     if isinstance(kwargs['strategy'], str):
        #         if 'deepspeed_stage_3' in kwargs['strategy']: 
        #             callbacks[-1].FILE_EXTENSION = ""
        #     elif isinstance(kwargs['strategy'], DeepSpeedStrategy):
        #         if kwargs['deepspeed_config']['stage'] == 3:
        #             callbacks[-1].FILE_EXTENSION = ""

    # if every_n_train_steps:
    #     if resumable:
    #         raise ValueError('Cannot resume training from a checkpoint that ended before the epoch ended. Fault '
    #                          'tolerant training needs to be implemented for this: '
    #                          'https://pytorch-lightning.readthedocs.io/en/latest/clouds'
    #                          '/fault_tolerant_training_expert.html#enable-fault-tolerant-behavior-on-your-own-cluster')
    #     callbacks.append(
    #         ModelCheckpoint(
    #             dirpath=exp_dir_trial,
    #             monitor=monitor,
    #             mode=monitor_mode,
    #             save_top_k=save_top_k,
    #             every_n_train_steps=every_n_train_steps,
    #             filename='{step:d}-{' + monitor + ':f}',
    #             save_last=False,  # cannot resume from this checkpoint.
    #         )
    #     )

    # Early stopping
    if early_stopping:
        # if 'strategy' in kwargs:
        #     assert 'deepspeed' not in kwargs['strategy'], 'DeepSpeed does not work with early stopping currently.'
        callbacks.append(
            EarlyStopping(
                monitor=monitor,
                mode=monitor_mode,
                min_delta=min_delta,
                patience=patience,
                divergence_threshold=divergence_threshold,
                verbose=True,
            )
        )

    # Learning rate monitor:
    if learning_rate_monitor:
        callbacks.append(LearningRateMonitor(log_momentum=True, log_weight_decay=True))

    # Perform only one epoch of training:
    if one_epoch_only:
        assert not early_stopping, 'one_epoch_only is not setup for early_stopping'
        assert 'val_check_interval' not in kwargs, 'val_check_interval is not setup for early_stopping'

        class OneEpochOnlyCallback(Callback):
            def __init__(self, submit):
                self.start_time = time.time()
                self.submit = submit
            def on_validation_epoch_end(self, trainer, pl_module):
                trainer.should_stop = True
                pl_module.trainer.should_stop = True
            def on_train_end(self, trainer, pl_module):
                elapsed_time = (time.time() - self.start_time) / 3600
                print(f'Training epoch elapsed time (hours): {elapsed_time}')
            def teardown(self, trainer, pl_module, stage):
                done = _is_max_limit_reached(trainer.fit_loop.epoch_progress.current.processed, trainer.fit_loop.max_epochs)
                if stage == 'fit' and self.submit and not done:
                    rank_0 = True
                    if torch.distributed.is_initialized():
                        if torch.distributed.get_rank() != 0:
                            rank_0 = False
                    if rank_0:
                        ClusterSubmit.sig_handler('one_epoch_only', None)
        callbacks.append(OneEpochOnlyCallback(submit=submit))

    # Accumulate gradient batches
    if accumulated_mbatch_size:
        total_devices = devices * num_nodes if devices * num_nodes > 0 else 1
        accumulate_grad_batches = accumulated_mbatch_size / (mbatch_size * total_devices)
        assert accumulate_grad_batches.is_integer(), f'Desired accumulated_mbatch_size ({accumulated_mbatch_size}) ' \
                                                     f'can not be attained with mbatch_size={mbatch_size}, devices=' \
                                                     f'{devices}, and num_nodes={num_nodes}'
        accumulate_grad_batches = int(accumulate_grad_batches)
    else:
        accumulate_grad_batches = 1

    # Remove keyword arguments not associated with lightning.pytorch.Trainer.
    # Parameters associated with lightning.pytorch.Trainer:
    #   https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html#trainer-class-api
    kwargs = {k: v for k, v in kwargs.items() if k in inspect.signature(Trainer).parameters}

    # lightning.pytorch.Trainer
    print(f'PTL no. devices: {devices}.')
    print(f'PTL no. nodes: {num_nodes}.')
    return Trainer(
        default_root_dir=exp_dir_trial, # Needed for hpc_ckpt save path. 
        logger=loggers,
        callbacks=callbacks,
        plugins=plugins,
        devices=devices,
        num_nodes=num_nodes,
        accumulate_grad_batches=accumulate_grad_batches,
        deterministic=deterministic,
        num_sanity_val_steps=num_sanity_val_steps,
        enable_progress_bar=enable_progress_bar,
        **kwargs,
    )

So in your stages.py, if you pass this into trainer_instance(), e.g.,

from lightning.pytorch.loggers import MLFlowLogger

...
...
...
run_name = f'{args.config_name}_t_{args.trial}'
run_id_path = os.path.join(args.exp_dir_trial, 'mlflow_run_id.txt')
if os.path.exists(run_id_path):
    with open(run_id_path, 'r') as f:
        run_id = f.read()
else:
    run_id = None
loggers = [
    MLFlowLogger(
        experiment_name=args.task,
        save_dir=os.path.join(args.exp_dir, args.task, '.mlruns'),
        log_model=False,
        tracking_uri=None,
        run_name=run_name,
        run_id=run_id,
        tags={'config': args.config_name, 'trial': args.trial},
    )
]
if not os.path.exists(run_id_path):
    with open(run_id_path, 'w') as f:
        f.write(loggers[-1].version)

trainer = trainer_instance(loggers= loggers, **vars(args))

You will have to lookup how to use MLFlow, it stores everything locally.

You should be able to get it to work. You can do a similar thing if you have wandb setup with lightning: https://lightning.ai/docs/pytorch/1.4.1/api/pytorch_lightning.loggers.wandb.html

anicolson commented 1 month ago

Or upgrade to latest dlhpcstarter (pip install --upgrade dlhpcstarter) and set mlflow_logger: true in your config file. This repo was made for an earlier version of dlhpcstarter, so we'll have to hope nothing breaks :)