1 year ago

#68562

test-img

Dongjin Yoon

How to configure two servers with different IP in different networks into one cluster in Dask?

I'm trying to set up Dask cluster with docker containers of nodes in different networks.

Here is my simplified configurations with one worker. (public IP is randomly written)

Node configurations

1. Node 1 (Scheduler)

  1. Docker instance
    IP: 172.17.0.2
    Scheduler port: 8786
    Dashboard port: 8787

  2. Host
    IP: 123.456.78.910 (public IP)
    Forwarding port: 28786(8786), 28787(8787)
    Open for all ports

2. Node 2 (Worker)

  1. Docker instance
    IP: 172.17.0.2
    Worker port: 8786

  2. Host
    IP: 123.456.78.911 (public IP)
    Forwarding port: 28786
    Open for specific ports(22, 28786, 28787, ..) ← I think this might be the cause of the problem

Tried

1. Register worker to scheduler

// Scheduler container terminal

$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://172.17.0.2:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.17.0.2:28786', name: tcp://172.17.0.2:28786, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://172.17.0.2:28786
distributed.core - INFO - Starting established connection
// Worker container terminal

$ dask-worker 123.456.78.910:28786 --worker-port 28786 --no-nanny
distributed.worker - INFO -       Start worker at:     tcp://172.17.0.2:28786
distributed.worker - INFO -          Listening to:     tcp://172.17.0.2:28786
distributed.worker - INFO -          dashboard at:           172.17.0.2:43989
distributed.worker - INFO - Waiting to connect to: tcp://123.456.78.910:28786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         40
distributed.worker - INFO -                Memory:                 502.55 GiB
distributed.worker - INFO -       Local Directory: /root/dask-worker-space/worker-y9tym3ma
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to: tcp://123.456.78.910:28786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

I think the address(172.17.0.2) should be 123.456.78.911 in the scheduler log

distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.17.0.2:28786', name: tcp://172.17.0.2:28786, memory: 0, processing: 0>

but, adding the host option does not work.

// Worker container terminal

$ dask-worker 123.456.78.910:28786 --host 123.456.78.911 --worker-port 28786 --no-nanny
distributed.dask_worker - INFO - End worker
Traceback (most recent call last):
  File "/opt/conda/envs/rapids/bin/dask-worker", line 33, in <module>
    sys.exit(load_entry_point('distributed==2021.9.1', 'console_scripts', 'dask-worker')())
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/cli/dask_worker.py", line 465, in go
    main()
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/cli/dask_worker.py", line 451, in main
    loop.run_sync(run)
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/tornado/ioloop.py", line 530, in run_sync
    return future_cell[0].result()
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/cli/dask_worker.py", line 445, in run
    await asyncio.gather(*nannies)
  File "/opt/conda/envs/rapids/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/worker.py", line 1172, in start
    await self.listen(start_address, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/core.py", line 401, in listen
    listener = await listen(
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/comm/core.py", line 206, in _
    await self.start()
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/comm/tcp.py", line 470, in start
    sockets = netutil.bind_sockets(
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/tornado/netutil.py", line 161, in bind_sockets
    sock.bind(sockaddr)
OSError: [Errno 99] Cannot assign requested address

2. Python script

// This script is run in Scheduler container

from dask import Client

client = Client("localhost:8786")
print(client)

names   = [str(i) for i in range(10)]
futures = client.map(lambda name: f"task-{name}", names, key=names)
results = client.gather(futures)


############### Blocked in here ###############


print("\n- Results")
for i in results:
    print(i)
// Python output

- Distributed scheduler:<Client: 'tcp://172.17.0.2:8786' processes=1 threads=40, memory=502.55 GiB>
distributed.client - WARNING - Couldn't gather 10 keys, rescheduling {'0': ('tcp://172.17.0.2:28786',), '8': ('tcp://172.17.0.2:28786',), '2': ('tcp://172.17.0.2:28786',), '4': ('tcp://172.17.0.2:28786',), '1': ('tcp://172.17.0.2:28786',), '9': ('tcp://172.17.0.2:28786',), '5': ('tcp://172.17.0.2:28786',), '6': ('tcp://172.17.0.2:28786',), '7': ('tcp://172.17.0.2:28786',), '3': ('tcp://172.17.0.2:28786',)}
// Scheduler output

distributed.scheduler - ERROR - Couldn't gather keys {'0': ['tcp://172.17.0.2:28786'], '8': ['tcp://172.17.0.2:28786'], '2': ['tcp://172.17.0.2:28786'], '4': ['tcp://172.17.0.2:28786'], '1': ['tcp://172.17.0.2:28786'], '9': ['tcp://172.17.0.2:28786'], '5': ['tcp://172.17.0.2:28786'], '6': ['tcp://172.17.0.2:28786'], '7': ['tcp://172.17.0.2:28786'], '3': ['tcp://172.17.0.2:28786']} state: ['memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory'] workers: ['tcp://172.17.0.2:28786']
NoneType: None
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.17.0.2:28786', name: tcp://172.17.0.2:28786, memory: 10, processing: 0>
distributed.core - INFO - Removing comms to tcp://172.17.0.2:28786
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://172.17.0.2:28786'], 0
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://172.17.0.2:28786'], 8
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://172.17.0.2:28786'], 2
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://172.17.0.2:28786'], 4
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://172.17.0.2:28786'], 1
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://172.17.0.2:28786'], 9
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://172.17.0.2:28786'], 5
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://172.17.0.2:28786'], 6
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://172.17.0.2:28786'], 7
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://172.17.0.2:28786'], 3
NoneType: None
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.17.0.2:28786', name: tcp://172.17.0.2:28786, memory: 10, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://172.17.0.2:28786
distributed.core - INFO - Starting established connection

It seems the connection between the scheduler and the worker is not good.

How can I connect docker containers in other networks?

python

docker

dask

distributed-computing

dask-distributed

0 Answers

Your Answer

Accepted video resources