Dask 分布式 XGBoost

Dask 是一个基于 Python 构建的并行计算库。Dask 可以轻松管理分布式 worker,并且擅长处理大型分布式数据科学工作流。XGBoost 中的 Dask 实现源自 dask-xgboost,并扩展了一些功能和采用了不同的接口。本教程主要介绍在 CPU 树算法上使用 Dask 的基本用法。关于基于 GPU 的训练和内部工作原理,请参阅 XGBoost 的全新官方 Dask API

注意

该集成未在 Windows 上进行测试。

目录

要求

Dask 可以使用 pip 或 conda 安装(更多信息请参阅 dask 安装文档)。为了使用 GPU 加速 XGBoost,建议使用 dask-cuda 创建 GPU 集群。

概览

Dask 集群由三个不同的组件组成:一个集中式调度器、一个或多个 worker 以及一个或多个客户端(作为面向用户提交任务到集群的入口点)。在使用 Dask 和 XGBoost 时,需要在客户端调用 XGBoost Dask 接口。下面是一个小示例,演示了在 Dask 集群上运行 XGBoost 的基本用法。

from xgboost import dask as dxgb

import dask.array as da
import dask.distributed

if __name__ == "__main__":
    cluster = dask.distributed.LocalCluster()
    client = dask.distributed.Client(cluster)

    # X and y must be Dask dataframes or arrays
    num_obs = 1e5
    num_features = 20
    X = da.random.random(size=(num_obs, num_features), chunks=(1000, num_features))
    y = da.random.random(size=(num_obs, 1), chunks=(1000, 1))

    dtrain = dxgb.DaskDMatrix(client, X, y)
    # or
    # dtrain = dxgb.DaskQuantileDMatrix(client, X, y)

    output = dxgb.train(
        client,
        {"verbosity": 2, "tree_method": "hist", "objective": "reg:squarederror"},
        dtrain,
        num_boost_round=4,
        evals=[(dtrain, "train")],
    )

这里我们首先使用 distributed.LocalCluster 创建一个单节点模式的集群,然后将一个 distributed.Client 连接到该集群,设置好后续计算的环境。请注意,集群构建由 __name__ == "__main__" 保护,这是必要的,否则可能会出现一些难以理解的错误。

然后我们创建一个 xgboost.dask.DaskDMatrix 对象并将其以及其他一些参数传递给 xgboost.dask.train(),这与 XGBoost 通常的非 Dask 接口非常相似。与非 Dask 接口不同的是,datalabel 必须是 Dask DataFrameDask Array 实例。

XGBoost 的 Dask 接口的主要区别在于,我们将 Dask 客户端作为额外参数传递,用于执行计算。请注意,如果客户端设置为 None,XGBoost 将使用 Dask 返回的默认客户端。

XGBoost 中实现了两套 API。第一套是功能 API,如上例所示。给定数据和一组参数,train 函数返回一个模型和计算历史记录(一个 Python 字典)。

{
  "booster": Booster,
  "history": dict,
}

对于预测,将 train 返回的 output 传递给 xgboost.dask.predict()

prediction = dxgb.predict(client, output, dtrain)
# Or equivalently, pass ``output['booster']``:
prediction = dxgb.predict(client, output['booster'], dtrain)

也可以省去构造 DaskDMatrix 的步骤,当不需要 base_margin 等元信息时,这可以使计算稍微快一些。

prediction = dxgb.predict(client, output, X)
# Use inplace version.
prediction = dxgb.inplace_predict(client, output, X)

这里 prediction 是一个 Dask Array 对象,如果输入是 DaskDMatrixda.Array,则包含来自模型的预测结果。将 Dask 集合直接放入 predict 函数或使用 xgboost.dask.inplace_predict() 时,输出类型取决于输入数据。详见下一节。

