使用 PySpark 进行分布式 XGBoost

从 1.7.0 版本开始,xgboost 支持 pyspark 估计器 API。

注意

此集成仅在 Linux 发行版上进行过测试。

XGBoost PySpark 估计器

SparkXGBRegressor

SparkXGBRegressor 是一个 PySpark ML 估计器。它实现了基于 XGBoost Python 库的 XGBoost 分类算法,可用于 PySpark 流水线和 PySpark ML 元算法,如 CrossValidator/TrainValidationSplit/OneVsRest。

我们可以像这样创建一个 SparkXGBRegressor 估计器

from xgboost.spark import SparkXGBRegressor
xgb_regressor = SparkXGBRegressor(
  features_col="features",
  label_col="label",
  num_workers=2,
)

上述代码片段创建了一个 spark 估计器,它可以对 spark 数据集进行拟合,并返回一个 spark 模型,该模型可以转换 spark 数据集并生成带预测列的数据集。我们可以将几乎所有 xgboost sklearn 估计器参数设置为 SparkXGBRegressor 参数,但某些参数如 nthread 在 spark 估计器中是被禁止的,并且某些参数被 pyspark 特定参数替换,例如 weight_colvalidation_indicator_col,详情请参阅 SparkXGBRegressor 文档。

以下代码片段展示了如何训练 spark xgboost 回归器模型,首先我们需要准备一个训练数据集,作为包含“label”列和“features”列(或多列)的 spark 数据框,“features”列(或多列)必须是 pyspark.ml.linalg.Vector 类型或 spark 数组类型或特征列名称列表。

xgb_regressor_model = xgb_regressor.fit(train_spark_dataframe)

以下代码片段展示了如何使用 spark xgboost 回归器模型预测测试数据,首先我们需要准备一个测试数据集,作为包含“features”和“label”列的 spark 数据框,“features”列必须是 pyspark.ml.linalg.Vector 类型或 spark 数组类型。

transformed_test_spark_dataframe = xgb_regressor_model.transform(test_spark_dataframe)

上述代码片段返回一个 transformed_test_spark_dataframe,其中包含输入数据集列和一个附加列“prediction”,表示预测结果。

SparkXGBClassifier

SparkXGBClassifier 估计器与 SparkXGBRegressor 具有相似的 API,但它有一些 pyspark 分类器特有的参数,例如 raw_prediction_colprobability_col 参数。相应地,默认情况下,SparkXGBClassifierModel 转换测试数据集将生成具有 3 个新列的结果数据集

  • “prediction”:表示预测标签。

  • “raw_prediction”:表示输出边际值。

  • “probability”:表示每个标签的预测概率。

XGBoost PySpark GPU 支持

XGBoost PySpark 完全支持 GPU 加速。用户不仅能够启用高效训练,还能将 GPU 用于整个 PySpark 流水线,包括 ETL 和推理。在下面的章节中,我们将通过一个在支持 GPU 的 Spark 独立集群上进行训练的示例。首先,我们需要安装一些额外的软件包,然后将 device 参数设置为 cudagpu

准备必要的包

除了 PySpark 和 XGBoost 模块,我们还需要 cuDF 包来处理 Spark 数据框。我们建议使用 Conda 或 Virtualenv 来管理 PySpark 任务的 Python 依赖项。有关 PySpark 依赖项管理的更多详细信息,请参阅 如何在 PySpark 中管理 Python 依赖项

简而言之,要使用 virtualenv 和 pip 创建一个可以发送到远程集群的 Python 环境

python -m venv xgboost_env
source xgboost_env/bin/activate
pip install pyarrow pandas venv-pack xgboost
# https://docs.rapids.org.cn/install#pip-install
pip install cudf-cu11 --extra-index-url=https://pypi.nvidia.com
venv-pack -o xgboost_env.tar.gz

使用 Conda

conda create -y -n xgboost_env -c conda-forge conda-pack python=3.9
conda activate xgboost_env
# use conda when the supported version of xgboost (1.7) is released on conda-forge
pip install xgboost
conda install cudf pyarrow pandas -c rapids -c nvidia -c conda-forge
conda pack -f -o xgboost_env.tar.gz

编写你的 PySpark 应用程序

下面的代码片段是使用 PySpark 训练 xgboost 模型的一个小例子。请注意,我们使用特征名称列表而不是向量类型作为输入。参数 "device=cuda" 特别指示训练将在 GPU 上执行。

from xgboost.spark import SparkXGBRegressor
spark = SparkSession.builder.getOrCreate()

# read data into spark dataframe
train_data_path = "xxxx/train"
train_df = spark.read.parquet(data_path)

test_data_path = "xxxx/test"
test_df = spark.read.parquet(test_data_path)

# assume the label column is named "class"
label_name = "class"

# get a list with feature column names
feature_names = [x.name for x in train_df.schema if x.name != label_name]

# create a xgboost pyspark regressor estimator and set device="cuda"
regressor = SparkXGBRegressor(
  features_col=feature_names,
  label_col=label_name,
  num_workers=2,
  device="cuda",
)

