[Dask/distributed xgboost] Error WinError 10048 with simple example

Hello,
I’m trying to learn how to use distributed xgboost with dask. I’m doing this first at my personal computer to learn the basics. My OS is a Windows 10 Home Single Language version 10.0.19042.
I’m having problems to run the simple example found in https://xgboost.readthedocs.io/en/latest/python/dask-examples/cpu_training.html.

import xgboost as xgb
from xgboost.dask import DaskDMatrix
from dask.distributed import Client
from dask.distributed import LocalCluster
from dask import array as da


def main(client):
    # generate some random data for demonstration
    m = 100000
    n = 100
    X = da.random.random(size=(m, n), chunks=100)
    y = da.random.random(size=(m, ), chunks=100)

    # DaskDMatrix acts like normal DMatrix, works as a proxy for local
    # DMatrix scatter around workers.
    dtrain = DaskDMatrix(client, X, y)

    # Use train method from xgboost.dask instead of xgboost.  This
    # distributed version of train returns a dictionary containing the
    # resulting booster and evaluation history obtained from
    # evaluation metrics.
    output = xgb.dask.train(client,
                            {'verbosity': 1,
                             'tree_method': 'hist'},
                            dtrain,
                            num_boost_round=4, evals=[(dtrain, 'train')])
    bst = output['booster']
    history = output['history']

    # you can pass output directly into `predict` too.
    prediction = xgb.dask.predict(client, bst, dtrain)
    print('Evaluation history:', history)
    return prediction


if __name__ == '__main__':
    # or use other clusters for scaling
    with LocalCluster(n_workers=4, threads_per_worker=4) as cluster:
        with Client(cluster) as client:
            main(client)

When I execute this for the first (after turning my computer on), it runs, but with the following warnings:

[12:35:44] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:237: Connect to (failed): [192.168.0.122]

[12:35:44] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:446: failed in ReconnectLink [12:35:44] C:\Users\Administrator\workspace\xgboost-win64_release_1.5.1\rabit\include\rabit/internal/utils.h:114: Socket Connect Error:WSAError-code=10060[12:35:44] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:446: failed in ReconnectLink [12:35:44] C:\Users\Administrator\workspace\xgboost-win64_release_1.5.1\rabit\include\rabit/internal/utils.h:114: Socket Connect Error:WSAError-code=10060

[12:35:44] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:446: failed in ReconnectLink [12:35:44] C:\Users\Administrator\workspace\xgboost-win64_release_1.5.1\rabit\include\rabit/internal/utils.h:114: Socket Connect Error:WSAError-code=10060
[12:35:44] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:446: failed in ReconnectLink [12:35:44] C:\Users\Administrator\workspace\xgboost-win64_release_1.5.1\rabit\include\rabit/internal/utils.h:114: Socket Connect Error:WSAError-code=10060

[12:37:10] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:240: Retry connect to ip(retry time 4): [192.168.0.122]


