Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Issue of batch sizes when using custom loss functions in Keras

I am doing a slight modification of a standard neural network by defining a custom loss function. The custom loss function depends not only on y_true and y_pred, but also on the training data. I implemented it using the wrapping solution described here.

Specifically, I wanted to define a custom loss function that is the standard mse plus the mse between the input and the square of y_pred:

def custom_loss(x_true)
    def loss(y_true, y_pred):
        return K.mean(K.square(y_pred - y_true) + K.square(y_true - x_true))
    return loss

Then I compile the model using

model_custom.compile(loss = custom_loss( x_true=training_data ), optimizer='adam')

fit the model using

model_custom.fit(training_data, training_label, epochs=100, batch_size = training_data.shape[0])

All of the above works fine, because the batch size is actually the number of all the training samples.

But if I set a different batch_size (e.g., 10) when I have 1000 training samples, there will be an error

Incompatible shapes: [1000] vs. [10].

It seems that Keras is able to automatically adjust the size of the inputs to its own loss function base on the batch size, but cannot do so for the custom loss function.

Do you know how to solve this issue?

Thank you!

==========================================================================

* Update: the batch size issue is solved, but another issue occurred

Thank you, Ori, for the suggestion of concatenating the input and output layers! It "worked", in the sense that the codes can run under any batch size. However, it seems that the result from training the new model is wrong... Below is a simplified version of the codes to demonstrate the problem:

import numpy as np
import scipy.io
import keras
from keras import backend as K
from keras.models import Model
from keras.layers import Input, Dense, Activation
from numpy.random import seed
from tensorflow import set_random_seed

def custom_loss(y_true, y_pred): # this is essentially the mean_square_error
    mse = K.mean( K.square( y_pred[:,2] - y_true ) )
    return mse

# set the seeds so that we get the same initialization across different trials
seed_numpy = 0
seed_tensorflow = 0

# generate data of x = [ y^3 y^2 ]
y = np.random.rand(5000+1000,1) * 2 # generate 5000 training and 1000 testing samples
x = np.concatenate( ( np.power(y, 3) , np.power(y, 2) ) , axis=1 )

training_data  = x[0:5000:1,:]
training_label = y[0:5000:1]
testing_data   = x[5000:6000:1,:]
testing_label  = y[5000:6000:1]

# build the standard neural network with one hidden layer
seed(seed_numpy)
set_random_seed(seed_tensorflow)

input_standard = Input(shape=(2,))                                               # input
hidden_standard = Dense(10, activation='relu', input_shape=(2,))(input_standard) # hidden layer
output_standard = Dense(1, activation='linear')(hidden_standard)                 # output layer

model_standard = Model(inputs=[input_standard], outputs=[output_standard])     # build the model
model_standard.compile(loss='mean_squared_error', optimizer='adam')            # compile the model
model_standard.fit(training_data, training_label, epochs=50, batch_size = 500) # train the model
testing_label_pred_standard = model_standard.predict(testing_data)             # make prediction

# get the mean squared error
mse_standard = np.sum( np.power( testing_label_pred_standard - testing_label , 2 ) ) / 1000

# build the neural network with the custom loss
seed(seed_numpy)
set_random_seed(seed_tensorflow)

input_custom = Input(shape=(2,))                                             # input
hidden_custom = Dense(10, activation='relu', input_shape=(2,))(input_custom) # hidden layer
output_custom_temp = Dense(1, activation='linear')(hidden_custom)            # output layer
output_custom = keras.layers.concatenate([input_custom, output_custom_temp])

model_custom = Model(inputs=[input_custom], outputs=[output_custom])         # build the model
model_custom.compile(loss = custom_loss, optimizer='adam')                   # compile the model
model_custom.fit(training_data, training_label, epochs=50, batch_size = 500) # train the model
testing_label_pred_custom = model_custom.predict(testing_data)               # make prediction

# get the mean squared error
mse_custom = np.sum( np.power( testing_label_pred_custom[:,2:3:1] - testing_label , 2 ) ) / 1000

# compare the result
print( [ mse_standard , mse_custom ] )

Basically, I have a standard one-hidden-layer neural network, and a custom one-hidden-layer neural network whose output layer is concatenated with the input layer. For testing purpose, I did not use the concatenated input layer in the custom loss function, because I wanted to see if the custom network can reproduce the standard neural network. Since the custom loss function is equivalent to the standard 'mean_squared_error' loss, both networks should have the same training results (I also reset the random seeds to make sure that they have the same initialization).

However, the training results are very different. It seems that the concatenation makes the training process different? Any ideas?

Thank you again for all your help!

Final update: Ori's approach of concatenating input and output layers works, and is verified by using the generator. Thanks!!

like image 293
Yuanzhang Xiao Avatar asked Nov 10 '18 00:11

Yuanzhang Xiao


People also ask

What batch size should I use keras?

If you plan on working on larger image sizes then you can go down to 16 but 32 is a good starting point. If you have the luxury of going to 64 that's also fine. Let's quickly show Keras example so you can test this yourself on your own data if you want.

