使用 Dask 的分布式 XGBoost

Dask 是一个基于 Python 构建的并行计算库。Dask 允许轻松管理分布式 worker,并擅长处理大型分布式数据科学工作流。XGBoost 中的实现源自 dask-xgboost,但具有一些扩展功能和不同的接口。本教程重点介绍 Dask 与 CPU 树算法的基本用法。有关基于 GPU 训练和内部工作原理的概述,请参阅 XGBoost 的新官方 Dask API

注意

此集成未经 Windows 测试。

目录

要求

Dask 可以使用 pip 或 conda 安装(有关更多信息,请参阅 dask 安装文档)。为了使用 GPU 加速 XGBoost,建议使用 dask-cuda 创建 GPU 集群。

概述

一个 Dask 集群由三个不同的组件组成:一个集中式调度器、一个或多个 worker 以及一个或多个充当用户面向集群提交任务的入口点的客户端。当将 XGBoost 与 Dask 一起使用时,需要从客户端调用 XGBoost Dask 接口。下面是一个小例子,说明了在 Dask 集群上运行 XGBoost 的基本用法

from xgboost import dask as dxgb

import dask.array as da
import dask.distributed

if __name__ == "__main__":
    cluster = dask.distributed.LocalCluster()
    client = dask.distributed.Client(cluster)

    # X and y must be Dask dataframes or arrays
    num_obs = 1e5
    num_features = 20
    X = da.random.random(size=(num_obs, num_features), chunks=(1000, num_features))
    y = da.random.random(size=(num_obs, 1), chunks=(1000, 1))

    dtrain = dxgb.DaskDMatrix(client, X, y)
    # or
    # dtrain = dxgb.DaskQuantileDMatrix(client, X, y)

    output = dxgb.train(
        client,
        {"verbosity": 2, "tree_method": "hist", "objective": "reg:squarederror"},
        dtrain,
        num_boost_round=4,
        evals=[(dtrain, "train")],
    )

这里我们首先使用 distributed.LocalCluster 在单节点模式下创建一个集群,然后将 distributed.Client 连接到此集群,为后续计算设置环境。请注意,集群构造受 __name__ == "__main__" 保护,这是必要的,否则可能会出现模糊错误。

然后我们创建一个 xgboost.dask.DaskDMatrix 对象并将其连同其他一些参数传递给 xgboost.dask.train(),这很像 XGBoost 的常规非 Dask 接口。与该接口不同,datalabel 必须是 Dask DataFrameDask Array 实例。

XGBoost 的 Dask 接口的主要区别在于我们传递 Dask 客户端作为执行计算的额外参数。请注意,如果客户端设置为 None,XGBoost 将使用 Dask 返回的默认客户端。

XGBoost 中实现了两组 API。第一组是上述示例中所示的函数式 API。给定数据和一组参数,train 函数返回一个模型和计算历史记录作为 Python 字典

{
  "booster": Booster,
  "history": dict,
}

对于预测,将 train 返回的 output 传递给 xgboost.dask.predict()

prediction = dxgb.predict(client, output, dtrain)
# Or equivalently, pass ``output['booster']``:
prediction = dxgb.predict(client, output['booster'], dtrain)

也可以省去 DaskDMatrix 的构造,当不需要 base_margin 等元信息时,这可以使计算稍微快一些

prediction = dxgb.predict(client, output, X)
# Use inplace version.
prediction = dxgb.inplace_predict(client, output, X)

这里 prediction 是一个 Dask Array 对象,如果输入是 DaskDMatrixda.Array,则包含模型的预测。当将 Dask 集合直接放入 predict 函数或使用 xgboost.dask.inplace_predict() 时,输出类型取决于输入数据。有关详细信息,请参阅下一节。

或者,XGBoost 还使用 DaskXGBClassifierDaskXGBRegressorDaskXGBRanker 和 2 种随机森林变体实现了 Scikit-Learn 接口。这个包装器类似于 XGBoost 中的单节点 Scikit-Learn 接口,以 Dask 集合作为输入,并具有一个额外的 client 属性。有关更多示例,请参阅以下部分和 XGBoost Dask 功能演练

