Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is a computation much slower within a Dask/Distributed worker?

I have a computation which runs much slower within a Dask/Distributed worker compared to running it locally. I can reproduce it without any I/O going on, so I can rule out that it has to do with transferring data. The following code is a minimal reproducing example:

import time
import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster


def gen_data(N=5000000):
    """ Dummy data generator """
    df = pd.DataFrame(index=range(N))
    for c in range(10):
        df[str(c)] = np.random.uniform(size=N)
    df["id"] = np.random.choice(range(100), size=len(df))
    return df


def do_something_on_df(df):
    """ Dummy computation that contains inplace mutations """
    for c in range(df.shape[1]):
        df[str(c)] = np.random.uniform(size=df.shape[0])
    return 42


def run_test():
    """ Test computation """
    df = gen_data()
    for key, group_df in df.groupby("id"):
        do_something_on_df(group_df)


class TimedContext(object):
    def __enter__(self):
        self.t1 = time.time()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.t2 = time.time()
        print(self.t2 - self.t1)

if __name__ == "__main__":
    client = Client("tcp://10.0.2.15:8786")

    with TimedContext():
        run_test()

    with TimedContext():
        client.submit(run_test).result()

Running the test computation locally takes ~10 seconds, but it takes ~30 seconds from within Dask/Distributed. I also noticed that the Dask/Distributed workers output a lot of log messages like

distributed.core - WARNING - Event loop was unresponsive for 1.03s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.25s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.91s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.99s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.50s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.90s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 2.23s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
...

which are surprising, because it's not clear what is holding the GIL in this example.

Why is there such a big performance difference? And what can I do to get the same performance?

Disclaimer: Self answering for documentation purposes...

like image 447
bluenote10 Avatar asked Dec 12 '17 16:12

bluenote10


People also ask

What does DASK compute do?

compute() concatenates all the Dask DataFrame partitions into a single pandas DataFrame. When the Dask DataFrame contains data that's split across multiple nodes in a cluster, then compute() may run slowly. It can also cause out of memory errors if the data isn't small enough to fit in the memory of a single machine.

What is DASK client?

The Dask Client The Client creates the Directed Acyclic Graph (DAG) of tasks by analysing the code, and is responsible for telling the scheduler what to compute. It gathers results from the workers and aggregates the results in the Client process.


1 Answers

This behavior is a result of a very surprising behavior of Pandas. By default, Pandas __setitem__ handlers perform checks to detect chained assignments, leading to the famous SettingWithCopyWarning. When dealing with a copy, these checks issue a call to gc.collect here. Thus, code that uses __setitem__ excessively will result in excessive gc.collect calls. This can have a significant impact on performance in general, but the problem is much worse within a Dask/Distributed worker, because the garbage collection has to deal with much more Python data structures compared to running stand-alone. Most likely the hidden garbage collection calls are also the source of the GIL-holding warnings.

Therefore, the solution is to avoid these excessive gc.collect calls. There are two ways:

  • Avoiding to use __setitem__ on copies: Arguably the best solution, but it requires some understanding of where copies are produced. In the above example this could be achieved by changing the function call to do_something_on_df(group_df.copy()).
  • Disabling chained assignment checks: Simply placing pd.options.mode.chained_assignment = None at the beginning of the computation also disables the gc.collect calls.

In both cases the test computation runs much faster than before with ~3.5 seconds both locally and under Dask/Distributed. This also gets rid of the GIL-holding warnings.

Note: I have filed an issue for this on GitHub.

like image 197
bluenote10 Avatar answered Oct 13 '22 23:10

bluenote10