[12:37:31] WARNING: C:/Users/Administrator/workspace/xgboost-win64_release_1.5.1/rabit/src/allreduce_base.cc:139: Failed to shutdown due to[12:37:31] C:\Users\Administrator\workspace\xgboost-win64_release_1.5.1\rabit\include\rabit/internal/utils.h:114: Socket Connect Error:WSAError-code=10060
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001E1E2698F70>>, <Task finished name='Task-2959' coro=<Worker.heartbeat() done, defined at C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py:941> exception=KeyError('dispatched_predict-3ebb87a72df4eb244bd4b9e64b754adf')>)
Traceback (most recent call last):
  File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
    ret = callback()
  File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
    future.result()
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py", line 950, in heartbeat
    response = await retry_operation(
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 385, in retry_operation
    return await retry(
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 370, in retry
    return await coro()
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 862, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 661, in send_recv
    raise exc.with_traceback(tb)
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 497, in handle_comm
    result = handler(comm, **msg)
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3647, in heartbeat_worker
    ws._executing = {
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3648, in <dictcomp>
    parent._tasks[key]: duration for key, duration in executing.items()
KeyError: 'dispatched_predict-3ebb87a72df4eb244bd4b9e64b754adf'
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x00000198FB4B8F70>>, <Task finished name='Task-2939' coro=<Worker.heartbeat() done, defined at C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py:941> exception=KeyError('dispatched_predict-4ad3894cd5e661104c45809b7b1ccf1c')>)
Traceback (most recent call last):
  File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
    ret = callback()
  File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
    future.result()
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py", line 950, in heartbeat
    response = await retry_operation(
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 385, in retry_operation
    return await retry(
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 370, in retry
    return await coro()
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 862, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 661, in send_recv
    raise exc.with_traceback(tb)
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 497, in handle_comm
    result = handler(comm, **msg)
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3647, in heartbeat_worker
    ws._executing = {
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3648, in <dictcomp>
    parent._tasks[key]: duration for key, duration in executing.items()
KeyError: 'dispatched_predict-4ad3894cd5e661104c45809b7b1ccf1c'
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x00000286DB158F70>>, <Task finished name='Task-2960' coro=<Worker.heartbeat() done, defined at C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py:941> exception=KeyError('dispatched_predict-8d70f98aa5613fdf3d70c9b4e7b7dc34')>)
Traceback (most recent call last):
  File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
    ret = callback()
  File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
    future.result()
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py", line 950, in heartbeat
    response = await retry_operation(
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 385, in retry_operation
    return await retry(
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils_comm.py", line 370, in retry
    return await coro()
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 862, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 661, in send_recv
    raise exc.with_traceback(tb)
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\core.py", line 497, in handle_comm
    result = handler(comm, **msg)
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3647, in heartbeat_worker
    ws._executing = {
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\scheduler.py", line 3648, in <dictcomp>
    parent._tasks[key]: duration for key, duration in executing.items()
KeyError: 'dispatched_predict-8d70f98aa5613fdf3d70c9b4e7b7dc34'
distributed.worker - WARNING - Heartbeat to scheduler failed

The warning with times [12:35:44] and [12:37:10] appear multiple times, but the last warning (with time [12:37:31]) appears only once in the end execution.
But every time I try to execute it again, it does not work anymore, and I got the following errors:

`C:\Users\andre\Anaconda3\lib\site-packages\distributed\node.py:151: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 65096 instead
  warnings.warn(
Windows is not officially supported for dask/xgboost, contribution are welcomed.
Windows is not officially supported for dask/xgboost, contribution are welcomed.
distributed.worker - WARNING - Run Failed
Function: _start_tracker
args:     (1)
kwargs:   {}
Traceback (most recent call last):
  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\worker.py", line 3734, in run
    result = function(*args, **kwargs)
  File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\dask.py", line 143, in _start_tracker
    rabit_context = RabitTracker(hostIP=host, nslave=n_workers, use_logger=False)
  File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\tracker.py", line 161, in __init__
    sock.bind((hostIP, _port))
OSError: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted
Traceback (most recent call last):

  File "C:\Users\andre\OneDrive\Documentos\Unicamp\Rafinha\xgboost\cpu_training.py", line 46, in <module>
    main(client)

  File "C:\Users\andre\OneDrive\Documentos\Unicamp\Rafinha\xgboost\cpu_training.py", line 28, in main
    output = xgb.dask.train(client,

  File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\dask.py", line 981, in train
    return client.sync(_train_async, global_config=config.get_config(), **args)

  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\client.py", line 843, in sync
    return sync(

  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils.py", line 353, in sync
    raise exc.with_traceback(tb)

  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\utils.py", line 336, in f
    result[0] = yield future

  File "C:\Users\andre\Anaconda3\lib\site-packages\tornado\gen.py", line 762, in run
    value = future.result()

  File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\dask.py", line 847, in _train_async
    _rabit_args = await _get_rabit_args(len(workers), client)

  File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\dask.py", line 805, in _get_rabit_args
    env = await client.run_on_scheduler(_start_tracker, n_workers)

  File "C:\Users\andre\Anaconda3\lib\site-packages\distributed\client.py", line 2382, in _run_on_scheduler
    raise exc.with_traceback(tb)

  File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\dask.py", line 143, in _start_tracker
    rabit_context = RabitTracker(hostIP=host, nslave=n_workers, use_logger=False)

  File "C:\Users\andre\Anaconda3\lib\site-packages\xgboost\tracker.py", line 161, in __init__
    sock.bind((hostIP, _port))

OSError: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted

I believe that the problem is related to xgboost, not dask alone, because when I run the following code, starting just Dask’s cluster and client, I do not get the above error:

if __name__ == '__main__':
    # or use other clusters for scaling
    with LocalCluster(n_workers=4, threads_per_worker=4) as cluster:
        with Client(cluster) as client:
            print(cluster)
            print(client)

Output:

C:\Users\andre\Anaconda3\lib\site-packages\distributed\node.py:151: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 49277 instead
  warnings.warn(
LocalCluster(537ecb12, 'tcp://127.0.0.1:49278', workers=4, threads=16, memory=7.86 GiB)
<Client: 'tcp://127.0.0.1:49278' processes=4 threads=16, memory=7.86 GiB>

I tried to use the host parameter for LocalCluster (e.g. LocalCluster(n_workers=4, threads_per_worker=4, host = ‘tcp://127.0.0.1:8787’), but it didn’t produced any change.
It seems that the cluster is not being turned off after the execution of the code. I tried to explicit include

client.close()
cluster.close()

but it didn’t produce any change also.
I have also noticed the message

“Windows is not officially supported for dask/xgboost, contribution are welcomed.”

Might it be an incompatibility of xgboost with windows ?

Thanks in advance!

As the message states, Windows is not officially supported for the Dask functionality of XGBoost.

OK, I’ll try in a virtual machine with Linux.
Thank you!