Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Train multiple neural nets in parallel on CPU in keras

I'd like to train tens of small neural networks in parallel on the CPU in Keras with Tensorflow backend. By default Tensorflow splits the batches over the cores when training a single nn but my average core utilization is only around 50%. So it seems like a good idea to assign the complete training of a neural net to a core so less data has to be moved around.

I can't seem to find how I can specify these actions.

Also note the neural nets have a different architecture so combining everything into a single graph will lead to sparser matrices and slower execution.

like image 930
Anton Avatar asked Feb 24 '18 09:02

Anton


Video Answer


1 Answers

There are some key points to making this work:

  • Use processes, not threads. Threads will result in asynchronous execution, but not parallel so only one CPU core would be used.
  • For practical purposes building, compiling and fitting a neural net should happen in the same process.
  • For each process a separate tensorflow graph and session need to be initialized.
  • After training the nets, you likely will want to serialize them for later use. It's important to use Keras' model.save(file_name), not regular pickling.

Implementation:

extend the python Process class:

from keras.layers import Dense
from keras.models import Sequential
from multiprocessing import Process, Queue
import tensorflow as tf

from train_val_set import TrainValSet


class NNProcess(Process):
    def __init__(self, process_id: int, nr_nets: int, ret_queue: Queue):
        super(NNProcess, self).__init__()
        self.process_id = process_id
        self.neural_nets = []
        self.train_val_set = None
        self.nr_nets = nr_nets
        self.ret_queue = ret_queue

    def set_train_val(self, train_val_set: TrainValSet):
        self.train_val_set = train_val_set

    def get_session_config(self):
        num_cores = 1
        num_CPU = 1
        num_GPU = 0

        config = tf.ConfigProto(intra_op_parallelism_threads=num_cores,
                                inter_op_parallelism_threads=num_cores, allow_soft_placement=False,
                                device_count={'CPU': num_CPU, 'GPU': num_GPU})

        return config

    def run(self):
        print("process " + str(self.process_id) + " starting...")

        with tf.Session(graph=tf.Graph(), config=self.get_session_config()) as session:
            self.init_nets()
            self.compile()
            self.fit_nets(self.train_val_set)
            for i in range(0, self.nr_nets):
                file_name = self.neural_nets[i].name + "_" + str(i) + ".pickle"
                self.neural_nets[i].save(file_name)
                self.ret_queue.put(file_name)
        print("process " + str(self.process_id) + " finished.")

    def compile(self):
        for neural_net in self.neural_nets:
            neural_net.compile(loss='categorical_crossentropy',
                          optimizer='sgd',
                          metrics=['accuracy'])

    def init_nets(self):
        for i in range(0, self.nr_nets):
            model = Sequential()
            model.add(Dense(units=64, activation='relu', input_dim=100))
            model.add(Dense(units=10, activation='softmax'))
            self.neural_nets.append(model)

    def fit_nets(self, train_val_set: TrainValSet):
        for i in range(0, self.nr_nets):
            self.neural_nets[i].fit()

Helper class:

from pandas import DataFrame


class TrainValSet:
    def __init__(self, df_train: DataFrame, df_val: DataFrame):
        self.x_train, self.y_train = self.get_x_y(df_train)
        self.x_val, self.y_val = self.get_x_y(df_val)

    def get_x_y(self, df: DataFrame):
        X = df.iloc[:, 0:-1].values
        y = df.iloc[:, -1].values

        return X, y

main file:

import pandas as pd
from multiprocessing import Manager
import tensorflow as tf
from keras import backend as K

from train_val_set import TrainValSet
from nn_process import NNProcess


def load_train_val_test_datasets(dataset_dir: str, dataset_name: str):
    df_train = pd.read_csv(dataset_dir + dataset_name + "/" + dataset_name + "_train.csv", header=None)
    df_val = pd.read_csv(dataset_dir + dataset_name + "/" + dataset_name + "_val.csv", header=None)
    df_test = pd.read_csv(dataset_dir + dataset_name + "/" + dataset_name + "_test.csv", header=None)

    return df_train, df_val, df_test


# config for prediction and evaluation only
def get_session_config(num_cores):
    num_CPU = 1
    num_GPU = 0

    config = tf.ConfigProto(intra_op_parallelism_threads=num_cores,
                            inter_op_parallelism_threads=num_cores, allow_soft_placement=True,
                            device_count={'CPU': num_CPU, 'GPU': num_GPU})

    return config


def train_test(nr_nets: int, nr_processes: int):
    df_train, df_val, df_test = load_train_val_test_datasets('MNIST')
    train_val_set = TrainValSet(df_train, df_val)
    nets_per_proc = int(nr_nets/nr_processes)

    nn_queue = Manager().Queue()

    processes = []

    for i in range(0, nr_processes):
        nn_process = NNProcess(i, nets_per_proc, nn_queue)
        nn_process.set_train_val(train_val_set)
        processes.append(nn_process)

    for nn_process in processes:
        nn_process.start()

    for nn_process in processes:
        nn_process.join()

    tf_session = tf.Session(config=get_session_config(4))
    K.set_session(tf_session)

    # ...
    # load neural nets from files
    # do predictions
like image 127
Anton Avatar answered Oct 07 '22 12:10

Anton