使用 PySpark 的分布式 XGBoost

从 1.7.0 版本开始,xgboost 支持 pyspark estimator API。

注意

该集成仅在 Linux 发行版上经过测试。

XGBoost PySpark Estimator

SparkXGBRegressor

SparkXGBRegressor 是一个 PySpark ML estimator。它基于 XGBoost python 库实现了 XGBoost 回归算法,并且可以在 PySpark Pipeline 和 CrossValidator/TrainValidationSplit/OneVsRest 等 PySpark ML 元算法中使用。

我们可以像下面这样创建一个 SparkXGBRegressor estimator

from xgboost.spark import SparkXGBRegressor
xgb_regressor = SparkXGBRegressor(
  features_col="features",
  label_col="label",
  num_workers=2,
)

上面的代码片段创建了一个 spark estimator,它可以拟合 spark 数据集,并返回一个 spark model,该 model 可以转换 spark 数据集并生成带有预测列的数据集。我们可以将几乎所有 xgboost sklearn estimator 参数设置为 SparkXGBRegressor 的参数,但 spark estimator 禁止某些参数,例如 nthread,并且某些参数被替换为 pyspark 特定参数,例如 weight_colvalidation_indicator_col,详情请参阅 SparkXGBRegressor 文档。

以下代码片段展示了如何训练 spark xgboost regressor 模型,首先我们需要准备一个训练数据集,它是一个包含“label”列和“features”列(或多个列)的 spark dataframe,“features”列必须是 pyspark.ml.linalg.Vector 类型或 spark array 类型或一个特征列名称列表。

xgb_regressor_model = xgb_regressor.fit(train_spark_dataframe)

以下代码片段展示了如何使用 spark xgboost regressor model 预测测试数据,首先我们需要准备一个测试数据集,它是一个包含“features”和“label”列的 spark dataframe,“features”列必须是 pyspark.ml.linalg.Vector 类型或 spark array 类型。

transformed_test_spark_dataframe = xgb_regressor_model.transform(test_spark_dataframe)

上面的代码片段返回一个 transformed_test_spark_dataframe,其中包含输入数据集的列和一个表示预测结果的附加列“prediction”。

SparkXGBClassifier

SparkXGBClassifier estimator 具有与 SparkXGBRegressor 相似的 API,但它有一些 pyspark 分类器特有的参数,例如 raw_prediction_colprobability_col 参数。相应地,默认情况下,SparkXGBClassifierModel 转换测试数据集时会生成包含 3 个新列的结果数据集

  • “prediction”:表示预测的标签。

  • “raw_prediction”:表示输出的 margin 值。

  • “probability”:表示每个标签的预测概率。

XGBoost PySpark GPU 支持

XGBoost PySpark 完全支持 GPU 加速。用户不仅能够实现高效训练,还可以利用 GPU 加速整个 PySpark 流水线,包括 ETL 和推理。在下面的章节中,我们将介绍一个在支持 GPU 的 Spark standalone 集群上进行训练的示例。首先,我们需要安装一些额外的软件包,然后将 device 参数设置为 cudagpu

准备必要的软件包

除了 PySpark 和 XGBoost 模块,我们还需要用于处理 Spark dataframe 的 cuDF 包。我们建议使用 Conda 或 Virtualenv 来管理 PySpark 作业的 python 依赖。有关 PySpark 依赖管理的更多详情,请参阅 How to Manage Python Dependencies in PySpark

简而言之,要使用 virtualenv 和 pip 创建一个可以发送到远程集群的 Python 环境

python -m venv xgboost_env
source xgboost_env/bin/activate
pip install pyarrow pandas venv-pack xgboost
# https://docs.rapids.org.cn/install#pip-install
pip install cudf-cu11 --extra-index-url=https://pypi.nvidia.com
venv-pack -o xgboost_env.tar.gz

使用 Conda

conda create -y -n xgboost_env -c conda-forge conda-pack python=3.9
conda activate xgboost_env
# use conda when the supported version of xgboost (1.7) is released on conda-forge
pip install xgboost
conda install cudf pyarrow pandas -c rapids -c nvidia -c conda-forge
conda pack -f -o xgboost_env.tar.gz

编写 PySpark 应用

下面的代码片段是使用 PySpark 训练 xgboost 模型的简单示例。请注意,我们使用特征名称列表作为输入,而不是向量类型。参数 "device=cuda" 明确指示训练将在 GPU 上执行。

from xgboost.spark import SparkXGBRegressor
spark = SparkSession.builder.getOrCreate()

# read data into spark dataframe
train_data_path = "xxxx/train"
train_df = spark.read.parquet(data_path)

test_data_path = "xxxx/test"
test_df = spark.read.parquet(test_data_path)

# assume the label column is named "class"
label_name = "class"

# get a list with feature column names
feature_names = [x.name for x in train_df.schema if x.name != label_name]

# create a xgboost pyspark regressor estimator and set device="cuda"
regressor = SparkXGBRegressor(
  features_col=feature_names,
  label_col=label_name,
  num_workers=2,
  device="cuda",
)

