Now the JAR files were created! But here is the current error not yet solved:
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "<path>\spark-2.4.3-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1159, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred: Traceback (most recent call last):
File "<path>\spark-2.4.3-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 985, in send_command
response = connection.send_command(command)
File "<path>\spark-2.4.3-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1164, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
<ipython-input-8-2f8cc998c6cc> in <module>
16 growPolicy='lossguide',
17 numWorkers=executors_per_node*nodes,
---> 18 nthread=cores_per_executor
19 )
~\Documents\spark-2.4.3-bin-hadoop2.7\python\pyspark\__init__.py in wrapper(self, *args, **kwargs)
108 raise TypeError("Method %s forces keyword arguments." % func.__name__)
109 self._input_kwargs = kwargs
--> 110 return func(self, **kwargs)
111 return wrapper
112
~\AppData\Local\Temp\spark-605454e4-6f56-4a13-978a-7d3e1bb4a678\userFiles-a31377bb-1156-4ab0-97b7-4eacff20e871\sparkxgb_0.83.zip\sparkxgb\xgboost.py in __init__(self, alpha, baseMarginCol, baseScore, cacheTrainingSet, checkpointInterval, checkpointPath, colsampleBylevel, colsampleBytree, contribPredictionCol, customEval, customObj, eta, evalMetric, featuresCol, gamma, growPolicy, interactionConstraints, labelCol, reg_lambda, lambdaBias, leafPredictionCol, maxBins, maxDeltaStep, maxDepth, maxLeaves, maximizeEvaluationMetrics, minChildWeight, missing, monotoneConstraints, normalizeType, nthread, numClass, numEarlyStoppingRounds, numRound, numWorkers, objective, objectiveType, predictionCol, probabilityCol, rateDrop, rawPredictionCol, sampleType, scalePosWeight, seed, sketchEps, skipDrop, subsample, threshold, timeoutRequestWorkers, trackerConf, trainTestRatio, treeLimit, treeMethod, useExternalMemory, verbosity, weightCol)
112 super(XGBoostClassifier, self).__init__()
113 self._java_obj = self._new_java_obj("ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier", self.uid)
--> 114 self._create_params_from_java()
115 self._setDefault() # We get our defaults from the embedded Scala object, so no need to specify them here.
116 kwargs = self._input_kwargs
~\Documents\spark-2.4.3-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _create_params_from_java(self)
147 SPARK-10931: Temporary fix to create params that are defined in the Java obj but not here
148 """
--> 149 java_params = list(self._java_obj.params())
150 from pyspark.ml.param import Param
151 for java_param in java_params:
~\Documents\spark-2.4.3-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
~\Documents\spark-2.4.3-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
~\Documents\spark-2.4.3-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
334 raise Py4JError(
335 "An error occurred while calling {0}{1}{2}".
--> 336 format(target_id, ".", name))
337 else:
338 type = answer[1]
Py4JError: An error occurred while calling o61.params
My guess is that I am not properly importing the from sparkxgb import XGBoostClassifier
The code I am using is:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import os, sys
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
%env SPARK_JARS='<path>\.m2\repository\ml\dmlc\xgboost4j\0.90\xgboost4j-0.90.jar;<path>\.m2\repository\ml\dmlc\xgboost4j-spark\0.90\xgboost4j-0.90.jar pyspark-shell'
import pyspark
...
executors_per_node = 7
nodes=1
cores_per_executor=8
task_per_core=1
cache_size=50
total_size=340000
conf = SparkConf() \
.set('spark.default.parallelism', f'{nodes*executors_per_node*cores_per_executor*task_per_core}') \
.set('spark.executor.instances', '{:d}'.format(executors_per_node*nodes)) \
.set('spark.files.maxPartitionBytes', '256m') \
.set('spark.app.name', 'pyspark_final-xgboost-0.90-Egor') \
.set('spark.rdd.compress', 'False') \
.set('spark.serializer','org.apache.spark.serializer.KryoSerializer') \
.set('spark.executor.cores','{:d}'.format(cores_per_executor)) \
.set('spark.executor.memory', '{:d}m'.format(int(math.floor(nodes*total_size/(nodes*executors_per_node)))-1024-int(math.floor(cache_size*1024/(nodes*executors_per_node))))) \
.set('spark.task.cpus',f'{cores_per_executor}') \
.set('spark.driver.memory','24g') \
.set('spark.memory.offHeap.enabled','True') \
.set('spark.memory.offHeap.size','{:d}m'.format(int(math.floor(cache_size*1024/(nodes*executors_per_node))))) \
.set('spark.executor.memoryOverhead','{:d}m'.format(int(math.floor(cache_size*1024/(nodes*executors_per_node)))+3000)) \
.set('spark.sql.join.preferSortMergeJoin','False') \
.set('spark.memory.storageFraction','0.5') \
.set('spark.executor.extraJavaOptions', \
'-XX:+UseParallelGC -XX:+UseParallelOldGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps') \
.set("spark.jars", "<path>\\\\Documents\\IntelFederal\\spark\\xgboost-jars\\xgboost4j-spark-0.90-Egor.jar")\
.set("spark.driver.extraClassPath", "<path>\\\\Documents\\IntelFederal\\spark\\xgboost-jars\\xgboost4j-spark-0.90-Egor.jar")\
.set("spark.executor.extraClassPath", "<path>\\\\Documents\\IntelFederal\\spark\\xgboost-jars\\xgboost4j-spark-0.90-Egor.jar")
sc = SparkContext(conf=conf,master='local[*]') # to run on single node
sc.setLogLevel('ERROR')
spark = SQLContext(sc)
spark_home = os.environ.get('SPARK_HOME', None)
sc.addPyFile(os.path.join(spark_home, 'python/lib/sparkxgb_0.83.zip'))
time.sleep(10)
from sparkxgb import XGBoostClassifier # needs sparkxgb_0.83.zip
import os, sys
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
xgboost = XGBoostClassifier(
featuresCol='features',
labelCol='delinquency_12',
numRound=100,
maxDepth=8,
maxLeaves=256,
alpha=0.9,
eta=0.1,
gamma=0.1,
subsample=1.0,
reg_lambda=1.0,
scalePosWeight=2.0,
minChildWeight=30.0,
treeMethod='hist',
objective='reg:squarederror',
growPolicy='lossguide',
numWorkers=executors_per_node*nodes,
nthread=cores_per_executor
)
Thank you a lot for any help!