I have a computation which runs much slower within a Dask/Distributed worker compared to running it locally. I can reproduce it without any I/O going on, so I can rule out that it has to do with transferring data. The following code is a minimal reproducing example:
import time
import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster
def gen_data(N=5000000):
""" Dummy data generator """
df = pd.DataFrame(index=range(N))
for c in range(10):
df[str(c)] = np.random.uniform(size=N)
df["id"] = np.random.choice(range(100), size=len(df))
return df
def do_something_on_df(df):
""" Dummy computation that contains inplace mutations """
for c in range(df.shape[1]):
df[str(c)] = np.random.uniform(size=df.shape[0])
return 42
def run_test():
""" Test computation """
df = gen_data()
for key, group_df in df.groupby("id"):
do_something_on_df(group_df)
class TimedContext(object):
def __enter__(self):
self.t1 = time.time()
def __exit__(self, exc_type, exc_val, exc_tb):
self.t2 = time.time()
print(self.t2 - self.t1)
if __name__ == "__main__":
client = Client("tcp://10.0.2.15:8786")
with TimedContext():
run_test()
with TimedContext():
client.submit(run_test).result()
Running the test computation locally takes ~10 seconds, but it takes ~30 seconds from within Dask/Distributed. I also noticed that the Dask/Distributed workers output a lot of log messages like
distributed.core - WARNING - Event loop was unresponsive for 1.03s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.25s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.91s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.99s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.50s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.90s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 2.23s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
...
which are surprising, because it's not clear what is holding the GIL in this example.
Why is there such a big performance difference? And what can I do to get the same performance?
Disclaimer: Self answering for documentation purposes...
compute() concatenates all the Dask DataFrame partitions into a single pandas DataFrame. When the Dask DataFrame contains data that's split across multiple nodes in a cluster, then compute() may run slowly. It can also cause out of memory errors if the data isn't small enough to fit in the memory of a single machine.
The Dask Client The Client creates the Directed Acyclic Graph (DAG) of tasks by analysing the code, and is responsible for telling the scheduler what to compute. It gathers results from the workers and aggregates the results in the Client process.
This behavior is a result of a very surprising behavior of Pandas. By default, Pandas __setitem__
handlers perform checks to detect chained assignments, leading to the famous SettingWithCopyWarning
. When dealing with a copy, these checks issue a call to gc.collect
here. Thus, code that uses __setitem__
excessively will result in excessive gc.collect
calls. This can have a significant impact on performance in general, but the problem is much worse within a Dask/Distributed worker, because the garbage collection has to deal with much more Python data structures compared to running stand-alone. Most likely the hidden garbage collection calls are also the source of the GIL-holding warnings.
Therefore, the solution is to avoid these excessive gc.collect
calls. There are two ways:
__setitem__
on copies: Arguably the best solution, but it requires some understanding of where copies are produced. In the above example this could be achieved by changing the function call to do_something_on_df(group_df.copy())
.pd.options.mode.chained_assignment = None
at the beginning of the computation also disables the gc.collect
calls.In both cases the test computation runs much faster than before with ~3.5 seconds both locally and under Dask/Distributed. This also gets rid of the GIL-holding warnings.
Note: I have filed an issue for this on GitHub.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With