另外,XGBoost 还使用 DaskXGBClassifierDaskXGBRegressorDaskXGBRanker 以及 2 个随机森林变体实现了 Scikit-Learn 接口。这个包装器类似于 XGBoost 中的单节点 Scikit-Learn 接口,以 Dask 集合作为输入,并有一个额外的 client 属性。更多示例请参阅以下章节和 XGBoost Dask 功能详解

运行预测

在前面的示例中,我们使用 DaskDMatrix 作为 predict 函数的输入。实际上,也可以直接在 Dask 集合(如 ArrayDataFrame)上调用 predict 函数,并且可能会获得更好的预测性能。当使用 DataFrame 作为预测输入时,结果是 Dask Series 而不是 Array。此外,Dask 接口支持原地预测 (in-place predict),这有助于减少内存使用和预测时间。

# dtrain is the DaskDMatrix defined above.
prediction = dxgb.predict(client, booster, dtrain)

或等价地

# where X is a dask DataFrame or dask Array.
prediction = dxgb.predict(client, booster, X)

对于原地预测也是如此

# where X is a dask DataFrame or dask Array backed by cupy or cuDF.
booster.set_param({"device": "cuda"})
prediction = dxgb.inplace_predict(client, booster, X)

当输入是 da.Array 对象时,输出始终是 da.Array。然而,如果输入类型是 dd.DataFrame,输出可以是 dd.Seriesdd.DataFrameda.Array,具体取决于输出形状。例如,当使用基于 SHAP 的预测时,返回值可能具有 3 或 4 个维度,在这种情况下始终返回 Array

运行预测(无论是使用 predict 还是 inplace_predict)的性能对块数敏感。内部实现使用了 da.map_blocksdd.map_partitions。当分区数量很大且每个分区的数据量很少时,调用预测的开销会变得明显。另一方面,如果不使用 GPU,每个块上用于预测的线程数也很重要。目前,XGBoost 对每个分区使用单线程。如果每个 worker 上的块数小于核心数,则 CPU worker 可能无法充分利用。

运行连续预测的一个简单优化是使用 distributed.Future

dataset = [X_0, X_1, X_2]
booster_f = client.scatter(booster, broadcast=True)
futures = []
for X in dataset:
    # Here we pass in a future instead of concrete booster
    shap_f = dxgb.predict(client, booster_f, X, pred_contribs=True)
    futures.append(shap_f)

results = client.gather(futures)

这仅在函数式接口上可用,因为 Scikit-Learn 包装器不知道如何维护有效的 booster Future。要从 Scikit-Learn 包装器对象获取 booster 对象:

cls = dxgb.DaskXGBClassifier()
cls.fit(X, y)

booster = cls.get_booster()

Scikit-Learn Estimator 接口

如前所述,还有另一个模仿 scikit-learn estimator 的更高抽象层接口。与函数式接口相比,该接口更容易使用,但限制更多。值得一提的是,尽管该接口模仿了 scikit-learn estimator,但它不适用于常规的 scikit-learn 工具,例如 GridSearchCV,因为 scikit-learn 不理解分布式 dask 数据集合。

from distributed import LocalCluster, Client
from xgboost import dask as dxgb


def main(client: Client) -> None:
    X, y = load_data()
    clf = dxgb.DaskXGBClassifier(n_estimators=100, tree_method="hist")
    clf.client = client  # assign the client
    clf.fit(X, y, eval_set=[(X, y)])
    proba = clf.predict_proba(X)


if __name__ == "__main__":
    with LocalCluster() as cluster:
        with Client(cluster) as client:
            main(client)

GPU 加速

对于大多数使用 GPU 的情况,应使用 Dask-CUDA 项目创建集群,该项目会自动为 worker 进程配置正确的设备序号。因此,用户不应该指定序号(正确做法:device=cuda,错误做法:device=cuda:1)。请参阅 使用 Dask 在 GPU 上训练示例使用 scikit-learn 回归器接口和 GPU 直方图树方法 获取工作示例。

使用其他集群

