使用 Ray 分布式运行 XGBoost

Ray 是一个通用的分布式执行框架。Ray 可以用于将计算从单个节点扩展到数百个节点的集群,而无需更改任何代码。

Ray 的 Python 绑定附带了一系列维护良好的机器学习库,用于超参数优化和模型服务。

The XGBoost-Ray 项目提供了在 Ray 集群上运行 XGBoost 训练和预测作业的接口。它允许利用分布式数据表示形式,例如 Modin 数据帧,以及从云存储(例如 Parquet 文件)进行分布式加载。

XGBoost-Ray 与超参数优化库 Ray Tune 集成良好,并实现了先进的容错处理机制。使用 Ray,只需向集群添加新节点,即可将训练作业扩展到数百个节点。您还可以使用 Ray 来利用多 GPU XGBoost 训练。

安装和启动 Ray

可以从 PyPI 安装 Ray,如下所示:

pip install ray

如果您在单机上使用 Ray,则无需执行任何其他操作 - XGBoost-Ray 在使用时将自动启动一个本地 Ray 集群。

如果您想在集群上使用 Ray,可以使用 Ray 集群启动器

安装 XGBoost-Ray

XGBoost-Ray 也可以通过 PyPI 获取

pip install xgboost_ray

这将安装在 Ray 上运行 XGBoost 所需的所有依赖项,包括 Ray 本身(如果之前未安装的话)。

使用 XGBoost-Ray 进行训练和预测

XGBoost-Ray 使用与核心 XGBoost 相同的 API。只有两个区别:

  1. 您将使用 xgboost_ray.RayDMatrix 对象,而不是使用 xgboost.DMatrix

  2. 有一个额外的 xgboost_ray.RayParams 参数可用于配置分布式训练。

简单训练示例

要运行这个简单的示例,您需要安装 scikit-learn(使用 pip install sklearn)。

在本例中,我们将加载 乳腺癌数据集,并使用两个 actor 训练一个二分类器。

from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)

evals_result = {}
bst = train(
    {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    },
    train_set,
    evals_result=evals_result,
    evals=[(train_set, "train")],
    verbose_eval=False,
    ray_params=RayParams(num_actors=2, cpus_per_actor=1))

bst.save_model("model.xgb")
print("Final training error: {:.4f}".format(
    evals_result["train"]["error"][-1]))

与非分布式 API 相比,唯一的区别是导入语句(使用 xgboost_ray 而不是 xgboost)、使用 RayDMatrix 而不是 DMatrix,以及传递一个 xgboost_ray.RayParams 对象。

返回的对象是一个常规的 xgboost.Booster 实例。

简单预测示例

from xgboost_ray import RayDMatrix, RayParams, predict
from sklearn.datasets import load_breast_cancer
import xgboost as xgb

data, labels = load_breast_cancer(return_X_y=True)

dpred = RayDMatrix(data, labels)

bst = xgb.Booster(model_file="model.xgb")
pred_ray = predict(bst, dpred, ray_params=RayParams(num_actors=2))

print(pred_ray)

在本例中,数据将被分割到两个 actor 中。结果数组将按正确顺序集成这些数据。

RayParams 对象

RayParams 对象用于配置与分布式训练相关的各种设置。

多 GPU 训练

Ray 自动检测集群节点上的 GPU。要开始在多个 GPU 上训练,您只需设置 RayParams 对象的 gpus_per_actor 参数,以及用于多个 GPU 的 num_actors 参数

ray_params = RayParams(
    num_actors=4,
    gpus_per_actor=1,
)

这将在四个 GPU 上并行训练。

请注意,通常每个 actor 分配一个以上的 GPU 没有意义,因为 XGBoost 依赖于分布式库(如 Dask 或 Ray)来利用多 GPU 训练。

设置每个 actor 的 CPU 数量

XGBoost 原生利用多线程来加速计算。因此,如果您只在 CPU 上训练,每个节点使用多个 actor 可能没有好处。在这种情况下,假设您有一个由同构节点组成的集群,将每个 actor 的 CPU 数量设置为每个节点上可用的 CPU 数量,并将 actor 的数量设置为节点的数量。

