aimhubio / aim

Aim 💫 — An easy-to-use & supercharged open-source experiment tracker.
https://aimstack.io
Apache License 2.0
5.24k stars 322 forks source link

RuntimeError: dictionary keys changed during iteration #3181

Open GuHongyang opened 4 months ago

GuHongyang commented 4 months ago

🐛 Bug

When using ray.tune for hyperparameter optimization and invoking the AimLoggerCallback, the following error occurs:

Traceback (most recent call last):
  File "/root/data/mamba-reid/tst.py", line 92, in <module>
    results = tuner.fit()
              ^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/tuner.py", line 377, in fit
    return self._local_tuner.fit()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/impl/tuner_internal.py", line 476, in fit
    analysis = self._fit_internal(trainable, param_space)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/impl/tuner_internal.py", line 592, in _fit_internal
    analysis = run(
               ^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/tune.py", line 994, in run
    runner.step()
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py", line 685, in step
    if not self._actor_manager.next(timeout=0.1):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/air/execution/_internal/actor_manager.py", line 223, in next
    self._actor_task_events.resolve_future(future)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/air/execution/_internal/event_manager.py", line 118, in resolve_future
    on_result(result)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/air/execution/_internal/actor_manager.py", line 766, in on_result
    self._actor_task_resolved(
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/air/execution/_internal/actor_manager.py", line 299, in _actor_task_resolved
    tracked_actor_task._on_result(tracked_actor, result)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py", line 1229, in _on_result
    raise TuneError(traceback.format_exc())
ray.tune.error.TuneError: Traceback (most recent call last):
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py", line 1220, in _on_result
    on_result(trial, *args, **kwargs)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py", line 1947, in _on_trial_reset
    self._actor_started(tracked_actor, log="REUSED")
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py", line 1131, in _actor_started
    self._callbacks.on_trial_start(
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/callback.py", line 398, in on_trial_start
    callback.on_trial_start(**info)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/logger/logger.py", line 147, in on_trial_start
    self.log_trial_start(trial)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/logger/aim.py", line 110, in log_trial_start
    self._trial_to_run[trial] = self._create_run(trial)
                                ^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/logger/aim.py", line 91, in _create_run
    run = Run(
          ^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/ext/exception_resistant.py", line 70, in wrapper
    _SafeModeConfig.exception_callback(e, func)
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/ext/exception_resistant.py", line 47, in reraise_exception
    raise e
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/ext/exception_resistant.py", line 68, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/sdk/run.py", line 859, in __init__
    super().__init__(run_hash, repo=repo, read_only=read_only, experiment=experiment, force_resume=force_resume)
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/sdk/run.py", line 308, in __init__
    self._checkins = RunStatusReporter(self.hash, LocalFileManager(self.repo.path))
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/sdk/reporter/__init__.py", line 439, in __init__
    self.flush(block=True)
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/sdk/reporter/__init__.py", line 671, in flush
    for flag_name in flag_names:
RuntimeError: dictionary keys changed during iteration

Specifically located in: /root/data/anaconda3/lib/python3.11/site-packages/aim/sdk/reporter/init.py

    def flush(
        self,
        flag_name: Optional[str] = None,
        block: bool = True,
    ) -> None:
        """
        Flush the last check-in.
        If `flag_name` is specified, only the check-ins for the given flag
        will be flushed.
        Otherwise, all the check-ins will be flushed. In this case, the order
        of (active) check-ins (per flag name) will be preserved.
        """
        logger.debug(f"notifying {self}")

        with self.reporter_lock:
            flag_names = [flag_name] if flag_name is not None else self.timed_tasks
            with self.flush_condition:
                for flag_name in flag_names:
                    logger.debug(f"flushing {flag_name}")
                    # We add a new task with the highest priority to flush the
                    # last check-in. This task will be processed by the writer
                    # thread immediately.
                    self._schedule(TimedTask(when=0, flag_name=flag_name))

                # As there may be no flag names at all, the queue may be
                # untouched. In this case, we need to notify the writer thread
                # explicitly.
                with self.refresh_condition:
                    self.refresh_condition.notify()

                # If `block` is set, we wait until the writer thread finishes
                # flushing the last check-in.
                if block:
                    logger.debug("blocking until the writer finishes...")
                    self.flush_condition.wait()
                    logger.debug("done")

To reproduce

ray=2.31.0 aim=3.22.0

import argparse
import logging
import os.path as osp
import tempfile
import time
from functools import partial
from random import random

import numpy as np
import torch
import yaml
from ray import train, tune, init
from ray.train import Checkpoint
from ray.tune.logger.aim import AimLoggerCallback
from ray.tune.schedulers import HyperBandForBOHB
from ray.tune.search.bohb import TuneBOHB
from tabulate import tabulate

from data import build_dataloaders
from utils import setup_logger, setup_seed, str2dict, str2list

def my_train(config):

    setup_seed(777) 

    growth_rate = config['a'] # [1.0, 10.0]
    saturation_level = config['b'] # [1.0, 10.0]
    noise_level = config['c'] # [0.01, 0.1]
    noise_wo = config['d'] # {1.0, 0.0}

    x_values = np.linspace(0, 10, 12)

    start = 0
    checkpoint = train.get_checkpoint()
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            checkpoint_dict = torch.load(osp.join(checkpoint_dir, "checkpoint.pt"))
            start = checkpoint_dict["epoch"] + 1

    best_mAP = -1.0

    for epoch in range(start, len(x_values)):
        setup_seed(epoch)
        x = x_values[epoch]
        base_performance = 1 / (1 + np.exp(-growth_rate * (x - saturation_level)))
        random_noise = np.random.normal(0, noise_level)
        performance = base_performance + random_noise * (1.0 - noise_wo)

        time.sleep(random()*5.0)
        best_mAP = max(best_mAP, performance)

        with tempfile.TemporaryDirectory() as tempdir:
            torch.save(
                {"epoch": epoch},
                osp.join(tempdir, "checkpoint.pt"),
            )
            train.report(metrics={'mAP': performance, 'best_mAP':best_mAP}, checkpoint=Checkpoint.from_directory(tempdir))

if __name__ == '__main__':
    # init(local_mode=True)
    lg, savedir = setup_logger(exp='tune_tst')

    config = {
            "a": tune.uniform(1.0, 10.0),
            "b": tune.uniform(1.0, 10.0),
            "c": tune.uniform(0.01, 0.1),
            "d": tune.choice([1.0, 0.0])
        }
    algo = TuneBOHB(
        points_to_evaluate=[
                            {"a": 2.59095, "b": 5.6506, "c": 0.0967916, "d": 0.0},
                            {"a": 1.0, "b": 1.0, "c": 0.01, "d": 0.0},
                            {"a": 1.0, "b": 1.0, "c": 0.1, "d": 1.0},
                            ],
        seed=0,
        max_concurrent=4) 

    sche = HyperBandForBOHB(
        time_attr="training_iteration",
        max_t=12,
        reduction_factor=2)

    tuner = tune.Tuner(
        tune.with_resources(my_train, {"cpu": 0.1}),    
        param_space=config,
        tune_config=tune.TuneConfig(num_samples=50, metric='best_mAP', mode='max', search_alg=algo, scheduler=sche, reuse_actors=True),
        run_config=train.RunConfig(name="tst", storage_path=osp.abspath(savedir), callbacks=[AimLoggerCallback()], failure_config=train.FailureConfig(fail_fast=True)))
    results = tuner.fit()

    lg.info(results.get_best_result().config)

Expected behavior

I can solve this problem by replacing self.timed_tasks with list(self.timed_tasks.keys()), but will there be any side effects?

with self.reporter_lock:
            flag_names = [flag_name] if flag_name is not None else list(self.timed_tasks.keys())#self.timed_tasks
mihran113 commented 4 months ago

Hey @GuHongyang! Thanks a lot for opening the issue. Strange thing is that I couldn't reproduce this on my end with python 3.11 aim 3.22 and ray 3.32. But anyway it's a situation that is better to be safe than sorry, so it would be nice if you could open a PR with the fix you've proposed as it won't have any side effects.

VassilisVassiliadis commented 4 days ago

@mihran113 we had this happen a few times so I opened a PR here: https://github.com/aimhubio/aim/pull/3256

It's just a reiteration of what @GuHongyang wrote in the bug description.