Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to parallelize groupby() in dask?

I tried:

df.groupby('name').agg('count').compute(num_workers=1)
df.groupby('name').agg('count').compute(num_workers=4)

They take the same time, why num_workers does not work?

Thanks

like image 522
Robin1988 Avatar asked Apr 09 '19 19:04

Robin1988


People also ask

How do you count in Groupby pandas?

Use count() by Column Name Use pandas DataFrame. groupby() to group the rows by column and use count() method to get the count for each group by ignoring None and Nan values. It works with non-floating type data as well.

Is DASK faster than pandas?

The original pandas query took 182 seconds and the optimized Dask query took 19 seconds, which is about 10 times faster. Dask can provide performance boosts over pandas because it can execute common operations in parallel, where pandas is limited to a single core.

Is DASK apply parallel?

Dask helps developers scale their entire Python ecosystem, and it can work with your laptop or a container cluster. Dask is a one-stop solution for larger-than-memory data sets, as it provides multicore and distributed parallel execution. It also provides a general implementation for different concepts in programming.

What is Groupby method?

A groupby operation involves some combination of splitting the object, applying a function, and combining the results. This can be used to group large amounts of data and compute operations on these groups. Parameters bymapping, function, label, or list of labels. Used to determine the groups for the groupby.


1 Answers

By default, Dask will work with multi-threaded tasks which means it uses a single processor on your computer. (Note that using dask is nevertheless interesting if you have data that can't fit in memory)

If you want to use several processors to compute your operation, you have to use a different scheduler:

from dask import dataframe as dd
from dask.distributed import LocalCluster, Client

df = dd.read_csv("data.csv")

def group(num_workers): 
    start = time.time() 
    res = df.groupby("name").agg("count").compute(num_workers=num_workers) 
    end = time.time() 
    return res, end-start

print(group(4))

clust = LocalCluster()
clt = Client(clust, set_as_default=True) 
print(group(4)) 

Here, I create a local cluster using 4 parallel processes (because I have a quadcore) and then set a default scheduling client that will use this local cluster to perform the Dask operations. With a CSV two columns file of 1.5 Gb, the standard groupby takes around 35 seconds on my laptop whereas the multiprocess one only takes around 22 seconds.

like image 123
Olivier CAYROL Avatar answered Sep 28 '22 17:09

Olivier CAYROL