XGBoost4J-Spark-GPU 教程

XGBoost4J-Spark-GPU 是一个开源库,旨在通过利用 RAPIDS Accelerator for Apache Spark 产品,加速 Apache Spark 集群上端到端的分布式 XGBoost 训练(使用 GPU)。

本教程将向您展示如何使用 XGBoost4J-Spark-GPU

使用 XGBoost4J-Spark-GPU 构建机器学习应用

将 XGBoost 添加到您的项目

在深入了解 XGBoost4J-Spark-GPU 的使用教程之前,建议参考 从 Maven 仓库安装 获取将 XGBoost4J-Spark-GPU 添加为项目依赖项的说明。我们提供稳定版本和快照版本,以方便您使用。

数据准备

在本节中,我们使用 Iris 数据集作为示例,展示如何使用 Apache Spark 转换原始数据集,使其符合 XGBoost 的数据接口要求。

Iris 数据集以 CSV 格式提供。每个实例包含 4 个特征:“萼片长度”、“萼片宽度”、“花瓣长度”和“花瓣宽度”。此外,它还包含“class”列,该列本质上是标签,有三个可能的值:“Iris Setosa”、“Iris Versicolour”和“Iris Virginica”。

使用 Spark 内置读取器读取数据集

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

val spark = SparkSession.builder().getOrCreate()

val labelName = "class"
val schema = new StructType(Array(
    StructField("sepal length", DoubleType, true),
    StructField("sepal width", DoubleType, true),
    StructField("petal length", DoubleType, true),
    StructField("petal width", DoubleType, true),
    StructField(labelName, StringType, true)))

val xgbInput = spark.read.option("header", "false")
    .schema(schema)
    .csv(dataPath)

首先,我们创建 SparkSession 的实例,它是处理 DataFrame 的任何 Spark 应用的入口点。`schema` 变量定义了包裹 Iris 数据的 DataFrame 的结构。通过显式设置这个结构,我们可以定义列名及其类型;否则列名将是 Spark 派生的默认名称,例如 `_col0` 等。最后,我们可以使用 Spark 内置的 CSV 读取器将 Iris CSV 文件加载为名为 `xgbInput` 的 DataFrame。

Apache Spark 还包含许多用于其他格式的内置读取器,例如 ORC、Parquet、Avro、JSON。

转换原始 Iris 数据集

为了使 Iris 数据集能够被 XGBoost 识别,我们需要将字符串类型的标签(即“class”)编码为双精度浮点数类型的标签。

将字符串类型标签转换为双精度浮点数的其中一种方法是使用 Spark 内置的特征转换器 StringIndexer。但此功能在 RAPIDS Accelerator 中未加速,这意味着它将回退到 CPU 执行。因此,我们使用另一种方法来实现相同目标,代码如下

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val spec = Window.orderBy(labelName)
val Array(train, test) = xgbInput
    .withColumn("tmpClassName", dense_rank().over(spec) - 1)
    .drop(labelName)
    .withColumnRenamed("tmpClassName", labelName)
    .randomSplit(Array(0.7, 0.3), seed = 1)

train.show(5)
+------------+-----------+------------+-----------+-----+
|sepal length|sepal width|petal length|petal width|class|
+------------+-----------+------------+-----------+-----+
|         4.3|        3.0|         1.1|        0.1|    0|
|         4.4|        2.9|         1.4|        0.2|    0|
|         4.4|        3.0|         1.3|        0.2|    0|
|         4.4|        3.2|         1.3|        0.2|    0|
|         4.6|        3.2|         1.4|        0.2|    0|
+------------+-----------+------------+-----------+-----+

通过窗口操作,我们将标签的字符串列映射到了标签索引。

训练

XGBoost4J-Spark-GPU 支持回归、分类和排序模型。尽管我们在本教程中使用 Iris 数据集演示如何使用 `XGBoost4J-Spark-GPU` 解决多类别分类问题,但在回归和排序问题中的用法与分类非常相似。

