Can i run xgboost in spark local mode?I run the following code and get the following error.Any help would be appreciated!


#1

package com.tai.ctr

import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

object XGBOOSTTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(“XGBOOSTTest”).master(“local[*]”).getOrCreate()
val schema = new StructType(Array(
StructField(“sepal length”,DoubleType,true),
StructField(“sepal width”,DoubleType,true),
StructField(“petal length”,DoubleType,true),
StructField(“petal width”,DoubleType,true),
StructField(“class”,StringType,true)
))

val rawInput = spark.read.schema(schema).csv("src/main/resources/iris.data")

//transform class to index to make xgboost happy
val stringIndexer = new StringIndexer()
  .setInputCol("class")
    .setOutputCol("classIndex")
    .fit(rawInput)

val labelTransformed = stringIndexer.transform(rawInput).drop("class")
//compose all feature columns as vector
val vectorAssembler = new VectorAssembler()
    .setInputCols(Array("sepal length", "sepal width", "petal length", "petal width"))
    .setOutputCol("features")
val xgbInput = vectorAssembler.transform(labelTransformed).select("features","classIndex")
val Array(train,eval1,eval2,test) = xgbInput.randomSplit(Array(0.6,0.2,0.1,0.1))

val xgbParam = Map("eta" -> 0.1f,
  "max_depth" -> 2,
  "objective" -> "multi:softprob",
  "num_class" -> 3,
  "num_round" -> 100,
  "num_workers" -> 2,
  "eval_sets" -> Map("eval1" -> eval1,"eval2" -> eval2))

val xgbClassifier = new XGBoostClassifier(xgbParam)
  .setFeaturesCol("features")
  .setLabelCol("classIndex")

val xgbClassificationModel =xgbClassifier.fit(train)

val results = xgbClassificationModel.transform(test)

results.printSchema()

spark.stop()

}
}

error message:
19/05/17 07:18:29 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
19/05/17 07:18:30 INFO BlockManagerInfo: Removed broadcast_6_piece0 on 192.168.1.101:51559 in memory (size: 18.6 KB, free: 912.2 MB)
19/05/17 07:18:30 INFO BlockManagerInfo: Removed broadcast_8_piece0 on 192.168.1.101:51559 in memory (size: 18.6 KB, free: 912.2 MB)
19/05/17 07:18:30 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 192.168.1.101:51559 in memory (size: 18.6 KB, free: 912.2 MB)
19/05/17 07:18:30 INFO RabitTracker: Tracker Process ends with exit code 1
19/05/17 07:18:30 INFO XGBoostSpark: Rabit returns with exit code 1
Exception in thread “main” ml.dmlc.xgboost4j.java.XGBoostError: XGBoostModel training failed
** at ml.dmlc.xgboost4j.scala.spark.XGBoost$.ml$dmlc$xgboost4j$scala$spark$XGBoost$$postTrackerReturnProcessing(XGBoost.scala:511)**
** at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$1.apply(XGBoost.scala:404)**
** at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$1.apply(XGBoost.scala:381)**
** at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)**
** at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)**
** at scala.collection.immutable.List.foreach(List.scala:392)**
** at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)**
** at scala.collection.immutable.List.map(List.scala:296)**
** at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainDistributed(XGBoost.scala:380)**
** at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.train(XGBoostClassifier.scala:196)**
** at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.train(XGBoostClassifier.scala:48)**
** at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)**
** at com.tai.ctr.XGBOOSTTest$.main(XGBOOSTTest.scala:48)**
** at com.tai.ctr.XGBOOSTTest.main(XGBOOSTTest.scala)**
19/05/17 07:18:30 ERROR RabitTracker: Uncaught exception thrown by worker:
java.lang.InterruptedException
** at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)**
** at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)**
** at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)**
** at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)**
** at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)**
** at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:243)**
** at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:728)**
** at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)**
** at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)**
** at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)**
** at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)**
** at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)**
** at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)**
** at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)**
** at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)**
** at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)**
** at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)**
** at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$1$$anon$1.run(XGBoost.scala:397)**
19/05/17 07:18:31 INFO MemoryStore: Block rdd_39_1 stored as values in memory (estimated size 1544.0 B, free 911.3 MB)
19/05/17 07:18:31 INFO BlockManagerInfo: Added rdd_39_1 in memory on 192.168.1.101:51559 (size: 1544.0 B, free: 912.2 MB)
19/05/17 07:18:31 INFO MemoryStore: Block rdd_39_0 stored as values in memory (estimated size 1544.0 B, free 911.3 MB)
19/05/17 07:18:31 INFO BlockManagerInfo: Added rdd_39_0 in memory on 192.168.1.101:51559 (size: 1544.0 B, free: 912.2 MB)
19/05/17 07:18:31 INFO Executor: 1 block locks were not released by TID = 5:
[rdd_39_0]
19/05/17 07:18:31 INFO Executor: 1 block locks were not released by TID = 6:
[rdd_39_1]
19/05/17 07:18:31 INFO Executor: Finished task 1.0 in stage 5.0 (TID 6). 1052 bytes result sent to driver
19/05/17 07:18:31 INFO Executor: Finished task 0.0 in stage 5.0 (TID 5). 1052 bytes result sent to driver
19/05/17 07:18:31 INFO TaskSetManager: Finished task 1.0 in stage 5.0 (TID 6) in 2186 ms on localhost (executor driver) (1/2)
19/05/17 07:18:31 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 5) in 2195 ms on localhost (executor driver) (2/2)
19/05/17 07:18:31 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool
19/05/17 07:18:31 INFO DAGScheduler: ResultStage 5 (foreachPartition at XGBoost.scala:397) finished in 2.214 s
19/05/17 07:18:35 INFO SparkContext: Invoking stop() from shutdown hook

The error message


#2

Are you using correct Spark version?


#3

My code is also failing at postTrackerReturnProcessing(XGBoost.scala:511). Did you figure out the issue?


#4

same error, 0.82 xgboost + spark 2.3
User class threw exception: ml.dmlc.xgboost4j.java.XGBoostError: XGBoostModel training failed
at ml.dmlc.xgboost4j.scala.spark.XGBoost$.ml$dmlc$xgboost4j$scala$spark$XGBoost$$postTrackerReturnProcessing(XGBoost.scala:511)
at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$1.apply(XGBoost.scala:404)
at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$1.apply(XGBoost.scala:381)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainDistributed(XGBoost.scala:380)
at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.train(XGBoostClassifier.scala:196)
at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.train(XGBoostClassifier.scala:48)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
at trainXgb.train$.main(train.scala:15)
at trainXgb.train.main(train.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)