I recently found dask module that aims to be an easy-to-use python parallel processing module. Big selling point for me is that it works with pandas.
After reading a bit on its manual page, I can't find a way to do this trivially parallelizable task:
ts.apply(func) # for pandas series df.apply(func, axis = 1) # for pandas DF row apply
At the moment, to achieve this in dask, AFAIK,
ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
which is ugly syntax and is actually slower than outright
df.apply(func, axis = 1) # for pandas DF row apply
Any suggestion?
Edit: Thanks @MRocklin for the map function. It seems to be slower than plain pandas apply. Is this related to pandas GIL releasing issue or am I doing it wrong?
import dask.dataframe as dd s = pd.Series([10000]*120) ds = dd.from_pandas(s, npartitions = 3) def slow_func(k): A = np.random.normal(size = k) # k = 10000 s = 0 for a in A: if a > 0: s += 1 else: s -= 1 return s s.apply(slow_func) # 0.43 sec ds.map(slow_func).compute() # 2.04 sec
Just like Pandas, Dask DataFrame supports label-based indexing with the . loc accessor for selecting rows or columns, and __getitem__ (square brackets) for selecting just columns. To select rows, the DataFrame's divisions must be known (see Internal Design and Dask DataFrames Best Practices for more information.)
Use Cases of Dask Parallelizing data science apps: to achieve parallelism in any data science and ML solution, Dask is the preferred choice because parallelism is not limited to a single application. You can also parallelize multiple applications on the same hardware/cluster.
The original pandas query took 182 seconds and the optimized Dask query took 19 seconds, which is about 10 times faster. Dask can provide performance boosts over pandas because it can execute common operations in parallel, where pandas is limited to a single core.
A Dask DataFrame is a large parallel DataFrame composed of many smaller pandas DataFrames, split along the index. These pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster.
map_partitions
You can apply your function to all of the partitions of your dataframe with the map_partitions
function.
df.map_partitions(func, columns=...)
Note that func will be given only part of the dataset at a time, not the entire dataset like with pandas apply
(which presumably you wouldn't want if you want to do parallelism.)
map
/ apply
You can map a function row-wise across a series with map
df.mycolumn.map(func)
You can map a function row-wise across a dataframe with apply
df.apply(func, axis=1)
As of version 0.6.0 dask.dataframes
parallelizes with threads. Custom Python functions will not receive much benefit from thread-based parallelism. You could try processes instead
df = dd.read_csv(...) df.map_partitions(func, columns=...).compute(scheduler='processes')
apply
However, you should really avoid apply
with custom Python functions, both in Pandas and in Dask. This is often a source of poor performance. It could be that if you find a way to do your operation in a vectorized manner then it could be that your Pandas code will be 100x faster and you won't need dask.dataframe at all.
numba
For your particular problem you might consider numba
. This significantly improves your performance.
In [1]: import numpy as np In [2]: import pandas as pd In [3]: s = pd.Series([10000]*120) In [4]: %paste def slow_func(k): A = np.random.normal(size = k) # k = 10000 s = 0 for a in A: if a > 0: s += 1 else: s -= 1 return s ## -- End pasted text -- In [5]: %time _ = s.apply(slow_func) CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms Wall time: 347 ms In [6]: import numba In [7]: fast_func = numba.jit(slow_func) In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead CPU times: user 179 ms, sys: 0 ns, total: 179 ms Wall time: 175 ms In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms Wall time: 68.7 ms
Disclaimer, I work for the company that makes both numba
and dask
and employs many of the pandas
developers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With