ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.2k stars 5.81k forks source link

[Tune] Tune Job hangs out and can't finish the tune job #23858

Open tianlinzx opened 2 years ago

tianlinzx commented 2 years ago

What happened + What you expected to happen

I use tune to optimize hyperparameters ,the num_samples is set to 200, and it hangs out after 8 trials. I have tried this serveral times. `(run pid=1032772) Number of trials: 42/200 (30 PENDING, 4 RUNNING, 8 TERMINATED) (run pid=1032772) +-------------------------+------------+--------------------+-----------+---------------+--------+--------------+--------+------------------+ (run pid=1032772) | Trial name | status | loc | dropout | hidden_size | lr | num_layers | iter | total time (s) | (run pid=1032772) |-------------------------+------------+--------------------+-----------+---------------+--------+--------------+--------+------------------| (run pid=1032772) | train_model_6ba95_00001 | RUNNING | 10.244.1.242:59816 | 0.5 | 100 | 0.0001 | 1 | | | (run pid=1032772) | train_model_6ba95_00002 | RUNNING | 10.244.1.241:49849 | 0.8 | 200 | 1e-05 | 4 | | | (run pid=1032772) | train_model_6ba95_00008 | RUNNING | 10.244.1.241:68242 | 0.6 | 10 | 1e-05 | 3 | | | (run pid=1032772) | train_model_6ba95_00011 | RUNNING | 10.244.1.242:90467 | 0.5 | 200 | 0.001 | 1 | | | (run pid=1032772) | train_model_6ba95_00012 | PENDING | | 0.5 | 500 | 1e-06 | 3 | | | (run pid=1032772) | train_model_6ba95_00013 | PENDING | | 0.6 | 300 | 1e-06 | 4 | | | (run pid=1032772) | train_model_6ba95_00014 | PENDING | | 0.6 | 500 | 0.0001 | 2 | | | (run pid=1032772) | train_model_6ba95_00015 | PENDING | | 0.5 | 100 | 0.001 | 1 | | | (run pid=1032772) | train_model_6ba95_00016 | PENDING | | 0.7 | 10 | 1e-06 | 3 | | | (run pid=1032772) | train_model_6ba95_00017 | PENDING | | 0.5 | 300 | 0.0001 | 4 | | | (run pid=1032772) | train_model_6ba95_00018 | PENDING | | 0.8 | 10 | 1e-06 | 1 | | | (run pid=1032772) | train_model_6ba95_00019 | PENDING | | 0.6 | 500 | 1e-05 | 1 | | | (run pid=1032772) | train_model_6ba95_00000 | TERMINATED | 10.244.1.242:59782 | 0.5 | 500 | 1e-05 | 4 | 1 | 834.522 | (run pid=1032772) | train_model_6ba95_00003 | TERMINATED | 10.244.1.241:49850 | 0.4 | 100 | 0.0001 | 2 | 1 | 496.198 | (run pid=1032772) | train_model_6ba95_00004 | TERMINATED | 10.244.1.241:56024 | 0.8 | 100 | 0.0001 | 3 | 1 | 432.626 | (run pid=1032772) | train_model_6ba95_00005 | TERMINATED | 10.244.1.242:66059 | 0.4 | 10 | 1e-06 | 4 | 1 | 432.914 | (run pid=1032772) | train_model_6ba95_00006 | TERMINATED | 10.244.1.241:62137 | 0.6 | 500 | 0.001 | 2 | 1 | 544.445 | (run pid=1032772) | train_model_6ba95_00007 | TERMINATED | 10.244.1.242:72172 | 0.6 | 100 | 0.001 | 4 | 1 | 456.016 | (run pid=1032772) | train_model_6ba95_00009 | TERMINATED | 10.244.1.242:78266 | 0.8 | 100 | 0.0001 | 1 | 1 | 381.592 | (run pid=1032772) | train_model_6ba95_00010 | TERMINATED | 10.244.1.242:84370 | 0.6 | 10 | 0.0001 | 3 | 1 | 399.034 | (run pid=1032772) +-------------------------+------------+--------------------+-----------+---------------+--------+--------------+--------+------------------+ hanged

(run pid=1032772) ... 22 more trials not shown (22 PENDING)`

Versions / Dependencies

