1717from distributed .deploy .utils import nprocesses_nthreads
1818from distributed .metrics import time
1919from distributed .utils import parse_ports , sync , tmpfile
20- from distributed .utils_test import popen , terminate_process , wait_for_port
20+ from distributed .utils_test import gen_cluster , popen , terminate_process , wait_for_port
2121
2222
2323def test_nanny_worker_ports (loop ):
24- with popen (["dask-scheduler" , "--port" , "9359" , "--no-dashboard" ]) as sched :
24+ with popen (["dask-scheduler" , "--port" , "9359" , "--no-dashboard" ]):
2525 with popen (
2626 [
2727 "dask-worker" ,
@@ -34,7 +34,7 @@ def test_nanny_worker_ports(loop):
3434 "5273" ,
3535 "--no-dashboard" ,
3636 ]
37- ) as worker :
37+ ):
3838 with Client ("127.0.0.1:9359" , loop = loop ) as c :
3939 start = time ()
4040 while True :
@@ -50,6 +50,7 @@ def test_nanny_worker_ports(loop):
5050 )
5151
5252
53+ @pytest .mark .slow
5354def test_nanny_worker_port_range (loop ):
5455 with popen (["dask-scheduler" , "--port" , "9359" , "--no-dashboard" ]) as sched :
5556 nprocs = 3
@@ -69,7 +70,7 @@ def test_nanny_worker_port_range(loop):
6970 nanny_port ,
7071 "--no-dashboard" ,
7172 ]
72- ) as worker :
73+ ):
7374 with Client ("127.0.0.1:9359" , loop = loop ) as c :
7475 start = time ()
7576 while len (c .scheduler_info ()["workers" ]) < nprocs :
@@ -89,7 +90,7 @@ def get_port(dask_worker):
8990
9091
9192def test_nanny_worker_port_range_too_many_workers_raises (loop ):
92- with popen (["dask-scheduler" , "--port" , "9359" , "--no-dashboard" ]) as sched :
93+ with popen (["dask-scheduler" , "--port" , "9359" , "--no-dashboard" ]):
9394 with popen (
9495 [
9596 "dask-worker" ,
@@ -111,7 +112,7 @@ def test_nanny_worker_port_range_too_many_workers_raises(loop):
111112
112113
113114def test_memory_limit (loop ):
114- with popen (["dask-scheduler" , "--no-dashboard" ]) as sched :
115+ with popen (["dask-scheduler" , "--no-dashboard" ]):
115116 with popen (
116117 [
117118 "dask-worker" ,
@@ -120,7 +121,7 @@ def test_memory_limit(loop):
120121 "2e3MB" ,
121122 "--no-dashboard" ,
122123 ]
123- ) as worker :
124+ ):
124125 with Client ("127.0.0.1:8786" , loop = loop ) as c :
125126 while not c .nthreads ():
126127 sleep (0.1 )
@@ -131,7 +132,7 @@ def test_memory_limit(loop):
131132
132133
133134def test_no_nanny (loop ):
134- with popen (["dask-scheduler" , "--no-dashboard" ]) as sched :
135+ with popen (["dask-scheduler" , "--no-dashboard" ]):
135136 with popen (
136137 ["dask-worker" , "127.0.0.1:8786" , "--no-nanny" , "--no-dashboard" ]
137138 ) as worker :
@@ -157,11 +158,11 @@ def test_no_reconnect(nanny, loop):
157158 start = time ()
158159 while worker .poll () is None :
159160 sleep (0.1 )
160- assert time () < start + 10
161+ assert time () < start + 30
161162
162163
163164def test_resources (loop ):
164- with popen (["dask-scheduler" , "--no-dashboard" ]) as sched :
165+ with popen (["dask-scheduler" , "--no-dashboard" ]):
165166 with popen (
166167 [
167168 "dask-worker" ,
@@ -170,7 +171,7 @@ def test_resources(loop):
170171 "--resources" ,
171172 "A=1 B=2,C=3" ,
172173 ]
173- ) as worker :
174+ ):
174175 with Client ("127.0.0.1:8786" , loop = loop ) as c :
175176 while not c .scheduler_info ()["workers" ]:
176177 sleep (0.1 )
@@ -182,7 +183,7 @@ def test_resources(loop):
182183@pytest .mark .parametrize ("nanny" , ["--nanny" , "--no-nanny" ])
183184def test_local_directory (loop , nanny ):
184185 with tmpfile () as fn :
185- with popen (["dask-scheduler" , "--no-dashboard" ]) as sched :
186+ with popen (["dask-scheduler" , "--no-dashboard" ]):
186187 with popen (
187188 [
188189 "dask-worker" ,
@@ -192,7 +193,7 @@ def test_local_directory(loop, nanny):
192193 "--local-directory" ,
193194 fn ,
194195 ]
195- ) as worker :
196+ ):
196197 with Client ("127.0.0.1:8786" , loop = loop , timeout = 10 ) as c :
197198 start = time ()
198199 while not c .scheduler_info ()["workers" ]:
@@ -206,9 +207,7 @@ def test_local_directory(loop, nanny):
206207@pytest .mark .parametrize ("nanny" , ["--nanny" , "--no-nanny" ])
207208def test_scheduler_file (loop , nanny ):
208209 with tmpfile () as fn :
209- with popen (
210- ["dask-scheduler" , "--no-dashboard" , "--scheduler-file" , fn ]
211- ) as sched :
210+ with popen (["dask-scheduler" , "--no-dashboard" , "--scheduler-file" , fn ]):
212211 with popen (
213212 ["dask-worker" , "--scheduler-file" , fn , nanny , "--no-dashboard" ]
214213 ):
@@ -221,7 +220,7 @@ def test_scheduler_file(loop, nanny):
221220
222221def test_scheduler_address_env (loop , monkeypatch ):
223222 monkeypatch .setenv ("DASK_SCHEDULER_ADDRESS" , "tcp://127.0.0.1:8786" )
224- with popen (["dask-scheduler" , "--no-dashboard" ]) as sched :
223+ with popen (["dask-scheduler" , "--no-dashboard" ]):
225224 with popen (["dask-worker" , "--no-dashboard" ]):
226225 with Client (os .environ ["DASK_SCHEDULER_ADDRESS" ], loop = loop ) as c :
227226 start = time ()
@@ -231,7 +230,7 @@ def test_scheduler_address_env(loop, monkeypatch):
231230
232231
233232def test_nprocs_requires_nanny (loop ):
234- with popen (["dask-scheduler" , "--no-dashboard" ]) as sched :
233+ with popen (["dask-scheduler" , "--no-dashboard" ]):
235234 with popen (
236235 ["dask-worker" , "127.0.0.1:8786" , "--nprocs=2" , "--no-nanny" ]
237236 ) as worker :
@@ -242,31 +241,29 @@ def test_nprocs_requires_nanny(loop):
242241
243242
244243def test_nprocs_negative (loop ):
245- with popen (["dask-scheduler" , "--no-dashboard" ]) as sched :
246- with popen (["dask-worker" , "127.0.0.1:8786" , "--nprocs=-1" ]) as worker :
244+ with popen (["dask-scheduler" , "--no-dashboard" ]):
245+ with popen (["dask-worker" , "127.0.0.1:8786" , "--nprocs=-1" ]):
247246 with Client ("tcp://127.0.0.1:8786" , loop = loop ) as c :
248247 c .wait_for_workers (cpu_count (), timeout = "10 seconds" )
249248
250249
251250def test_nprocs_auto (loop ):
252- with popen (["dask-scheduler" , "--no-dashboard" ]) as sched :
253- with popen (["dask-worker" , "127.0.0.1:8786" , "--nprocs=auto" ]) as worker :
251+ with popen (["dask-scheduler" , "--no-dashboard" ]):
252+ with popen (["dask-worker" , "127.0.0.1:8786" , "--nprocs=auto" ]):
254253 with Client ("tcp://127.0.0.1:8786" , loop = loop ) as c :
255254 procs , _ = nprocesses_nthreads ()
256255 c .wait_for_workers (procs , timeout = "10 seconds" )
257256
258257
259258def test_nprocs_expands_name (loop ):
260- with popen (["dask-scheduler" , "--no-dashboard" ]) as sched :
261- with popen (
262- ["dask-worker" , "127.0.0.1:8786" , "--nprocs" , "2" , "--name" , "0" ]
263- ) as worker :
264- with popen (["dask-worker" , "127.0.0.1:8786" , "--nprocs" , "2" ]) as worker :
259+ with popen (["dask-scheduler" , "--no-dashboard" ]):
260+ with popen (["dask-worker" , "127.0.0.1:8786" , "--nprocs" , "2" , "--name" , "0" ]):
261+ with popen (["dask-worker" , "127.0.0.1:8786" , "--nprocs" , "2" ]):
265262 with Client ("tcp://127.0.0.1:8786" , loop = loop ) as c :
266263 start = time ()
267264 while len (c .scheduler_info ()["workers" ]) < 4 :
268265 sleep (0.2 )
269- assert time () < start + 10
266+ assert time () < start + 30
270267
271268 info = c .scheduler_info ()
272269 names = [d ["name" ] for d in info ["workers" ].values ()]
@@ -281,7 +278,7 @@ def test_nprocs_expands_name(loop):
281278 "listen_address" , ["tcp://0.0.0.0:39837" , "tcp://127.0.0.2:39837" ]
282279)
283280def test_contact_listen_address (loop , nanny , listen_address ):
284- with popen (["dask-scheduler" , "--no-dashboard" ]) as sched :
281+ with popen (["dask-scheduler" , "--no-dashboard" ]):
285282 with popen (
286283 [
287284 "dask-worker" ,
@@ -293,7 +290,7 @@ def test_contact_listen_address(loop, nanny, listen_address):
293290 "--listen-address" ,
294291 listen_address ,
295292 ]
296- ) as worker :
293+ ):
297294 with Client ("127.0.0.1:8786" ) as client :
298295 while not client .nthreads ():
299296 sleep (0.1 )
@@ -313,14 +310,14 @@ def func(dask_worker):
313310@pytest .mark .parametrize ("nanny" , ["--nanny" , "--no-nanny" ])
314311@pytest .mark .parametrize ("host" , ["127.0.0.2" , "0.0.0.0" ])
315312def test_respect_host_listen_address (loop , nanny , host ):
316- with popen (["dask-scheduler" , "--no-dashboard" ]) as sched :
313+ with popen (["dask-scheduler" , "--no-dashboard" ]):
317314 with popen (
318315 ["dask-worker" , "127.0.0.1:8786" , nanny , "--no-dashboard" , "--host" , host ]
319316 ) as worker :
320317 with Client ("127.0.0.1:8786" ) as client :
321318 while not client .nthreads ():
322319 sleep (0.1 )
323- info = client .scheduler_info ()
320+ client .scheduler_info ()
324321
325322 # roundtrip works
326323 assert client .submit (lambda x : x + 1 , 10 ).result () == 11
@@ -341,7 +338,7 @@ def test_dashboard_non_standard_ports(loop):
341338 except ImportError :
342339 proxy_exists = False
343340
344- with popen (["dask-scheduler" , "--port" , "3449" ]) as s :
341+ with popen (["dask-scheduler" , "--port" , "3449" ]):
345342 with popen (
346343 [
347344 "dask-worker" ,
@@ -351,7 +348,7 @@ def test_dashboard_non_standard_ports(loop):
351348 "--host" ,
352349 "127.0.0.1" ,
353350 ]
354- ) as proc :
351+ ):
355352 with Client ("127.0.0.1:3449" , loop = loop ) as c :
356353 c .wait_for_workers (1 )
357354 pass
@@ -406,14 +403,13 @@ def test_bokeh_deprecation():
406403 pass
407404
408405
409- @pytest .mark .asyncio
410- async def test_integer_names (cleanup ):
411- async with Scheduler (port = 0 ) as s :
412- with popen (["dask-worker" , s .address , "--name" , "123" ]) as worker :
413- while not s .workers :
414- await asyncio .sleep (0.01 )
415- [ws ] = s .workers .values ()
416- assert ws .name == 123
406+ @gen_cluster (nthreads = [])
407+ async def test_integer_names (s ):
408+ with popen (["dask-worker" , s .address , "--name" , "123" ]):
409+ while not s .workers :
410+ await asyncio .sleep (0.01 )
411+ [ws ] = s .workers .values ()
412+ assert ws .name == 123
417413
418414
419415@pytest .mark .asyncio
@@ -438,7 +434,7 @@ class MyWorker(Worker):
438434 else :
439435 env ["PYTHONPATH" ] = tmpdir
440436
441- async with Scheduler (port = 0 ) as s :
437+ async with Scheduler (dashboard_address = ":0" ) as s :
442438 async with Client (s .address , asynchronous = True ) as c :
443439 with popen (
444440 [
@@ -449,7 +445,7 @@ class MyWorker(Worker):
449445 "myworker.MyWorker" ,
450446 ],
451447 env = env ,
452- ) as worker :
448+ ):
453449 await c .wait_for_workers (1 )
454450
455451 def worker_type (dask_worker ):
0 commit comments