Keras custom loss as a function of multiple outputs

I built a custom architecture with keras (a convnet). The network has 4 heads, each outputting a tensor of different size. I am trying to write a custom loss function as a function of this 4 outputs. I have been implementing cusutom losses before, but it was either a different loss for each head or the same loss for each head. In this case, I need to combine the 4 outputs to calculate the loss.

I am used to the following:

def custom_loss(y_true, y_pred):
    return something
model.compile(optimizer, loss=custom_loss)

but in my case, I would need y_pred to be a list of the 4 outputs. I can pad the outputs with zeros and add a concatenate layer in my model, but I was wondering if there was an easier way around.


My loss function is rather complex, can I write something like:

model.add_loss(custom_loss(input1, input2, output1, output2))

where custom loss is defined as:

def custom_loss(input1, input2, output1, output2):
    return loss
You could pack your outputs together in a tf.ExtensionType, and unpack it again in the loss function.

I made a Colab Notebook that demonstrates how to do this in tensorflow 2.8.0. (https://colab.research.google.com/drive/1MjlddizqFlezAUu5SOOW8svlnKQH4rog#scrollTo=pDMskk-86wFY)

Pros of using this approach vs add_loss():

  • No need to define "dummy" labels at inference time.
  • No need to define the loss within the model.
  • pretty nice


  • your model now outputs an object with the outputs as fields instead of the tensors directly (which is maybe a pro for your use case).
  • At the time of writing this answer, tf.ExtensionTypess don't work with Tensorflow Serving)

I'm adding the full code here, just in case I accidentally delete the Colab Notebook:

import tensorflow as tf
import tensorflow_datasets as tfds
# tf.__version__ should be >= 2.8.0

class PackedTensor(tf.experimental.BatchableExtensionType):
    __name__ = 'extension_type_colab.PackedTensor'

    output_0: tf.Tensor
    output_1: tf.Tensor

    # shape and dtype hold no meaning in this context, so we use a dummy
    # to stop Keras from complaining

    shape = property(lambda self: self.output_0.shape)
    dtype = property(lambda self: self.output_0.dtype)

    class Spec:

        def __init__(self, shape, dtype=tf.float32):
            self.output_0 = tf.TensorSpec(shape, dtype)
            self.output_1 = tf.TensorSpec(shape, dtype)

        # shape and dtype hold no meaning in this context, so we use a dummy
        # to stop Keras from complaining
        shape: tf.TensorShape = tf.constant(1.).shape 
        dtype: tf.DType = tf.constant(1.).dtype

# these two functions have no meaning, but need dummy implementations
# to stop Keras from complaining
def packed_shape(input: PackedTensor, out_type=tf.int32, name=None):
    return tf.shape(input.col_ids)

def packed_cast(x: PackedTensor, dtype: str, name=None):
    return x

class SCCEWithExtraOutput(tf.keras.losses.Loss):
    """ This custom loss function is designed for models with an PackedTensor as
    a single output, so with attributes outputs_0 and outputs_1. This loss will 
    train a model so that outputs_0 represent the predicted class of the input
    image, and outputs_1 will be trained to always be zero (as a dummy). 
    def __init__(self, *args, **kwargs):
        super(SCCEWithExtraOutput, self).__init__(*args, **kwargs)
        self.loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    def call(self, y_true, y_pred):
        output_0, output_1 = y_pred.output_0, y_pred.output_1
        scce = self.loss_fn(y_true, output_0)
        return scce + tf.abs(output_1)

# load the datasets
(ds_train, ds_test), ds_info = tfds.load(
    split=['train', 'test'],
def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label

ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(128)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(128)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)

# create a layer to combine to pack the outputs in a PackedTensor
class PackingLayer(tf.keras.layers.Layer):
  def call(self, inputs, training=None):
    first_output, second_output = inputs
    packed_output = PackedTensor(first_output, second_output)
    return packed_output

# define the model
# inputs -> flatten -> hidden -> Dense(10) -> PackingLayer() -> outputs
#                           |--> Dense(1)  ----^ 
inputs = tf.keras.Input(shape=(28, 28, 1), dtype=tf.float32)
flatten_layer = tf.keras.layers.Flatten()
hidden_layer = tf.keras.layers.Dense(128, activation='relu')
first_output_layer = tf.keras.layers.Dense(10)
second_output_layer = tf.keras.layers.Dense(1)
packing_layer = PackingLayer()

hidden = flatten_layer(inputs)
hidden = hidden_layer(hidden)
first_output = first_output_layer(hidden)
second_output = second_output_layer(hidden)
outputs = packing_layer((first_output, second_output))
model = tf.keras.Model(inputs=inputs, outputs=outputs)

    # metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],

    # validation_data=ds_test,

for index, sample in enumerate(ds_train):
  predicted_packed_tensor = model(sample[0])
  print(predicted_packed_tensor.output_0.shape, predicted_packed_tensor.output_1.shape)
  if index > 10:

# prove we can also load and infer the model in a completely new process
# notice that as the class PackedTensor does not exist in this process,
# the model now returns a tensorflow.python.framework.extension_type.AnonymousExtensionType
# with attributes "output_0" and "output_1".

import subprocess

script = """
import tensorflow as tf
import tensorflow_datasets as tfds
model = tf.saved_model.load("savedmodel")
(ds_train, ds_test), ds_info = tfds.load(
    split=['train', 'test'],
def normalize_img(image, label):
  return tf.cast(image, tf.float32) / 255., label

ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.batch(20)

for index, sample in enumerate(ds_train):
  predicted = model(sample[0])
  print(predicted.output_0.shape, predicted.output_1.shape)
  if index > 5:
pipes = subprocess.Popen(["python3", "-c", script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out, std_err = pipes.communicate()
for line in std_out.decode().split("\n"):

You could try the model.add_loss() function. The idea is to construct your custom loss as a tensor instead of a function, add it to the model, and compile the model without further specifying a loss. See also this implementation of a variational autoencoder where a similar idea is used.


import keras.backend as K
from keras.layers import Input, Dense
from keras.models import Model
from keras.losses import mse
import numpy as np

# Some random training data
features = np.random.rand(100,20)
labels_1 = np.random.rand(100,4)
labels_2 = np.random.rand(100,1)

# Input layer, one hidden layer
input_layer = Input((20,))
dense_1 = Dense(128)(input_layer)

# Two outputs
output_1 = Dense(4)(dense_1)
output_2 = Dense(1)(dense_1)

# Two additional 'inputs' for the labels
label_layer_1 = Input((4,))
label_layer_2 = Input((1,))

# Instantiate model, pass label layers as inputs
model = Model(inputs=[input_layer, label_layer_1, label_layer_2], outputs=[output_1, output_2])

# Construct your custom loss as a tensor
loss = K.mean(mse(output_1, label_layer_1) * mse(output_2, label_layer_2))

# Add loss to model

# Compile without specifying a loss

dummy = np.zeros((100,))
model.fit([features, labels_1, labels_2], dummy, epochs=2)