# train and return the model
model = regressor.fit(train_df)

# predict on test data
predict_df = model.transform(test_df)
predict_df.show()

与其他分布式接口一样,device 参数不支持指定序号,因为 GPU 由 Spark 而不是 XGBoost 管理(正确用法:device=cuda,错误用法:device=cuda:0)。

提交 PySpark 应用

假设您已经配置了支持 GPU 的 Spark standalone 集群。否则,请参阅 spark standalone configuration with GPU support

从 XGBoost 2.0.1 开始,默认启用阶段级调度。因此,如果您使用的是 Spark standalone 集群版本 3.4.0 或更高版本,我们强烈建议将 "spark.task.resource.gpu.amount" 配置为一个分数。这将使得在 ETL 阶段可以并行运行多个任务。一个示例配置是 "spark.task.resource.gpu.amount=1/spark.executor.cores"`。但是,如果您使用的 XGBoost 版本低于 2.0.1 或 Spark standalone 集群版本低于 3.4.0,您仍然需要将 "spark.task.resource.gpu.amount" 设置为等于 "spark.executor.resource.gpu.amount"

注意

目前,XGBoost 中的阶段级调度功能仅限于 Spark standalone 集群模式。但是,一旦 Spark 3.5.1 正式发布,我们计划将其兼容性扩展到 YARN 和 Kubernetes。

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python

spark-submit \
  --master spark://<master-ip>:7077 \
  --conf spark.executor.cores=12 \
  --conf spark.task.cpus=1 \
  --conf spark.executor.resource.gpu.amount=1 \
  --conf spark.task.resource.gpu.amount=0.08 \
  --archives xgboost_env.tar.gz#environment \
  xgboost_app.py

上面的命令提交了使用 pip 或 conda 创建的 python 环境的 xgboost pyspark 应用,指定每个 executor 请求 1 个 GPU 和 12 个 CPU。因此您可以看到,在 ETL 阶段,每个 executor 总共将有 12 个任务并发执行。

模型持久化

与标准 PySpark ml estimator 类似,可以使用 saveload 方法来持久化和重用模型

regressor = SparkXGBRegressor()
model = regressor.fit(train_df)
# save the model
model.save("/tmp/xgboost-pyspark-model")
# load the model
model2 = SparkXGBRankerModel.load("/tmp/xgboost-pyspark-model")

要导出 XGBoost 使用的底层 booster 模型

regressor = SparkXGBRegressor()
model = regressor.fit(train_df)
# the same booster object returned by xgboost.train
booster: xgb.Booster = model.get_booster()
booster.predict(...)
booster.save_model("model.json") # or model.ubj, depending on your choice of format.

此 booster 不仅与其他 Python 接口共享,还被所有 XGBoost 绑定使用,包括 C、Java 和 R 包。最后,可以直接从已保存的 spark estimator 中提取 booster 文件,而无需通过 getter。

import xgboost as xgb
bst = xgb.Booster()
# Loading the model saved in previous snippet
bst.load_model("/tmp/xgboost-pyspark-model/model/part-00000")

加速 xgboost pyspark 的整个流水线

借助 RAPIDS Accelerator for Apache Spark,您可以利用 GPU 加速 xgboost pyspark 的整个流水线(ETL、训练、转换),无需进行任何代码修改。同样,您可以选择将 "spark.task.resource.gpu.amount" 设置为一个分数,从而在 ETL 阶段并行执行更多任务。更多详情请参阅 提交 PySpark 应用

下面显示了一个带有额外 spark 配置和依赖的提交命令示例

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python

spark-submit \
  --master spark://<master-ip>:7077 \
  --conf spark.executor.cores=12 \
  --conf spark.task.cpus=1 \
  --conf spark.executor.resource.gpu.amount=1 \
  --conf spark.task.resource.gpu.amount=0.08 \
  --packages com.nvidia:rapids-4-spark_2.12:24.04.1 \
  --conf spark.plugins=com.nvidia.spark.SQLPlugin \
  --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \
  --archives xgboost_env.tar.gz#environment \
  xgboost_app.py

启用 rapids 插件时,需要 JVM rapids 插件和 cuDF Python 包。更多配置选项以及插件的详细信息可以在上面的 RAPIDS 链接中找到。

进阶用法

XGBoost 需要将输入数据集重新分区到 num_workers 数量,以确保同时运行 num_workers 个训练任务。但是,重新分区是一项代价高昂的操作。

如果存在直接从源读取数据并将其直接拟合到 XGBoost 而不引入 shuffle 阶段的场景,用户可以通过将 Spark 配置参数 spark.sql.files.maxPartitionNumspark.sql.files.minPartitionNum 设置为 num_workers 来避免重新分区。这会指示 Spark 自动将数据集分区到所需的数量。

但是,如果输入数据集存在倾斜(即数据分布不均匀),将分区数设置为 num_workers 可能效率不高。在这种情况下,用户可以设置 force_repartition=true 选项,明确强制 XGBoost 重新分区数据集,即使分区数已经等于 num_workers。这可以确保数据在 workers 之间均匀分布。