Issue of batch sizes when using custom loss functions in Keras

I am doing a slight modification of a standard neural network by defining a custom loss function. The custom loss function depends not only on y_true and y_pred, but also on the training data. I implemented it using the wrapping solution described here.

Specifically, I wanted to define a custom loss function that is the standard mse plus the mse between the input and the square of y_pred:

def custom_loss(x_true)
    def loss(y_true, y_pred):
        return K.mean(K.square(y_pred - y_true) + K.square(y_true - x_true))
    return loss

Then I compile the model using

model_custom.compile(loss = custom_loss( x_true=training_data ), optimizer='adam')

fit the model using

model_custom.fit(training_data, training_label, epochs=100, batch_size = training_data.shape[0])

All of the above works fine, because the batch size is actually the number of all the training samples.

But if I set a different batch_size (e.g., 10) when I have 1000 training samples, there will be an error

Incompatible shapes: [1000] vs. [10].

It seems that Keras is able to automatically adjust the size of the inputs to its own loss function base on the batch size, but cannot do so for the custom loss function.

Do you know how to solve this issue?

Thank you!


* Update: the batch size issue is solved, but another issue occurred

Thank you, Ori, for the suggestion of concatenating the input and output layers! It "worked", in the sense that the codes can run under any batch size. However, it seems that the result from training the new model is wrong... Below is a simplified version of the codes to demonstrate the problem:

import numpy as np
import scipy.io
import keras
from keras import backend as K
from keras.models import Model
from keras.layers import Input, Dense, Activation
from numpy.random import seed
from tensorflow import set_random_seed

def custom_loss(y_true, y_pred): # this is essentially the mean_square_error
    mse = K.mean( K.square( y_pred[:,2] - y_true ) )
    return mse

# set the seeds so that we get the same initialization across different trials
seed_numpy = 0
seed_tensorflow = 0

# generate data of x = [ y^3 y^2 ]
y = np.random.rand(5000+1000,1) * 2 # generate 5000 training and 1000 testing samples
x = np.concatenate( ( np.power(y, 3) , np.power(y, 2) ) , axis=1 )

training_data  = x[0:5000:1,:]
training_label = y[0:5000:1]
testing_data   = x[5000:6000:1,:]
testing_label  = y[5000:6000:1]

# build the standard neural network with one hidden layer

input_standard = Input(shape=(2,))                                               # input
hidden_standard = Dense(10, activation='relu', input_shape=(2,))(input_standard) # hidden layer
output_standard = Dense(1, activation='linear')(hidden_standard)                 # output layer

model_standard = Model(inputs=[input_standard], outputs=[output_standard])     # build the model
model_standard.compile(loss='mean_squared_error', optimizer='adam')            # compile the model
model_standard.fit(training_data, training_label, epochs=50, batch_size = 500) # train the model
testing_label_pred_standard = model_standard.predict(testing_data)             # make prediction

# get the mean squared error
mse_standard = np.sum( np.power( testing_label_pred_standard - testing_label , 2 ) ) / 1000

# build the neural network with the custom loss

input_custom = Input(shape=(2,))                                             # input
hidden_custom = Dense(10, activation='relu', input_shape=(2,))(input_custom) # hidden layer
output_custom_temp = Dense(1, activation='linear')(hidden_custom)            # output layer
output_custom = keras.layers.concatenate([input_custom, output_custom_temp])

model_custom = Model(inputs=[input_custom], outputs=[output_custom])         # build the model
model_custom.compile(loss = custom_loss, optimizer='adam')                   # compile the model
model_custom.fit(training_data, training_label, epochs=50, batch_size = 500) # train the model
testing_label_pred_custom = model_custom.predict(testing_data)               # make prediction

# get the mean squared error
mse_custom = np.sum( np.power( testing_label_pred_custom[:,2:3:1] - testing_label , 2 ) ) / 1000

# compare the result
print( [ mse_standard , mse_custom ] )

Basically, I have a standard one-hidden-layer neural network, and a custom one-hidden-layer neural network whose output layer is concatenated with the input layer. For testing purpose, I did not use the concatenated input layer in the custom loss function, because I wanted to see if the custom network can reproduce the standard neural network. Since the custom loss function is equivalent to the standard 'mean_squared_error' loss, both networks should have the same training results (I also reset the random seeds to make sure that they have the same initialization).

However, the training results are very different. It seems that the concatenation makes the training process different? Any ideas?

