I'd like to train tens of small neural networks in parallel on the CPU in Keras with Tensorflow backend. By default Tensorflow splits the batches over the cores when training a single nn but my average core utilization is only around 50%. So it seems like a good idea to assign the complete training of a neural net to a core so less data has to be moved around.
I can't seem to find how I can specify these actions.
Also note the neural nets have a different architecture so combining everything into a single graph will lead to sparser matrices and slower execution.
There are some key points to making this work:
model.save(file_name)
, not regular pickling.Implementation:
extend the python Process
class:
from keras.layers import Dense
from keras.models import Sequential
from multiprocessing import Process, Queue
import tensorflow as tf
from train_val_set import TrainValSet
class NNProcess(Process):
def __init__(self, process_id: int, nr_nets: int, ret_queue: Queue):
super(NNProcess, self).__init__()
self.process_id = process_id
self.neural_nets = []
self.train_val_set = None
self.nr_nets = nr_nets
self.ret_queue = ret_queue
def set_train_val(self, train_val_set: TrainValSet):
self.train_val_set = train_val_set
def get_session_config(self):
num_cores = 1
num_CPU = 1
num_GPU = 0
config = tf.ConfigProto(intra_op_parallelism_threads=num_cores,
inter_op_parallelism_threads=num_cores, allow_soft_placement=False,
device_count={'CPU': num_CPU, 'GPU': num_GPU})
return config
def run(self):
print("process " + str(self.process_id) + " starting...")
with tf.Session(graph=tf.Graph(), config=self.get_session_config()) as session:
self.init_nets()
self.compile()
self.fit_nets(self.train_val_set)
for i in range(0, self.nr_nets):
file_name = self.neural_nets[i].name + "_" + str(i) + ".pickle"
self.neural_nets[i].save(file_name)
self.ret_queue.put(file_name)
print("process " + str(self.process_id) + " finished.")
def compile(self):
for neural_net in self.neural_nets:
neural_net.compile(loss='categorical_crossentropy',
optimizer='sgd',
metrics=['accuracy'])
def init_nets(self):
for i in range(0, self.nr_nets):
model = Sequential()
model.add(Dense(units=64, activation='relu', input_dim=100))
model.add(Dense(units=10, activation='softmax'))
self.neural_nets.append(model)
def fit_nets(self, train_val_set: TrainValSet):
for i in range(0, self.nr_nets):
self.neural_nets[i].fit()
Helper class:
from pandas import DataFrame
class TrainValSet:
def __init__(self, df_train: DataFrame, df_val: DataFrame):
self.x_train, self.y_train = self.get_x_y(df_train)
self.x_val, self.y_val = self.get_x_y(df_val)
def get_x_y(self, df: DataFrame):
X = df.iloc[:, 0:-1].values
y = df.iloc[:, -1].values
return X, y
main file:
import pandas as pd
from multiprocessing import Manager
import tensorflow as tf
from keras import backend as K
from train_val_set import TrainValSet
from nn_process import NNProcess
def load_train_val_test_datasets(dataset_dir: str, dataset_name: str):
df_train = pd.read_csv(dataset_dir + dataset_name + "/" + dataset_name + "_train.csv", header=None)
df_val = pd.read_csv(dataset_dir + dataset_name + "/" + dataset_name + "_val.csv", header=None)
df_test = pd.read_csv(dataset_dir + dataset_name + "/" + dataset_name + "_test.csv", header=None)
return df_train, df_val, df_test
# config for prediction and evaluation only
def get_session_config(num_cores):
num_CPU = 1
num_GPU = 0
config = tf.ConfigProto(intra_op_parallelism_threads=num_cores,
inter_op_parallelism_threads=num_cores, allow_soft_placement=True,
device_count={'CPU': num_CPU, 'GPU': num_GPU})
return config
def train_test(nr_nets: int, nr_processes: int):
df_train, df_val, df_test = load_train_val_test_datasets('MNIST')
train_val_set = TrainValSet(df_train, df_val)
nets_per_proc = int(nr_nets/nr_processes)
nn_queue = Manager().Queue()
processes = []
for i in range(0, nr_processes):
nn_process = NNProcess(i, nets_per_proc, nn_queue)
nn_process.set_train_val(train_val_set)
processes.append(nn_process)
for nn_process in processes:
nn_process.start()
for nn_process in processes:
nn_process.join()
tf_session = tf.Session(config=get_session_config(4))
K.set_session(tf_session)
# ...
# load neural nets from files
# do predictions
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With