This is my first venture into parallel processing and I have been looking into Dask but I am having trouble actually coding it.
I have had a look at their examples and documentation and I think dask.delayed will work best. I attempted to wrap my functions with the delayed(function_name), or add an @delayed decorator, but I can't seem to get it working properly. I preferred Dask over other methods since it is made in python and for its (supposed) simplicity. I know dask doesn't work on the for loop, but they say it can work inside a loop.
My code passes files through a function that contains inputs to other functions and looks like this:
from dask import delayed filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ] for count, name in enumerate(filenames)" name = name.split('.')[0] ....
then do some pre-processing ex:
preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)
then I call a constructor and pass the pre_results in to the function calls:
fc = FunctionCalls() Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily', input_data=pre_result1, model1=pre_result2)
What i do here is I pass the file into the for loop, do some pre-processing and then pass the file into two models.
Thoughts or tips on how to do parallelize this? I began getting odd errors and I had no idea how to fix the code. The code does work as is. I use a bunch of pandas dataframes, series, and numpy arrays, and I would prefer not to go back and change everything to work with dask.dataframes etc.
The code in my comment may be difficult to read. Here it is in a more formatted way.
In the code below, when I type print(mean_squared_error) I just get: Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')
from dask import delayed import pandas as pd from sklearn.metrics import mean_squared_error as mse filenames = ['file1.csv'] for count, name in enumerate(filenames): file1 = pd.read_csv(name) df = pd.DataFrame(file1) prediction = df['Close'][:-1] observed = df['Close'][1:] mean_squared_error = delayed(mse)(observed, prediction)
You achieve parallelism by having many delayed calls, not by using only a single one: Dask will not look inside a function decorated with @dask.delayed and parallelize that code internally. To accomplish that, it needs your help to find good places to break up a computation. The first version only has one delayed task, and so cannot parallelize.
We can easily parallelize our normal python code using dask by following list of below steps: Find out the list of time-consuming functions in code that are getting executed more than once. Convert a list of time-consuming functions from normal to lazy by using dask.delayed. All functions calling these lazy functions will also become lazy.
The API of dask.delayed is simplified which lets us parallelize our python code very easily. The dask.delayed API is used to convert normal function to lazy function.
Scenario 2: If there are more than one final lazy functions then we can use compute () method of dask. We need to pass all lazy functions to this method and it'll run all of them in parallel and returns results for all of them. We'll be explaining these steps below with few examples to make things clear.
You need to call dask.compute to eventually compute the result. See dask.delayed documentation.
import pandas as pd from sklearn.metrics import mean_squared_error as mse filenames = [...] results = [] for count, name in enumerate(filenames): file1 = pd.read_csv(name) df = pd.DataFrame(file1) # isn't this already a dataframe? prediction = df['Close'][:-1] observed = df['Close'][1:] mean_squared_error = mse(observed, prediction) results.append(mean_squared_error)
import dask import pandas as pd from sklearn.metrics import mean_squared_error as mse filenames = [...] delayed_results = [] for count, name in enumerate(filenames): df = dask.delayed(pd.read_csv)(name) prediction = df['Close'][:-1] observed = df['Close'][1:] mean_squared_error = dask.delayed(mse)(observed, prediction) delayed_results.append(mean_squared_error) results = dask.compute(*delayed_results)
A much clearer solution, IMO, than the accepted answer is this snippet.
from dask import compute, delayed import pandas as pd from sklearn.metrics import mean_squared_error as mse filenames = [...] def compute_mse(file_name): df = pd.read_csv(file_name) prediction = df['Close'][:-1] observed = df['Close'][1:] return mse(observed, prediction) delayed_results = [delayed(compute_mse)(file_name) for file_name in filenames] mean_squared_errors = compute(*delayed_results, scheduler="processes")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With