[Solved] Multi-node and multi-threading with XGBoost4J-Spark PySpark API

Hi,

without early_stoppong_rounds, my distributed xgboost training with the new PySpark API works. However, it took way long compared to single vanilla xgboost. My observation is that the SparkXGBClassifier uses only 1 core for training, while the vanilla xgboost uses all cores available in the machine for training, which causes the problem. I tried to set the spark.task.cpus to a larger number according to the doc, but it doesn’t help. Here is my configurations:

def setup_standalone(master_name):

spark = SparkSession.builder.master(f'spark://{master_name}:7077')\
    .appName("xgboost_train")\
    .config("spark.driver.memory", '30g')\
    .config("spark.local.dir", "/mnt/tmp/spark")\
    .config("spark.executor.memory", "200g")\
    .config("spark.executor.memoryOverhead", "20g")\
    .config("spark.executor.cores", "40")\
    .config("spark.executor.instances","4")\
    .getOrCreate()
        
return spark

xgb_classifier = SparkXGBClassifier(**xgb_parms)

xgb_parms = { 
'max_depth':8, 
'learning_rate':0.1, 
'subsample':0.8,
'colsample_bytree':0.8, 
'eval_metric':'logloss',
'tree_method':'hist',
"random_state":48

}

My standalone spark cluster has 2 nodes and each node has 160 cores and around 500 GiB RAM. My code runs within a docker container. I build the xgboost 2.0.0-dev from source.

I guess the problem lies in my spark configuration, but I couldn’t find any doc related to how to configure spark cluster for running distributed xgboost on spark using the new PySpark API on the web. Would be great if anyone in the forum can help me out on this. Thanks!!!

ok. I solved the problem by change my spark and xgboost configurations. This is the working version:

    spark = SparkSession.builder.master(f'spark://{master_name}:7077')\
    .appName("Recsys2021_xgboost_train")\
    .config("spark.driver.memory", '30g')\
    .config("spark.local.dir", "/mnt/tmp/spark")\
    .config("spark.executor.memory", "30g")\
    .config("spark.executor.memoryOverhead", "50g")\
    .config("spark.executor.cores", "8")\
    .config("spark.executor.instances","16")\
    .config("spark.task.cpus","8")\
    .getOrCreate()


xgb_parms = { 
'max_depth':8, 
'learning_rate':0.1, 
'subsample':0.8,
'colsample_bytree':0.8, 
'eval_metric':'logloss',
'tree_method':'hist',
'missing':0.0,
'random_state':48,
'num_workers':16

}

Hope it helps.