使用 PySpark 的分布式 XGBoost
从 1.7.0 版本开始,xgboost 支持 pyspark estimator API。
注意
该集成仅在 Linux 发行版上经过测试。
XGBoost PySpark Estimator
SparkXGBRegressor
SparkXGBRegressor 是一个 PySpark ML estimator。它基于 XGBoost python 库实现了 XGBoost 回归算法,并且可以在 PySpark Pipeline 和 CrossValidator/TrainValidationSplit/OneVsRest 等 PySpark ML 元算法中使用。
我们可以像下面这样创建一个 SparkXGBRegressor
estimator
from xgboost.spark import SparkXGBRegressor
xgb_regressor = SparkXGBRegressor(
features_col="features",
label_col="label",
num_workers=2,
)
上面的代码片段创建了一个 spark estimator,它可以拟合 spark 数据集,并返回一个 spark model,该 model 可以转换 spark 数据集并生成带有预测列的数据集。我们可以将几乎所有 xgboost sklearn estimator 参数设置为 SparkXGBRegressor
的参数,但 spark estimator 禁止某些参数,例如 nthread
,并且某些参数被替换为 pyspark 特定参数,例如 weight_col
、validation_indicator_col
,详情请参阅 SparkXGBRegressor
文档。
以下代码片段展示了如何训练 spark xgboost regressor 模型,首先我们需要准备一个训练数据集,它是一个包含“label”列和“features”列(或多个列)的 spark dataframe,“features”列必须是 pyspark.ml.linalg.Vector
类型或 spark array 类型或一个特征列名称列表。
xgb_regressor_model = xgb_regressor.fit(train_spark_dataframe)
以下代码片段展示了如何使用 spark xgboost regressor model 预测测试数据,首先我们需要准备一个测试数据集,它是一个包含“features”和“label”列的 spark dataframe,“features”列必须是 pyspark.ml.linalg.Vector
类型或 spark array 类型。
transformed_test_spark_dataframe = xgb_regressor_model.transform(test_spark_dataframe)
上面的代码片段返回一个 transformed_test_spark_dataframe
,其中包含输入数据集的列和一个表示预测结果的附加列“prediction”。
SparkXGBClassifier
SparkXGBClassifier
estimator 具有与 SparkXGBRegressor
相似的 API,但它有一些 pyspark 分类器特有的参数,例如 raw_prediction_col
和 probability_col
参数。相应地,默认情况下,SparkXGBClassifierModel
转换测试数据集时会生成包含 3 个新列的结果数据集
“prediction”:表示预测的标签。
“raw_prediction”:表示输出的 margin 值。
“probability”:表示每个标签的预测概率。
XGBoost PySpark GPU 支持
XGBoost PySpark 完全支持 GPU 加速。用户不仅能够实现高效训练,还可以利用 GPU 加速整个 PySpark 流水线,包括 ETL 和推理。在下面的章节中,我们将介绍一个在支持 GPU 的 Spark standalone 集群上进行训练的示例。首先,我们需要安装一些额外的软件包,然后将 device
参数设置为 cuda
或 gpu
。
准备必要的软件包
除了 PySpark 和 XGBoost 模块,我们还需要用于处理 Spark dataframe 的 cuDF 包。我们建议使用 Conda 或 Virtualenv 来管理 PySpark 作业的 python 依赖。有关 PySpark 依赖管理的更多详情,请参阅 How to Manage Python Dependencies in PySpark。
简而言之,要使用 virtualenv 和 pip 创建一个可以发送到远程集群的 Python 环境
python -m venv xgboost_env
source xgboost_env/bin/activate
pip install pyarrow pandas venv-pack xgboost
# https://docs.rapids.org.cn/install#pip-install
pip install cudf-cu11 --extra-index-url=https://pypi.nvidia.com
venv-pack -o xgboost_env.tar.gz
使用 Conda
conda create -y -n xgboost_env -c conda-forge conda-pack python=3.9
conda activate xgboost_env
# use conda when the supported version of xgboost (1.7) is released on conda-forge
pip install xgboost
conda install cudf pyarrow pandas -c rapids -c nvidia -c conda-forge
conda pack -f -o xgboost_env.tar.gz
编写 PySpark 应用
下面的代码片段是使用 PySpark 训练 xgboost 模型的简单示例。请注意,我们使用特征名称列表作为输入,而不是向量类型。参数 "device=cuda"
明确指示训练将在 GPU 上执行。
from xgboost.spark import SparkXGBRegressor
spark = SparkSession.builder.getOrCreate()
# read data into spark dataframe
train_data_path = "xxxx/train"
train_df = spark.read.parquet(data_path)
test_data_path = "xxxx/test"
test_df = spark.read.parquet(test_data_path)
# assume the label column is named "class"
label_name = "class"
# get a list with feature column names
feature_names = [x.name for x in train_df.schema if x.name != label_name]
# create a xgboost pyspark regressor estimator and set device="cuda"
regressor = SparkXGBRegressor(
features_col=feature_names,
label_col=label_name,
num_workers=2,
device="cuda",
)
# train and return the model
model = regressor.fit(train_df)
# predict on test data
predict_df = model.transform(test_df)
predict_df.show()
与其他分布式接口一样,device
参数不支持指定序号,因为 GPU 由 Spark 而不是 XGBoost 管理(正确用法:device=cuda
,错误用法:device=cuda:0
)。
提交 PySpark 应用
假设您已经配置了支持 GPU 的 Spark standalone 集群。否则,请参阅 spark standalone configuration with GPU support。
从 XGBoost 2.0.1 开始,默认启用阶段级调度。因此,如果您使用的是 Spark standalone 集群版本 3.4.0 或更高版本,我们强烈建议将 "spark.task.resource.gpu.amount"
配置为一个分数。这将使得在 ETL 阶段可以并行运行多个任务。一个示例配置是 "spark.task.resource.gpu.amount=1/spark.executor.cores"`。但是,如果您使用的 XGBoost 版本低于 2.0.1 或 Spark standalone 集群版本低于 3.4.0,您仍然需要将
"spark.task.resource.gpu.amount"
设置为等于 "spark.executor.resource.gpu.amount"
。
注意
目前,XGBoost 中的阶段级调度功能仅限于 Spark standalone 集群模式。但是,一旦 Spark 3.5.1 正式发布,我们计划将其兼容性扩展到 YARN 和 Kubernetes。
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
spark-submit \
--master spark://<master-ip>:7077 \
--conf spark.executor.cores=12 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.08 \
--archives xgboost_env.tar.gz#environment \
xgboost_app.py
上面的命令提交了使用 pip 或 conda 创建的 python 环境的 xgboost pyspark 应用,指定每个 executor 请求 1 个 GPU 和 12 个 CPU。因此您可以看到,在 ETL 阶段,每个 executor 总共将有 12 个任务并发执行。
模型持久化
与标准 PySpark ml estimator 类似,可以使用 save
和 load
方法来持久化和重用模型
regressor = SparkXGBRegressor()
model = regressor.fit(train_df)
# save the model
model.save("/tmp/xgboost-pyspark-model")
# load the model
model2 = SparkXGBRankerModel.load("/tmp/xgboost-pyspark-model")
要导出 XGBoost 使用的底层 booster 模型
regressor = SparkXGBRegressor()
model = regressor.fit(train_df)
# the same booster object returned by xgboost.train
booster: xgb.Booster = model.get_booster()
booster.predict(...)
booster.save_model("model.json") # or model.ubj, depending on your choice of format.
此 booster 不仅与其他 Python 接口共享,还被所有 XGBoost 绑定使用,包括 C、Java 和 R 包。最后,可以直接从已保存的 spark estimator 中提取 booster 文件,而无需通过 getter。
import xgboost as xgb
bst = xgb.Booster()
# Loading the model saved in previous snippet
bst.load_model("/tmp/xgboost-pyspark-model/model/part-00000")
加速 xgboost pyspark 的整个流水线
借助 RAPIDS Accelerator for Apache Spark,您可以利用 GPU 加速 xgboost pyspark 的整个流水线(ETL、训练、转换),无需进行任何代码修改。同样,您可以选择将 "spark.task.resource.gpu.amount"
设置为一个分数,从而在 ETL 阶段并行执行更多任务。更多详情请参阅 提交 PySpark 应用。
下面显示了一个带有额外 spark 配置和依赖的提交命令示例
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
spark-submit \
--master spark://<master-ip>:7077 \
--conf spark.executor.cores=12 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.08 \
--packages com.nvidia:rapids-4-spark_2.12:24.04.1 \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \
--archives xgboost_env.tar.gz#environment \
xgboost_app.py
启用 rapids 插件时,需要 JVM rapids 插件和 cuDF Python 包。更多配置选项以及插件的详细信息可以在上面的 RAPIDS 链接中找到。
进阶用法
XGBoost 需要将输入数据集重新分区到 num_workers 数量,以确保同时运行 num_workers 个训练任务。但是,重新分区是一项代价高昂的操作。
如果存在直接从源读取数据并将其直接拟合到 XGBoost 而不引入 shuffle 阶段的场景,用户可以通过将 Spark 配置参数 spark.sql.files.maxPartitionNum
和 spark.sql.files.minPartitionNum
设置为 num_workers 来避免重新分区。这会指示 Spark 自动将数据集分区到所需的数量。
但是,如果输入数据集存在倾斜(即数据分布不均匀),将分区数设置为 num_workers 可能效率不高。在这种情况下,用户可以设置 force_repartition=true
选项,明确强制 XGBoost 重新分区数据集,即使分区数已经等于 num_workers。这可以确保数据在 workers 之间均匀分布。