Closed mikacashman closed 3 years ago
Thanks for writing this up! Could you please share the code used as well? 🙂
Adding code:
Code specific to HPC setup. Consists of (1) a bsub run script which sets up the dask-scheduler before calling (2) the python code. Uses conda setup so personal setup needed for my system under a switch (ISMIKA). I can add the input files used for building the issue report if needed/desired. This launches from a batch node and the dask-based code runs on six 32GB V100 GPU nodes.
Python code designed to run cuml and/or skilearn models with flags.
(1) bsub job script
#!/usr/bin/env bash
#BSUB -P SYB106
#BSUB -W 1:30
#BSUB -alloc_flags "gpumps smt4"
#BSUB -nnodes 1
#BSUB -J rfr-mnmg-long-hm-run1
#BSUB -o rfr-mnmg-long-hm-run1.%J.out
#BSUB -q batch-hm
## FILES
inpath="/gpfs/alpine/syb105/world-shared/mcashman/RAPIDS-MNMG/input_data"
infile="long.tsv"
#Options: small.tsv, long.tsv
## Required conda setup for Mikaela
ISMIKA=true
if [ $ISMIKA ] ; then
# Use for conda env issues, reload conda
source /gpfs/alpine/syb105/proj-shared/Personal/mcashman/scripts/conda_summit.sh
## Clean env
conda deactivate
module purge
fi
## Setup
module load gcc/7.4.0
module load python/3.7.0-anaconda3-5.3.0
module load cuda/10.1.243
source activate /gpfs/alpine/world-shared/stf011/nvrapids_0.14_gcc_7.4.0
## Dask setup
export PATH=/gpfs/alpine/world-shared/stf011/nvrapids_0.14_gcc_7.4.0/bin:$PATH
#dask workers
WORKERS_PER_NODE=6 #6
#should be equal to -nnodes
NODES=1
WORKERS=$(($WORKERS_PER_NODE*$NODES))
echo WORKERS=$WORKERS
#set your project id
PROJ_ID=syb106
dask_dir=$MEMBERWORK/$PROJ_ID/dask
if [ ! -d "$dask_dir" ]
then
mkdir $dask_dir
fi
export CUPY_CACHE_DIR=$dask_dir
export OMP_PROC_BIND=FALSE
# clean previous contents
rm -fr ${dask_dir}/*
# Several dask schedulers could run in the same batch node by different users,
# create a random port to reduce port collisions
PORT_SCHED=$(shuf -i 4000-6000 -n 1)
PORT_DASH=$(shuf -i 7000-8999 -n 1)
# saving ports to use them if launching jupyter lab
echo $PORT_SCHED >> ${dask_dir}/port_sched
echo $PORT_DASH >> ${dask_dir}/port_dash
HOSTNAME=$(hostname)
IP_ADDRESS=$(hostname -I | awk '{print $2}')
echo 'Running scheduler in'
echo $IP_ADDRESS:$PORT_SCHED
echo
echo 'Running dashboard in'
echo $IP_ADDRESS:$PORT_DASH
dask-scheduler --port ${PORT_SCHED} --dashboard-address ${PORT_DASH} --interface ib0 --scheduler-file ${dask_dir}/my-scheduler.json &
echo 'BENCHMARK (min sleep)'
sleep 60
echo '...awake'
echo
echo 'Running worker(s) in: '
jsrun -n 1 -c 1 hostname
##=HM (30GB device mem lim)
jsrun -c 1 -g 1 -n ${WORKERS} -r 6 -a 1 --bind rs --smpiargs="off" dask-cuda-worker --scheduler-file ${dask_dir}/my-scheduler.json --local-directory ${dask_dir} --nthreads 1 --memory-limit 85GB --device-memory-limit 30GB --death-timeout 180 --interface ib0 --enable-nvlink &
#echo $hostname
echo 'BENCHMARK (min sleep)'
sleep 60
echo '...awake'
cd /gpfs/alpine/syb105/proj-shared/Personal/mcashman/Projects/RAPIDS
echo 'BENCHMARK'
#jsrun -c 1 -g 1 -n ${WORKERS} -r 6 -a 1 --smpiargs="none"
python rfr_mnmg_V2.py -in $inpath/$infile --cuml #--skilearn
(2) python code
import os
import numpy as np
import time
import sklearn
import pandas as pd
import cudf
import cuml
import cupy
from sklearn.metrics import accuracy_score
from sklearn import model_selection, datasets
from cuml.dask.common import utils as dask_utils
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import dask_cudf
from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRFC
from cuml.dask.ensemble import RandomForestRegressor as cumlDaskRFR
from sklearn.ensemble import RandomForestClassifier as sklRFC
from sklearn.ensemble import RandomForestRegressor as sklRFR
def main():
## Setup arguments
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-in', action="store", dest="inpath",
type=str, required=True, help="path of input file")
parser.add_argument('--skilearn', action='store_true', dest="runSkl",
required=False, help="Flag for running ski-learn model")
parser.add_argument('--cuml', action='store_true', dest="runCuml",
required=False, default="", help="Flag for running cuML model")
args=parser.parse_args()
if not (args.runSkl or args.runCuml):
print("Invalid argument, please select a model to run\n--skilearn and/or --cuml\n")
exit(1)
## Load data
print("Starting input read...",flush=True)
stime=time.time()
data_type = np.float32
try:
with open(args.inpath, 'r') as f:
test_data = cudf.read_csv(f,sep='\t')
#test_data = np.loadtxt(f,dtype=float,skiprows=1)
except EnvironmentError: # parent of IOError, OSError *and* WindowsError where available
print(f'ERROR: can not open input file\n\t{args.inpath}')
exit(1)
print(f'Time to load data: {time.time() - stime}',flush=True)
## Sample and check input data
print(f'Type of test_data: {type(test_data)}')
print(f'----Sample of input data:\n{test_data.iloc[0:4,0:4]}\n',flush=True)
## Split data into X and y
test_data_f32=test_data.astype('float32')
X=test_data_f32.iloc[0:,0:-1]
y=test_data_f32.pheno0
## Test prints (optional)
print(f'X: {type(X)} shape: {X.shape}')
print(f'y: {type(y)} shape: {y.shape}')
print(f'dtypes\nX: {X.dtypes}\ny: {y.dtype}\n',flush=True)
# Random Forest building parameters
max_depth = 20
n_bins = 8
n_trees = 1000
## Split train-test
X_train, X_test, y_train, y_test = model_selection.train_test_split(X,
y, test_size=0.2)
## IF SKLEARN
if(args.runSkl):
#Convert data to pandas
stime=time.time()
X_train_pd =X_train.to_pandas()
X_test_pd =X_test.to_pandas()
y_train_pd =y_train.to_pandas()
y_test_pd =y_test.to_pandas()
print(f'sklearn time to convert: {time.time()-stime}',flush=True)
# Use all avilable CPU cores
stime=time.time()
skl_model = sklRFR(max_depth=max_depth, n_estimators=n_trees, n_jobs=-1)
skl_model.fit(X_train_pd, y_train_pd)
print(f'sklearn fit time: {time.time()-stime}',flush=True)
# Predict
stime=time.time()
skl_y_pred = skl_model.predict(X_test_pd)
print(f'sklearn predict time: {time.time()-stime}',flush=True)
## IF CUML
if(args.runCuml):
# Partition with Dask
stime=time.time()
n_partitions = n_workers
print(f'number of paritions = number of workers = {n_partitions}',flush=True)
# In this case, each worker will train on 1/n_partitions fraction of the data
X_train_dask = dask_cudf.from_cudf(X_train, npartitions=n_partitions)
y_train_dask = dask_cudf.from_cudf(y_train, npartitions=n_partitions)
# Attempt to fix chunks error - Mika
X_test_dask = dask_cudf.from_cudf(X_test, npartitions=n_partitions)
# Persist to cache the data in active memory
X_train_dask, y_train_dask = dask_utils.persist_across_workers(client,
[X_train_dask, y_train_dask], workers=workers)
print(f'cuml setup time {time.time()-stime}',flush=True)
# Build model
stime = time.time()
cuml_model = cumlDaskRFR(max_depth=max_depth, n_estimators=n_trees,
n_streams=n_streams,n_bins=n_bins)
cuml_model.fit(X_train_dask, y_train_dask)
wait(cuml_model.rfs) # Allow asynchronous training tasks to finish
print(f'cuml fit time {time.time()-stime}',flush=True)
# Predict
stime=time.time()
cuml_y_pred = cuml_model.predict(X_test_dask)
print(f'cuml predict time {time.time()-stime}',flush=True)
print("===== Accuracy Metrics =====",flush=True)
# Due to randomness in the algorithm, you may see slight variation in accuracies
if(args.runSkl):
print("-----SKLearn")
print(f'y_test_pd: {type(y_test_pd)} OF {y_test_pd.dtype}')
print(f'skl_y_pred: {type(skl_y_pred)} OF {skl_y_pred.dtype}')
print("SKLearn MSE: ", sklearn.metrics.mean_squared_error(y_test_pd, skl_y_pred))
print("SKLearn r2: ", sklearn.metrics.r2_score(y_test_pd, skl_y_pred))
if(args.runCuml):
print("-----CuML")
print(f'y_test: {type(y_test)} OF {y_test.dtype}')
print(f'cuml_y_pred: {type(cuml_y_pred)} OF {cuml_y_pred.dtype}')
print("CuML MSE: ", cuml.metrics.regression.mean_squared_error(y_test, cuml_y_pred.compute()))
print("CuML r2: ", cuml.metrics.regression.r2_score(y_test, cuml_y_pred.compute()))
#print(f'Type of cuml_y_pred.compute: {type(cuml_y_pred.compute())}')
#,convert_dtype=True))
print("DONE",flush=True)
if __name__ == '__main__':
print ("Internal benchmark", flush=True)
file = os.getenv('MEMBERWORK') + '/syb106/dask/my-scheduler.json'
client = Client(scheduler_file=file)
print ("Client information: ", client, flush=True)
# Query the client for all connected workers
workers = client.has_what().keys()
print(f'workers: {workers}')
n_workers = len(workers)
print(f'n_workers: {n_workers}')
n_streams = 8 # Performance optimization
main()
client.shutdown()
Hey @mikacashman, given the high amount of ORLN specific setup its a bit tricky to reproduce your workload.
My guess is that your first error (no dask workers) is intermittently possible because you are ending up with collisions on your port scheduler -- i.e., whenever your shuffle function lands on a used port in the 4k-6k range (possibly by other dask workers). One way to confirm would be to run a series of tests with hard-coded ports that are known to be unused.
Error #2 may have a similar cause as the one above, though again its a bit hard to say definitely without trying your exact setup.
Lastly #3 is a missed heartbeat due to a closed connection, and this is fairly benign.
I am going to close this. We have been able to mostly eliminate two of the errors and the still prevalent error (3) remains to be benign. The keys seems to mostly being careful of running more than one job at a time on the same batch node (or any other dask jobs as the batch nodes are shared). There is also a notable lag time found between job death and all dask scheduler code being cleaned up. So longer wait time between jobs has appeared beneficial. Thanks for the comments.
Hi there, I have been working with support from NVIDIA-RAPIDS with ORNL on slack and was advised to post my issues here.
I have been observing some flaky behavior when running a MNMG version of the RF-regression code I am working with (using dask). I have run multiple runs back-to-back on two different data sets (at least 5 runs on each) and have been recording the behavior. Below are reports on three such flaky errors I frequently encounter (some fatal to functionality, some not). The non-fatal ones do not always kill the job either which makes them difficult to manage.
Error#1 - 0 workers (sometimes I end up with 0 workers when I should have 6)
Error#2 - Address in use (similar trace repeated for 5/6 workers in this example)
Error#3 - CommClosedError (this one doesn't appear fatal)
Some run information in addition to the env detail at the end: This is being run on the summit supercomputer at ORNL (powerpc - V100 GPUs). My run script uses a dask scheduler on 6 GPU nodes. I have a 60 second sleep after running dask-scheduler and another 60 second sleep after the jsrun command before running the RAPIDS python script. I can provide further information or code if needed.
I have two further questions (feel free to direct me to open separate issues if desired).
Question#1: Is it possible to run multiple of these dask-based jobs in parallel? When I have tried I end up with more than 6 workers being reported (I attempt to run 5 at the same time which leads to 30 workers 5Jobs*6Workers) and leads to errors. Is there anyway to run several of these dask-scheduler based jobs at once? I have tried adding a uniq identifier to the dask-scheduler file and using a unique directory for each dask files, but neither method resulted in success for me.
Question#2: Is there a way to redirect the dask related output to a different file than the standard out file? The dask-scheduler reports a lot of information (that I need to keep for errors such as these) but my bash and python prints get buried quickly. Of course I could print to a specific file in bash/python, but I would rather simply redirect the dask output if there is a known way to do that.
Thanks for any guidance in advance.
Click here to see environment details