I run the code with pyspark shell:
pyspark --jars /root/hadoop/mysql-connector-java-5.1.37-bin.jar,/root/downloads/rapids-4-spark_2.12-21.10.0.jar,/root/downloads/cudf-21.10.0.jar,/root/downloads/rapids-4-spark-tools_2.12-21.10.0.jar,/root/downloads/xgboost4j_3.0-1.4.2-0.1.0.jar,/root/downloads/xgboost4j-spark_3.0-1.4.2-0.1.0.jar --master yarn --deploy-mode client --num-executors 2 --executor-cores 1 --executor-memory 1g --driver-memory 1g
–conf spark.sql.session.timeZone=UTC
–conf spark.driver.extraJavaOptions=-Duser.timezone=UTC
–conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
–conf spark.plugins=com.nvidia.spark.SQLPlugin
–conf spark.kryo.registrator=com.nvidia.spark.rapids.GpuKryoRegistrator
–conf spark.resources.discoveryPlugin=com.nvidia.spark.ExclusiveModeGpuDiscoveryPlugin
–conf spark.rapids.shims-provider-override=com.nvidia.spark.rapids.shims.spark312.SparkShimServiceProvider
–conf spark.rapids.memory.gpu.pooling.enabled=true
–conf spark.rapids.python.memory.gpu.allocFraction=0.8
–conf spark.rapids.python.memory.gpu.maxAllocFraction=0.9
–conf spark.executor.resource.gpu.amount=1
–conf spark.task.resource.gpu.amount=0.5
–conf spark.executor.resource.gpu.discoveryScript=/root/hadoop/getGpusResources_executor.sh
–conf spark.rapids.sql.enabled=true
–conf spark.rapids.sql.incompatibleOps.enabled=true
–conf spark.rapids.sql.concurrentGpuTasks=2
–conf spark.rapids.sql.udfCompiler.enabled=true
–conf spark.rapids.sql.format.csv.read.enabled=true
–conf spark.rapids.sql.format.csv.enabled=true
–conf spark.rapids.sql.variableFloatAgg.enabled=true
–conf spark.rapids.sql.explain=ALL
–conf spark.rapids.sql.batchSizeBytes=512M
–conf spark.sql.hive.convertMetastoreParquet=true
–py-files /root/downloads/sparkxgb.zip
code:
import numpy as np
import pandas as pd
import os
import re
from sklearn import metrics
import matplotlib.pyplot as plt
import findspark
findspark.init()
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from sparkxgb.xgboost import XGBoostEstimator
import pyspark.sql.functions as F
import pyspark.sql.types as T
#spark.sparkContext.addPyFile("/root/downloads/sparkxgb.zip")
df_all = spark
.read
.option(“header”, “true”)
.csv(“bank.csv”)
tran_tab = str.maketrans({x:None for x in list(’{()}’)})
df_all = df_all.toDF(*(re.sub(r’[.\s]+’, ‘_’, c).translate(tran_tab) for c in df_all.columns))
fill na
df_all = df_all.na.fill(0)
unused_col = [‘day’,‘month’]
df_all = df_all.select([col for col in df_all.columns if col not in unused_col])
numeric_features = [t[0] for t in df_all.dtypes if t[1] == ‘int’]
cols = df_all.columns
string_col = [t[0] for t in df_all.dtypes if t[1] != ‘int’]
string_col = [x for x in string_col if x!=‘deposit’]
for S in string_col:
globals()[str(S)+‘Indexer’] = StringIndexer()
.setInputCol(S)
.setOutputCol(str(S)+‘Index’)
.setHandleInvalid(“keep”)
globals()[str(S)+‘classVecIndexer’] = OneHotEncoder(inputCols=[globals()[str(S)+‘Indexer’].getOutputCol()], outputCols=[str(S)+ “classVec”])
zip to one ‘feature’ columns
feature_col = [s+‘Index’ for s in string_col]
feature_col.extend([str(s)+ “classVec” for s in string_col])
feature_col.extend(numeric_features)
vectorAssembler = VectorAssembler()
.setInputCols(feature_col)
.setOutputCol(“features”)
index label columns
label_stringIdx = StringIndexer(inputCol = ‘deposit’, outputCol = ‘label’)
define xgboost
xgboost = XGBoostEstimator(
featuresCol=“features”,
labelCol=“label”,
predictionCol=“prediction”
)
then I got an error:
TypeError Traceback (most recent call last)
in
2 featuresCol=“features”,
3 labelCol=“label”,
----> 4 predictionCol=“prediction”
5 )
/usr/local/spark/spark-3.1.2-bin-hadoop3.2/python/pyspark/init.py in wrapper(self, *args, **kwargs)
112 raise TypeError(“Method %s forces keyword arguments.” % func.name)
113 self._input_kwargs = kwargs
–> 114 return func(self, **kwargs)
115 return wrapper
116
/tmp/spark-d0b4f019-5ed6-4b4f-8100-c6de976c6a27/userFiles-7b721153-c4b0-4c68-8d94-b7864250a319/sparkxgb.zip/sparkxgb/xgboost.py in init(self, checkpoint_path, checkpointInterval, missing, nthread, nworkers, silent, use_external_memory, baseMarginCol, featuresCol, labelCol, predictionCol, weightCol, base_score, booster, eval_metric, num_class, num_round, objective, seed, alpha, colsample_bytree, colsample_bylevel, eta, gamma, grow_policy, max_bin, max_delta_step, max_depth, min_child_weight, reg_lambda, scale_pos_weight, sketch_eps, subsample, tree_method, normalize_type, rate_drop, sample_type, skip_drop, lambda_bias)
116
117 super(XGBoostEstimator, self).init()
–> 118 self._java_obj = self._new_java_obj(“ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator”, self.uid)
119 self._create_params_from_java()
120 self._setDefault(
/usr/local/spark/spark-3.1.2-bin-hadoop3.2/python/pyspark/ml/wrapper.py in _new_java_obj(java_class, *args)
64 java_obj = getattr(java_obj, name)
65 java_args = [_py2java(sc, arg) for arg in args]
—> 66 return java_obj(*java_args)
67
68 @staticmethod
TypeError: ‘JavaPackage’ object is not callable
I think this error is due to the incompatible version of puspark and sparkxgb.zip
where can I find the right version of sparkxgb.zip package?