Ray 1.11.0 or Ray 1.12.0RC1 pyqlib

Reproduction script


from warnings import simplefilter
simplefilter(action='ignore', category=FutureWarning)

import os
import ray

from qlib.utils import init_instance_by_config
from qlib.tests.data import GetData
from qlib.workflow.record_temp import SignalRecord, PortAnaRecord, SigAnaRecord
from qlib.utils import flatten_dict
import qlib
import pickle
# region in [REG_CN, REG_US]
from qlib.constant import REG_CN
from ray import tune
from hyperopt import hp
from ray.tune.suggest.hyperopt import HyperOptSearch

from qlib.utils import exists_qlib_data, init_instance_by_config, flatten_dict
import numpy as np
from pathlib import Path

market = "csi300"
benchmark = "SH000300"
provider_uri = "~/cn_data"

class qlib_model():
    def __init__(self, task):
        qlib.init(provider_uri=provider_uri, region=REG_CN)
        GetData().qlib_data(target_dir=provider_uri, region=REG_CN, exists_skip=True)

        # path=Path("./dataset.pkl")
        # if path.exists() == False:
        #     print("exit###########################exit###########################")
        #     dataset = init_instance_by_config(task["dataset"])
        #     dataset.config(dump_all=True, recursive=True)
        #     dataset.to_pickle(path="./dataset.pkl")
        # else:
        #     print("open^^^^^^^^^^^^^^^^^^^^^^^^^open^^^^^^^^^^^^^^^^^^^^^^^^^")
        #     with open("./dataset.pkl", "rb") as file_dataset:
        #         self.dataset = pickle.load(file_dataset)
        self.dataset = init_instance_by_config(task["dataset"])
        self.model = init_instance_by_config(task["model"])

    def train(self, task):
        qlib.init(provider_uri=provider_uri, region=REG_CN)
        from qlib.workflow import R
        # 模型训练实验
        with R.start(experiment_name="train_model"):
            R.log_params(**flatten_dict(task))
            self.model.fit(self.dataset)
            R.save_objects(trained_model=self.model)
            rid = R.get_recorder().id
            self.rid = rid

    def backtest(self):
        qlib.init(provider_uri=provider_uri, region=REG_CN)
        from qlib.workflow import R
        with R.start(experiment_name="backtest_analysis"):
            port_analysis_config = {
                "strategy": {
                    "class":"TopkDropoutStrategy",
                    "module_path":"qlib.contrib.strategy",
                    "kwargs": {
                        "signal":(self.model, self.dataset),
                        "topk":20,
                        "n_drop": 5,
                    },
                },

                "backtest": {
                    "start_time": None,
                    "end_time": None,
                    "account": 100000000,
                    "benchmark": 'sh000300',
                    "exchange_kwargs":{
                        #"verbose": False,
                        "limit_threshold": np.inf,
                        "deal_price": "close",
                        "open_cost": 0.00015,
                        "close_cost": 0.00015,
                        "min_cost": 20,
                    },
                },
            }

            recorder = R.get_recorder(recorder_id=self.rid, experiment_name="train_model")
            trained_model = recorder.load_object("trained_model")

            sr = SignalRecord(model=trained_model, dataset=self.dataset, recorder=recorder)
            sr.generate()

            # Signal Analysis
            sar = SigAnaRecord(recorder)
            sar.generate()    

            # backtest & analysis
            par = PortAnaRecord(recorder=recorder, config=port_analysis_config)
            par.generate()

            analysis_df = recorder.load_object("portfolio_analysis/port_analysis_1day.pkl")
            score = analysis_df.loc["excess_return_without_cost"].loc["annualized_return"]
            return score

