Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to retrieve examples from multiple tfrecords in tensorflow while using initializable iterator

I have multiple tfrecord files named: Train_DE_01.tfrecords through Train_DE_34.tfrecords; and Devel_DE_01.tfrecords through Devel_DE_14.tfrecords. Hence, I have a training and a validation dataset. And My aim was to iterator over the examples of the tfrecords such that I retrieve 2 examples from Train_DE_01.tfrecords, 2 from Train_DE_02.tfrecords ... and 2 Train_DE_34.tfrecords. In other words, when the batch size is 68, I need 2 examples from each tfrecord file. I my code, I have used an initializable Iterator as follows:

# file_name: This is a place_holder that will contain the name of the files of the tfrecords.
def load_sewa_data(file_name, batch_size):

    with tf.name_scope('sewa_tf_records'):
        dataset = tf.data.TFRecordDataset(file_name).map(_parse_sewa_example).batch(batch_size)
        iterator = dataset.make_initializable_iterator(shared_name='sewa_iterator')

        next_batch = iterator.get_next()

        names, detected, arousal, valence, liking, istalkings, images = next_batch

        print(names, detected, arousal, valence, liking, istalkings, images)

        return names, detected, arousal, valence, liking, istalkings, images, iterator

After running the names through a session using sess.run(); I figured out that the first 68 example are being fetched from Train_DE_01.tfrecords; then, subsequent examples are fetched from the same tfrecord until all the examples in the Train_DE_01.tfrecords are being consumed.

I have tried using the zip() function of Dataset api with the reinitializable iterator as follows:

def load_devel_sewa_tfrecords(filenames_dev, test_batch_size):

    datasets_dev_iterators = []

    with tf.name_scope('TFRecordsDevel'):
        for file_name in filenames_dev:
            dataset_dev = tf.data.TFRecordDataset(file_name).map(_parse_devel_function).batch(test_batch_size)
            datasets_dev_iterators.append(dataset_dev)

        dataset_dev_all = tf.data.Dataset.zip(tuple(datasets_dev_iterators))
        return dataset_dev_all


def load_train_sewa_tfrecords(filenames_train, train_batch_size):
    datasets_train_iterators = []

    with tf.name_scope('TFRecordsTrain'):
        for file_name in filenames_train:
            dataset_train = tf.data.TFRecordDataset(file_name).map(_parse_train_function).batch(train_batch_size)
            datasets_train_iterators.append(dataset_train)

        dataset_train_all = tf.data.Dataset.zip(tuple(datasets_train_iterators))

        return dataset_train_all


def load_sewa_dataset(filenames_train, train_batch_size, filenames_dev, test_batch_size):
    dataset_train_all = load_train_sewa_tfrecords(filenames_train, train_batch_size)
    dataset_dev_all = load_devel_sewa_tfrecords(filenames_dev, test_batch_size)

    iterator = tf.data.Iterator.from_structure(dataset_train_all.output_types,
                                               dataset_train_all.output_shapes)

    training_init_op = iterator.make_initializer(dataset_train_all)
    validation_init_op = iterator.make_initializer(dataset_dev_all)

    with tf.name_scope('inputs'):
        next_batch = iterator.get_next(name='next_batch')
        names = []
        detected = []
        arousal = []
        valence = []
        liking = []
        istalkings = []
        images = []

        # len(next_batch) is 34.
        # len(n) is 7. Since we are extracting: name, detected, arousal, valence, liking, istalking and images...
        # len(n[0 or 1 or 2 or ... or 6]) = is batch size.
        for n in next_batch:

            names.append(n[0])
            detected.append(n[1])
            arousal.append(n[2])
            valence.append(n[3])
            liking.append(n[4])
            istalkings.append(n[5])
            images.append(n[6])

        names = tf.concat(names, axis=0, name='names')
        detected = tf.concat(detected, axis=0, name='detected')
        arousal = tf.concat(arousal, axis=0, name='arousal')
        valence = tf.concat(valence, axis=0, name='valence')
        liking = tf.concat(liking, axis=0, name='liking')
        istalkings = tf.concat(istalkings, axis=0, name='istalkings')
        images = tf.concat(images, axis=0, name='images')

        return names, detected, arousal, valence, liking, istalkings, images, training_init_op, validation_init_op

Now if I try the following:

sess = tf.Session()
sess.run(training_init_op)
print(sess.run(names))

I got the following error:

ValueError: The two structures don't have the same number of elements.

which makes sense because the number of training files is 34 while that for validation dataset is 14.

I would like to know how can I achieve the goal in mind?

Any help is much appreciated!!

like image 618
I. A Avatar asked Oct 16 '22 08:10

I. A


1 Answers

Here is the work around that I found using the tf.cond.