运行预测

在前面的示例中,我们使用 DaskDMatrix 作为 predict 函数的输入。实际上,也可以直接在 ArrayDataFrame 等 Dask 集合上调用 predict 函数,并且可能具有更好的预测性能。当 DataFrame 用作预测输入时,结果是 Dask Series 而不是数组。此外,Dask 接口支持就地预测,这有助于减少内存使用和预测时间。

# dtrain is the DaskDMatrix defined above.
prediction = dxgb.predict(client, booster, dtrain)

或等效地

# where X is a dask DataFrame or dask Array.
prediction = dxgb.predict(client, booster, X)

此外,对于就地预测

# where X is a dask DataFrame or dask Array backed by cupy or cuDF.
booster.set_param({"device": "cuda"})
prediction = dxgb.inplace_predict(client, booster, X)

当输入是 da.Array 对象时,输出始终是 da.Array。但是,如果输入类型是 dd.DataFrame,则输出可以是 dd.Seriesdd.DataFrameda.Array,具体取决于输出形状。例如,当使用基于 SHAP 的预测时,返回值可以具有 3 或 4 维,在这种情况下,总是返回 Array

运行预测(无论是使用 predict 还是 inplace_predict)的性能对块的数量敏感。在内部,它是使用 da.map_blocksdd.map_partitions 实现的。当分区数量很大且每个分区只有少量数据时,调用预测的开销就会变得明显。另一方面,如果不使用 GPU,则每个块上用于预测的线程数很重要。目前,xgboost 对每个分区使用单线程。如果每个 worker 上的块数小于核心数,则 CPU worker 可能无法充分利用。

运行连续预测的一个简单优化是使用 distributed.Future

dataset = [X_0, X_1, X_2]
booster_f = client.scatter(booster, broadcast=True)
futures = []
for X in dataset:
    # Here we pass in a future instead of concrete booster
    shap_f = dxgb.predict(client, booster_f, X, pred_contribs=True)
    futures.append(shap_f)

results = client.gather(futures)

这仅在函数式接口上可用,因为 Scikit-Learn 包装器不知道如何为 booster 维护有效的 future。要从 Scikit-Learn 包装器对象获取 booster 对象

cls = dxgb.DaskXGBClassifier()
cls.fit(X, y)

booster = cls.get_booster()

Scikit-Learn 评估器接口

如前所述,还有另一个接口模仿 Scikit-Learn 评估器,具有更高级别的抽象。与函数式接口相比,该接口更易于使用,但限制更多。值得一提的是,尽管该接口模仿 Scikit-Learn 评估器,但它不适用于常规 Scikit-Learn 实用程序(如 GridSearchCV),因为 Scikit-Learn 不理解分布式 Dask 数据集合。

from distributed import LocalCluster, Client
from xgboost import dask as dxgb


def main(client: Client) -> None:
    X, y = load_data()
    clf = dxgb.DaskXGBClassifier(n_estimators=100, tree_method="hist")
    clf.client = client  # assign the client
    clf.fit(X, y, eval_set=[(X, y)])
    proba = clf.predict_proba(X)


if __name__ == "__main__":
    with LocalCluster() as cluster:
        with Client(cluster) as client:
            main(client)

GPU 加速

对于大多数 GPU 用例,应使用 Dask-CUDA 项目来创建集群,该项目会自动为 worker 进程配置正确的设备序号。因此,用户不应指定序号(好:device=cuda,坏:device=cuda:1)。请参阅 使用 Dask 在 GPU 上训练的示例将 Scikit-Learn 回归器接口与 GPU 直方图树方法一起使用 以获取工作示例。

与其他集群协作

使用 Dask 的 LocalCluster 方便在本地机器上快速入门。但是,一旦您准备好扩展工作,就有多种方法可以在分布式集群上部署 Dask。例如,您可以将 Dask-CUDA 用于 GPU,您还可以使用 Dask Cloud Provider 在云中部署 Dask 集群。有关更全面的列表,请参阅 Dask 文档

