[dask] Is distributed training globally "data parallel" and locally "feature parallel"?

Question

Is this a true statement?

When using distributed training in Python with xgb.dask, the data distribution is “data parallel”, meaning that each worker has a subset of the training data’s rows. Locally on each worker, the process of training a model is “feature parallel”, which means that candidate splits for different features are evaluated in parallel using multithreading.
_
As a result, the same feature + split combinations may be evaluated on multiple workers (using their local pieces of the data).

I ask because I’m trying to form good expectations for how training time with dask.xgb should change as I add more machines.

If my statement above is correct, I’d expect a decreasing rate of improvement from adding more workers because some duplicated work is being done (e.g. workers 1, 2, and 3 might all evaluate the split feat_1 > 0.75 on their local piece of the data).

Background

I have read the original xgboost paper, and saw this description of the approx tree method used in distributed training:

In order to reduce the cost of sorting, we propose to store the data in in-memory units, which we called block. Data in each block is stored in the compressed column (CSC) format, with each column sorted by the corresponding feature value. This input data layout only needs to be computed once before training, and can be reused in later iterations
_
The block structure also helps when using the approximate algorithms. Multiple blocks can be used in this case, with each block corresponding to subset of rows in the dataset.
_
Different blocks can be distributed across machines, or stored on disk in the out-of-core setting. Using the sorted structure, the quantile finding step becomes a linear scan over the sorted columns.

As far as I can tell, the DaskDMatrix isn’t equivalent to that and can’t be, since Dask collections are lazy loaded and xgb.dask tries to avoid moving any data off of the worker it’s loaded onto.

I’m using these terms “data parallel” and “feature parallel” because that is how LightGBM describes them: https://lightgbm.readthedocs.io/en/latest/Features.html#optimization-in-parallel-learning.

Notes for reviewers

I have tried searching this discussion board, XGBoost issues, Stack Overflow, and the source code for xgboost and rabit and could not find an answer to this. Apologies in advance if this is covered somewhere already.

Thanks for your time and consideration!

Nice to see you here, @jameslamb!

No, there is no duplicated work when workers evaluate the same split candidate on their local piece of data.

Other factors may prevent linear speedup when adding more workers, but duplicated work is not the reason. One major factor for slowdown is the use of AllReduce to combine gradient histogram among multiple workers.

1 Like

Thanks! How is it that XGBoost avoids the case where the same combination of (feature, threshold) is evaluated on multiple workers?

Is the list of candidate splits determined globally first somehow, and then the work of evaluating them divided among the workers?

Yes, we use approximate quantiles for each feature to generate the list of candidate splits (feature, threshold).

No. The split evaluation works as follows:

  1. Each worker generates gradient histograms for the data slice it’s assigned. The histograms let us query the partial sum of gradient pairs
sum_i   (g_i, h_i)

where the sum is taken over the set of all data points for which the value of a particular feature is in a particular range. Each range is in form [q[j, k], q[j, k+1]] where q[j, k] is the k-th (approximate) quantile of feature j. As a result, the number of bins in the histograms is M * K * T, where M is the number of features, K is the number of quantiles (by default 256), and T is the number of tree nodes added so far in the tree.

  1. Workers perform AllReduce to combine the gradient histograms.

  2. Given the gradient histograms, workers are now able to choose the best split candidate.

  3. Workers perform data partition given the split candidate, i.e. all data rows have an updated partition (node) ID.

  4. Workers re-compute the gradient pairs for each data point using the new data partitions. Now go back to Step 1.

(I believe LightGBM’s distributed training works in a similar way. Correct me if I’m wrong here.)

Now I do realize that Step 3 represents duplicated workload across multiple workers. But given the histograms, the work involved in Step 3 is minimal, relative to the work needed in Steps 4 and 5. The reason is that the histograms constitute sufficient statistics for computing loss values for all split candidates. For example, we can evaluate the threshold q[j, k] as follows:

  1. Compute the “left” sum L by summing all the bins for ranges (-inf, q[j, 0]], …, [q[j, k-1], q[j, k]].
  2. Compute the “right” sum R by summing all the bins for ranges [q[j, k], q[j, k+1]], …, [q[j, K-1], +inf).
  3. Now use L and R to compute the change in loss that would result by creating the new split with (feature=j, threshold=q[j,k]). There exists a closed-form formula for the loss change, and the formula only depends on L and R.

In practice, I would be more worried about the bottleneck in AllReduce, as we have to communicate histograms of size M * K * T. It can be quite big, if we have high-dimensional data (large M) or if we grow deep trees (large T).

ps. If I have lots of time, I would love to write a whitepaper to describe the algorithm in precise mathematical details. But alas my hands are currently full.

1 Like

Thank you very much for the thorough answer!

Ok, this makes a lot more sense to me and I have a better idea of where the bottlenecks might be. I’m going to bookmark this page :grinning:

1 Like