使用 Dask 的 LocalCluster 方便在本地机器上快速上手。但一旦准备好扩展工作,有多种方法可以在分布式集群上部署 Dask。例如,您可以使用 Dask-CUDA 用于 GPU,并且可以使用 Dask Cloud Provider 在云中部署 Dask 集群。请参阅 Dask 文档获取更全面的列表

在下面的示例中,使用 KubeClusterKubernetes 上部署 Dask

from dask_kubernetes.operator import KubeCluster  # Need to install the ``dask-kubernetes`` package
from dask_kubernetes.operator.kubecluster.kubecluster import CreateMode

from dask.distributed import Client
from xgboost import dask as dxgb
import dask.array as da


def main():
  '''Connect to a remote kube cluster with GPU nodes and run training on it.'''
    m = 1000
    n = 10
    kWorkers = 2                # assuming you have 2 GPU nodes on that cluster.
    # You need to work out the worker-spec yourself.  See document in dask_kubernetes for
    # its usage.  Here we just want to show that XGBoost works on various clusters.

    # See notes below for why we use pre-allocated cluster.
    with KubeCluster(
        name="xgboost-test",
        image="my-image-name:latest",
        n_workers=kWorkers,
        create_mode=CreateMode.CONNECT_ONLY,
        shutdown_on_close=False,
    ) as cluster:
        with Client(cluster) as client:
            X = da.random.random(size=(m, n), chunks=100)
            y = X.sum(axis=1)

            regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0)
            regressor.client = client
            regressor.set_params(tree_method='hist', device="cuda")
            regressor.fit(X, y, eval_set=[(X, y)])


if __name__ == '__main__':
    # Launch the kube cluster on somewhere like GKE, then run this as client process.
    # main function will connect to that cluster and start training xgboost model.
    main()

不同的集群类别可能存在微妙的差异,例如网络配置,或者特定的集群实现可能包含我们尚未意识到的 bug。如果在没有该集群实现文档中说明如何解决的情况下发现此类问题,请提交 issue。

Kubernetes 集群的一个有趣方面是 pod 可能在 Dask 工作流开始后才可用,这可能导致分布式 XGBoost 出现问题,因为 XGBoost 要求输入数据使用的节点在训练期间保持不变。要使用 Kubernetes 集群,必须等待所有 pod 在线后才能提交 XGBoost 任务。可以在 Python 中创建等待函数,或者简单地在使用 dask 工作流之前使用 k8s 工具(如 kubectl)预分配集群。要预分配集群,我们可以首先使用 dask kubernetes 生成集群 spec:

import json

from dask_kubernetes.operator import make_cluster_spec

spec = make_cluster_spec(name="xgboost-test", image="my-image-name:latest", n_workers=16)
with open("cluster-spec.json", "w") as fd:
    json.dump(spec, fd, indent=2)
kubectl apply -f ./cluster-spec.json

检查 pod 是否可用

kubectl get pods

一旦所有 pod 初始化完成,即可运行 Dask XGBoost 工作流,如前例所示。重要的是确保集群设置参数 create_mode=CreateMode.CONNECT_ONLY,如果不想在单个作业后关闭集群,可选地设置 shutdown_on_close=False

线程

XGBoost 通过设置 nthread 参数(scikit-learn 为 n_jobs)内置支持通过线程进行并行计算。如果设置了这些参数,它们将覆盖 Dask 中的配置。例如:

with dask.distributed.LocalCluster(n_workers=7, threads_per_worker=4) as cluster:

每个 Dask worker 分配了 4 个线程。那么默认情况下 XGBoost 在每个进程中会使用 4 个线程进行训练。但如果设置了 nthread 参数:

output = dxgb.train(
    client,
    {"verbosity": 1, "nthread": 8, "tree_method": "hist"},
    dtrain,
    num_boost_round=4,
    evals=[(dtrain, "train")],
)

XGBoost 在每个训练进程中将使用 8 个线程。

与 asyncio 协作

在 1.2.0 版本中添加。

