Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dask: How would I parallelize my code with dask delayed?

This is my first venture into parallel processing and I have been looking into Dask but I am having trouble actually coding it.

I have had a look at their examples and documentation and I think dask.delayed will work best. I attempted to wrap my functions with the delayed(function_name), or add an @delayed decorator, but I can't seem to get it working properly. I preferred Dask over other methods since it is made in python and for its (supposed) simplicity. I know dask doesn't work on the for loop, but they say it can work inside a loop.

My code passes files through a function that contains inputs to other functions and looks like this:

from dask import delayed filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ] for count, name in enumerate(filenames)"     name = name.split('.')[0]     .... 

then do some pre-processing ex:

    preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name) 

then I call a constructor and pass the pre_results in to the function calls:

    fc = FunctionCalls()     Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',                              input_data=pre_result1, model1=pre_result2) 

What i do here is I pass the file into the for loop, do some pre-processing and then pass the file into two models.

Thoughts or tips on how to do parallelize this? I began getting odd errors and I had no idea how to fix the code. The code does work as is. I use a bunch of pandas dataframes, series, and numpy arrays, and I would prefer not to go back and change everything to work with dask.dataframes etc.

The code in my comment may be difficult to read. Here it is in a more formatted way.

In the code below, when I type print(mean_squared_error) I just get: Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')

from dask import delayed import pandas as pd from sklearn.metrics import mean_squared_error as mse filenames = ['file1.csv']  for count, name in enumerate(filenames):     file1 = pd.read_csv(name)     df = pd.DataFrame(file1)     prediction = df['Close'][:-1]     observed = df['Close'][1:]     mean_squared_error = delayed(mse)(observed, prediction) 
like image 775
Monty Avatar asked Mar 02 '17 08:03

Monty


People also ask

How to parallelize DASK's delayed tasks?

You achieve parallelism by having many delayed calls, not by using only a single one: Dask will not look inside a function decorated with @dask.delayed and parallelize that code internally. To accomplish that, it needs your help to find good places to break up a computation. The first version only has one delayed task, and so cannot parallelize.

How to parallelize normal Python code using DASK?

We can easily parallelize our normal python code using dask by following list of below steps: Find out the list of time-consuming functions in code that are getting executed more than once. Convert a list of time-consuming functions from normal to lazy by using dask.delayed. All functions calling these lazy functions will also become lazy.

What is DASK delayed API in Python?

The API of dask.delayed is simplified which lets us parallelize our python code very easily. The dask.delayed API is used to convert normal function to lazy function.

How to run multiple lazy functions in parallel in DASK?

Scenario 2: If there are more than one final lazy functions then we can use compute () method of dask. We need to pass all lazy functions to this method and it'll run all of them in parallel and returns results for all of them. We'll be explaining these steps below with few examples to make things clear.


2 Answers

You need to call dask.compute to eventually compute the result. See dask.delayed documentation.

Sequential code

import pandas as pd from sklearn.metrics import mean_squared_error as mse filenames = [...]  results = [] for count, name in enumerate(filenames):     file1 = pd.read_csv(name)     df = pd.DataFrame(file1)  # isn't this already a dataframe?     prediction = df['Close'][:-1]     observed = df['Close'][1:]     mean_squared_error = mse(observed, prediction)       results.append(mean_squared_error) 

Parallel code

import dask import pandas as pd from sklearn.metrics import mean_squared_error as mse filenames = [...]  delayed_results = [] for count, name in enumerate(filenames):     df = dask.delayed(pd.read_csv)(name)     prediction = df['Close'][:-1]     observed = df['Close'][1:]     mean_squared_error = dask.delayed(mse)(observed, prediction)     delayed_results.append(mean_squared_error)  results = dask.compute(*delayed_results) 
like image 96
MRocklin Avatar answered Oct 24 '22 18:10

MRocklin


A much clearer solution, IMO, than the accepted answer is this snippet.

from dask import compute, delayed import pandas as pd from sklearn.metrics import mean_squared_error as mse filenames = [...]  def compute_mse(file_name):     df = pd.read_csv(file_name)     prediction = df['Close'][:-1]     observed = df['Close'][1:]     return mse(observed, prediction)  delayed_results = [delayed(compute_mse)(file_name) for file_name in filenames] mean_squared_errors = compute(*delayed_results, scheduler="processes") 
like image 35
Vitalis Avatar answered Oct 24 '22 20:10

Vitalis