Thank you again for all your help!

Final update: Ori's approach of concatenating input and output layers works, and is verified by using the generator. Thanks!!

1 Answers

The problem is that when compiling the model, you set x_true to be a static tensor, in the size of all the samples. While the input for keras loss functions are the y_true and y_pred, where each of them is of size [batch_size, :].

As I see it there are 2 options you can solve this, the first one is using a generator for creating the batches, in such a way that you will have control over which indices are evaluated each time, and at the loss function you could slice the x_true tensor to fit the samples being evaluated:

def custom_loss(x_true)
    def loss(y_true, y_pred):
        x_true_samples = relevant_samples(x_true)
        return K.mean(K.square(y_pred - y_true) + K.square(y_true - x_true_samples))
    return loss

This solution can be complicated, what I would suggest is a simpler workaround -
Concatenate the input layer with the output layer, such that your new output will be of the form original_output , input.

Now you can use a new modified loss function:

def loss(y_true, y_pred):
    return K.mean(K.square(y_pred[:,:output_shape] - y_true[:,:output_shape]) +
                  K.square(y_true[:,:output_shape] - y_pred[:,outputshape:))

Now your new loss function will take in account both the input data, and the prediction.

Note that while you set the seed, your models are not exactly the same, and as you did not use a generator, you let keras choose the batches, and for different models he might pick different samples.
As your model does not converge, different samples can lead to different results.

I added a generator to your code, to verify the samples we pick for training, now you can see both results are the same:

def custom_loss(y_true, y_pred): # this is essentially the mean_square_error
    mse = keras.losses.mean_squared_error(y_true, y_pred[:,2])
    return mse

def generator(x, y, batch_size):
    curIndex = 0
    batch_x = np.zeros((batch_size,2))
    batch_y = np.zeros((batch_size,1))
    while True:
        for i in range(batch_size):            
            batch_x[i] = x[curIndex,:]
            batch_y[i] = y[curIndex,:]
            i += 1;
            if i == 5000:
                i = 0
        yield batch_x, batch_y

# set the seeds so that we get the same initialization across different trials
seed_numpy = 0
seed_tensorflow = 0

# generate data of x = [ y^3 y^2 ]
y = np.random.rand(5000+1000,1) * 2 # generate 5000 training and 1000 testing samples
x = np.concatenate( ( np.power(y, 3) , np.power(y, 2) ) , axis=1 )

training_data  = x[0:5000:1,:]
training_label = y[0:5000:1]
testing_data   = x[5000:6000:1,:]
testing_label  = y[5000:6000:1]

batch_size = 32

# build the standard neural network with one hidden layer

input_standard = Input(shape=(2,))                                               # input
hidden_standard = Dense(10, activation='relu', input_shape=(2,))(input_standard) # hidden layer
output_standard = Dense(1, activation='linear')(hidden_standard)                 # output layer

model_standard = Model(inputs=[input_standard], outputs=[output_standard])     # build the model
model_standard.compile(loss='mse', optimizer='adam')            # compile the model
#model_standard.fit(training_data, training_label, epochs=50, batch_size = 10) # train the model
model_standard.fit_generator(generator(training_data,training_label,batch_size),  steps_per_epoch= 32, epochs= 100)
testing_label_pred_standard = model_standard.predict(testing_data)             # make prediction

# get the mean squared error
mse_standard = np.sum( np.power( testing_label_pred_standard - testing_label , 2 ) ) / 1000

# build the neural network with the custom loss

input_custom = Input(shape=(2,))                                               # input
hidden_custom = Dense(10, activation='relu', input_shape=(2,))(input_custom) # hidden layer
output_custom_temp = Dense(1, activation='linear')(hidden_custom)            # output layer
output_custom = keras.layers.concatenate([input_custom, output_custom_temp])

model_custom = Model(inputs=input_custom, outputs=output_custom)         # build the model
model_custom.compile(loss = custom_loss, optimizer='adam')                   # compile the model
#model_custom.fit(training_data, training_label, epochs=50, batch_size = 10) # train the model
model_custom.fit_generator(generator(training_data,training_label,batch_size),  steps_per_epoch= 32, epochs= 100)
testing_label_pred_custom = model_custom.predict(testing_data)

# get the mean squared error
mse_custom = np.sum( np.power( testing_label_pred_custom[:,2:3:1] - testing_label , 2 ) ) / 1000

# compare the result
print( [ mse_standard , mse_custom ] )