在下面的示例中,KubeCluster 用于 在 Kubernetes 上部署 Dask

from dask_kubernetes.operator import KubeCluster  # Need to install the ``dask-kubernetes`` package
from dask_kubernetes.operator.kubecluster.kubecluster import CreateMode

from dask.distributed import Client
from xgboost import dask as dxgb
import dask.array as da


def main():
  '''Connect to a remote kube cluster with GPU nodes and run training on it.'''
    m = 1000
    n = 10
    kWorkers = 2                # assuming you have 2 GPU nodes on that cluster.
    # You need to work out the worker-spec yourself.  See document in dask_kubernetes for
    # its usage.  Here we just want to show that XGBoost works on various clusters.

    # See notes below for why we use pre-allocated cluster.
    with KubeCluster(
        name="xgboost-test",
        image="my-image-name:latest",
        n_workers=kWorkers,
        create_mode=CreateMode.CONNECT_ONLY,
        shutdown_on_close=False,
    ) as cluster:
        with Client(cluster) as client:
            X = da.random.random(size=(m, n), chunks=100)
            y = X.sum(axis=1)

            regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0)
            regressor.client = client
            regressor.set_params(tree_method='hist', device="cuda")
            regressor.fit(X, y, eval_set=[(X, y)])


if __name__ == '__main__':
    # Launch the kube cluster on somewhere like GKE, then run this as client process.
    # main function will connect to that cluster and start training xgboost model.
    main()

不同的集群类可能存在细微差别,例如网络配置,或者特定的集群实现可能包含我们不知道的错误。如果发现此类情况并且该集群实现中没有解决文档,请打开一个 issue。

Kubernetes 集群的一个有趣方面是,在 Dask 工作流开始后,Pod 可能会变得可用,这可能会导致分布式 XGBoost 出现问题,因为 XGBoost 希望输入数据使用的节点在训练期间保持不变。要使用 Kubernetes 集群,必须等待所有 Pod 都联机后才能提交 XGBoost 任务。可以使用 Python 创建一个等待函数,或者在运行 Dask 工作流之前简单地使用 k8s 工具(如 kubectl)预分配一个集群。要预分配一个集群,我们可以首先使用 Dask Kubernetes 生成集群规范

import json

from dask_kubernetes.operator import make_cluster_spec

spec = make_cluster_spec(name="xgboost-test", image="my-image-name:latest", n_workers=16)
with open("cluster-spec.json", "w") as fd:
    json.dump(spec, fd, indent=2)
kubectl apply -f ./cluster-spec.json

检查 Pod 是否可用

kubectl get pods

一旦所有 Pod 都已初始化,就可以像前面的示例一样运行 Dask XGBoost 工作流。重要的是要确保集群将参数 create_mode=CreateMode.CONNECT_ONLY 设置为,如果不想在单个作业后关闭集群,则可选地设置 shutdown_on_close=False

线程

XGBoost 通过设置 nthread 参数(Scikit-Learn 为 n_jobs)内置支持通过线程进行并行计算。如果设置了这些参数,它们将覆盖 Dask 中的配置。例如

with dask.distributed.LocalCluster(n_workers=7, threads_per_worker=4) as cluster:

为每个 Dask worker 分配了 4 个线程。然后,XGBoost 默认在每个进程中使用 4 个线程进行训练。但如果设置了 nthread 参数

output = dxgb.train(
    client,
    {"verbosity": 1, "nthread": 8, "tree_method": "hist"},
    dtrain,
    num_boost_round=4,
    evals=[(dtrain, "train")],
)

XGBoost 将在每个训练过程中使用 8 个线程。

使用 asyncio

在 1.2.0 版本中添加。

