I want to create a Tensorflow neural network model using the Functional API, but I'm not sure how to separate the input into two. I wanted to do something like: given an input, its first half goes to the first part of the neural network, its second half goes to the second part, and each input is passed through the layers until they concatenate, go through another layer and finally reach the output. I thought of something like the snippet of code below, along with a quick sketch.
from tensorflow.keras.layers import Dense
def define_model(self):
input1 = tf.keras.Input(shape=(4,)) #input is a 1D vector containing 7 elements, split as 4 and 3
input2 = tf.keras.Input(shape=(3,))
layer1_1 = Dense(4, activation=tf.nn.leaky_relu)(input1)
layer2_1 = Dense(4, activation=tf.nn.leaky_relu)(layer1_1)
layer1_2 = Dense(4, activation=tf.nn.leaky_relu)(input2)
layer2_2 = Dense(3, activation=tf.nn.leaky_relu)(layer1_2)
concat_layer = tf.keras.concatenate([layer2_1,layer2_2], axis = 0)
layer3 = Dense(6, activation=tf.nn.leaky_relu)(concat_layer)
output = Dense(4)(layer3) #no activation
self.model = tf.keras.Model(inputs = [input1,input2],outputs = output)
self.model.compile(loss = 'mean_squared_error', optimizer = 'rmsprop')
return self.model
First of all, should I add any Dropout or BatchNormalization layers in this model?
Also, the first 4 elements of the input array are binary (like [1,0,0,1] or [0,1,1,1]), while the other 3 can be any real number. Should I treat the 1st "column" of the neural network differently than the 2nd one, given that the first operates with inputs in the 0<x<1 range, while the 2nd one doesn't?
It sounds right, but I can't really test if it should work or not, as I would have to rework A LOT of the code to generate enough data to train it. Am I going in the right direction or should I be doing something different? Would this code work at all?
EDIT: I'm having issues during training. Suppose that I want to train the model like this (the values don't matter all that much, what's important is the data type):
#this snippet generates training data - nothing real, just test examples. Also, I changed the output layer from 4 elements to just 1 to test it.
A1=[np.array([[1.,0,0,1]]),np.array([[0,1.,0]])]
B1=np.array([7])
c=np.array([[5,-4,1,-1],[2,3,-1]], dtype = object)
A2 = [[np.random.randint(2, size= [1,4]),np.random.randint(2, size= [1,3])] for i in range(1000)]
B2 = np.array([np.sum(A[i][0]*c[0])+np.sum(A[i][1]*c[1]) for i in range(1000)])
model.fit(A1,B1, epochs = 50, verbose=False) #this works!
model.fit(A2,B2, epochs = 50, verbose=False) #but this doesn't.
FINAL EDIT: here are the predict() and predict_on_batch() functions.
def predict(a,b):
pred = m.predict([a,b])
return pred
def predict_b(c,d):
preds = m.predict_on_batch([c,d])
return preds
#a, b, c and d must look like this:
a = [np.array([0,1,0,1])]
b = [np.array([0,0,1])]
c = [np.array([1, 0, 0, 1]),
np.array([0, 1, 1, 1]),
np.array([0, 1, 0, 0]),
np.array([1, 0, 0, 0]),
np.array([0, 0, 1, 0])]
d = [np.array([1, 0, 1]),
np.array([0, 0, 1]),
np.array([0, 1, 1]),
np.array([1, 1, 1]),
np.array([0, 0, 0])]
#notice that all of those should follow the same pattern, which is a list of arrays.
The rest of the code is under M. Innat's answer.
Keras is able to handle multiple inputs (and even multiple outputs) via its functional API. Learn more about 3 ways to create a Keras model with TensorFlow 2.0 (Sequential, Functional, and Model Subclassing).
A Sequential model is appropriate for a plain stack of layers where each layer has exactly one input tensor and one output tensor. Schematically, the following Sequential model: # Define Sequential model with 3 layers. model = keras.
InputLayer. Layer to be used as an entry point into a Network (a graph of layers).
There is some issue with your code. I will try to answer the main issue here and discard some side questions such as whether you should use Dropout
or BatchNormalization
layers in your model or not because that's totally out of the scope of your main question and also irrelevant.
If you try to build your model, using m = define_model()
, I'm pretty sure you will encounter the following error:
layer2_1 = Dense(4, activation=tf.nn.leaky_relu)(layer1_1)
layer2_2 = Dense(3, activation=tf.nn.leaky_relu)(layer1_2)
concat_layer = tf.keras.layers.concatenate([layer2_1, layer2_2], axis = 0)
ValueError: A `Concatenate` layer requires inputs with matching shapes
except for the concat axis. Got inputs shapes: [(None, 4), (None, 3)]
The correct axis
should be by default -1
or 1
but not 0
while concatenating nonidentical shape (e.g. Dense(4)
, Dense(3)
). You either set the same output shape which is Dense(3)
to Dense(4)
or set axis = 1
. Let's pick one (according to your draw):
from tensorflow.keras import Input
from tensorflow.keras import layers
def define_model():
input1 = Input(shape=(4,))
input2 = Input(shape=(3,))
layer1_1 = layers.Dense(4, activation=tf.nn.leaky_relu)(input1)
layer2_1 = layers.Dense(4, activation=tf.nn.leaky_relu)(layer1_1)
layer1_2 = layers.Dense(4, activation=tf.nn.leaky_relu)(input2)
layer2_2 = layers.Dense(3, activation=tf.nn.leaky_relu)(layer1_2)
concat_layer = layers.concatenate([layer2_1, layer2_2], axis = 1)
layer3 = layers.Dense(6, activation=tf.nn.leaky_relu)(concat_layer)
output = layers.Dense(4)(layer3)
model = tf.keras.Model(inputs = [input1,input2],outputs = output)
model.compile(loss = 'mean_squared_error', optimizer = 'rmsprop')
return model
m = define_model()
More details, see the output shape for choosing axis
param:
x1 = tf.keras.layers.Dense(8)(np.arange(10).reshape(5, 2))
x2 = tf.keras.layers.Dense(8)(np.arange(10, 20).reshape(5, 2))
print(x1.shape, x2.shape)
# (5, 8) (5, 8)
# using axis = 0
concatted = tf.keras.layers.Concatenate(axis=0)([x1, x2])
concatted.shape
# TensorShape([10, 8])
# using axis = 1
concatted = tf.keras.layers.Concatenate(axis=1)([x1, x2])
concatted.shape
# TensorShape([5, 16])
Test Model
The input that works for you:
A1_i = np.array([[1.,0, 0,1]])
A1_j = np.array([[0, 1., 0]])
B1 = np.array([4])
print(type(A1_i), type(A1_j), type(B1))
print(A1_i.shape, A1_j.shape, B1.shape)
m.fit([A1_i, A1_j], B1, epochs = 2, verbose=2)
<class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.ndarray'>
(1, 4) (1, 3) (1,)
Epoch 1/2
584ms/step - loss: 15.9902
Epoch 2/2
4ms/step - loss: 15.8900
<tensorflow.python.keras.callbacks.History at 0x7fb1b484b890>
The other parts that didn't work for you have several issues, first model input should be a numpy
array, not a list
. The second issue came from the modeling part, which concatenates. However, the correct way should be as follows:
c = np.array([[5,-4,1,-1],[2,3,-1]], dtype = object)
A2_i = np.random.randint(10, size = [100,4])
A2_j = np.random.randint(10, size = [100,3])
B2 = np.array( [np.sum(A2_i[i][0]*c[0]) +
np.sum(A2_j[i][1]*c[1]) for i in range(100)])
print(type(A2_i), type(A2_j), type(B2))
print(A2_i.shape, A2_j.shape, B2.shape)
m.fit([A2_i, A2_j], B2, epochs = 2, verbose=2)
<class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.ndarray'>
(100, 4) (100, 3) (100,)
Epoch 1/2
4ms/step - loss: 683.9537
Epoch 2/2
4ms/step - loss: 681.0673
<tensorflow.python.keras.callbacks.History at 0x7fb1600a8d50>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With