Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to throttle a large number of tasks without using all workers

Tags:

python

dask

Imagine I have a dask grid with 10 workers & 40 cores totals. This is a shared grid, so I don't want to fully saturate it with my work. I have 1000 tasks to do, and I want to submit (and have actively running) a maximum of 20 tasks at a time.

To be concrete,

from time import sleep
from random import random

def inc(x):
    from random import random
    sleep(random() * 2)
    return x + 1

def double(x):
    from random import random
    sleep(random())
    return 2 * x

>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>

If I setup a system of Queues

>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)

This will work, BUT, this will just dump ALL of my tasks to the grid, saturating it. Ideally I could:

e.scatter(input_q, max_submit=20)

It seems that the example from the docs here would allow me to use a maxsize queue. But that looks like from a user-perspective I would still have to deal with the backpressure. Ideally dask would automatically take care of this.

like image 711
Jeff Avatar asked Aug 12 '16 15:08

Jeff


2 Answers

Use maxsize=

You're very close. All of scatter, gather, and map take the same maxsize= keyword argument that Queue takes. So a simple workflow might be as follows:

Example

from time import sleep

def inc(x):
    sleep(1)
    return x + 1

your_input_data = list(range(1000))

from queue import Queue              # Put your data into a queue
q = Queue()
for i in your_input_data:
    q.put(i)

from dask.distributed import Executor
e = Executor('127.0.0.1:8786')        # Connect to cluster


futures = e.map(inc, q, maxsize=20)  # Map inc over data
results = e.gather(futures)          # Gather results

L = []
while not q.empty() or not futures.empty() or not results.empty():
    L.append(results.get())  # this blocks waiting for all results

All of q, futures, and results are Python Queue objects. The q and results queues don't have a limit, so they'll greedily pull in as much as they can. The futures queue however has a maximum size of 20, so it will only allow 20 futures in flight at any given time. Once the leading future is complete it will immediately be consumed by the gather function and its result will be placed into the results queue. This frees up space in futures and causes another task to be submitted.

Note that this isn't exactly what you wanted. These queues are ordered so futures will only get popped off when they're in the front of the queue. If all of the in-flight futures have finished except for the first they'll still stay in the queue, taking up space. Given this constraint you might want to choose a maxsize= slightly more than your desired 20 items.

Extending this

Here we do a simple map->gather pipeline with no logic in between. You could also put other map computations in here or even pull futures out of the queues and do custom work with them on your own. It's easy to break out of the mold provided above.

like image 178
MRocklin Avatar answered Nov 18 '22 23:11

MRocklin


The solution posted on github was very useful - https://github.com/dask/distributed/issues/864

Solution:

inputs = iter(inputs)
futures = [c.submit(func, next(inputs)) for i in range(maxsize)]
ac = as_completed(futures)

for finished_future in ac:
    # submit new future 
    try:
        new_future = c.submit(func, next(inputs))
        ac.add(new_future)
    except StopIteration:
        pass
    result = finished_future.result() 
    ... # do stuff with result

Query:

However for determining the workers that are free for throttling the tasks, am trying to utilize the client.has_what() api. Seems like the load on workers does not get reflected immediately similar to what is shown on the status UI page. At times it takes quite a bit of time for has_what to reflect any data.

Is there another api that can be used to determine number of free workers which can then be used to determine the throttle range similar to what UI is utilizing.

like image 1
Ameet Shah Avatar answered Nov 18 '22 22:11

Ameet Shah