XGBoost 的 Dask 接口支持 Python 中新的 asyncio,可以集成到异步工作流中。有关将 Dask 与异步操作一起使用的信息,请参阅 此 Dask 示例distributed 中的文档。要异步使用 XGBoost 的 Dask 接口,用于训练和预测的 client 必须在异步模式下运行,方法是在创建 client 时指定 asynchronous=True(示例如下)。然后,函数式接口提供的所有函数(包括 DaskDMatrix)都将返回协程,然后可以等待这些协程来检索它们的结果。请注意,XGBoost 是一个计算密集型应用程序,并行性比并发性更重要。asyncio 的支持更多是为了兼容性,而不是为了性能提升。

函数式接口

async with dask.distributed.Client(scheduler_address, asynchronous=True) as client:
    X, y = generate_array()
    m = await dxgb.DaskDMatrix(client, X, y)
    output = await dxgb.train(client, {}, dtrain=m)

    with_m = await dxgb.predict(client, output, m)
    with_X = await dxgb.predict(client, output, X)
    inplace = await dxgb.inplace_predict(client, output, X)

    # Use ``client.compute`` instead of the ``compute`` method from dask collection
    print(await client.compute(with_m))

而对于 Scikit-Learn 接口,诸如 set_params 和访问类属性(如 evals_result())之类的简单方法不需要 await。其他涉及实际计算的方法将返回一个协程,因此需要等待

async with dask.distributed.Client(scheduler_address, asynchronous=True) as client:
    X, y = generate_array()
    regressor = await dxgb.DaskXGBRegressor(verbosity=1, n_estimators=2)
    regressor.set_params(tree_method='hist')  # trivial method, synchronous operation
    regressor.client = client  #  accessing attribute, synchronous operation
    regressor = await regressor.fit(X, y, eval_set=[(X, y)])
    prediction = await regressor.predict(X)

    # Use `client.compute` instead of the `compute` method from dask collection
    print(await client.compute(prediction))

评估和提前停止

在 1.3.0 版本中添加。

Dask 接口允许使用存储在分布式集合(Dask DataFrame 或 Dask Array)中的验证集。这些验证集可用于评估和提前停止。

要启用提前停止,请传递一个或多个包含 DaskDMatrix 对象的验证集。

import dask.array as da
from xgboost import dask as dxgb

num_rows = 1e6
num_features = 100
num_partitions = 10
rows_per_chunk = num_rows / num_partitions

data = da.random.random(
    size=(num_rows, num_features),
    chunks=(rows_per_chunk, num_features)
)

labels = da.random.random(
    size=(num_rows, 1),
    chunks=(rows_per_chunk, 1)
)

X_eval = da.random.random(
    size=(num_rows, num_features),
    chunks=(rows_per_chunk, num_features)
)

y_eval = da.random.random(
    size=(num_rows, 1),
    chunks=(rows_per_chunk, 1)
)

dtrain = dxgb.DaskDMatrix(
    client=client,
    data=data,
    label=labels
)

dvalid = dxgb.DaskDMatrix(
    client=client,
    data=X_eval,
    label=y_eval
)

result = dxgb.train(
    client=client,
    params={
        "objective": "reg:squarederror",
    },
    dtrain=dtrain,
    num_boost_round=10,
    evals=[(dvalid, "valid1")],
    early_stopping_rounds=3
)

当以这种方式将验证集提供给 xgboost.dask.train() 时,xgboost.dask.train() 返回的模型对象包含每个验证集在所有 boosting 轮次中的评估指标历史记录。

print(result["history"])
# {'valid1': OrderedDict([('rmse', [0.28857, 0.28858, 0.288592, 0.288598])])}

如果通过传递 early_stopping_rounds 启用了提前停止,您可以在返回的 booster 中检查最佳迭代。

booster = result["booster"]
print(booster.best_iteration)
best_model = booster[: booster.best_iteration]

其他自定义

XGBoost Dask 接口接受单节点 Python 接口中的其他高级功能,包括回调函数、自定义评估指标和目标