XGBoost 的 Dask 接口支持 Python 中的新 asyncio,并且可以集成到异步工作流中。要将 Dask 与异步操作一起使用,请参阅 此 Dask 示例distributed 文档。要异步使用 XGBoost 的 Dask 接口,在创建 client 时必须通过指定 asynchronous=True 使其在异步模式下运行(示例如下)。然后,功能接口提供的所有函数(包括 DaskDMatrix)都将返回协程 (coroutines),然后可以对其进行 await 以获取结果。请注意,XGBoost 是一个计算密集型应用程序,并行比并发更重要。asyncio 支持更多是为了兼容性而不是性能提升。

函数式接口

async with dask.distributed.Client(scheduler_address, asynchronous=True) as client:
    X, y = generate_array()
    m = await dxgb.DaskDMatrix(client, X, y)
    output = await dxgb.train(client, {}, dtrain=m)

    with_m = await dxgb.predict(client, output, m)
    with_X = await dxgb.predict(client, output, X)
    inplace = await dxgb.inplace_predict(client, output, X)

    # Use ``client.compute`` instead of the ``compute`` method from dask collection
    print(await client.compute(with_m))

对于 Scikit-Learn 接口,set_params 等简单方法和访问 evals_result() 等类属性不需要 await。其他涉及实际计算的方法将返回协程,因此需要进行 await。

async with dask.distributed.Client(scheduler_address, asynchronous=True) as client:
    X, y = generate_array()
    regressor = await dxgb.DaskXGBRegressor(verbosity=1, n_estimators=2)
    regressor.set_params(tree_method='hist')  # trivial method, synchronous operation
    regressor.client = client  #  accessing attribute, synchronous operation
    regressor = await regressor.fit(X, y, eval_set=[(X, y)])
    prediction = await regressor.predict(X)

    # Use `client.compute` instead of the `compute` method from dask collection
    print(await client.compute(prediction))

评估和 Early Stopping

在 1.3.0 版本中添加。

Dask 接口允许使用存储在分布式集合(Dask DataFrame 或 Dask Array)中的验证集。这些集合可用于评估和 Early Stopping。

要启用 Early Stopping,请传递一个或多个包含 DaskDMatrix 对象的验证集。

import dask.array as da
from xgboost import dask as dxgb

num_rows = 1e6
num_features = 100
num_partitions = 10
rows_per_chunk = num_rows / num_partitions

data = da.random.random(
    size=(num_rows, num_features),
    chunks=(rows_per_chunk, num_features)
)

labels = da.random.random(
    size=(num_rows, 1),
    chunks=(rows_per_chunk, 1)
)

X_eval = da.random.random(
    size=(num_rows, num_features),
    chunks=(rows_per_chunk, num_features)
)

y_eval = da.random.random(
    size=(num_rows, 1),
    chunks=(rows_per_chunk, 1)
)

dtrain = dxgb.DaskDMatrix(
    client=client,
    data=data,
    label=labels
)

dvalid = dxgb.DaskDMatrix(
    client=client,
    data=X_eval,
    label=y_eval
)

result = dxgb.train(
    client=client,
    params={
        "objective": "reg:squarederror",
    },
    dtrain=dtrain,
    num_boost_round=10,
    evals=[(dvalid, "valid1")],
    early_stopping_rounds=3
)

当验证集以这种方式提供给 xgboost.dask.train() 时,xgboost.dask.train() 返回的模型对象包含每个验证集在所有 boosting 轮次中的评估指标历史记录。

print(result["history"])
# {'valid1': OrderedDict([('rmse', [0.28857, 0.28858, 0.288592, 0.288598])])}

如果通过传递 early_stopping_rounds 也启用了 Early Stopping,则可以在返回的 booster 中检查最佳迭代次数。

booster = result["booster"]
print(booster.best_iteration)
best_model = booster[: booster.best_iteration]

其他自定义

XGBoost Dask 接口支持单节点 Python 接口中的其他高级功能,包括回调函数、自定义评估指标和目标函数。

