Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass multiple arguments to dask.distributed.Client().map?

import dask.distributed
def f(x, y):
    return x, y
client = dask.distributed.Client()
client.map(f, [(1, 2), (2, 3)])

Does not work.

[<Future: status: pending, key: f-137239e2f6eafbe900c0087f550bc0ca>,
 <Future: status: pending, key: f-64f918a0c730c63955da91694fcf7acc>]

distributed.worker - WARNING -  Compute Failed
Function:  f
args:      ((1, 2))
kwargs:    {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)

distributed.worker - WARNING -  Compute Failed
Function:  f
args:      ((2, 3))
kwargs:    {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
like image 275
mathtick Avatar asked Mar 02 '19 23:03

mathtick


2 Answers

You do not quite have the signature right - perhaps the doc is not clear (suggestions welcome). Client.map() takes (variable number of) sets of arguments for each task submitted, not a single iterable thing. You should phrase this as

client.map(f, (1, 2), (2, 3))

or, if you wanted to stay closer to your original pattern

client.map(f, *[(1, 2), (2, 3)])
like image 153
mdurant Avatar answered Oct 29 '22 12:10

mdurant


Ok, the documentation is definitely a bit confusing on this one. And I couldn't find an example that clearly demonstrated this problem. So let me break it down below:

def test_fn(a, b, c, d, **kwargs):
    return a + b + c + d + kwargs["special"]

futures = client.map(test_fn, *[[1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4)], special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]

futures = client.map(test_fn, [1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4), special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]

Things to note:

  1. Doesn't matter if you use lists or tuples. And like I did above, you can mix them.
  2. You have to group arguments by their position. So if you're passing in 4 sets of arguments, the first list will contain the first argument from all 4 sets. (In this case, the "first" call to test_fn gets a=b=c=d=1.)
  3. Extra **kwargs (like special) are passed through to the function. But it'll be the same value for all function calls.

Now that I think about it, this isn't that surprising. I think it's just following Python's concurrent.futures.ProcessPoolExecutor.map() signature.

PS. Note that even though the documentation says "Returns:
List, iterator, or Queue of futures, depending on the type of the inputs.", you can actually get this error: Dask no longer supports mapping over Iterators or Queues. Consider using a normal for loop and Client.submit

like image 3
Alexei Andreev Avatar answered Oct 29 '22 14:10

Alexei Andreev