Parallel CV of XGBoost in Spark


#1

Hello,
I’m using xgboost in CV to find a “good” and reliable model with the following configuration:

val xgbParam = Map(
   "booster" -> "gbtree",
    "objectiveType" -> "classification",
    "objective" -> "binary:logitraw", 
    "evalMetric" -> "error",
    "numEarlyStoppingRounds" -> 10,
    "maximizeEvaluationMetrics" -> false,
    "verbosity" -> 0,
    "maxBin" -> 256,
    "num_parallel_tree" -> 4,
    "numWorkers" -> 100)

    
val xgbClassifier = { new XGBoostClassifier(xgbParam)
    .setFeaturesCol(finalVectAssembler.getOutputCol)
    .setLabelCol(labelIndexer.getOutputCol)
    .setScalePosWeight(posLabelWeight)
}


val xgbPipeline = new Pipeline().setStages(Array(labelIndexer, vectAssembler, xgbClassifier))


val cvParamGrid = new ParamGridBuilder()
    .addGrid(xgbClassifier.maxDepth, Array(6, 12, 16))
    .addGrid(xgbClassifier.eta, Array(0.1, 0.3))
    .addGrid(xgbClassifier.numRound, Array(200))
    .addGrid(xgbClassifier.colsampleBytree, Array(1.0)) 
    .addGrid(xgbClassifier.alpha, Array(0.0, 0.3, 0.5, 0.7, 1.0)) 
    .addGrid(xgbClassifier.lambda, Array(1.0, 0.7, 0.5, 0.3, 0.0)) 
    .build()
    
val cvXGBoost = new CrossValidator()
    .setEstimator(xgbPipeline)
    .setEvaluator(new BinaryClassificationEvaluator().setLabelCol("labelCol_Index"))
    .setEstimatorParamMaps(cvParamGrid)
    .setNumFolds(5)

The problem I’m facing is that the xgboost is not parallelized and it has only one task (taking a lot of time), leaving a lot of cluster’s resources (CPU and Memory) unused. How can one fixe this

NB: My data has 200 partitions and my cluster, 150+ CPU

Thanks


#2

Hi, my guess is because of setting “num_parallel_tree” results in numWorkers not working here. Seems like num_parallel_tree is to support boosted random forest. Maybe you can try to set it to default 1 and see whether works.


#3

Hi @jackie930 thanks for your answer.
I’ve tried this

    val xgbParam = Map(
   "booster" -> "gbtree",
    "objectiveType" -> "classification",
    "objective" -> "binary:logitraw", 
    "evalMetric" -> "error",
    "numEarlyStoppingRounds" -> 10,
    "maximizeEvaluationMetrics" -> false,
    "verbosity" -> 0,
    "maxBin" -> 256,
    "num_parallel_tree" -> 1,
    "nthread" ->  3, 
    "numWorkers" -> 30)

Where “spark.task.cpus”: “3” but it’s much slower than the case where I set numWorkers to 1


#4

Found this link https://www.qubole.com/blog/machine-learning-xgboost-qubole-spark-cluster/ helpfull


#5

Here are the changes I made to solve my problem,i.e being able to parallelize my XGBoost CrossValidation

spark.task.cpus=3 
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=#ClusterNodes
  • Separate the data preprocessing step to the modeling part.
val vectorizerPipeline = new Pipeline().setStages(Array(labelIndexer, vectAssembler))    
val vectorizedData = vectorizerPipeline.fit(trainDF).transform(trainDF).cache()

val cvParamGrid = new ParamGridBuilder()
    .addGrid(xgbClassifier.maxDepth, Array(6, 12, 16))
    .addGrid(xgbClassifier.eta, Array(0.1, 0.3))
    .addGrid(xgbClassifier.numRound, Array(200))
    .addGrid(xgbClassifier.colsampleBytree, Array(1.0)) 
    .addGrid(xgbClassifier.alpha, Array(0.0, 0.3, 0.5, 0.7, 1.0)) 
    .addGrid(xgbClassifier.lambda, Array(1.0, 0.7, 0.5, 0.3, 0.0)) 
    .build()

val cvXGBoost = new CrossValidator()
    .setEstimator(xgbClassifier) // => not need a pipeline here, this reduces the number of tasks, i.e do not repeat the preprocessing steps
    .setEvaluator(new BinaryClassificationEvaluator().setLabelCol("labelCol_Index"))
    .setEstimatorParamMaps(cvParamGrid)
    .setNumFolds(5)


val cvModel = cvXGBoost.fit(vectorizedData)