def eval_error_metric(predt, dtrain: xgb.DMatrix):
    label = dtrain.get_label()
    r = np.zeros(predt.shape)
    gt = predt > 0.5
    r[gt] = 1 - label[gt]
    le = predt <= 0.5
    r[le] = label[le]
    return 'CustomErr', np.sum(r)

# custom callback
early_stop = xgb.callback.EarlyStopping(
    rounds=early_stopping_rounds,
    metric_name="CustomErr",
    data_name="Train",
    save_best=True,
)

booster = dxgb.train(
    client,
    params={
        "objective": "binary:logistic",
        "eval_metric": ["error", "rmse"],
        "tree_method": "hist",
    },
    dtrain=D_train,
    evals=[(D_train, "Train"), (D_valid, "Valid")],
    feval=eval_error_metric,  # custom evaluation metric
    num_boost_round=100,
    callbacks=[early_stop],
)

超参数调优

有关使用 Dask 和 Optuna 的 XGBoost 示例集,请参阅 https://github.com/coiled/dask-xgboost-nyctaxi

Learning to Rank (排序学习)

在 3.0.0 版本中添加。

注意

尚不支持位置去偏 (Position debiasing)。

出于性能原因,Dask 排序学习有两种操作模式。区别在于是否需要分布式全局排序。关于排序学习如何与分布式训练协作,请参阅 分布式训练。下面我们将讨论一些 Dask 特有的功能。

首先,如果您使用 DaskQuantileDMatrix 接口或将 DaskXGBRankerallow_group_split 设置为 True,XGBoost 将尝试根据查询 ID 对每个 worker 的样本进行排序和分组。此模式尝试跳过全局排序,仅对 worker 本地数据进行排序,因此没有 worker 间的数据 shuffle。请注意,即使是 worker 本地排序也是昂贵的,特别是在内存使用方面,因为在使用 sort_values() 时没有 spilling,并且我们需要连接数据。XGBoost 首先检查 QID 是否已排序,然后才实际执行排序操作。如果查询组相对连续,这意味着大多数查询组内的样本彼此靠近并且可能驻留在同一个 worker 上,则可以选择此模式。如果对数据进行了随机 shuffle,则不要使用此模式。

如果输入数据是随机的,那么我们无法保证同一组内的大部分数据都在同一个 worker 上。对于大型查询组,这可能不是问题。但对于小型查询组,每个 worker 可能只从所有组中获得一两个样本,这可能导致灾难性的性能。在这种情况下,我们可以根据查询组对数据进行分区,这是 DaskXGBRanker 的默认行为,除非将 allow_group_split 设置为 True。此模式除了对查询组 ID 进行编码操作外,还对整个数据集执行排序和分组。加上分区碎片化,此选项可能导致性能缓慢。请参阅 使用 Dask 接口进行排序学习 获取工作示例。

故障排除

  • 在某些环境中,XGBoost 可能无法解析调度器的 IP 地址,症状是在训练期间收到 OSError: [Errno 99] Cannot assign requested address 错误。快速解决方法是显式指定地址。为此,可以使用集体 Config

    在 3.0.0 版本中添加。

import dask
from distributed import Client
from xgboost import dask as dxgb
from xgboost.collective import Config

# let xgboost know the scheduler address
coll_cfg = Config(retry=1, timeout=20, tracker_host_ip="10.23.170.98", tracker_port=0)