要训练 XGBoost 分类模型,首先需要定义一个 XGBoostClassifier

import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
val xgbParam = Map(
    "objective" -> "multi:softprob",
    "num_class" -> 3,
    "num_round" -> 100,
    "device" -> "cuda",
    "num_workers" -> 1)

val featuresNames = schema.fieldNames.filter(name => name != labelName)

val xgbClassifier = new XGBoostClassifier(xgbParam)
    .setFeaturesCol(featuresNames)
    .setLabelCol(labelName)

`device` 参数用于通知 XGBoost 使用 CUDA 设备而非 CPU。与单节点模式不同,GPU 由 Spark 管理而非 XGBoost。因此,不支持像 `cuda:1` 这样显式指定的设备序号。

训练 XGBoost 模型可用的参数可以在 这里 找到。与 XGBoost4J-Spark 包类似,除了默认参数集之外,XGBoost4J-Spark-GPU 也支持这些参数的驼峰命名法变体,以与 Spark 的 MLlib 命名约定保持一致。

具体而言, 此页面 中的每个参数在 XGBoost4J-Spark-GPU 中都有其对应的驼峰命名法形式。例如,要设置每棵树的 `max_depth`,您可以像上面代码片段中那样传递参数(作为包裹在 Map 中的 `max_depth`),或者您可以通过 XGBoostClassifier 中的 setter 方法进行设置。

val xgbClassifier = new XGBoostClassifier(xgbParam)
    .setFeaturesCol(featuresNames)
    .setLabelCol(labelName)
xgbClassifier.setMaxDepth(2)

注意

与 XGBoost4j-Spark 不同的是,后者接受 VectorUDT 类型的特征列和特征列名数组,而 XGBoost4J-Spark-GPU 仅通过 `setFeaturesCol(value: Array[String])` 接受特征列名数组。

设置 XGBoostClassifier 参数和特征/标签列后,我们可以通过使用输入 DataFrame 拟合 XGBoostClassifier 来构建转换器 XGBoostClassificationModel。此 `fit` 操作本质上是训练过程,生成的模型可用于其他任务,例如预测。

val xgbClassificationModel = xgbClassifier.fit(train)

预测

当我们获得一个模型,无论是 XGBoostClassificationModel、XGBoostRegressionModel 还是 XGBoostRankerModel,它都接收一个 DataFrame 作为输入,读取包含特征向量的列,对每个特征向量进行预测,并默认输出一个包含以下列的新 DataFrame

  • XGBoostClassificationModel 将为每个可能的标签输出边距(`rawPredictionCol`)、概率(`probabilityCol`)和最终预测标签(`predictionCol`)。

  • XGBoostRegressionModel 将输出预测标签(`predictionCol`)。

  • XGBoostRankerModel 将输出预测标签(`predictionCol`)。

val xgbClassificationModel = xgbClassifier.fit(train)
val results = xgbClassificationModel.transform(test)
results.show()

通过上面的代码片段,我们得到一个 DataFrame 作为结果,其中包含边距、每个类别的概率以及每个实例的预测结果。