def train_model(config):
    data_handler_config = {
        "start_time" : "2010-01-01",
        "end_time" : "2021-06-01",
        "fit_start_time" : "2010-01-01",
        "fit_end_time" : "2016-12-31",
        "instruments": market,
        "infer_processors": [
            # {"class": "FilterCol", 
            # "kwargs": {
            #     "fields_group": "feature",
            #     "col_list": ["RESI5", "WVMA5", "RSQR5", "KLEN", "RSQR10", "CORR5", "CORD5", "CORR10", "ROC60", "RESI10", "VSTD5", "RSQR60", "CORR60", "WVMA60", "STD5", "RSQR20", "CORD60", "CORD10", "CORR20", "KLOW"]}}, 
            {"class": "RobustZScoreNorm", 
            "kwargs": {"fields_group": "feature", "clip_outlier": "true"}}, 
            {"class": "Fillna", "kwargs": {"fields_group": "feature"}}], 
        "learn_processors": [
            {"class": "DropnaLabel"},
            {"class": "CSRankNorm", "kwargs": {"fields_group": "label"}}], 

        "label": ["Ref($close, -2) / Ref($close, -1) - 1"] ,
    }

    task = {
        "model": {
            "class": "GATs",
            "module_path": "qlib.contrib.model.pytorch_gats_ts",
            "kwargs": {
                "d_feat": 158,
                "hidden_size":  config["hidden_size"], #64
                "num_layers": config["num_layers"], #2
                "dropout": config["dropout"], #0.7
                "n_epochs": 5,
                "lr": config["lr"], #1e-4
                "early_stop": 5,
                "metric": "loss",
                "loss": "mse",
                # "base_model": "LSTM",
                # "model_path": "./csi300_lstm_ts.pkl",
                # "GPU":0,
        },
    },

    "dataset": {
        "class": "TSDatasetH",
        "module_path": "qlib.data.dataset",
        "kwargs": {
            "handler": {
                "class": "Alpha158",
                "module_path": "qlib.contrib.data.handler",
                "kwargs": data_handler_config,
            },

            "segments":{
                "train":("2010-01-01","2016-12-31"),
                "valid":("2017-01-01","2018-12-31"),
                "test":("2019-01-01","2021-06-01"),
            },

            "step_len": 20

            },
        },
    }

    NetworkActor = qlib_model(task)
    NetworkActor.train(task)
    score = NetworkActor.backtest()
    tune.report(annualized_return=score)

if __name__ == "__main__":
    try:
        if ray.is_initialized() is not True:
            ray.init(address="ray://10.1.164.29:30008", runtime_env={"pip": ["numpy==1.22.0"]})

        search_space = {
            "hidden_size": tune.choice([10, 100, 200, 300, 500]),
            "num_layers": tune.choice([1, 2, 3, 4]),
            "dropout": tune.choice([0.4, 0.5, 0.6, 0.7, 0.8]),
            "lr": tune.choice([1e-3, 1e-4, 1e-5, 1e-6]),

            # "hidden_size": tune.randint(10, 500),
            # "num_layers": tune.randint(1, 8),
            # "dropout": tune.uniform(0.1, 0.9),
            # "lr": tune.qloguniform(1e-6, 1e-3, 5e-7),# round to increments of 5e-5
        }

        # hyperopt_search = HyperOptSearch(search_space, metric="annualized_return", mode="max")

        analysis = tune.run(train_model, config=search_space, num_samples=200, resources_per_trial={"cpu": 2, "gpu": 0.5})

        print("---------------------------------")
        print(analysis.best_config)
        print(analysis.results_df)
    finally:
        ray.shutdown()
tianlinzx commented 2 years ago

Here is the source code qlib_ray_gat_158_tune_v2.py.zip .

amogkam commented 2 years ago

Hey @tianlinzx do you mind also sharing your cluster configuration or the cluster yaml that you used? Thanks!

amogkam commented 2 years ago

And is there anything else shown in the stdout when tune hangs?

tianlinzx commented 2 years ago

@amogkam We just use the default helm file https://github.com/ray-project/ray/blob/master/deploy/charts/ray/Chart.yaml and made some changes to default values https://github.com/ray-project/ray/blob/master/deploy/charts/ray/values.yaml.

The is no other error msg,that's why it confused me .

BTW, I observed the job hangs but the process is still running . Is there any mechanism to enforce killing process when job hangs or timeout ?

tianlinzx commented 2 years ago

@amogkam Here is the configuration file cluster.txt .

amogkam commented 2 years ago

Hey @tianlinzx thanks for providing all the info! I tried out your code and was able to reproduce the hanging behavior after 10 successful trials:

image

Looking at the call stack, it seems that the trials are in fact hanging, and in particular on termination of joblib multiprocessing during dataset creation:

