XGBoost4J-Spark-GPU 教程
XGBoost4J-Spark-GPU 是一个开源库,旨在通过利用 适用于 Apache Spark 的 RAPIDS Accelerator 产品,端到端地使用 GPU 加速 Apache Spark 集群上的分布式 XGBoost 训练。
本教程将向您展示如何使用 XGBoost4J-Spark-GPU。
使用 XGBoost4J-Spark-GPU 构建 ML 应用程序
将 XGBoost 添加到您的项目中
在深入研究使用 XGBoost4J-Spark-GPU 的教程之前,建议参阅 从 Maven 仓库安装,了解将 XGBoost4J-Spark-GPU 作为项目依赖项添加的说明。我们提供稳定版本和快照版本以方便您使用。
数据准备
在本节中,我们以 Iris 数据集为例,展示如何使用 Apache Spark 转换原始数据集并使其适应 XGBoost 的数据接口。
Iris 数据集以 CSV 格式提供。每个实例包含 4 个特征:“萼片长度”、“萼片宽度”、“花瓣长度”和“花瓣宽度”。此外,它还包含“类别”列,该列本质上是具有三个可能值:“Setosa 鸢尾”、“Versicolour 鸢尾”和“Virginica 鸢尾”的标签。
使用 Spark 内置阅读器读取数据集
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
val spark = SparkSession.builder().getOrCreate()
val labelName = "class"
val schema = new StructType(Array(
StructField("sepal length", DoubleType, true),
StructField("sepal width", DoubleType, true),
StructField("petal length", DoubleType, true),
StructField("petal width", DoubleType, true),
StructField(labelName, StringType, true)))
val xgbInput = spark.read.option("header", "false")
.schema(schema)
.csv(dataPath)
首先,我们创建一个 SparkSession 实例,它是任何处理 DataFrames 的 Spark 应用程序的入口点。schema
变量定义了封装 Iris 数据的 DataFrame 的模式。通过显式设置此模式,我们可以定义列名及其类型;否则,列名将是 Spark 派生的默认名称,例如 _col0
等。最后,我们可以使用 Spark 的内置 CSV 阅读器将 Iris CSV 文件加载为名为 xgbInput
的 DataFrame。
Apache Spark 还包含许多用于其他格式(如 ORC、Parquet、Avro、JSON)的内置阅读器。
转换原始 Iris 数据集
为了使 Iris 数据集能够被 XGBoost 识别,我们需要将字符串类型的标签(即“类别”)编码为双精度浮点数类型的标签。
将字符串类型标签转换为双精度浮点数的一种方法是使用 Spark 的内置特征转换器 StringIndexer。但是此功能在 RAPIDS Accelerator 中未加速,这意味着它将回退到 CPU。相反,我们使用以下代码实现相同目标。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val spec = Window.orderBy(labelName)
val Array(train, test) = xgbInput
.withColumn("tmpClassName", dense_rank().over(spec) - 1)
.drop(labelName)
.withColumnRenamed("tmpClassName", labelName)
.randomSplit(Array(0.7, 0.3), seed = 1)
train.show(5)
+------------+-----------+------------+-----------+-----+
|sepal length|sepal width|petal length|petal width|class|
+------------+-----------+------------+-----------+-----+
| 4.3| 3.0| 1.1| 0.1| 0|
| 4.4| 2.9| 1.4| 0.2| 0|
| 4.4| 3.0| 1.3| 0.2| 0|
| 4.4| 3.2| 1.3| 0.2| 0|
| 4.6| 3.2| 1.4| 0.2| 0|
+------------+-----------+------------+-----------+-----+
通过窗口操作,我们已将标签的字符串列映射到标签索引。
训练
XGBoost4j-Spark-Gpu 支持回归、分类和排名模型。虽然我们在此教程中使用 Iris 数据集来演示如何使用 XGBoost4J-Spark-GPU
解决多分类问题,但回归和排名模型的用法与分类模型非常相似。
要训练用于分类的 XGBoost 模型,我们首先需要定义一个 XGBoostClassifier
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
val xgbParam = Map(
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 100,
"device" -> "cuda",
"num_workers" -> 1)
val featuresNames = schema.fieldNames.filter(name => name != labelName)
val xgbClassifier = new XGBoostClassifier(xgbParam)
.setFeaturesCol(featuresNames)
.setLabelCol(labelName)
device
参数用于通知 XGBoost 应该使用 CUDA 设备而不是 CPU。与单节点模式不同,GPU 由 Spark 管理而不是由 XGBoost 管理。因此,不支持显式指定的设备序号,例如 cuda:1
。
用于训练 XGBoost 模型的可用参数可在此处找到。与 XGBoost4J-Spark 包类似,除了默认参数集之外,XGBoost4J-Spark-GPU 也支持这些参数的驼峰式变体,以与 Spark 的 MLlib 命名约定保持一致。
具体来说,此页面中的每个参数在 XGBoost4J-Spark-GPU 中都有其驼峰式等效形式。例如,要设置每棵树的 max_depth
,您可以像我们在上面的代码片段中所做的那样传递参数(作为封装在 Map 中的 max_depth
),或者您可以通过 XGBoostClassifer 中的设置器来完成
val xgbClassifier = new XGBoostClassifier(xgbParam)
.setFeaturesCol(featuresNames)
.setLabelCol(labelName)
xgbClassifier.setMaxDepth(2)
注意
与 XGBoost4j-Spark 接受 VectorUDT 类型的特征列和特征列名数组不同,XGBoost4j-Spark-GPU 仅通过 setFeaturesCol(value: Array[String])
接受特征列名数组。
设置 XGBoostClassifier 参数和特征/标签列后,我们可以通过使用输入 DataFrame 拟合 XGBoostClassifier 来构建一个转换器,即 XGBoostClassificationModel。此 fit
操作本质上是训练过程,生成的模型随后可用于预测等其他任务。
val xgbClassificationModel = xgbClassifier.fit(train)
预测
当我们得到一个模型,无论是 XGBoostClassificationModel、XGBoostRegressionModel 还是 XGBoostRankerModel,它都将一个 DataFrame 作为输入,读取包含特征向量的列,对每个特征向量进行预测,并默认输出一个包含以下列的新 DataFrame
XGBoostClassificationModel 将输出每个可能标签的边距 (
rawPredictionCol
)、概率 (probabilityCol
) 和最终预测标签 (predictionCol
)。XGBoostRegressionModel 将输出预测标签 (
predictionCol
)。XGBoostRankerModel 将输出预测标签 (
predictionCol
)。
val xgbClassificationModel = xgbClassifier.fit(train)
val results = xgbClassificationModel.transform(test)
results.show()
使用上面的代码片段,我们得到一个 DataFrame 作为结果,其中包含每个类的边距、概率以及每个实例的预测。
+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+
|sepal length|sepal width| petal length| petal width|class| rawPrediction| probability|prediction|
+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+
| 4.5| 2.3| 1.3|0.30000000000000004| 0|[3.16666603088378...|[0.98853939771652...| 0.0|
| 4.6| 3.1| 1.5| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 4.8| 3.1| 1.6| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 4.8| 3.4| 1.6| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 4.8| 3.4|1.9000000000000001| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 4.9| 2.4| 3.3| 1.0| 1|[-2.1498908996582...|[0.00596602633595...| 1.0|
| 4.9| 2.5| 4.5| 1.7| 2|[-2.1498908996582...|[0.00596602633595...| 1.0|
| 5.0| 3.5| 1.3|0.30000000000000004| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.1| 2.5| 3.0| 1.1| 1|[3.16666603088378...|[0.98853939771652...| 0.0|
| 5.1| 3.3| 1.7| 0.5| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.1| 3.5| 1.4| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.1| 3.8| 1.6| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.2| 3.4| 1.4| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.2| 3.5| 1.5| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.2| 4.1| 1.5| 0.1| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.4| 3.9| 1.7| 0.4| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.5| 2.4| 3.8| 1.1| 1|[-2.1498908996582...|[0.00596602633595...| 1.0|
| 5.5| 4.2| 1.4| 0.2| 0|[3.25857257843017...|[0.98969423770904...| 0.0|
| 5.7| 2.5| 5.0| 2.0| 2|[-2.1498908996582...|[0.00280966912396...| 2.0|
| 5.7| 3.0| 4.2| 1.2| 1|[-2.1498908996582...|[0.00643939292058...| 1.0|
+------------+-----------+------------------+-------------------+-----+--------------------+--------------------+----------+
提交应用程序
假设您已配置支持 GPU 的 Spark 独立集群。否则,请参阅支持 GPU 的 Spark 独立配置。
从 XGBoost 2.1.0 开始,阶段级调度自动启用。因此,如果您使用的是 Spark 独立集群 3.4.0 或更高版本,我们强烈建议将 "spark.task.resource.gpu.amount"
配置为小数。这将允许在 ETL 阶段并行运行多个任务。一个示例配置是 "spark.task.resource.gpu.amount=1/spark.executor.cores"
。但是,如果您使用的是早于 2.1.0 的 XGBoost 版本或低于 3.4.0 的 Spark 独立集群版本,您仍然需要将 "spark.task.resource.gpu.amount"
设置为等于 "spark.executor.resource.gpu.amount"
。
假设应用程序主类是“Iris”,应用程序 jar 是“iris-1.0.0.jar”,下面提供了一个示例,演示如何将 xgboost 应用程序提交到 Apache Spark 独立集群。
rapids_version=24.08.0
xgboost_version=$LATEST_VERSION
main_class=Iris
app_jar=iris-1.0.0.jar
spark-submit \
--master $master \
--packages com.nvidia:rapids-4-spark_2.12:${rapids_version},ml.dmlc:xgboost4j-spark-gpu_2.12:${xgboost_version} \
--conf spark.executor.cores=12 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.08 \
--conf spark.rapids.sql.csv.read.double.enabled=true \
--conf spark.rapids.sql.hasNans=false \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--class ${main_class} \
${app_jar}
首先,我们需要通过
--packages
指定RAPIDS Accelerator, xgboost4j-spark-gpu
包。其次,
RAPIDS Accelerator
是一个 Spark 插件,因此我们需要通过指定spark.plugins=com.nvidia.spark.SQLPlugin
来配置它。
有关其他 RAPIDS Accelerator
配置的详细信息,请参阅配置。
有关 RAPIDS Accelerator 常见问题解答
,请参阅常见问题解答。
RMM 支持
3.0 版本新增。
当与 RMM 插件一起编译时(参见从源代码构建),XGBoost Spark 包可以根据 spark.rapids.memory.gpu.pooling.enabled 和 spark.rapids.memory.gpu.pool 自动重用 RMM 内存池。请注意,这两个提交选项都需要相应设置。此外,XGBoost 使用 NCCL 进行 GPU 通信,这需要一些 GPU 内存用于通信缓冲区,并且不应让 RMM 占用所有可用内存。与内存池相关的示例配置
spark-submit \
--master $master \
--conf spark.rapids.memory.gpu.allocFraction=0.5 \
--conf spark.rapids.memory.gpu.maxAllocFraction=0.8 \
--conf spark.rapids.memory.gpu.pool=ARENA \
--conf spark.rapids.memory.gpu.pooling.enabled=true \
...