Hello,
I’m trying to learn how to use distributed xgboost with dask. I’m doing this first at my personal computer to learn the basics. My OS is a Windows 10 Home Single Language version 10.0.19042.
I’m having problems to run the simple example found in https://xgboost.readthedocs.io/en/latest/python/dask-examples/cpu_training.html.
import xgboost as xgb
from xgboost.dask import DaskDMatrix
from dask.distributed import Client
from dask.distributed import LocalCluster
from dask import array as da
def main(client):
# generate some random data for demonstration
m = 100000
n = 100
X = da.random.random(size=(m, n), chunks=100)
y = da.random.random(size=(m, ), chunks=100)
# DaskDMatrix acts like normal DMatrix, works as a proxy for local
# DMatrix scatter around workers.
dtrain = DaskDMatrix(client, X, y)
# Use train method from xgboost.dask instead of xgboost. This
# distributed version of train returns a dictionary containing the
# resulting booster and evaluation history obtained from
# evaluation metrics.
output = xgb.dask.train(client,
{'verbosity': 1,
'tree_method': 'hist'},
dtrain,
num_boost_round=4, evals=[(dtrain, 'train')])
bst = output['booster']
history = output['history']
# you can pass output directly into `predict` too.
prediction = xgb.dask.predict(client, bst, dtrain)
print('Evaluation history:', history)
return prediction
if __name__ == '__main__':
# or use other clusters for scaling
with LocalCluster(n_workers=4, threads_per_worker=4) as cluster:
with Client(cluster) as client:
main(client)
When I execute this for the first (after turning my computer on), it runs, but with the following warnings:
[12:35:44] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:237: Connect to (failed): [192.168.0.122]
[12:35:44] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:446: failed in ReconnectLink [12:35:44] C:\Users\Administrator\workspace\xgboost-win64_release_1.5.1\rabit\include\rabit/internal/utils.h:114: Socket Connect Error:WSAError-code=10060[12:35:44] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:446: failed in ReconnectLink [12:35:44] C:\Users\Administrator\workspace\xgboost-win64_release_1.5.1\rabit\include\rabit/internal/utils.h:114: Socket Connect Error:WSAError-code=10060
[12:35:44] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:446: failed in ReconnectLink [12:35:44] C:\Users\Administrator\workspace\xgboost-win64_release_1.5.1\rabit\include\rabit/internal/utils.h:114: Socket Connect Error:WSAError-code=10060
[12:35:44] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:446: failed in ReconnectLink [12:35:44] C:\Users\Administrator\workspace\xgboost-win64_release_1.5.1\rabit\include\rabit/internal/utils.h:114: Socket Connect Error:WSAError-code=10060
[12:37:10] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:240: Retry connect to ip(retry time 4): [192.168.0.122]
[12:37:31] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:139: Failed to shutdown due to[12:37:31] C:\Users\Administrator\workspace\xgboost-win64_release_1.5.1\rabit\include\rabit/internal/utils.h:114: Socket Connect Error:WSAError-code=10060
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001E1E2698F70>>, <Task finished name='Task-2959' coro=<Worker.heartbeat() done, defined at C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py:941> exception=KeyError('dispatched_predict-3ebb87a72df4eb244bd4b9e64b754adf')>)
Traceback (most recent call last):
File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
ret = callback()
File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
future.result()
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py", line 950, in heartbeat
response = await retry_operation(
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 385, in retry_operation
return await retry(
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 370, in retry
return await coro()
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 862, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 661, in send_recv
raise exc.with_traceback(tb)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 497, in handle_comm
result = handler(comm, **msg)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3647, in heartbeat_worker
ws._executing = {
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3648, in <dictcomp>
parent._tasks[key]: duration for key, duration in executing.items()
KeyError: 'dispatched_predict-3ebb87a72df4eb244bd4b9e64b754adf'
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x00000198FB4B8F70>>, <Task finished name='Task-2939' coro=<Worker.heartbeat() done, defined at C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py:941> exception=KeyError('dispatched_predict-4ad3894cd5e661104c45809b7b1ccf1c')>)
Traceback (most recent call last):
File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
ret = callback()
File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
future.result()
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py", line 950, in heartbeat
response = await retry_operation(
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 385, in retry_operation
return await retry(
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 370, in retry
return await coro()
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 862, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 661, in send_recv
raise exc.with_traceback(tb)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 497, in handle_comm
result = handler(comm, **msg)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3647, in heartbeat_worker
ws._executing = {
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3648, in <dictcomp>
parent._tasks[key]: duration for key, duration in executing.items()
KeyError: 'dispatched_predict-4ad3894cd5e661104c45809b7b1ccf1c'
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x00000286DB158F70>>, <Task finished name='Task-2960' coro=<Worker.heartbeat() done, defined at C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py:941> exception=KeyError('dispatched_predict-8d70f98aa5613fdf3d70c9b4e7b7dc34')>)
Traceback (most recent call last):
File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
ret = callback()
File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
future.result()
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py", line 950, in heartbeat
response = await retry_operation(
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 385, in retry_operation
return await retry(
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 370, in retry
return await coro()
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 862, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 661, in send_recv
raise exc.with_traceback(tb)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 497, in handle_comm
result = handler(comm, **msg)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3647, in heartbeat_worker
ws._executing = {
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3648, in <dictcomp>
parent._tasks[key]: duration for key, duration in executing.items()
KeyError: 'dispatched_predict-8d70f98aa5613fdf3d70c9b4e7b7dc34'
distributed.worker - WARNING - Heartbeat to scheduler failed
The warning with times [12:35:44] and [12:37:10] appear multiple times, but the last warning (with time [12:37:31]) appears only once in the end execution.
But every time I try to execute it again, it does not work anymore, and I got the following errors:
`C:\Users\andre\Anaconda3\lib\site-packages\distributed\node.py:151: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 65096 instead
warnings.warn(
Windows is not officially supported for dask/xgboost, contribution are welcomed.
Windows is not officially supported for dask/xgboost, contribution are welcomed.
distributed.worker - WARNING - Run Failed
Function: _start_tracker
args: (1)
kwargs: {}
Traceback (most recent call last):
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py", line 3734, in run
result = function(*args, **kwargs)
File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\dask.py", line 143, in _start_tracker
rabit_context = RabitTracker(hostIP=host, nslave=n_workers, use_logger=False)
File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\tracker.py", line 161, in __init__
sock.bind((hostIP, _port))
OSError: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted
Traceback (most recent call last):
File "C:\Users\andre\OneDrive\Documentos\Unicamp\Rafinha\xgboost\cpu_training.py", line 46, in <module>
main(client)
File "C:\Users\andre\OneDrive\Documentos\Unicamp\Rafinha\xgboost\cpu_training.py", line 28, in main
output = xgb.dask.train(client,
File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\dask.py", line 981, in train
return client.sync(_train_async, global_config=config.get_config(), **args)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\client.py", line 843, in sync
return sync(
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils.py", line 353, in sync
raise exc.with_traceback(tb)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils.py", line 336, in f
result[0] = yield future
File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\gen.py", line 762, in run
value = future.result()
File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\dask.py", line 847, in _train_async
_rabit_args = await _get_rabit_args(len(workers), client)
File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\dask.py", line 805, in _get_rabit_args
env = await client.run_on_scheduler(_start_tracker, n_workers)
File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\client.py", line 2382, in _run_on_scheduler
raise exc.with_traceback(tb)
File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\dask.py", line 143, in _start_tracker
rabit_context = RabitTracker(hostIP=host, nslave=n_workers, use_logger=False)
File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\tracker.py", line 161, in __init__
sock.bind((hostIP, _port))
OSError: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted
I believe that the problem is related to xgboost, not dask alone, because when I run the following code, starting just Dask’s cluster and client, I do not get the above error:
if __name__ == '__main__':
# or use other clusters for scaling
with LocalCluster(n_workers=4, threads_per_worker=4) as cluster:
with Client(cluster) as client:
print(cluster)
print(client)
Output:
C:\Users\andre\Anaconda3\lib\site-packages\distributed\node.py:151: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 49277 instead
warnings.warn(
LocalCluster(537ecb12, 'tcp://127.0.0.1:49278', workers=4, threads=16, memory=7.86 GiB)
<Client: 'tcp://127.0.0.1:49278' processes=4 threads=16, memory=7.86 GiB>
I tried to use the host parameter for LocalCluster (e.g. LocalCluster(n_workers=4, threads_per_worker=4, host = ‘tcp://127.0.0.1:8787’), but it didn’t produced any change.
It seems that the cluster is not being turned off after the execution of the code. I tried to explicit include
client.close()
cluster.close()
but it didn’t produce any change also.
I have also noticed the message
“Windows is not officially supported for dask/xgboost, contribution are welcomed.”
Might it be an incompatibility of xgboost with windows ?
Thanks in advance!