def eval_error_metric(predt, dtrain: xgb.DMatrix):
    label = dtrain.get_label()
    r = np.zeros(predt.shape)
    gt = predt > 0.5
    r[gt] = 1 - label[gt]
    le = predt <= 0.5
    r[le] = label[le]
    return 'CustomErr', np.sum(r)

# custom callback
early_stop = xgb.callback.EarlyStopping(
    rounds=early_stopping_rounds,
    metric_name="CustomErr",
    data_name="Train",
    save_best=True,
)

booster = dxgb.train(
    client,
    params={
        "objective": "binary:logistic",
        "eval_metric": ["error", "rmse"],
        "tree_method": "hist",
    },
    dtrain=D_train,
    evals=[(D_train, "Train"), (D_valid, "Valid")],
    feval=eval_error_metric,  # custom evaluation metric
    num_boost_round=100,
    callbacks=[early_stop],
)

超参数调整

请参阅 https://github.com/coiled/dask-xgboost-nyctaxi,获取使用 XGBoost 和 Dask 以及 Optuna 的一组示例。

学习排名

3.0.0 版本新增。

注意

尚未支持位置偏差消除。

出于性能原因,Dask 学习排名有两种操作模式。区别在于是否需要分布式全局排序。有关排名如何与分布式训练协同工作的通用信息,请参阅 分布式训练。下面我们将讨论一些 Dask 特有的功能。

首先,如果您使用 DaskQuantileDMatrix 接口或将 allow_group_split 设置为 TrueDaskXGBRanker,XGBoost 将尝试根据查询 ID 为每个 worker 排序和分组样本。此模式试图跳过全局排序,仅对 worker 本地数据进行排序,因此没有 worker 间数据混洗。请注意,即使是 worker 本地排序也是昂贵的,尤其是在内存使用方面,因为当使用 sort_values() 时没有溢出,并且我们需要连接数据。XGBoost 在实际执行排序操作之前,首先检查 QID 是否已排序。如果查询组相对连续,即查询组中的大多数样本彼此接近并且可能驻留在同一 worker 上,则可以选择此选项。如果您已对数据执行随机混洗,请不要使用此选项。

如果输入数据是随机的,那么我们无法保证同一组内的大部分数据位于同一 worker 上。对于大型查询组,这可能不是问题。但对于小型查询组,每个 worker 可能只从其组中获得一两个样本,这可能导致灾难性的性能。在这种情况下,我们可以根据查询组对数据进行分区,这是 DaskXGBRanker 的默认行为,除非将 allow_group_split 设置为 True。此模式除了对查询组 ID 进行编码操作之外,还对整个数据集执行排序和分组。加上分区碎片,此选项可能导致性能缓慢。请参阅 使用 Dask 接口进行学习排名 以获取工作示例。

故障排除

  • 在某些环境中,XGBoost 可能无法解析调度程序的 IP 地址,症状是用户在训练期间收到 OSError: [Errno 99] Cannot assign requested address 错误。一个快速的解决方法是明确指定地址。为此,使用集合 Config

    3.0.0 版本新增。

import dask
from distributed import Client
from xgboost import dask as dxgb
from xgboost.collective import Config

# let xgboost know the scheduler address
coll_cfg = Config(retry=1, timeout=20, tracker_host_ip="10.23.170.98", tracker_port=0)

