I am currently studying LSTM. I found a code from a book to predict sin&cos mixed curve. However, I'm stuck in its inference function.
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import os
LOG_DIR = os.path.join(os.path.dirname(__file__), "log")
if os.path.exists(LOG_DIR) is False:
os.mkdir(LOG_DIR)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
def inference(x, n_batch, maxlen=None, n_hidden=None, n_out=None):
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.01)
return tf.Variable(initial)
def bias_variable(shape):
initial = tf.zeros(shape, dtype=tf.float32)
return tf.Variable(initial)
cell = tf.contrib.rnn.BasicLSTMCell(n_hidden)
initial_state = cell.zero_state(n_batch, tf.float32)
state = initial_state
outputs = []
with tf.variable_scope('LSTM'):
for t in range(maxlen):
if t > 0:
tf.get_variable_scope().reuse_variables()
(cell_output, state) = cell(x[:, t, :], state)
outputs.append(cell_output)
output = outputs[-1]
V = weight_variable([n_hidden, n_out])
c = bias_variable([n_out])
y = tf.matmul(output, V) + c #
return y
def loss(y, t):
mse = tf.reduce_mean(tf.square(y - t))
return mse
def training(loss):
optimizer = tf.train.AdamOptimizer(learning_rate=0.001, beta1=0.9, beta2=0.999)
train_step = optimizer.minimize(loss)
return train_step
class EarlyStopping():
def __init__(self, patience=0, verbose=0):
self._step = 0
self._loss = float('inf')
self.patience = patience
self.verbose = verbose
def validate(self, loss):
if self._loss < loss:
self._step += 1
if self._step > self.patience:
if self.verbose:
print('early stopping')
return True
else:
self._step = 0
self._loss = loss
return False
def sin(x, T):
return np.sin(12.0 * np.pi * x / T)
def cos(x, T):
return np.cos(17.0 * np.pi * x / T) / 3
def toy_problem(x, T, ampl=0.05):
x = np.arange(0, 2 * T + 1)
noise = ampl * np.random.uniform(low=-1.0, high=1.0, size=len(x))
return sin(x, T) + cos(x, T) + noise
if __name__ == '__main__':
T = 500
x = np.arange(0, 2 * T + 1)
length_of_sequence = 2 * T
maxlen = 300 # length of one sequential data set
f = toy_problem(x, T)
data = []
target = []
for i in range(0, length_of_sequence - maxlen + 1):
data.append(f[i:i + maxlen]) # e.g, [1,2,3,4]
target.append(f[i + maxlen]) # [5] <- next value
X = np.array(data).reshape(len(data), maxlen, 1) # store sequential data & coordinate num of dimension
Y = np.array(target).reshape(len(data), 1) # store target data & coordinate num of dimension
"""
divide training data and validation data
"""
N_train = int(len(data) * 0.9)
N_validation = len(data) - N_train
X_train, X_validation, Y_train, Y_validation = \
train_test_split(X, Y, test_size=N_validation)
# data distribution function
"""
Model Configuration
"""
n_in = len(X[0][0]) # 1
n_hidden = 30 # num of units(not layers) in a hidden layer
n_out = len(Y[0]) # 1
x = tf.placeholder(tf.float32, shape=[None, maxlen, n_in])
t = tf.placeholder(tf.float32, shape=[None, n_out])
n_batch = tf.placeholder(tf.int32, shape=[])
y = inference(x, n_batch, maxlen=maxlen, n_hidden=n_hidden, n_out=n_out)
loss = loss(y, t)
train_step = training(loss)
early_stopping = EarlyStopping(patience=10, verbose=1) # create instance of EarlyStopping
history = {
'val_loss': []
}
"""
Model Learning
"""
epochs = 500 # num of times to train NN using a whole training data set
batch_size = 10 # size which divide a whole training data set
init = tf.global_variables_initializer() # initialization
sess = tf.Session() # instance of Session func
tf.summary.FileWriter(LOG_DIR, sess.graph) # create log file to show in Tensor Board
summary_op = tf.summary.merge_all()
summary_writer = tf.summary.FileWriter("./log/RNN2/", sess.graph_def)
sess.run(init) # run a graph
n_batches = N_train // batch_size # num of batches
for epoch in range(epochs):
X_, Y_ = shuffle(X_train, Y_train) # we need to shuffle training data to train NN well
for i in range(n_batches): # train with mini batch method
start = i * batch_size
end = start + batch_size
sess.run(train_step, feed_dict={
# Training with "batch size" number of sequential data
x: X_[start:end],
t: Y_[start:end],
n_batch: batch_size
}) # after training, the NN parameters keep remaining (I guess)
# after one epoch,compute the loss
val_loss = loss.eval(session=sess, feed_dict={
# .eval() returns values
x: X_validation,
t: Y_validation,
n_batch: N_validation
})
history['val_loss'].append(val_loss)
print('epoch:', epoch, ' validation loss:', val_loss)
if early_stopping.validate(val_loss):
break
'''
Prediction with output
'''
truncate = maxlen
Z = X[:1] # get the first part of original data
print "Z", Z
original = [f[i] for i in range(maxlen)]
predicted = [None for i in range(maxlen)]
# predict the future from the last sequential data
z_ = Z[-1:]
for i in range(length_of_sequence - maxlen + 1):
y_ = y.eval(session=sess, feed_dict={
x: z_,
n_batch: 1
})
# compute new sequential data using prediction output which is created by trained network
sequence_ = np.concatenate((z_.reshape(maxlen, n_in)[1:], y_), axis=0).reshape(1, maxlen, n_in)
z_ = sequence_
predicted.append(y_.reshape(-1))
'''
Visualization
'''
plt.rc('font', family='serif')
plt.figure()
plt.ylim([-2.0, 2.0])
# plt.plot(sin(x,T), linestyle='dotted', color='#aaaaaa')
# plt.plot(cos(x,T), linestyle='dotted', color='#aaaaaa')
plt.plot(toy_problem(x, T), linestyle='dotted', color='#aaaaaa')
plt.plot(original, color='black')
plt.plot(predicted, color='red')
plt.show()
In inference function, we have
cell = tf.contrib.rnn.BasicLSTMCell(n_hidden)
initial_state = cell.zero_state(n_batch, tf.float32)
state = initial_state
outputs = []
with tf.variable_scope('LSTM'):
for t in range(maxlen):
if t > 0:
tf.get_variable_scope().reuse_variables()
(cell_output, state) = cell(x[:, t, :], state)
outputs.append(cell_output)
output = outputs[-1]
But I have no idea what cell = tf.contrib.rnn.BasicLSTMCell(n_hidden)
and (cell_output, state) = cell(x[:, t, :], state)
does.
I'm trying to understand but if anybody has a clue.
This is standard code of using the RNN utilities of Tensorflow. You can read more here.
Shortly,
cell = tf.contrib.rnn.BasicLSTMCell(n_hidden)
creates a LSTM layer and instantiates variables for all gates.
(cell_output, state) = cell(x[:, t, :], state)
is the effective run of the layer providing as input sequence each element of the dimension 1 of the Tensor x
(i.e. x[:, t, :]
). In other words, x is a 3-dimensional Tensor and it is sliced along the dimension 1 (columns), each slice is an element of the sequence given as input to the LSTM layer.
cell_output
is the output of the layer for each slice
outputs
contains maxlen
values, and they are the outputs of the layer for each element (x[:, t, :]
) of the input sequence.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With