with Client(scheduler_file="sched.json") as client:
    reg = dxgb.DaskXGBRegressor(coll_cfg=coll_cfg)
  • 请注意,XGBoost 需要与 dask 不同的端口。默认情况下,在类似 Unix 的系统上,XGBoost 使用端口 0 来查找可用端口,这在用户运行在受限的 docker 环境中时可能会失败。在这种情况下,请在容器中打开额外端口并按上面的代码片段指定。

  • 如果在启用 GPU 训练时遇到 NCCL 系统错误,通常包含错误消息 NCCL failure: unhandled system error,您可以使用 NCCL 文档 中列出的一个环境变量指定其网络配置,例如 NCCL_SOCKET_IFNAME。此外,您可以使用 NCCL_DEBUG 获取调试日志。

  • 如果在容器环境中 NCCL 初始化失败,可能是由于系统共享内存受限。使用 docker 时,可以尝试以下标志:--shm-size=4g

  • NCCL 尚未支持 MIG(Multi-Instance GPU)。初始化时会收到包含 Multiple processes within a communication group … 的错误消息。

  • 从 2.1.0 版本开始,为了减小二进制 wheel 的大小,XGBoost 包(使用 pip 安装)从环境中加载 NCCL,而不是直接捆绑它。这意味着如果您遇到类似“Failed to load nccl …”的错误消息,则表示您的环境中未安装或正确配置 NCCL。

    要解决此问题,可以使用 pip 安装 NCCL

    pip install nvidia-nccl-cu12 # (or with any compatible CUDA version)
    

    XGBoost 的默认 conda 安装应该不会遇到此错误。如果您使用定制的 XGBoost,请确保以下条件之一为真:

    • XGBoost 未使用 USE_DLOPEN_NCCL 标志编译。

    • 在初始化 collective 时,dmlc_nccl_path 参数设置为完整的 NCCL 路径。

    以下是排查 NCCL 依赖项问题的其他提示:

    • 检查 NCCL 安装路径并验证其是否正确安装。当 XGBoost 使用 pip 安装时,我们尝试通过在 Python 中使用 from nvidia.nccl import lib 来查找 NCCL。

    • 确保您安装了兼容的 CUDA 版本。NCCL 需要兼容的 CUDA 版本才能正常工作。

    • 如果您不使用分布式 XGBoost 训练但仍看到此错误,请在 GitHub 上提交 issue。

    • 如果继续遇到 NCCL 依赖项问题,请在 GitHub 上提交 issue。

IPv6 支持

在 1.7.0 版本中添加。

XGBoost 在 Linux 上的 Dask 接口初步支持 IPv6。由于大多数集群对 IPv6 的支持是部分的(双栈而非纯 IPv6),我们需要类似 故障排除 的额外用户配置,以帮助 XGBoost 获取正确的地址信息。

import dask
from distributed import Client
from xgboost import dask as dxgb
# let xgboost know the scheduler address, use the same bracket format as dask.
with dask.config.set({"xgboost.scheduler_address": "[fd20:b6f:f759:9800::]"}):
    with Client("[fd20:b6f:f759:9800::]") as client:
        reg = dxgb.DaskXGBRegressor(tree_method="hist")

使用 GPU 时,XGBoost 使用 NCCL 作为底层通信框架,根据集群设置,可能需要通过环境变量进行一些额外配置。请注意,IPv6 支持仅限于 Unix 系统。

记录评估结果

默认情况下,Dask 接口在调度器进程中打印评估结果。这使得用户难以监控训练进度。我们可以使用回调函数定义自定义评估监控器。有关如何将日志转发到客户端进程的工作示例,请参阅 将评估日志转发到客户端的示例。在该示例中,有两种使用 Dask 内置方法的潜在解决方案,包括 distributed.Client.forward_logging()distributed.print()。两者都有一些注意事项,但可以作为开发更复杂方法(例如写入文件)的良好起点。

DaskDMatrix 初始化缓慢并抛出奇怪错误的原因

XGBoost 中的 dask API 要求构造 DaskDMatrix。使用 Scikit-Learn 接口时,在 fitpredict 步骤中,会自动为所有输入数据隐式构造 DaskDMatrix。您可能已经注意到 DaskDMatrix 构造会花费大量时间,有时还会抛出看起来与 DaskDMatrix 无关的错误。原因简述如下:默认情况下,大多数 dask 计算都是延迟求值的,这意味着计算不会立即执行,直到您显式请求结果,例如调用 compute()。有关 dask 中的详细信息,请参阅前一个链接;有关延迟求值的通用概念信息,请参阅此 wikiDaskDMatrix 构造函数强制执行延迟计算,这意味着它是您所有先前的计算实际执行的地方,包括 dd.read_csv() 等操作。为了隔离 DaskDMatrix 中的计算与其他延迟计算,可以在构造 DaskDMatrix 之前显式等待输入数据的结果。此外,可以使用 dask 的诊断仪表板来监控当前正在执行的操作。