如果您在单个节点上进行多 GPU 训练,请将可用 CPU 数量平均分配给所有 actor。例如,如果您有 16 个 CPU 和 4 个 GPU 可用,则每个 actor 应访问 1 个 GPU 和 4 个 CPU。

如果您使用的是异构节点集群(CPU 数量不同),您可能只想使用 最大公约数 作为每个 actor 的 CPU 数量。例如,如果您有一个由三个节点组成的集群,分别有 4、8 和 12 个 CPU,您可以启动 6 个 actor,每个 actor 有 4 个 CPU,以实现最大的 CPU 利用率。

容错

XGBoost-Ray 支持两种容错模式。在 非弹性训练 模式下,当训练 actor 死亡时(例如由于节点宕机),训练作业将停止,XGBoost-Ray 将等待 actor(或其资源)再次可用(这可能在不同的节点上),然后所有 actor 返回后继续训练。

弹性训练 模式下,当训练 actor 死亡时,其余 actor 将在没有死亡 actor 的情况下继续训练。如果 actor 返回,它将再次重新集成到训练中。

请注意,在弹性训练中,这意味着您将在一段时间内使用较少的数据进行训练。这样做的好处是,即使节点在训练运行的剩余时间内消失,您也可以继续训练,而无需等待它再次启动。在实践中,这通常会导致准确性略微下降,但与非弹性训练相比,训练时间会大大缩短。

两种训练模式都可以通过相应的 xgboost_ray.RayParams 参数进行配置。

超参数优化

XGBoost-Ray 与 超参数优化框架 Ray Tune 集成良好。Ray Tune 使用 Ray 启动具有不同超参数配置的多个分布式试验。如果与 XGBoost-Ray 一起使用,这些试验将启动它们自己的分布式训练作业。

XGBoost-Ray 自动将评估结果报告回 Ray Tune。您只需要做几件事:

  1. 将您的 XGBoost-Ray 训练调用放入一个接受参数配置的函数中(如下例中的 train_model)。

  2. 创建一个 xgboost_ray.RayParams 对象(如下例中的 ray_params)。

  3. 定义参数搜索空间(如下例中的 config 字典)。

  4. 调用 tune.run()
    • metric 参数应包含您想要优化的指标。通常,这由传递给 xgboost_ray.train()evals 参数的前缀以及在 XGBoost 参数中传递的 eval_metric 组成(如下例中的 train-error)。

    • mode 应为 minmax,取决于您是想最小化还是最大化该指标

    • 应使用 ray_params.get_tune_resources() 设置 resources_per_actor。这将确保每个试验都具有启动其分布式训练作业所需的资源。

from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

num_actors = 4
num_cpus_per_actor = 1

ray_params = RayParams(
    num_actors=num_actors, cpus_per_actor=num_cpus_per_actor)

def train_model(config):
    train_x, train_y = load_breast_cancer(return_X_y=True)
    train_set = RayDMatrix(train_x, train_y)

    evals_result = {}
    bst = train(
        params=config,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(train_set, "train")],
        verbose_eval=False,
        ray_params=ray_params)
    bst.save_model("model.xgb")

from ray import tune

# Specify the hyperparameter search space.
config = {
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
    "eta": tune.loguniform(1e-4, 1e-1),
    "subsample": tune.uniform(0.5, 1.0),
    "max_depth": tune.randint(1, 9)
}

# Make sure to use the `get_tune_resources` method to set the `resources_per_trial`
analysis = tune.run(
    train_model,
    config=config,
    metric="train-error",
    mode="min",
    num_samples=4,
    resources_per_trial=ray_params.get_tune_resources())
print("Best hyperparameters", analysis.best_config)

Ray Tune 支持各种搜索算法和库(例如 BayesOpt、Tree-Parzen 估计器)智能调度器(如逐次减半)以及其他功能。有关更多信息,请参阅 Ray Tune 文档

附加资源