with Client(scheduler_file="sched.json") as client:
    reg = dxgb.DaskXGBRegressor(coll_cfg=coll_cfg)
  • 请注意,XGBoost 需要与 Dask 不同的端口。默认情况下,在类似 Unix 的系统上,XGBoost 使用端口 0 查找可用端口,如果用户在受限的 Docker 环境中运行,这可能会失败。在这种情况下,请在容器中打开其他端口并如上文所示指定。

  • 如果您在使用 GPU 训练时遇到 NCCL 系统错误,通常包含错误消息 NCCL failure: unhandled system error,您可以使用 NCCL 文档 中列出的环境变量之一(例如 NCCL_SOCKET_IFNAME)来指定其网络配置。此外,您可以使用 NCCL_DEBUG 获取调试日志。

  • 如果 NCCL 在容器环境中初始化失败,可能是由于系统共享内存受限引起的。使用 Docker 时,可以尝试使用标志:–shm-size=4g

  • NCCL 尚未支持 MIG(多实例 GPU)。初始化时,您将收到一条错误消息,其中包含 Multiple processes within a communication group …

  • 从 2.1.0 版本开始,为了减少二进制 wheel 的大小,XGBoost 包(使用 pip 安装)从环境中加载 NCCL,而不是直接捆绑它。这意味着如果您遇到类似“Failed to load nccl …”的错误消息,则表明您的环境中未安装或未正确配置 NCCL。

    要解决此问题,您可以使用 pip 安装 NCCL

    pip install nvidia-nccl-cu12 # (or with any compatible CUDA version)
    

    XGBoost 的默认 conda 安装不应遇到此错误。如果您使用的是自定义 XGBoost,请确保以下之一为真

    • XGBoost 未使用 USE_DLOPEN_NCCL 标志编译。

    • 在初始化集合时,dmlc_nccl_path 参数设置为完整的 NCCL 路径。

    以下是一些解决 NCCL 依赖性问题的其他技巧

    • 检查 NCCL 安装路径并验证它是否正确安装。当使用 pip 安装 XGBoost 时,我们尝试通过在 Python 中使用 from nvidia.nccl import lib 来查找 NCCL。

    • 确保您安装了正确的 CUDA 版本。NCCL 需要兼容的 CUDA 版本才能正常运行。

    • 如果您没有将 XGBoost 用于分布式训练,但仍然看到此错误,请在 GitHub 上打开一个 issue。

    • 如果您继续遇到 NCCL 依赖性问题,请在 GitHub 上打开一个 issue。

IPv6 支持

在 1.7.0 版本中添加。

XGBoost 对 Linux 上的 Dask 接口初步支持 IPv6。由于大多数集群对 IPv6 的支持是部分(双栈而不是仅 IPv6),我们要求额外的用户配置,类似于 故障排除,以帮助 XGBoost 获取正确的地址信息

import dask
from distributed import Client
from xgboost import dask as dxgb
# let xgboost know the scheduler address, use the same bracket format as dask.
with dask.config.set({"xgboost.scheduler_address": "[fd20:b6f:f759:9800::]"}):
    with Client("[fd20:b6f:f759:9800::]") as client:
        reg = dxgb.DaskXGBRegressor(tree_method="hist")

当使用 GPU 时,XGBoost 采用 NCCL 作为底层通信框架,这可能需要通过环境变量进行一些额外的配置,具体取决于集群的设置。请注意,IPv6 支持仅限于 Unix。

记录评估结果

默认情况下,Dask 接口在调度程序进程中打印评估结果。这使得用户难以监控训练进度。我们可以使用回调函数定义自定义评估监视器。请参阅 将评估日志转发到客户端的示例,了解如何将日志转发到客户端进程的工作示例。在该示例中,有两种使用 Dask 内置方法的潜在解决方案,包括 distributed.Client.forward_logging()distributed.print()。它们都存在一些注意事项,但可以作为开发更复杂方法(例如写入文件)的良好起点。

为什么 DaskDMatrix 的初始化如此缓慢并抛出奇怪的错误

XGBoost 中的 Dask API 需要构建 DaskDMatrix。使用 Scikit-Learn 接口时,DaskDMatrixfitpredict 步骤期间隐式构建所有输入数据。您可能已经观察到 DaskDMatrix 的构建可能需要大量时间,并且有时会抛出似乎与 DaskDMatrix 无关的错误。这里简要解释一下原因。默认情况下,大多数 Dask 计算都是 惰性评估 的,这意味着除非您明确请求结果(例如,通过调用 compute()),否则不会执行计算。有关 Dask 详细信息,请参阅前面的链接,有关惰性评估的一般概念信息,请参阅 此 wikiDaskDMatrix 构造函数强制评估惰性计算,这意味着所有早期的计算(包括 dd.read_csv() 等操作)实际上都是在此处执行的。为了将 DaskDMatrix 中的计算与其他惰性计算隔离开来,可以在构建 DaskDMatrix 之前明确等待输入数据的结果。此外,Dask 的 诊断仪表板 可用于监视当前正在执行的操作。

