I am new to dask and I found so nice to have a module that makes it easy to get parallelization. I am working on a project where I was able to parallelize in a single machine a loop as you can see here . However, I would like to move over to dask.distributed
. I applied the following changes to the class above:
diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
index ce6a72b..89f8638 100644
--- a/mlchem/fingerprints/gaussian.py
+++ b/mlchem/fingerprints/gaussian.py
@@ -6,7 +6,7 @@ from sklearn.externals import joblib
from .cutoff import Cosine
from collections import OrderedDict
import dask
-import dask.multiprocessing
+from dask.distributed import Client
import time
@@ -141,13 +141,14 @@ class Gaussian(object):
for image in images.items():
computations.append(self.fingerprints_per_image(image))
+ client = Client()
if self.scaler is None:
- feature_space = dask.compute(*computations, scheduler='processes',
+ feature_space = dask.compute(*computations, scheduler='distributed',
num_workers=self.cores)
feature_space = OrderedDict(feature_space)
else:
stacked_features = dask.compute(*computations,
- scheduler='processes',
+ scheduler='distributed',
num_workers=self.cores)
stacked_features = numpy.array(stacked_features)
Doing so generates this error:
File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
I have tried different ways of adding if __name__ == '__main__':
without any success. This can be reproduced by running this example. I would appreciate if anyone could help me to figure this out. I have no clue on how I should change my code to make it work.
Thanks.
Edit: The example is cu_training.py
.
The Client
command starts up new processes, so it will have to be within the if __name__ == '__main__':
block as described in this SO question or this GitHub issue
This is the same as with the multiprocessing module
I faced several issues in my code even after including main if __name__ == '__main__':
.
I am using several python files and modules and multiprocess is used by only one function for some sampling operation. The only fix that worked for me is to include main at the very first file & first line of the whole code (even including imports). The following worked well:
if __name__ == '__main__':
from mjrl.utils.gym_env import GymEnv
from mjrl.policies.gaussian_mlp import MLP
from mjrl.baselines.quadratic_baseline import QuadraticBaseline
from mjrl.baselines.mlp_baseline import MLPBaseline
from mjrl.algos.npg_cg import NPG
from mjrl.algos.dapg import DAPG
from mjrl.algos.behavior_cloning import BC
from mjrl.utils.train_agent import train_agent
from mjrl.samplers.core import sample_paths
import os
import json
import mjrl.envs
import mj_envs
import time as timer
import pickle
import argparse
import numpy as np
# ===============================================================================
# Get command line arguments
# ===============================================================================
parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
parser.add_argument('--output', type=str, required=True, help='location to store results')
parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
args = parser.parse_args()
JOB_DIR = args.output
if not os.path.exists(JOB_DIR):
os.mkdir(JOB_DIR)
with open(args.config, 'r') as f:
job_data = eval(f.read())
assert 'algorithm' in job_data.keys()
assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
EXP_FILE = JOB_DIR + '/job_config.json'
with open(EXP_FILE, 'w') as f:
json.dump(job_data, f, indent=4)
# ===============================================================================
# Train Loop
# ===============================================================================
e = GymEnv(job_data['env'])
policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With