Imagine I have a dask grid with 10 workers & 40 cores totals. This is a shared grid, so I don't want to fully saturate it with my work. I have 1000 tasks to do, and I want to submit (and have actively running) a maximum of 20 tasks at a time.
To be concrete,
from time import sleep
from random import random
def inc(x):
from random import random
sleep(random() * 2)
return x + 1
def double(x):
from random import random
sleep(random())
return 2 * x
>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>
If I setup a system of Queues
>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)
This will work, BUT, this will just dump ALL of my tasks to the grid, saturating it. Ideally I could:
e.scatter(input_q, max_submit=20)
It seems that the example from the docs here would allow me to use a maxsize
queue. But that looks like from a user-perspective I would still have to deal with the backpressure. Ideally dask
would automatically take care of this.
maxsize=
You're very close. All of scatter
, gather
, and map
take the same maxsize=
keyword argument that Queue
takes. So a simple workflow might be as follows:
from time import sleep
def inc(x):
sleep(1)
return x + 1
your_input_data = list(range(1000))
from queue import Queue # Put your data into a queue
q = Queue()
for i in your_input_data:
q.put(i)
from dask.distributed import Executor
e = Executor('127.0.0.1:8786') # Connect to cluster
futures = e.map(inc, q, maxsize=20) # Map inc over data
results = e.gather(futures) # Gather results
L = []
while not q.empty() or not futures.empty() or not results.empty():
L.append(results.get()) # this blocks waiting for all results
All of q
, futures
, and results
are Python Queue objects. The q
and results
queues don't have a limit, so they'll greedily pull in as much as they can. The futures
queue however has a maximum size of 20, so it will only allow 20 futures in flight at any given time. Once the leading future is complete it will immediately be consumed by the gather function and its result will be placed into the results
queue. This frees up space in futures
and causes another task to be submitted.
Note that this isn't exactly what you wanted. These queues are ordered so futures will only get popped off when they're in the front of the queue. If all of the in-flight futures have finished except for the first they'll still stay in the queue, taking up space. Given this constraint you might want to choose a maxsize=
slightly more than your desired 20
items.
Here we do a simple map->gather
pipeline with no logic in between. You could also put other map
computations in here or even pull futures out of the queues and do custom work with them on your own. It's easy to break out of the mold provided above.
The solution posted on github was very useful - https://github.com/dask/distributed/issues/864
Solution:
inputs = iter(inputs)
futures = [c.submit(func, next(inputs)) for i in range(maxsize)]
ac = as_completed(futures)
for finished_future in ac:
# submit new future
try:
new_future = c.submit(func, next(inputs))
ac.add(new_future)
except StopIteration:
pass
result = finished_future.result()
... # do stuff with result
Query:
However for determining the workers that are free for throttling the tasks, am trying to utilize the client.has_what() api. Seems like the load on workers does not get reflected immediately similar to what is shown on the status UI page. At times it takes quite a bit of time for has_what to reflect any data.
Is there another api that can be used to determine number of free workers which can then be used to determine the throttle range similar to what UI is utilizing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With