+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+
|sepal length|sepal width|      petal length|        petal width|class|       rawPrediction|         probability|prediction|
+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+
|         4.5|        2.3|               1.3|0.30000000000000004|    0|[3.16666603088378...|[0.98853939771652...|       0.0|
|         4.6|        3.1|               1.5|                0.2|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         4.8|        3.1|               1.6|                0.2|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         4.8|        3.4|               1.6|                0.2|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         4.8|        3.4|1.9000000000000001|                0.2|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         4.9|        2.4|               3.3|                1.0|    1|[-2.1498908996582...|[0.00596602633595...|       1.0|
|         4.9|        2.5|               4.5|                1.7|    2|[-2.1498908996582...|[0.00596602633595...|       1.0|
|         5.0|        3.5|               1.3|0.30000000000000004|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         5.1|        2.5|               3.0|                1.1|    1|[3.16666603088378...|[0.98853939771652...|       0.0|
|         5.1|        3.3|               1.7|                0.5|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         5.1|        3.5|               1.4|                0.2|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         5.1|        3.8|               1.6|                0.2|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         5.2|        3.4|               1.4|                0.2|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         5.2|        3.5|               1.5|                0.2|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         5.2|        4.1|               1.5|                0.1|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         5.4|        3.9|               1.7|                0.4|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         5.5|        2.4|               3.8|                1.1|    1|[-2.1498908996582...|[0.00596602633595...|       1.0|
|         5.5|        4.2|               1.4|                0.2|    0|[3.25857257843017...|[0.98969423770904...|       0.0|
|         5.7|        2.5|               5.0|                2.0|    2|[-2.1498908996582...|[0.00280966912396...|       2.0|
|         5.7|        3.0|               4.2|                1.2|    1|[-2.1498908996582...|[0.00643939292058...|       1.0|
+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+

提交应用

假设您已经配置了支持 GPU 的 Spark standalone 集群。否则,请参考 支持 GPU 的 Spark standalone 配置

从 XGBoost 2.1.0 版本开始,会自动启用阶段级调度(stage-level scheduling)。因此,如果您使用的是 Spark standalone 集群版本 3.4.0 或更高版本,我们强烈建议将 `spark.task.resource.gpu.amount` 配置为一个分数。这将允许在 ETL 阶段并行运行多个任务。例如配置为 `spark.task.resource.gpu.amount=1/spark.executor.cores`。但是,如果您使用的 XGBoost 版本低于 2.1.0 或 Spark standalone 集群版本低于 3.4.0,您仍然需要将 `spark.task.resource.gpu.amount` 设置为等于 `spark.executor.resource.gpu.amount`。

假设应用主类为“Iris”,应用 jar 包为“iris-1.0.0.jar”,下面提供了一个示例,演示如何将 xgboost 应用提交到 Apache Spark Standalone 集群。

rapids_version=24.08.0
xgboost_version=$LATEST_VERSION
main_class=Iris
app_jar=iris-1.0.0.jar

spark-submit \
  --master $master \
  --packages com.nvidia:rapids-4-spark_2.12:${rapids_version},ml.dmlc:xgboost4j-spark-gpu_2.12:${xgboost_version} \
  --conf spark.executor.cores=12 \
  --conf spark.task.cpus=1 \
  --conf spark.executor.resource.gpu.amount=1 \
  --conf spark.task.resource.gpu.amount=0.08 \
  --conf spark.rapids.sql.csv.read.double.enabled=true \
  --conf spark.rapids.sql.hasNans=false \
  --conf spark.plugins=com.nvidia.spark.SQLPlugin \
  --class ${main_class} \
   ${app_jar}
  • 首先,我们需要通过 `--packages` 指定 RAPIDS Accelerator 和 xgboost4j-spark-gpu 包

  • 其次,RAPIDS Accelerator 是一个 Spark 插件,因此我们需要通过指定 `spark.plugins=com.nvidia.spark.SQLPlugin` 来进行配置

有关 RAPIDS Accelerator 其他配置的详细信息,请参考 配置

有关 RAPIDS Accelerator 常见问题,请参考 常见问题

RMM 支持

在 3.0 版本中添加。

当使用 RMM 插件编译时(参见 从源代码构建),XGBoost spark 包可以根据 `spark.rapids.memory.gpu.pooling.enabled` 和 `spark.rapids.memory.gpu.pool` 自动重用 RMM 内存池。请注意,这两个提交选项都需要相应设置。此外,XGBoost 使用 NCCL 进行 GPU 通信,这需要一些 GPU 内存用于通信缓冲区,不应让 RMM 占用所有可用内存。内存池相关配置示例

spark-submit \
  --master $master \
  --conf spark.rapids.memory.gpu.allocFraction=0.5 \
  --conf spark.rapids.memory.gpu.maxAllocFraction=0.8 \
  --conf spark.rapids.memory.gpu.pool=ARENA \
  --conf spark.rapids.memory.gpu.pooling.enabled=true \
  ...