Thread 0x7F3D79E3B700 (active): "Thread-3"
    poll (multiprocessing/popen_fork.py:28)
    wait (multiprocessing/popen_fork.py:48)
    join (multiprocessing/process.py:140)
    _terminate_pool (multiprocessing/pool.py:617)
    __call__ (multiprocessing/util.py:201)
    terminate (multiprocessing/pool.py:548)
    terminate (joblib/pool.py:329)
    terminate (joblib/_parallel_backends.py:243)
    terminate (joblib/_parallel_backends.py:476)
    _terminate_backend (joblib/parallel.py:759)
    __call__ (joblib/parallel.py:1066)
    dataset_processor (qlib/data/data.py:526)
    dataset (qlib/data/data.py:787)
    features (qlib/data/data.py:1051)
    load_group_df (qlib/data/dataset/loader.py:212)
    <dictcomp> (qlib/data/dataset/loader.py:137)
    load (qlib/data/dataset/loader.py:137)
    setup_data (qlib/data/dataset/handler.py:141)
    setup_data (qlib/data/dataset/handler.py:570)
    __init__ (qlib/data/dataset/handler.py:97)
    __init__ (qlib/data/dataset/handler.py:434)
    __init__ (qlib/contrib/data/handler.py:178)
    init_instance_by_config (qlib/utils/__init__.py:336)
    __init__ (qlib/data/dataset/__init__.py:115)
    __init__ (qlib/data/dataset/__init__.py:565)
    init_instance_by_config (qlib/utils/__init__.py:336)
    __init__ (tune_job.py:44)
    train_model (tune_job.py:171)
    _trainable_func (ray/tune/function_runner.py:639)
    _resume_span (ray/util/tracing/tracing_helper.py:462)
    entrypoint (ray/tune/function_runner.py:352)
    run (ray/tune/function_runner.py:277)
    _bootstrap_inner (threading.py:926)
    _bootstrap (threading.py:890)

It seems like the qlib library is using joblib.Parallel during dataset_processing https://github.com/microsoft/qlib/blob/v0.8.4/qlib/data/data.py#L526.

qlib also sets the joblib backend to multiprocessing: https://github.com/microsoft/qlib/blob/v0.8.4/qlib/config.py#L123, resulting in multiprocessing being used within each Ray actor.

In general, it's not best practice to mix multiprocessing with Ray (https://discuss.ray.io/t/best-solution-to-have-multiprocess-working-in-actor/2165), so I would recommend seeing if there's a way in qlib to disable multiprocessing, or instead of using the multiprocessing backend for joblib.Parallel, to use the loky backend instead, which is the default.

Another option is to use the Ray backend for joblib (more info here: https://docs.ray.io/en/latest/ray-more-libs/joblib.html) which would more seamlessly fit with Tune.

tianlinzx commented 2 years ago

@amogkam Thanks for your reply. Will try to modify the qlib library later. Temporally close this issue.

tianlinzx commented 2 years ago

@amogkam Hi Amog ,I have modified my code but the job hangs again after 8 rounds of tune job trials. My colleage will upload the lastest source code and screen dump later on . Would you please have look ?

1059692261 commented 2 years ago

@amogkam Hi Amog. As you suggested, we first tried to use the loky backend for qlib in Ray tune, but we abandoned this solution soon due to a serious memory leak. Then we chose to use the Ray backend for joblib with Tune, however the jobs still hang after 7 successful trials(please ignore the error one). Would you please take a look at our code to see if there's anything going wrong? And could you please share that where and how we should inspect the whole call stack? here are our code and trial results: 1 2 qlib_ray_gat_158_tune.zip Thanks in advance!

amogkam commented 2 years ago

Hey @tianlinzx @1059692261, while Tune is hanging, can you do a ray stack on each of the nodes and share the output?

tianlinzx commented 2 years ago

@amogkam Here is the result of ray stack. reproduce code is qlib_ray_gat_158_tune.zip 微信图片_2 微信图片_3

amogkam commented 2 years ago

Hey @tianlinzx- from the second screenshot it looks like joblib is still using the multiprocessing backend. It seems like it's being hardcoded by qlib here: https://github.com/microsoft/qlib/blob/v0.8.4/qlib/config.py#L123.