Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What does BasicLSTMCell do?

I am currently studying LSTM. I found a code from a book to predict sin&cos mixed curve. However, I'm stuck in its inference function.

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import os


LOG_DIR = os.path.join(os.path.dirname(__file__), "log")
if os.path.exists(LOG_DIR) is False:
    os.mkdir(LOG_DIR)

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'


def inference(x, n_batch, maxlen=None, n_hidden=None, n_out=None):
    def weight_variable(shape):
        initial = tf.truncated_normal(shape, stddev=0.01)
        return tf.Variable(initial)

    def bias_variable(shape):
        initial = tf.zeros(shape, dtype=tf.float32)
        return tf.Variable(initial)

    cell = tf.contrib.rnn.BasicLSTMCell(n_hidden)
    initial_state = cell.zero_state(n_batch, tf.float32)

    state = initial_state
    outputs = []
    with tf.variable_scope('LSTM'):
        for t in range(maxlen):
            if t > 0:
                tf.get_variable_scope().reuse_variables()
            (cell_output, state) = cell(x[:, t, :], state)
            outputs.append(cell_output)

    output = outputs[-1]

    V = weight_variable([n_hidden, n_out])
    c = bias_variable([n_out])
    y = tf.matmul(output, V) + c  #

    return y


def loss(y, t):
    mse = tf.reduce_mean(tf.square(y - t))
    return mse


def training(loss):
    optimizer = tf.train.AdamOptimizer(learning_rate=0.001, beta1=0.9, beta2=0.999)
    train_step = optimizer.minimize(loss)
    return train_step


class EarlyStopping():
    def __init__(self, patience=0, verbose=0):
        self._step = 0
        self._loss = float('inf')
        self.patience = patience
        self.verbose = verbose

    def validate(self, loss):
        if self._loss < loss:
            self._step += 1
            if self._step > self.patience:
                if self.verbose:
                    print('early stopping')
                return True
        else:
            self._step = 0
            self._loss = loss

        return False


def sin(x, T):
    return np.sin(12.0 * np.pi * x / T)


def cos(x, T):
    return np.cos(17.0 * np.pi * x / T) / 3


def toy_problem(x, T, ampl=0.05):
    x = np.arange(0, 2 * T + 1)
    noise = ampl * np.random.uniform(low=-1.0, high=1.0, size=len(x))
    return sin(x, T) + cos(x, T) + noise