# train and return the model
model = regressor.fit(train_df)

# predict on test data
predict_df = model.transform(test_df)
predict_df.show()

像其他分布式接口一样,device 参数不支持指定序号,因为 GPU 由 Spark 而非 XGBoost 管理(正确:device=cuda,错误:device=cuda:0)。

提交 PySpark 应用程序

假设您已配置支持 GPU 的 Spark 独立集群。否则,请参阅 带 GPU 支持的 spark 独立配置

从 XGBoost 2.0.1 开始,阶段级调度将自动启用。因此,如果您使用的是 Spark 独立集群版本 3.4.0 或更高版本,我们强烈建议将 "spark.task.resource.gpu.amount" 配置为小数。这将允许在 ETL 阶段并行运行多个任务。一个示例配置是 "spark.task.resource.gpu.amount=1/spark.executor.cores"。但是,如果您使用的是早于 2.0.1 的 XGBoost 版本或低于 3.4.0 的 Spark 独立集群版本,您仍然需要将 "spark.task.resource.gpu.amount" 设置为等于 "spark.executor.resource.gpu.amount"

注意

目前,XGBoost 中的阶段级调度功能仅限于 Spark 独立集群模式。然而,我们计划在 Spark 3.5.1 正式发布后将其兼容性扩展到 YARN 和 Kubernetes。

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python

spark-submit \
  --master spark://<master-ip>:7077 \
  --conf spark.executor.cores=12 \
  --conf spark.task.cpus=1 \
  --conf spark.executor.resource.gpu.amount=1 \
  --conf spark.task.resource.gpu.amount=0.08 \
  --archives xgboost_env.tar.gz#environment \
  xgboost_app.py

上述命令提交了 xgboost pyspark 应用程序,其中包含由 pip 或 conda 创建的 python 环境,并指定每个执行器请求 1 个 GPU 和 12 个 CPU。因此,您可以看到,在 ETL 阶段,每个执行器将并发执行总共 12 个任务。

模型持久化

与标准 PySpark ml 估计器类似,可以使用 saveload 方法来持久化和重用模型

regressor = SparkXGBRegressor()
model = regressor.fit(train_df)
# save the model
model.save("/tmp/xgboost-pyspark-model")
# load the model
model2 = SparkXGBRankerModel.load("/tmp/xgboost-pyspark-model")

导出 XGBoost 使用的底层 booster 模型

regressor = SparkXGBRegressor()
model = regressor.fit(train_df)
# the same booster object returned by xgboost.train
booster: xgb.Booster = model.get_booster()
booster.predict(...)
booster.save_model("model.json") # or model.ubj, depending on your choice of format.

此 Booster 不仅与其他 Python 接口共享,还被所有 XGBoost 绑定(包括 C、Java 和 R 包)使用。最后,可以直接从保存的 spark 估计器中提取 Booster 文件,而无需通过 getter。

import xgboost as xgb
bst = xgb.Booster()
# Loading the model saved in previous snippet
bst.load_model("/tmp/xgboost-pyspark-model/model/part-00000")

加速 xgboost pyspark 的整个流水线

借助 RAPIDS Accelerator for Apache Spark,您可以利用 GPU 加速 xgboost pyspark 的整个流水线(ETL、训练、转换),而无需修改任何代码。同样,您可以将 "spark.task.resource.gpu.amount" 设置配置为分数,从而在 ETL 阶段并行执行更多任务。更多详细信息请参阅 提交 PySpark 应用程序

下面是一个示例提交命令,其中包含额外的 spark 配置和依赖项

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python

spark-submit \
  --master spark://<master-ip>:7077 \
  --conf spark.executor.cores=12 \
  --conf spark.task.cpus=1 \
  --conf spark.executor.resource.gpu.amount=1 \
  --conf spark.task.resource.gpu.amount=0.08 \
  --packages com.nvidia:rapids-4-spark_2.12:24.04.1 \
  --conf spark.plugins=com.nvidia.spark.SQLPlugin \
  --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \
  --archives xgboost_env.tar.gz#environment \
  xgboost_app.py

启用 rapids 插件时,需要 JVM rapids 插件和 cuDF Python 包。更多配置选项可在上面的 RAPIDS 链接中找到,其中包含插件的详细信息。

高级用法

XGBoost 需要将输入数据集重新分区到 num_workers,以确保 num_workers 训练任务同时运行。然而,重新分区是一个代价高昂的操作。

如果存在从源读取数据并直接将其拟合到 XGBoost 而无需引入 shuffle 阶段的场景,用户可以通过将 Spark 配置参数 spark.sql.files.maxPartitionNumspark.sql.files.minPartitionNum 设置为 num_workers 来避免重新分区。这会告诉 Spark 自动将数据集分区为所需的数量。

然而,如果输入数据集存在偏差(即数据分布不均匀),将分区数设置为 num_workers 可能效率不高。在这种情况下,用户可以设置 force_repartition=true 选项以显式强制 XGBoost 对数据集进行重新分区,即使分区数已等于 num_workers。这可确保数据在 worker 之间均匀分布。