I built a custom architecture with keras (a convnet). The network has 4 heads, each outputting a tensor of different size. I am trying to write a custom loss function as a function of this 4 outputs. I have been implementing cusutom losses before, but it was either a different loss for each head or the same loss for each head. In this case, I need to combine the 4 outputs to calculate the loss.
I am used to the following:
def custom_loss(y_true, y_pred):
return something
model.compile(optimizer, loss=custom_loss)
but in my case, I would need y_pred
to be a list of the 4 outputs. I can pad the outputs with zeros and add a concatenate layer in my model, but I was wondering if there was an easier way around.
My loss function is rather complex, can I write something like:
model.add_loss(custom_loss(input1, input2, output1, output2))
where custom loss is defined as:
def custom_loss(input1, input2, output1, output2):
return loss
You could pack your outputs together in a tf.ExtensionType
, and unpack it again in the loss function.
I made a Colab Notebook that demonstrates how to do this in tensorflow 2.8.0
. (https://colab.research.google.com/drive/1MjlddizqFlezAUu5SOOW8svlnKQH4rog#scrollTo=pDMskk-86wFY)
Pros of using this approach vs add_loss()
:
Cons:
tf.ExtensionTypes
s don't work with Tensorflow Serving
)I'm adding the full code here, just in case I accidentally delete the Colab Notebook:
import tensorflow as tf
import tensorflow_datasets as tfds
# tf.__version__ should be >= 2.8.0
print(tf.__version__)
class PackedTensor(tf.experimental.BatchableExtensionType):
__name__ = 'extension_type_colab.PackedTensor'
output_0: tf.Tensor
output_1: tf.Tensor
# shape and dtype hold no meaning in this context, so we use a dummy
# to stop Keras from complaining
shape = property(lambda self: self.output_0.shape)
dtype = property(lambda self: self.output_0.dtype)
class Spec:
def __init__(self, shape, dtype=tf.float32):
self.output_0 = tf.TensorSpec(shape, dtype)
self.output_1 = tf.TensorSpec(shape, dtype)
# shape and dtype hold no meaning in this context, so we use a dummy
# to stop Keras from complaining
shape: tf.TensorShape = tf.constant(1.).shape
dtype: tf.DType = tf.constant(1.).dtype
# these two functions have no meaning, but need dummy implementations
# to stop Keras from complaining
@tf.experimental.dispatch_for_api(tf.shape)
def packed_shape(input: PackedTensor, out_type=tf.int32, name=None):
return tf.shape(input.col_ids)
@tf.experimental.dispatch_for_api(tf.cast)
def packed_cast(x: PackedTensor, dtype: str, name=None):
return x
class SCCEWithExtraOutput(tf.keras.losses.Loss):
""" This custom loss function is designed for models with an PackedTensor as
a single output, so with attributes outputs_0 and outputs_1. This loss will
train a model so that outputs_0 represent the predicted class of the input
image, and outputs_1 will be trained to always be zero (as a dummy).
"""
def __init__(self, *args, **kwargs):
super(SCCEWithExtraOutput, self).__init__(*args, **kwargs)
self.loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
def call(self, y_true, y_pred):
output_0, output_1 = y_pred.output_0, y_pred.output_1
scce = self.loss_fn(y_true, output_0)
return scce + tf.abs(output_1)
# load the datasets
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
"""Normalizes images: `uint8` -> `float32`."""
return tf.cast(image, tf.float32) / 255., label
ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(128)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(128)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)
# create a layer to combine to pack the outputs in a PackedTensor
class PackingLayer(tf.keras.layers.Layer):
def call(self, inputs, training=None):
first_output, second_output = inputs
packed_output = PackedTensor(first_output, second_output)
return packed_output
# define the model
#
# inputs -> flatten -> hidden -> Dense(10) -> PackingLayer() -> outputs
# |--> Dense(1) ----^
inputs = tf.keras.Input(shape=(28, 28, 1), dtype=tf.float32)
flatten_layer = tf.keras.layers.Flatten()
hidden_layer = tf.keras.layers.Dense(128, activation='relu')
first_output_layer = tf.keras.layers.Dense(10)
second_output_layer = tf.keras.layers.Dense(1)
packing_layer = PackingLayer()
hidden = flatten_layer(inputs)
hidden = hidden_layer(hidden)
first_output = first_output_layer(hidden)
second_output = second_output_layer(hidden)
outputs = packing_layer((first_output, second_output))
model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=tf.keras.optimizers.Adam(0.001),
loss=SCCEWithExtraOutput(),
# metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
)
model.fit(
ds_train,
epochs=1,
# validation_data=ds_test,
)
model.save("savedmodel")
for index, sample in enumerate(ds_train):
predicted_packed_tensor = model(sample[0])
print(predicted_packed_tensor.output_0.shape, predicted_packed_tensor.output_1.shape)
print(type(predicted_packed_tensor))
if index > 10:
break
# prove we can also load and infer the model in a completely new process
# notice that as the class PackedTensor does not exist in this process,
# the model now returns a tensorflow.python.framework.extension_type.AnonymousExtensionType
# with attributes "output_0" and "output_1".
import subprocess
script = """
import tensorflow as tf
import tensorflow_datasets as tfds
model = tf.saved_model.load("savedmodel")
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
return tf.cast(image, tf.float32) / 255., label
ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.batch(20)
for index, sample in enumerate(ds_train):
predicted = model(sample[0])
print(predicted.output_0.shape, predicted.output_1.shape)
print(type(predicted))
if index > 5:
break
"""
pipes = subprocess.Popen(["python3", "-c", script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out, std_err = pipes.communicate()
for line in std_out.decode().split("\n"):
print(line)
You could try the model.add_loss()
function. The idea is to construct your custom loss as a tensor instead of a function, add it to the model, and compile the model without further specifying a loss. See also this implementation of a variational autoencoder where a similar idea is used.
Example:
import keras.backend as K
from keras.layers import Input, Dense
from keras.models import Model
from keras.losses import mse
import numpy as np
# Some random training data
features = np.random.rand(100,20)
labels_1 = np.random.rand(100,4)
labels_2 = np.random.rand(100,1)
# Input layer, one hidden layer
input_layer = Input((20,))
dense_1 = Dense(128)(input_layer)
# Two outputs
output_1 = Dense(4)(dense_1)
output_2 = Dense(1)(dense_1)
# Two additional 'inputs' for the labels
label_layer_1 = Input((4,))
label_layer_2 = Input((1,))
# Instantiate model, pass label layers as inputs
model = Model(inputs=[input_layer, label_layer_1, label_layer_2], outputs=[output_1, output_2])
# Construct your custom loss as a tensor
loss = K.mean(mse(output_1, label_layer_1) * mse(output_2, label_layer_2))
# Add loss to model
model.add_loss(loss)
# Compile without specifying a loss
model.compile(optimizer='sgd')
dummy = np.zeros((100,))
model.fit([features, labels_1, labels_2], dummy, epochs=2)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With