if __name__ == '__main__':
    T = 500
    x = np.arange(0, 2 * T + 1)
    length_of_sequence = 2 * T
    maxlen = 300  # length of one sequential data set
    f = toy_problem(x, T)

    data = []
    target = []

    for i in range(0, length_of_sequence - maxlen + 1):
        data.append(f[i:i + maxlen])  # e.g, [1,2,3,4]
        target.append(f[i + maxlen])  # [5] <- next value

    X = np.array(data).reshape(len(data), maxlen, 1)  # store sequential data & coordinate num of dimension
    Y = np.array(target).reshape(len(data), 1)  # store target data & coordinate num of dimension

    """ 
    divide training data and validation data
    """
    N_train = int(len(data) * 0.9)
    N_validation = len(data) - N_train

    X_train, X_validation, Y_train, Y_validation = \
        train_test_split(X, Y, test_size=N_validation)
    # data distribution function



    """ 
    Model Configuration
    """
    n_in = len(X[0][0])  # 1
    n_hidden = 30  # num of units(not layers) in a hidden layer
    n_out = len(Y[0])  # 1

    x = tf.placeholder(tf.float32, shape=[None, maxlen, n_in])
    t = tf.placeholder(tf.float32, shape=[None, n_out])
    n_batch = tf.placeholder(tf.int32, shape=[])

    y = inference(x, n_batch, maxlen=maxlen, n_hidden=n_hidden, n_out=n_out)
    loss = loss(y, t)
    train_step = training(loss)

    early_stopping = EarlyStopping(patience=10, verbose=1)  # create instance of EarlyStopping
    history = {
        'val_loss': []
    }

    """ 
    Model Learning
    """
    epochs = 500  # num of times to train NN using a whole training data set
    batch_size = 10  # size which divide a whole training data set

    init = tf.global_variables_initializer()  # initialization
    sess = tf.Session()  # instance of Session func
    tf.summary.FileWriter(LOG_DIR, sess.graph)  # create log file to show in Tensor Board
    summary_op = tf.summary.merge_all()
    summary_writer = tf.summary.FileWriter("./log/RNN2/", sess.graph_def)
    sess.run(init)  # run a graph

    n_batches = N_train // batch_size  # num of batches

    for epoch in range(epochs):
        X_, Y_ = shuffle(X_train, Y_train)  # we need to shuffle training data to train NN well

        for i in range(n_batches):  # train with mini batch method
            start = i * batch_size
            end = start + batch_size

            sess.run(train_step, feed_dict={
                # Training with "batch size" number of sequential data
                x: X_[start:end],
                t: Y_[start:end],
                n_batch: batch_size
            })  # after training, the NN parameters keep remaining (I guess)

        # after one epoch,compute the loss
        val_loss = loss.eval(session=sess, feed_dict={
            # .eval() returns values
            x: X_validation,
            t: Y_validation,
            n_batch: N_validation
        })
        history['val_loss'].append(val_loss)
        print('epoch:', epoch, ' validation loss:', val_loss)

        if early_stopping.validate(val_loss):
            break

    '''
    Prediction with output
    '''
    truncate = maxlen
    Z = X[:1]  # get the first part of original data
    print "Z", Z

    original = [f[i] for i in range(maxlen)]
    predicted = [None for i in range(maxlen)]

    # predict the future from the last sequential data
    z_ = Z[-1:]
    for i in range(length_of_sequence - maxlen + 1):
        y_ = y.eval(session=sess, feed_dict={
            x: z_,
            n_batch: 1
        })
        # compute new sequential data using prediction output which is created by trained network
        sequence_ = np.concatenate((z_.reshape(maxlen, n_in)[1:], y_), axis=0).reshape(1, maxlen, n_in)
        z_ = sequence_
        predicted.append(y_.reshape(-1))

    '''
    Visualization
    '''
    plt.rc('font', family='serif')
    plt.figure()
    plt.ylim([-2.0, 2.0])
    # plt.plot(sin(x,T), linestyle='dotted', color='#aaaaaa')
    # plt.plot(cos(x,T), linestyle='dotted', color='#aaaaaa')
    plt.plot(toy_problem(x, T), linestyle='dotted', color='#aaaaaa')
    plt.plot(original, color='black')
    plt.plot(predicted, color='red')
    plt.show()

In inference function, we have

cell = tf.contrib.rnn.BasicLSTMCell(n_hidden)
initial_state = cell.zero_state(n_batch, tf.float32)

state = initial_state
outputs = []
with tf.variable_scope('LSTM'):
    for t in range(maxlen):
        if t > 0:
            tf.get_variable_scope().reuse_variables()
        (cell_output, state) = cell(x[:, t, :], state)
        outputs.append(cell_output)

output = outputs[-1]

But I have no idea what cell = tf.contrib.rnn.BasicLSTMCell(n_hidden) and (cell_output, state) = cell(x[:, t, :], state) does.

I'm trying to understand but if anybody has a clue.

like image 640
soshi shimada Avatar asked Sep 09 '17 20:09

soshi shimada


1 Answers

This is standard code of using the RNN utilities of Tensorflow. You can read more here.

Shortly,

  1. cell = tf.contrib.rnn.BasicLSTMCell(n_hidden) creates a LSTM layer and instantiates variables for all gates.

  2. (cell_output, state) = cell(x[:, t, :], state) is the effective run of the layer providing as input sequence each element of the dimension 1 of the Tensor x (i.e. x[:, t, :]). In other words, x is a 3-dimensional Tensor and it is sliced along the dimension 1 (columns), each slice is an element of the sequence given as input to the LSTM layer.

  3. cell_output is the output of the layer for each slice

  4. outputs contains maxlen values, and they are the outputs of the layer for each element (x[:, t, :]) of the input sequence.

like image 191
Giuseppe Marra Avatar answered Oct 14 '22 13:10

Giuseppe Marra