Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Local use of dask: to Client() or not to Client()?

Tags:

I am trying to understand the use patterns for Dask on a local machine.

Specifically,

  • I have a dataset that fits in memory
  • I'd like to do some pandas operations
    • groupby...
    • date parsing
    • etc.

Pandas performs these operations via a single core and these operations are taking hours for me. I have 8 cores on my machine and, as such, I'd like to use Dask to parallelize these operations as best as possible.

My question is as follows: What is the difference between the two way of doing this in Dask:

import pandas as pd
from sklearn.datasets import load_iris

iris = load_iris()

(1)

import dask.dataframe as dd

df = dd.from_pandas(
    pd.DataFrame(iris.data, columns=iris.feature_names),
    npartitions=2
)

df.mean().compute()

(2)

import dask.dataframe as dd
from distributed import Client

client = Client()

df = client.persist(
    dd.from_pandas(
        pd.DataFrame(iris.data, columns=iris.feature_names),
        npartitions=2
    )
)

df.mean().compute()

What is the benefit of one use pattern over the other? Why should I use one over the other?

like image 818
Jonathan Avatar asked May 30 '18 23:05

Jonathan


1 Answers

Version (2) has two differences compared to version (1): the choice to use the distributed scheduler, and persist. These are separate factors. There is a lot of documentation about both: https://distributed.readthedocs.io/en/latest/quickstart.html, http://dask.pydata.org/en/latest/dataframe-performance.html#persist-intelligently , so this answer can be kept brief.

1) The distributed scheduler is newer and smarter than the previous threaded and multiprocess schedulers. As the name suggests, it is able to use a cluster, but also works on a single machine. Although the latency when calling .compute() is generally higher, in many ways it is more efficient, has more advanced features such as real-time dynamic programming and more diagnostics such as the dashboard. When created with Client(), you by default get a number of processes equal to the number of cores, but you can choose the number of processes and threads, and get close to the original threads-only situation with Client(processes=False).

2) Persisting means evaluating a computation and storing it in memory, so that further computations are faster. You can also persist without the distributed client (dask.persist). It effectively offers to trade memory for performance because you don't need to re-evalute the computation each time you use it for anything that depends on it. In the case where you go on to perform only one computation on the intermediate, as in the example, it should make no difference to performance.

like image 117
mdurant Avatar answered Sep 28 '22 18:09

mdurant