How is custom loss function implemented?

Creating custom loss functions in Keras A custom loss function can be created by defining a function that takes the true values and predicted values as required parameters. The function should return an array of losses. The function can then be passed at the compile stage.

What does a loss function return?

But according to the source code, the loss function actually should return an array of losses for every sample in a batch. For example the mean_squared_error function in the source code will return an array, not a scalar. The call() method of LossFunctionWrapper also returns loss value for each sample.


1 Answers

The problem is that when compiling the model, you set x_true to be a static tensor, in the size of all the samples. While the input for keras loss functions are the y_true and y_pred, where each of them is of size [batch_size, :].

As I see it there are 2 options you can solve this, the first one is using a generator for creating the batches, in such a way that you will have control over which indices are evaluated each time, and at the loss function you could slice the x_true tensor to fit the samples being evaluated:

def custom_loss(x_true)
    def loss(y_true, y_pred):
        x_true_samples = relevant_samples(x_true)
        return K.mean(K.square(y_pred - y_true) + K.square(y_true - x_true_samples))
    return loss

This solution can be complicated, what I would suggest is a simpler workaround -
Concatenate the input layer with the output layer, such that your new output will be of the form original_output , input.

Now you can use a new modified loss function:

def loss(y_true, y_pred):
    return K.mean(K.square(y_pred[:,:output_shape] - y_true[:,:output_shape]) +
                  K.square(y_true[:,:output_shape] - y_pred[:,outputshape:))

Now your new loss function will take in account both the input data, and the prediction.

Edit:
Note that while you set the seed, your models are not exactly the same, and as you did not use a generator, you let keras choose the batches, and for different models he might pick different samples.
As your model does not converge, different samples can lead to different results.

I added a generator to your code, to verify the samples we pick for training, now you can see both results are the same:

def custom_loss(y_true, y_pred): # this is essentially the mean_square_error
    mse = keras.losses.mean_squared_error(y_true, y_pred[:,2])
    return mse


def generator(x, y, batch_size):
    curIndex = 0
    batch_x = np.zeros((batch_size,2))
    batch_y = np.zeros((batch_size,1))
    while True:
        for i in range(batch_size):            
            batch_x[i] = x[curIndex,:]
            batch_y[i] = y[curIndex,:]
            i += 1;
            if i == 5000:
                i = 0
        yield batch_x, batch_y

# set the seeds so that we get the same initialization across different trials
seed_numpy = 0
seed_tensorflow = 0

# generate data of x = [ y^3 y^2 ]
y = np.random.rand(5000+1000,1) * 2 # generate 5000 training and 1000 testing samples
x = np.concatenate( ( np.power(y, 3) , np.power(y, 2) ) , axis=1 )

training_data  = x[0:5000:1,:]
training_label = y[0:5000:1]
testing_data   = x[5000:6000:1,:]
testing_label  = y[5000:6000:1]

batch_size = 32



# build the standard neural network with one hidden layer
seed(seed_numpy)
set_random_seed(seed_tensorflow)

input_standard = Input(shape=(2,))                                               # input
hidden_standard = Dense(10, activation='relu', input_shape=(2,))(input_standard) # hidden layer
output_standard = Dense(1, activation='linear')(hidden_standard)                 # output layer

model_standard = Model(inputs=[input_standard], outputs=[output_standard])     # build the model
model_standard.compile(loss='mse', optimizer='adam')            # compile the model
#model_standard.fit(training_data, training_label, epochs=50, batch_size = 10) # train the model
model_standard.fit_generator(generator(training_data,training_label,batch_size),  steps_per_epoch= 32, epochs= 100)
testing_label_pred_standard = model_standard.predict(testing_data)             # make prediction

# get the mean squared error
mse_standard = np.sum( np.power( testing_label_pred_standard - testing_label , 2 ) ) / 1000

# build the neural network with the custom loss
seed(seed_numpy)
set_random_seed(seed_tensorflow)


input_custom = Input(shape=(2,))                                               # input
hidden_custom = Dense(10, activation='relu', input_shape=(2,))(input_custom) # hidden layer
output_custom_temp = Dense(1, activation='linear')(hidden_custom)            # output layer
output_custom = keras.layers.concatenate([input_custom, output_custom_temp])

model_custom = Model(inputs=input_custom, outputs=output_custom)         # build the model
model_custom.compile(loss = custom_loss, optimizer='adam')                   # compile the model
#model_custom.fit(training_data, training_label, epochs=50, batch_size = 10) # train the model
model_custom.fit_generator(generator(training_data,training_label,batch_size),  steps_per_epoch= 32, epochs= 100)
testing_label_pred_custom = model_custom.predict(testing_data)

# get the mean squared error
mse_custom = np.sum( np.power( testing_label_pred_custom[:,2:3:1] - testing_label , 2 ) ) / 1000

# compare the result
print( [ mse_standard , mse_custom ] )
like image 147
Dinari Avatar answered Oct 28 '22 15:10

Dinari