可复现结果

在单节点模式下,只要底层平台相同,我们总是可以期望在运行之间获得相同的训练结果。然而,在分布式环境中很难获得可复现结果,因为任务可能会获得不同的机器分配或在不同会话期间拥有不同数量的可用资源。有一些启发式方法和指南可以实现它,但没有经过验证的方法来保证这种确定性行为。XGBoost 中的 Dask 接口尽最大努力提供可复现结果。本节重点介绍了一些已知标准,并尝试分享对该问题的一些见解。

XGBoost 主要执行两种不同的任务:训练和推理。在相同的软件和硬件以及相同的运行时配置下,推理是可复现的。本节的其余部分将重点关注训练。

许多挑战源于我们正在使用近似算法。用于查找直方图 bin 的草图算法是精确分位数算法的近似,分布式环境中的 AUC 指标是精确 AUC 分数的近似,浮点数是实数的近似。浮点数是一个问题,因为它们的求和不具有关联性,这意味着 \((a + b) + c\) 不一定等于 \(a + (b + c)\),即使此属性对实数成立。因此,每当我们改变求和的顺序时,结果可能会有所不同。这要求为了从 XGBoost 获得可复现的输出,整个管道必须是可复现的。

  • 软件堆栈在每次运行中都相同。这是不言而喻的。XGBoost 可能会在不同版本之间生成不同的输出。这是预期的,因为我们可能会更改超参数的默认值,或者生成不同浮点结果的并行策略。我们保证算法的正确性,但最终输出有很多回旋余地。对于许多依赖项,情况类似,例如,随机数生成器可能因平台而异。

  • 每次运行的硬件堆栈都相同。这包括 worker 的数量以及每个 worker 上可用资源的数量。XGBoost 可以使用不同数量的 worker 生成不同的结果。这是由前面提到的近似问题引起的。

  • 与硬件约束类似,网络拓扑也是最终输出的一个因素。如果改变拓扑,worker 的顺序可能会不同,从而导致浮点运算的顺序不同。

  • 管道各个位置使用的随机种子。

  • 数据分区需要可复现。这与每个 worker 上可用的资源有关。Dask 可能会根据其自身的调度策略为每次运行以不同方式分区数据。例如,如果您在运行 XGBoost 的第二次训练会话时集群中存在一些额外的任务,则某些 worker 的内存可能会受限,Dask 可能不会将 XGBoost 的训练数据推送到该 worker。数据分区的这种变化可能导致不同的输出模型。如果您正在使用共享 Dask 集群,则结果在运行之间可能会有所不同。

  • 在数据框上执行的操作需要可复现。有些操作(例如 DataFrame.merge)在 GPU 等并行硬件上不是确定性的,其中索引的顺序可能在每次运行之间有所不同。

由于上述标准,在分布式环境中训练模型与使用单节点训练模型时,预期会得到不同的结果。

内存使用

以下是使用 Dask 和 XGBoost 减少内存使用的一些实践。

  • 在分布式工作流中,最好直接由 Dask 集合加载数据,而不是由客户端进程加载。当无法避免使用客户端进程加载时,请使用 client.scatter 将数据从客户端进程分发到 worker。请参阅 [2] 获取一个很好的总结。

  • 当使用 GPU 输入(例如由 dask_cudf 加载的数据帧)时,您可以尝试使用 xgboost.dask.DaskQuantileDMatrix 作为 DaskDMatrix 的替代品,以减少整体内存使用。请参阅 使用 Dask 在 GPU 上训练的示例 以获取示例。

  • 尽可能使用就地预测。

参考文献

  1. https://github.com/dask/dask/issues/6833

  2. https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array