ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34k stars 5.78k forks source link

[<Ray component: syncer.py.>] Last sync command failed with the following error #44320

Open FunkyFrog1 opened 7 months ago

FunkyFrog1 commented 7 months ago

What happened + What you expected to happen

I tested two different models in my computer. Everything is ok at the beginning, but after running for a long time, it started to report a WARNING.

2024-03-27 19:25:00,284 WARNING syncer.py:406 -- Last sync command failed with the following error: Traceback (most recent call last): File "Q:\Anaconda\envs\brain\Lib\site-packages\ray\train_internal\syncer.py", line 404, in _launch_sync_process self.wait() File "Q:\Anaconda\envs\brain\Lib\site-packages\ray\train_internal\syncer.py", line 474, in wait raise e File "Q:\Anaconda\envs\brain\Lib\site-packages\ray\train_internal\syncer.py", line 472, in wait self._sync_process.wait(timeout=timeout or self.sync_timeout) File "Q:\Anaconda\envs\brain\Lib\site-packages\ray\train_internal\syncer.py", line 173, in wait raise exception File "Q:\Anaconda\envs\brain\Lib\site-packages\ray\train_internal\syncer.py", line 136, in entrypoint result = self._fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "Q:\Anaconda\envs\brain\Lib\site-packages\ray\train_internal\storage.py", line 212, in _upload_to_fs_path _pyarrow_fs_copy_files(local_path, fs_path, destination_filesystem=fs) File "Q:\Anaconda\envs\brain\Lib\site-packages\ray\train_internal\storage.py", line 110, in _pyarrow_fs_copy_files return pyarrow.fs.copy_files( ^^^^^^^^^^^^^^^^^^^^^^ File "Q:\Anaconda\envs\brain\Lib\site-packages\pyarrow\fs.py", line 272, in copy_files _copy_files_selector(source_fs, source_sel, File "pyarrow_fs.pyx", line 1627, in pyarrow._fs._copy_files_selector File "pyarrow\error.pxi", line 91, in pyarrow.lib.check_status FileNotFoundError: [WinError 3] Failed copying 'C:/Users/yangping/AppData/Local/Temp/ray/session_2024-03-27_19-18-19_227437_3712/artifacts/2024-03-27_19-18-28/train_seeg_2024-03-27_19-18-19/driver_artifacts/train_seeg_2277bdb9_1_batch=8,epoch=150,lr=0.0000,momentum=0.0075_2024-03-27_19-18-28/events.out.tfevents.1711538314.IBRR-SCNU' to 'C:/Users/yangping/ray_results/train_seeg_2024-03-27_19-18-19/train_seeg_2277bdb9_1_batch=8,epoch=150,lr=0.0000,momentum=0.0075_2024-03-27_19-18-28/events.out.tfevents.1711538314.IBRR-SCNU'. Detail: [Windows error 3]

And I 've try to restart my code, this time it reported immediately, and I couldn't shut it down by import warnings warnings.filterwarnings("ignore")

It looks like a sync problem, may cause by running two model at once, so I try to run a single model. It still happened!!!The same report.

Here are my code.

import os.path
from functools import partial
import random

import torch
from ray.tune.search.hyperopt import HyperOptSearch
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.bayesopt import BayesOptSearch
from torch.nn import MSELoss, SmoothL1Loss
import torch.optim as optim

import numpy as np
from ray import tune, train
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.search import ConcurrencyLimiter

from model import Feature_aligner
from dataloader import PreprocessDataLoader

def train_fn(model, optimizer, train_loader, loss_func):
    model.train()
    train_loss_list = []
    for (seeg, depth_z) in train_loader:
        seeg, depth_z = seeg[:, :, :250].to(device), depth_z.to(device)
        optimizer.zero_grad()

        z = model(seeg)

        loss = loss_func(z, depth_z)

        loss.backward()
        train_loss_list.append(loss.item())

        optimizer.step()

    return np.mean(train_loss_list)

def val_fn(model, val_loader, loss_func):
    model.eval()

    val_loss_list = []
    with torch.no_grad():
        for (seeg, depth_z) in val_loader:
            seeg, depth_z = seeg[:, :, :250].to(device), depth_z.to(device)

            z = model(seeg)

            val_loss_list.append(loss_func(z, depth_z).item())

    return np.mean(val_loss_list)

def loss_fn(x, pred_x, l1_lambda):
    MSE = MSELoss()
    l1_loss = l1_lambda * torch.norm(x, p=1)
    loss = MSE(x, pred_x) + l1_loss
    return loss

def training(config, current_dir):
    preprocess_data = PreprocessDataLoader(current_dir=current_dir)
    train_loader, val_loader = preprocess_data.get_data(config['batch'])

    model = Feature_aligner().to(device)

    optimizer = optim.SGD(
        model.parameters(),
        lr=config['lr'],
        momentum=config['momentum']
    )

    loss_func = partial(loss_fn, l1_lambda=config['l1_lambda'])

    for i in range(config['epoch']):
        train_loss = train_fn(
            model=model,
            optimizer=optimizer,
            train_loader=train_loader,
            loss_func=loss_func
        )

        val_loss = val_fn(
            model=model,
            val_loader = val_loader,
            loss_func=loss_func
        )

        train.report({
            'train_loss': train_loss,
            'val_loss': val_loss,
            'avg_loss': (train_loss + val_loss) / 2
        })

def main(num_samples):
    current_dir = os.getcwd()

    algo = HyperOptSearch(points_to_evaluate=initial_params)
    # algo = BayesOptSearch(utility_kwargs={"kind": "ucb", "kappa": 2.5, "xi": 0.0})
    # algo = OptunaSearch(metric=["val_loss"], mode=["min"])

    algo = ConcurrencyLimiter(algo, max_concurrent=4)

    scheduler = AsyncHyperBandScheduler()

    search_config = {
        'epoch': 200,
        'batch': 8,
        'lr': tune.loguniform(1e-10, 1e-3),
        'momentum': tune.uniform(0.8, 1),
        'l1_lambda': tune.loguniform(1e-10, 1e-4)
    }

    tuner = tune.Tuner(
        tune.with_resources(partial(training, current_dir=current_dir), {"gpu": 0.25}),
        # partial(training, current_dir=current_dir),
        tune_config=tune.TuneConfig(
            metric='val_loss',
            mode='min',
            num_samples=num_samples,
            search_alg=algo,
            scheduler=scheduler,
        ),
        param_space=search_config,
    )
    results = tuner.fit()

if __name__ == "__main__":
    device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
    print('device:', device)
    set_seed(42)

    main(6)

Versions / Dependencies

Windows

Python=3.11.7

ray 2.10.0 torch 2.1.1 tensorboard 2.16.2 tensorboard-data-server 0.7.2 tensorboardX 2.6.2.2

Reproduction script

def train_fn(model, optimizer, train_loader, loss_func):
    pass

def val_fn(model, val_loader, loss_func):
    pass

def loss_fn(x, pred_x, l1_lambda):
    pass

def training(config, current_dir):
    preprocess_data = PreprocessDataLoader(current_dir=current_dir)
    train_loader, val_loader = preprocess_data.get_data(config['batch'])

    model = Feature_aligner().to(device)

    optimizer = optim.SGD(
        model.parameters(),
        lr=config['lr'],
        momentum=config['momentum']
    )

    loss_func = partial(loss_fn, l1_lambda=config['l1_lambda'])

    for i in range(config['epoch']):
        train_loss = train_fn(
            model=model,
            optimizer=optimizer,
            train_loader=train_loader,
            loss_func=loss_func
        )

        val_loss = val_fn(
            model=model,
            val_loader = val_loader,
            loss_func=loss_func
        )

        train.report({
            'train_loss': train_loss,
            'val_loss': val_loss,
            'avg_loss': (train_loss + val_loss) / 2
        })

def main(num_samples):
    current_dir = os.getcwd()

    algo = HyperOptSearch(points_to_evaluate=initial_params)
    # algo = BayesOptSearch(utility_kwargs={"kind": "ucb", "kappa": 2.5, "xi": 0.0})
    # algo = OptunaSearch(metric=["val_loss"], mode=["min"])

    algo = ConcurrencyLimiter(algo, max_concurrent=4)

    scheduler = AsyncHyperBandScheduler()

    search_config = {
        'epoch': 200,
        'batch': 8,
        'lr': tune.loguniform(1e-10, 1e-3),
        'momentum': tune.uniform(0.8, 1),
        'l1_lambda': tune.loguniform(1e-10, 1e-4)
    }

    tuner = tune.Tuner(
        tune.with_resources(partial(training, current_dir=current_dir), {"gpu": 0.25}),
        # partial(training, current_dir=current_dir),
        tune_config=tune.TuneConfig(
            metric='val_loss',
            mode='min',
            num_samples=num_samples,
            search_alg=algo,
            scheduler=scheduler,
        ),
        param_space=search_config,
    )
    results = tuner.fit()

if __name__ == "__main__":
    device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
    print('device:', device)

    main(6)

Issue Severity

None

justinvyu commented 7 months ago

@FunkyFrog1 Has this error only recently appeared after you upgraded to Ray 2.10?

Could you also check if the file that is being copied actually exists? Another hypothesis is that the parent directory creation is not working well on windows -- note that windows support for Ray Tune is not tested thoroughly at the moment.

FunkyFrog1 commented 6 months ago

@FunkyFrog1 Has this error only recently appeared after you upgraded to Ray 2.10?

Could you also check if the file that is being copied actually exists? Another hypothesis is that the parent directory creation is not working well on windows -- note that windows support for Ray Tune is not tested thoroughly at the moment.

Reboot my computer can fix it. I am certainly sure the file exists, it may due to the Windows server system.