I'd assume that most frameworks like keras/tensorflow/... automatically use all CPU cores but in practice it seems they are not. I just could find few sources which can lead us to use whole capacity of CPU during Deep learning process. I found an article which is written about usage of
from multiprocessing import Pool
import psutil
import ray
in another hand, based on this answer for using a keras model in multiple processes there is no track of above-mentioned libraries. Is there the more elegant way to take advantage of Multiprocessing for Keras since it's very popular for implementation.
For instance , how can modify following simple RNN implementation to achieve at least 50% capacity of CPU during learning process?
Should I use 2nd model as multitasking like LSTM which I comment bellow? I mean can we simultaneously manage to run multi-models by using more capacity of CPU?
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from keras.layers.normalization import BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM,SimpleRNN
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop
df = pd.read_csv("D:\Train.csv", header=None)
index = [i for i in list(range(1440)) if i%3==2]
Y_train= df[index]
df = df.values
#making history by using look-back to prediction next
def create_dataset(dataset,data_train,look_back=1):
dataX,dataY = [],[]
print("Len:",len(dataset)-look_back-1)
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)
Y_train=np.array(Y_train)
df=np.array(df)
look_back = 10
trainX,trainY = create_dataset(df,Y_train, look_back=look_back)
#Split data into train & test
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
#Shape of train and test data
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
print("train size: {}".format(trainX.shape))
print("train Label size: {}".format(trainY.shape))
print("test size: {}".format(testX.shape))
print("test Label size: {}".format(testY.shape))
#train size: (23, 10, 1440)
#train Label size: (23, 960)
#test size: (6, 10, 1440)
#test Label size: (6, 960)
model_RNN = Sequential()
model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model_RNN.add(Dense(960))
model_RNN.add(BatchNormalization())
model_RNN.add(Activation('tanh'))
# Compile model
model_RNN.compile(loss='mean_squared_error', optimizer='adam')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
# Fit the model
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)
#predict
Y_train=np.array(trainY)
Y_test=np.array(testX)
Y_RNN_Train_pred=model_RNN.predict(trainX)
Y_RNN_Test_pred=model_RNN.predict(testX)
train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)
# create and fit the Simple LSTM model as 2nd model for multi-tasking
#model_LSTM = Sequential()
#model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))
#model_LSTM.add(Dense(units = 960))
#model_LSTM.add(BatchNormalization())
#model_LSTM.add(Activation('tanh'))
#model_LSTM.compile(loss='mean_squared_error', optimizer='adam')
#hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)
#Y_train=np.array(trainY)
#Y_test=np.array(testX)
#Y_LSTM_Train_pred=model_LSTM.predict(trainX)
#Y_LSTM_Test_pred=model_LSTM.predict(testX)
#train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)
#test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)
#plot losses for RNN + LSTM
f, ax = plt.subplots(figsize=(20, 15))
plt.subplot(1, 2, 1)
ax=plt.plot(hist_RNN.history['loss'] ,label='Train loss')
ax=plt.plot(hist_RNN.history['val_loss'],label='Test/Validation/Prediction loss')
plt.xlabel('Training steps (Epochs = 50)')
plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
plt.title(' RNN Loss on Train and Test data')
plt.legend()
plt.subplot(1, 2, 2)
ax=plt.plot(hist_LSTM.history['loss'] ,label='Train loss')
ax=plt.plot(hist_LSTM.history['val_loss'],label='Test/Validation/Prediction loss')
plt.xlabel('Training steps (Epochs = 50)')
plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
plt.title('LSTM Loss on Train and Test data')
plt.legend()
plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
#plt.savefig('All_Losses_history_.png')
plt.show()
Note I don't access to CUDA just I access powerful server without VGA. My aim is to take advantage of multiprocessing and multithreading for use maximum capacity of CPU instead of 30% it means just one core while I have Quad-core! Any advice would be greatly appreciated. I have uploaded a formatted csv dataset.
Update: my HW configuration is following:
But today, many programming languages support Multithreading and Multiprocessing, which aids concurrent execution of code. Multithreading and Multiprocessing solve this problem by providing a technique whereby different tasks can be managed simultaneously from a central system.
Multiprocessing uses two or more CPUs to increase computing power, whereas multithreading uses a single process with multiple code segments to increase computing power. Multithreading focuses on generating computing threads from a single process, whereas multiprocessing increases computing power by adding CPUs.
Multiprocessing outshines threading in cases where the program is CPU intensive and doesn't have to do any IO or user interaction. For example, any program that just crunches numbers will see a massive speedup from multiprocessing; in fact, threading will probably slow it down.
It's a good thing that training one model doesn't use all 100% of your CPU! Now we have space to train multiple models in parallel and speed up your overall training times.
NB: If you want to just speed up this model, look into GPUs or changing the hyperparameters like batch size and number of neurons (layer size).
Here's how you can use multiprocessing
to train multiple models at the same time (using processes running in parallel on each separate CPU core of your machine).
The multiprocessing.Pool
basically creates a pool of jobs that need doing. The processes will pick up these jobs and run them. When a job is finished, the process will pick up another job from the pool.
import time
import signal
import multiprocessing
def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(layer_size):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
import keras
from keras.models import Sequential
from keras.layers import Dense
print(f'Training a model with layer size {layer_size}')
# build your model here
model_RNN = Sequential()
model_RNN.add(Dense(layer_size))
# fit the model (the bit that takes time!)
model_RNN.fit(...)
# lets demonstrate with a sleep timer
time.sleep(5)
# save trained model to a file
model_RNN.save(...)
# you can also return values eg. the eval score
return model_RNN.evaluate(...)
num_workers = 4
hyperparams = [800, 960, 1100]
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, hyperparams)
print(scores)
Output:
Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]
This is easily demonstrated with a time.sleep
in the code. You'll see that all 3 processes start the training job, and then they all finish at about the same time. If this was single processed, you'd have to wait for each to finish before starting the next (yawn!).
EDIT OP also wanted full code. This is difficult on Stack Overflow because I can't test in your environment and with your code. I've taken the liberty of copying and pasting your code into my template above. You may need to add some imports but this is as close as you'll get to "runnable" and "full" code.
import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(model_type):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
from keras.layers import LSTM, SimpleRNN, Dense, Activation
from keras.models import Sequential
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.layers.normalization import BatchNormalization
print(f'Training a model: {model_type}')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
]
model = Sequential()
if model_type == 'rnn':
model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
elif model_type == 'lstm':
model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model.add(Dense(480))
model.add(BatchNormalization())
model.add(Activation('tanh'))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(
trainX,
trainY,
epochs=50,
batch_size=20,
validation_data=(testX, testY),
verbose=1,
callbacks=callbacks,
)
# predict
Y_Train_pred = model.predict(trainX)
Y_Test_pred = model.predict(testX)
train_MSE = mean_squared_error(trainY, Y_Train_pred)
test_MSE = mean_squared_error(testY, Y_Test_pred)
# you can also return values eg. the eval score
return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}
# Your code
# ---------
df = pd.read_csv("D:\Train.csv", header=None)
index = [i for i in list(range(1440)) if i % 3 == 2]
Y_train = df[index]
df = df.values
# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
dataX, dataY = [], []
print("Len:", len(dataset) - look_back - 1)
for i in range(len(dataset) - look_back - 1):
a = dataset[i : (i + look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)
Y_train = np.array(Y_train)
df = np.array(df)
look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)
# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
trainX, trainY, test_size=0.2, shuffle=False
)
# My Code
# -------
num_workers = 2
model_types = ['rnn', 'lstm']
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, model_types)
print(scores)
Output of the program:
[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866},
{'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With