In order to retrieve 2 examples from each tfrecord; I used the zip method of the tf.Dataset.data api as follows:

def load_train_sewa_tfrecords(filenames_train, train_batch_size):
    datasets_train_iterators = []

    with tf.name_scope('TFRecordsTrain'):
        for file_name in filenames_train:
            dataset_train = tf.data.TFRecordDataset(file_name).map(_parse_train_function).batch(train_batch_size)
            datasets_train_iterators.append(dataset_train)

        dataset_train_all = tf.data.Dataset.zip(tuple(datasets_train_iterators))
        iterator_train_all = dataset_train_all.make_initializable_iterator()

    with tf.name_scope('inputs_train'):
        next_batch = iterator_train_all.get_next(name='next_batch')

        names = []
        detected = []
        arousal = []
        valence = []
        liking = []
        istalkings = []
        images = []

        # len(next_batch) is 34.
        # len(n) is 7. Since we are extracting: name, detected, arousal, valence, liking, istalking and images...
        # len(n[0 or 1 or 2 or ... or 6]) = is batch size.
        for n in next_batch:

            names.append(n[0])
            detected.append(n[1])
            arousal.append(n[2])
            valence.append(n[3])
            liking.append(n[4])
            istalkings.append(n[5])
            images.append(n[6])

        names = tf.concat(names, axis=0, name='names')
        detected = tf.concat(detected, axis=0, name='detected')
        arousal = tf.concat(arousal, axis=0, name='arousal')
        valence = tf.concat(valence, axis=0, name='valence')
        liking = tf.concat(liking, axis=0, name='liking')
        istalkings = tf.concat(istalkings, axis=0, name='istalkings')
        images = tf.concat(images, axis=0, name='images')

        return names, detected, arousal, valence, liking, istalkings, images, iterator_train_all

I am going to have a similar method for the development; or I can change the passing parameters to the method so that I can use the same method twice... (Not the issue).

Then:

names_dev, detected_dev, arousal_dev, valence_dev, liking_dev, istalkings_dev, images_dev, iterator_dev_all = \
    load_devel_sewa_tfrecords(filenames_dev, sewa_batch_size)

names_train, detected_train, arousal_train, valence_train, liking_train, istalkings_train, images_train, iterator_train_all = \
    load_train_sewa_tfrecords(filenames_train, sewa_batch_size)

images_train = pre_process_sewa_images(images_train)
images_dev = pre_process_sewa_images(images_dev)


def return_train_sewa():
    return names_train, detected_train, arousal_train, valence_train, liking_train, istalkings_train, images_train


def return_dev_sewa():
    return names_dev, detected_dev, arousal_dev, valence_dev, liking_dev, istalkings_dev, images_dev


names, detected, arousal, valence, liking, istalkings, images_sewa = tf.cond(phase_train, return_train_sewa, return_dev_sewa)

sewa_inputs = []

sess = tf.Session()

import numpy as np
for e in range(epochs):
    sess.run(iterator_train_all.initializer)
    sess.run(iterator_dev_all.initializer)

    i = 0
    total = 0

    try:
        while True:
            i += 1
            names_np, detected_np, arousal_np, valence_np, liking_np, istalkings_np = \
                sess.run([names, detected, arousal, valence, liking, istalkings], feed_dict={phase_train: True})
            total += np.shape(names_np)[0]
            print("total =", total, " | i =", i)
    except:
        print("end of train...")

    i_d = 0
    total_d = 0

    sess.run(iterator_train_all.initializer)
    sess.run(iterator_dev_all.initializer)
    try:
        while True:
            i_d += 1
            names_np, detected_np, arousal_np, valence_np, liking_np, istalkings_np = \
                sess.run([names, detected, arousal, valence, liking, istalkings], feed_dict={phase_train: False})
            total_d += np.shape(names_np)[0]
            print("total_d =", total_d, " | i_d =", i_d)
            print(names_np)
    except:
        print("End of devel")

Note that it is mandatory to run both initializes sess.run(iterator_train_all.initializer) and sess.run(iterator_dev_all.initializer) before sess.run([names....]) since I guess with tf.cond; both the training and the validation examples will be retrieved, except that, the tf.cond will return only one of them based on the phase_train place_holder which will determine whether we are in the training or testing mode.

The proof of that: when I inserted names = tf.Print(input_=[names], data=[names], message='dev names') under load_devel_sewa_tfrecords; before the return; I got:

dev names[\'Devel_01\' \'Devel_01\' \'Devel_02\'...]

printed out in the console.i.e, while evaluating the training dataset; tensorflow was evaluating the devel dataset at the same time; but the tf.cond outputed the tfrecords related to the training dataset.

Hope this answer helps!!

like image 186
I. A Avatar answered Oct 21 '22 05:10

I. A