I have 3 small datasets with same column schema (totally < 1G), and wanted to train a XGBoost model for each (in Spark environment). Code snippet:
‘’’
def myPipeline(): Pipeline = {
val indexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("class")
.setHandleInvalid("keep")
val assembler = new VectorAssembler()
.setInputCols(Array("C1","C2","C3","C4","C5"))
.setOutputCol("assembled_features")
val xgbClassifier = new XGBoostClassifier()
.setNumWorkers(2)
.setEta(0.1)
.setMaxDepth(6)
.setNumRound(100)
.setObjective("binary:logistic")
.setFeaturesCol(assembler.getOutputCol)
.setLabelCol("class")
val pipeline = new Pipeline()
.setStages(Array(indexer, assembler, xgbClassifier))
pipeline
}
def main(String[] args): Unit = {
val d1 = spark.read.format("csv").load("path1")
val d2 = spark.read.format("csv").load("path2")
val d3 = spark.read.format("csv").load("path3")
val data = Array(d1,d2,d3)
val pipeline = myPipeline()
data.foreach(d => {
val model = pipeline.fit(d)
val prediction = pipeline.transform(d)
prediction.write.mode("overwrite").json(" ... ")
})
}
‘’’
Despite the large number of executors that I allocated when submitting the Spark application, the training process always failed with errors like:
BTW, I have allocated 8G memory for each executor when submitting the jobs.
Does anybody know why? Thanks!