Closed atronchi closed 1 year ago
cc @WeichenXu123
Hi, if you are loading the model you should use SparkXGBClassifierModel.load
instead of the SparkXGBClassifier.load
. The former is the estimated model while the latter is the estimator.
Other than this, I can't reproduce the error:
ls -l /tmp/xgboost-pyspark-model/*
total 4
-rw-r--r-- 1 jiamingy jiamingy 1445 Aug 10 19:49 part-00000
-rw-r--r-- 1 jiamingy jiamingy 0 Aug 10 19:49 _SUCCESS
Thank you @trivialfis I agree with root cause pointed out by @trivialfis
I dug around a bit in the sources and it appears that for the trained model there is no param 'xgb_model' so when we go to retrieve the init_booster with .getOrDefault it returns None: https://github.com/dmlc/xgboost/blob/master/python-package/xgboost/spark/core.py#L1380
That's for training continuation, I don't see the the issue with this. Could you please share a reproducible script?
Here's a notebook that reproduces the result:
{
"cells": [
{
"cell_type": "markdown",
"id": "6c9f1c0d-3afd-4f1a-94fa-7d906bc293ea",
"metadata": {},
"source": [
"# Spark XGBoost Cross Validation Classifier Demo\n",
"Largely borrowing from https://xgboost.readthedocs.io/en/latest/python/examples/spark_estimator_examples.html"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "4473b415-b307-47e7-acf7-056c287dc0ca",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requesting 2 executors for app spark-xgb-demo\n"
]
},
{
"data": {
"text/html": [
"\n",
" <div>\n",
" <p><b>SparkSession - hive</b></p>\n",
" \n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
"\n",
" <p><a href=\"http://spark-d408f489f636a31e-driver-svc.spark-applications.svc:4040\">Spark UI</a></p>\n",
"\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v3.3.0.48-aiml</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>k8s://https://10.100.0.1:443</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>spark-xgb-demo</code></dd>\n",
" </dl>\n",
" </div>\n",
" \n",
" </div>\n",
" "
],
"text/plain": [
"<pyspark.sql.session.SparkSession at 0x7f7d0d059970>"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from pyspark.sql import SparkSession\n",
"num_executors = 2\n",
"app_name = 'spark-xgb-demo'\n",
"print(f'Requesting {num_executors} executors for app {app_name}')\n",
"\n",
"# For XGBoost it is CRITICAL to disable dynamicAllocation, since the barrier execution mode requires \n",
"# keeping ALL nodes for the job to complete successfully.\n",
"\n",
"# See notes on arrow: https://spark.apache.org/docs/3.0.1/sql-pyspark-pandas-with-arrow.html\n",
"spark = SparkSession.builder \\\n",
" .config('spark.driver.maxResultSize', '8G') \\\n",
" .config('spark.executor.memory', '16G') \\\n",
" .config('spark.executor.cores', '1') \\\n",
" .config('spark.executor.instances', str(num_executors)) \\\n",
" .config('spark.dynamicAllocation.enabled', 'false') \\\n",
" .config('spark.sql.shuffle.partitions', str(num_executors)) \\\n",
" .config('spark.default.parallelism', str(num_executors)) \\\n",
" .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \\\n",
" .config('spark.sql.execution.arrow.pyspark.fallback.enabled', 'true') \\\n",
" .appName(app_name) \\\n",
" .enableHiveSupport() \\\n",
" .getOrCreate()\n",
"spark.sparkContext.setLogLevel('ERROR')\n",
"spark"
]
},
{
"cell_type": "markdown",
"id": "2596b98d-3a92-4405-9d9a-44f51e53595b",
"metadata": {
"tags": []
},
"source": [
"# Load data"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "37960519-aca4-4df7-91dd-01640632dbc4",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import sklearn.datasets\n",
"from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
"from pyspark.ml.linalg import Vectors\n",
"from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.functions import rand\n",
"from sklearn.model_selection import train_test_split\n",
"\n",
"from xgboost.spark import SparkXGBClassifier, SparkXGBClassifierModel\n",
"\n",
"\n",
"def create_spark_df(X, y):\n",
" return spark.createDataFrame(\n",
" spark.sparkContext.parallelize(\n",
" [(Vectors.dense(features), float(label)) for features, label in zip(X, y)]\n",
" ),\n",
" [\"features\", \"label\"],\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "f6562f34-849b-4163-b0b9-c55d6784c6b8",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"iris shapes: (150, 4), (150,), num classes: 3\n"
]
}
],
"source": [
"from pyspark.sql import functions as F\n",
"\n",
"# load iris dataset (classification dataset)\n",
"iris_X, iris_y = sklearn.datasets.load_iris(return_X_y=True)\n",
"print(f'iris shapes: {iris_X.shape}, {iris_y.shape}, num classes: {len(set(iris_y))}')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "e8002ca4-4384-4912-9349-dc05fe979094",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"spark.sparkContext.setJobGroup('load', 'iris')\n",
"iris_spark_df = create_spark_df(iris_X, iris_y)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "13433b97-cc90-4f7b-b3ff-8d4f100f3415",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"spark.sparkContext.setJobGroup('xgb', 'classifier')\n",
"xgb_classifier = SparkXGBClassifier()\n",
"xgb_classifier_model = xgb_classifier.fit(iris_spark_df)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "9d555445-4c1b-4ca4-b9c4-29f10783e4d9",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"classifier f1=1.0\n"
]
}
],
"source": [
"transformed_iris_spark_df = xgb_classifier_model.transform(iris_spark_df)\n",
"classifier_evaluator = MulticlassClassificationEvaluator(metricName=\"f1\")\n",
"print(f\"classifier f1={classifier_evaluator.evaluate(transformed_iris_spark_df)}\")"
]
},
{
"cell_type": "markdown",
"id": "b145de3d-8cda-46e4-9c1c-4c7ffd1cc601",
"metadata": {
"tags": []
},
"source": [
"# Debug with persisting model is broken\n",
"* https://github.com/dmlc/xgboost/issues/9446\n",
"* https://xgboost.readthedocs.io/en/stable/tutorials/spark_estimator.html#model-persistence"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "74b89427-3aff-4c26-8c23-680d8b0ecda0",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"/tmp/xgb_classifier_model:\n",
"metadata model\n",
"\n",
"/tmp/xgb_classifier_model/metadata:\n",
"_SUCCESS\n",
"\n",
"/tmp/xgb_classifier_model/model:\n",
"_SUCCESS\n"
]
}
],
"source": [
"xgb_classifier_model_path = \"/tmp/xgb_classifier_model\"\n",
"xgb_classifier_model.write().save(xgb_classifier_model_path)\n",
"\n",
"!ls -R {xgb_classifier_model_path} # seems... empty?"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "6ffb3205-dad4-448d-8ced-9ab24cd24d2c",
"metadata": {
"tags": []
},
"outputs": [
{
"ename": "ValueError",
"evalue": "RDD is empty",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mValueError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[8], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mSparkXGBClassifierModel\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mload\u001b[49m\u001b[43m(\u001b[49m\u001b[43mxgb_classifier_model_path\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m~/python/pyspark/ml/util.py:353\u001b[0m, in \u001b[0;36mMLReadable.load\u001b[0;34m(cls, path)\u001b[0m\n\u001b[1;32m 350\u001b[0m \u001b[38;5;129m@classmethod\u001b[39m\n\u001b[1;32m 351\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mload\u001b[39m(\u001b[38;5;28mcls\u001b[39m, path: \u001b[38;5;28mstr\u001b[39m) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m RL:\n\u001b[1;32m 352\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Reads an ML instance from the input path, a shortcut of `read().load(path)`.\"\"\"\u001b[39;00m\n\u001b[0;32m--> 353\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mcls\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mread\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mload\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpath\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m/usr/local/lib/python3.8/site-packages/xgboost/spark/core.py:1509\u001b[0m, in \u001b[0;36mSparkXGBModelReader.load\u001b[0;34m(self, path)\u001b[0m\n\u001b[1;32m 1503\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mload\u001b[39m(\u001b[38;5;28mself\u001b[39m, path: \u001b[38;5;28mstr\u001b[39m) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m_SparkXGBModel\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n\u001b[1;32m 1504\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"\u001b[39;00m\n\u001b[1;32m 1505\u001b[0m \u001b[38;5;124;03m Load metadata and model for a :py:class:`_SparkXGBModel`\u001b[39;00m\n\u001b[1;32m 1506\u001b[0m \n\u001b[1;32m 1507\u001b[0m \u001b[38;5;124;03m :return: SparkXGBRegressorModel or SparkXGBClassifierModel instance\u001b[39;00m\n\u001b[1;32m 1508\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m-> 1509\u001b[0m _, py_model \u001b[38;5;241m=\u001b[39m \u001b[43m_SparkXGBSharedReadWrite\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloadMetadataAndInstance\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 1510\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcls\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mpath\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mlogger\u001b[49m\n\u001b[1;32m 1511\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1512\u001b[0m py_model \u001b[38;5;241m=\u001b[39m cast(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m_SparkXGBModel\u001b[39m\u001b[38;5;124m\"\u001b[39m, py_model)\n\u001b[1;32m 1514\u001b[0m xgb_sklearn_params \u001b[38;5;241m=\u001b[39m py_model\u001b[38;5;241m.\u001b[39m_gen_xgb_params_dict(\n\u001b[1;32m 1515\u001b[0m gen_xgb_sklearn_estimator_param\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mTrue\u001b[39;00m\n\u001b[1;32m 1516\u001b[0m )\n",
"File \u001b[0;32m/usr/local/lib/python3.8/site-packages/xgboost/spark/core.py:1399\u001b[0m, in \u001b[0;36m_SparkXGBSharedReadWrite.loadMetadataAndInstance\u001b[0;34m(pyspark_xgb_cls, path, sc, logger)\u001b[0m\n\u001b[1;32m 1386\u001b[0m \u001b[38;5;129m@staticmethod\u001b[39m\n\u001b[1;32m 1387\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mloadMetadataAndInstance\u001b[39m(\n\u001b[1;32m 1388\u001b[0m pyspark_xgb_cls: Union[Type[_SparkXGBEstimator], Type[_SparkXGBModel]],\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 1391\u001b[0m logger: logging\u001b[38;5;241m.\u001b[39mLogger,\n\u001b[1;32m 1392\u001b[0m ) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m Tuple[Dict[\u001b[38;5;28mstr\u001b[39m, Any], Union[_SparkXGBEstimator, _SparkXGBModel]]:\n\u001b[1;32m 1393\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"\u001b[39;00m\n\u001b[1;32m 1394\u001b[0m \u001b[38;5;124;03m Load the metadata and the instance of an xgboost.spark._SparkXGBEstimator or\u001b[39;00m\n\u001b[1;32m 1395\u001b[0m \u001b[38;5;124;03m xgboost.spark._SparkXGBModel.\u001b[39;00m\n\u001b[1;32m 1396\u001b[0m \n\u001b[1;32m 1397\u001b[0m \u001b[38;5;124;03m :return: a tuple of (metadata, instance)\u001b[39;00m\n\u001b[1;32m 1398\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m-> 1399\u001b[0m metadata \u001b[38;5;241m=\u001b[39m \u001b[43mDefaultParamsReader\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloadMetadata\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 1400\u001b[0m \u001b[43m \u001b[49m\u001b[43mpath\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43msc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mexpectedClassName\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mget_class_name\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpyspark_xgb_cls\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1401\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1402\u001b[0m pyspark_xgb \u001b[38;5;241m=\u001b[39m pyspark_xgb_cls()\n\u001b[1;32m 1403\u001b[0m DefaultParamsReader\u001b[38;5;241m.\u001b[39mgetAndSetParams(pyspark_xgb, metadata)\n",
"File \u001b[0;32m~/python/pyspark/ml/util.py:565\u001b[0m, in \u001b[0;36mDefaultParamsReader.loadMetadata\u001b[0;34m(path, sc, expectedClassName)\u001b[0m\n\u001b[1;32m 554\u001b[0m \u001b[38;5;250m\u001b[39m\u001b[38;5;124;03m\"\"\"\u001b[39;00m\n\u001b[1;32m 555\u001b[0m \u001b[38;5;124;03mLoad metadata saved using :py:meth:`DefaultParamsWriter.saveMetadata`\u001b[39;00m\n\u001b[1;32m 556\u001b[0m \n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 562\u001b[0m \u001b[38;5;124;03m If non empty, this is checked against the loaded metadata.\u001b[39;00m\n\u001b[1;32m 563\u001b[0m \u001b[38;5;124;03m\"\"\"\u001b[39;00m\n\u001b[1;32m 564\u001b[0m metadataPath \u001b[38;5;241m=\u001b[39m os\u001b[38;5;241m.\u001b[39mpath\u001b[38;5;241m.\u001b[39mjoin(path, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mmetadata\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[0;32m--> 565\u001b[0m metadataStr \u001b[38;5;241m=\u001b[39m \u001b[43msc\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mtextFile\u001b[49m\u001b[43m(\u001b[49m\u001b[43mmetadataPath\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m1\u001b[39;49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mfirst\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 566\u001b[0m loadedVals \u001b[38;5;241m=\u001b[39m DefaultParamsReader\u001b[38;5;241m.\u001b[39m_parseMetaData(metadataStr, expectedClassName)\n\u001b[1;32m 567\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m loadedVals\n",
"File \u001b[0;32m~/python/pyspark/rdd.py:1906\u001b[0m, in \u001b[0;36mRDD.first\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 1904\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m rs:\n\u001b[1;32m 1905\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m rs[\u001b[38;5;241m0\u001b[39m]\n\u001b[0;32m-> 1906\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mRDD is empty\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n",
"\u001b[0;31mValueError\u001b[0m: RDD is empty"
]
}
],
"source": [
"SparkXGBClassifierModel.load(xgb_classifier_model_path)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "0cfdfebe-8ade-4a38-bc74-9479757faad2",
"metadata": {},
"outputs": [],
"source": [
"spark.stop()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark 3.2",
"language": "python",
"name": "spark_32_python_kubernetes"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.15"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
I converted the notebook to a script and replaced the ls
to a Python directory walk (just using script can be easier for developer to debug):
for root, subdirs, files in os.walk(xgb_classifier_model_path):
for f in files:
print(os.path.join(root, f))
Here's the output print:
iris shapes: (150, 4), (150,), num classes: 3
2023-08-15 19:21:38,598 INFO XGBoost-PySpark: _fit Running xgboost-2.0.0-dev on 1 workers with
booster params: {'objective': 'multi:softprob', 'device': 'cpu', 'num_class': 3, 'nthread': 1}
train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
dmatrix_kwargs: {'nthread': 1, 'missing': nan}
[19:21:40] task 0 got new rank 0 (0 + 1) / 1]
2023-08-15 19:21:42,131 INFO XGBoost-PySpark: _fit Finished xgboost training!
classifier f1=1.0
xgb_classifier_model_path: /tmp/xgb_classifier_model
/tmp/xgb_classifier_model/model/._SUCCESS.crc
/tmp/xgb_classifier_model/model/_SUCCESS
/tmp/xgb_classifier_model/model/.part-00000.crc
/tmp/xgb_classifier_model/model/part-00000
/tmp/xgb_classifier_model/metadata/._SUCCESS.crc
/tmp/xgb_classifier_model/metadata/_SUCCESS
/tmp/xgb_classifier_model/metadata/.part-00000.crc
/tmp/xgb_classifier_model/metadata/part-00000
My environment: pyspark: 3.4.1 xgboost: 2.0.0-dev b82e78c1693f63c49cb4d0be62a220698d43f880
Oooh interesting, I'm using only pySpark 3.2 and 3.3 so far. Let me see about upgrading to 3.4.
A colleague pointed out that my Spark session is configured for writing to S3 which apparently breaks writing local parquet. With these configs I was able to provide an S3 path instead of a local path and it worked out of the box as described in the docs.
I think this results from these configs for using Apache Iceberg:
In hindsight, it might be worth noting this in the docs about persisting the model that if the spark catalog is configured for S3 then you should use an S3 path instead of a local path to persist and load models.
Thank you for sharing, would you like to open a PR for the doc?
Following the documentation for model persistence when using SparkXGBClassifier, saving the model results in each metadata/model subdirectory under the given path having only an empty file named _SUCCESS. Attempts to load the model from the given path result in:
ValueError: RDD is empty
.Running xgboost-2.0.0.dev on pySpark3.3/python3.8.
See also: https://stackoverflow.com/questions/75370396/saving-sparkxgboost-model-yields-empty-directory/76860032#76860032