可重现的结果

在单节点模式下,只要底层平台相同,我们就可以始终期望获得相同的训练结果。然而,在分布式环境中很难获得可重现的结果,因为任务在不同的会话中可能获得不同的机器分配或拥有不同数量的可用资源。有一些启发式方法和指导方针可以实现这一点,但没有证明可行的方法来保证这种确定性行为。XGBoost 中的 Dask 接口会尽最大努力提供可重现的结果。本节重点介绍了一些已知标准,并试图分享一些关于该问题的见解。

XGBoost 主要执行两种不同的任务:训练和推理。在软件和硬件相同且运行时配置相同的情况下,推理是可重现的。本节的其余部分将重点介绍训练。

许多挑战源于我们使用的是近似算法。用于查找直方图 bin 的 sketching 算法是对精确分位数算法的近似,分布式环境中的 AUC 指标是对精确 AUC 分数的近似,而浮点数是对实数的近似。浮点数之所以成为问题,是因为其求和不具备结合律,这意味着 \((a + b) + c\) 不一定等于 \(a + (b + c)\),尽管此属性对于实数成立。因此,只要我们改变求和的顺序,结果就可能不同。这就要求为了获得可重现的 XGBoost 输出,整个流水线必须是可重现的。

  • 每次运行的软件栈相同。这是不言而喻的。不同版本的 XGBoost 可能会产生不同的输出。这是预期的,因为我们可能会更改超参数的默认值,或者并行策略生成不同的浮点结果。我们保证算法的正确性,但最终输出有很大的回旋余地。许多依赖项的情况也类似,例如,随机数生成器可能因平台而异。

  • 每次运行的硬件栈相同。这包括 worker 的数量以及每个 worker 上可用资源的数量。使用不同数量的 worker 训练,XGBoost 可能会产生不同的结果。这是由前面提到的近似问题引起的。

  • 与硬件限制类似,网络拓扑也是最终输出的一个因素。如果我们改变拓扑,worker 的顺序可能会不同,导致浮点运算的顺序不同。

  • 流水线各处使用的随机种子。

  • 数据的分区需要可重现。这与每个 worker 上的可用资源有关。Dask 可能根据其自身的调度策略为每次运行以不同方式对数据进行分区。例如,如果您在运行 XGBoost 的第二次训练会话时集群中还有其他任务,某些 worker 可能内存受限,并且 Dask 可能不会将 XGBoost 的训练数据推送到该 worker。这种数据分区的变化可能导致输出模型不同。如果您使用的是共享的 Dask 集群,那么每次运行结果很可能不同。

  • 在数据框上执行的操作需要可重现。有些操作(如 DataFrame.merge)在 GPU 等并行硬件上不是确定性的,因为索引的顺序在不同运行中可能不同。

由于上述标准,在分布式环境中训练模型与使用单节点训练模型预期会得到不同的结果。

内存使用

以下是一些使用 dask 和 xgboost 减少内存使用的方法。

  • 在分布式工作流中,最好直接通过 dask 集合加载数据,而不是通过客户端进程加载。当无法避免通过客户端进程加载时,使用 client.scatter 将数据从客户端进程分发到 workers。有关很好的总结,请参阅 [2]。

  • 使用 GPU 输入时(例如通过 dask_cudf 加载的数据框),可以尝试使用 xgboost.dask.DaskQuantileDMatrix 作为 DaskDMatrix 的替代品,以减少总体内存使用。有关示例,请参阅 使用 Dask 在 GPU 上训练示例

  • 尽可能使用原地预测 (in-place prediction)。

参考文献

  1. https://github.com/dask/dask/issues/6833

  2. https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array