Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TensorFlow: How can I evaluate a validation data queue multiple times during training?

tl;dr

How can I evaluate a validation set after every K training iterations, using separate queues for training and validation data, without resorting to separate tf.Sessions in multiple processes? There doesn't seem to be a clean way to achieve this, given my particular problem, and my current workaround (which I thought would work) gives me undefined behavior. Help!

The whole story

I want to evaluate a validation set every K training iterations, and I cannot figure out how to implement this properly in TensorFlow. This should be one of the most common operations, yet it feels that TensorFlow's API/architecture is working against me here or is at least making things unnecessarily difficult.

My assumptions are:

  • [A1] The multi-process model for training/validation as described here https://www.tensorflow.org/how_tos/reading_data/#multiple_input_pipelines is not applicable to my problem, as I have to assume there is not enough GPU memory available to load the variables twice.
  • [A2] I want to evaluate on the validation set every K training iterations.
  • [A3] Both training and validation data cannot be simply read from disk, but are generated on-the-fly. This makes it impossible to reliably pre-compute the size of the validation set in advance.
  • [A4] The validation set is too large to pre-compute and store onto disk.
  • [A5] The effective validation set size is not necessarily a multiple of the batch size.

The training input pipeline is set up as follows:

  • A tf.train.slice_input_producer() generates a (shuffled) list of filenames, each referring to raw input data.
  • A custom data generation function generates a variable number of training exemplars/labels from each chunk of raw input data.
  • The generated training exemplars/labels are queued via tf.train.shuffle_batch() before being fed into the network.

Due to [A3], [A4], [A5], the validation input pipeline is set up in an almost identical way, except that the final input queue is generated via tf.train.batch(), since shuffling is not desirable. Due to the above assumptions, a feed_dict based approach is also infeasible, and also seemingly incompatible with using a higher level function such as tf.train.batch.

However, a straightforward implementation using two different sets of queues for training and validation does not work. As far as I understand, I have two options:

  • [B1] Set the num_epochs argument of the validation tf.train.slice_input_producer to None.

    In this case, the validation set is cycled through endlessly, but I would need to know the size of the validation set in advance to explicitly limit the number of batches to evaluate per run through the validation set. Furthermore, if the validation set size is not divisible by the batch size, I will always pull a bit more in the last batch. As this would shift the order of evaluation of the validation data each time, this is not acceptable.

  • [B2] Set the num_epochs argument of the validation tf.train.slice_input_producer to 1, and additionally set the allow_smaller_final_batch argument of the tf.train.batch function to True.

    In this case, the validation set is cycled through exactly once, after which the respective queue is closed forever. By default, this will make evaluating the validation set two or more times impossible. Since I do not know of a good way to reopen a queue in TensorFlow, I need to work around this limitation.

Due to the greater limitations of option [B1], I chose to work around the issues of option [B2] instead. The (pseudo-)code outlining my current approach is as follows:

The training loop should be fairly canonical. Every K iterations, a function to evaluate the validation set is called. Note that I only start the queues that have a name starting with "train_"; these is the queue set up for collecting generated training data. In order to do this, I created two helper functions, get_queues_by_name and start_queue_runners.

def train_loop(train_ops, vali_ops, ...):
    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_all_variables(), tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("train")
        threads = start_queue_runners(sess, coord, queues)

        try:
            for step in range(start_iteration, num_train_iterations):
                # Runs the session on validation set
                if step % K == 0:
                    validation_results = run_validation(vali_ops, snapshot_file)

                # TRAINING:
                # ...

        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)

The helper functions look like this:

def get_queues_by_name(name):
    """Retrieves all queues that contain the string given by 'name'"""
    all_queues = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
    return [q for q in all_queues if name in q.name]


def start_queue_runners(session, coordinator, queues):
    """Similar to tf.train.start_queue_runners but now accepts a list of queues instead of a graph collection"""
    with session.graph.as_default():
        threads = []
        for queue in queues:
            log("Queue", "Starting queue '%s'" % queue.name, level=2)
            threads.extend(queue.create_threads(session, coordinator, daemon=True, start=True))
    return threads

In the run_validation function, my chosen workaround against the issue of a closed queue is to create a new tf.Session. I also only start the threads associated with the queue collecting validation set data.

def run_validation(ops, snapshot_file):  # Called inside train_loop()
    results = None
    loader = tf.train.Saver()

    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("eval")
        threads = start_queue_runners(sess, coord, queues)

        # Performs the inference in batches
        try:
            # Evaluate validation set:
            results = eval_in_batches(ops, sess)
        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)

    return results

I do not know whether creating a new tf.Session here is a good idea, but it seems like the only way to accomplish restarting the validation queue. Ideally, I also wouldn't want to re-load the model snapshot, as this seems conceptually unnecessary.

The issue with this code is that I see erratic/undefined behavior during running, such as NaN's or Inf's appearing inside the network during validation set evaluation. This seems to occur predominantly when the validation set queue is being filled at the same time as the training set queue is still being filled (since the training queue is open during validation set evaluation). For example, this very often happens if I evaluate the validation set at iteration 0 (when both queues still need to be filled). It almost seems as if the training/validation queues share some global state, although they are running in a different session.

Can someone explain why this is happening, and how I can solve this more cleanly while taking my above assumptions [A1]-[A5] into account?

like image 635
kmhofmann Avatar asked Dec 16 '16 15:12

kmhofmann


1 Answers

I'm currently facing a similar problem. So far I avoided any queues at all and just feeding in the data via the feed_dict but I'm obviously loosing some performance by not using queues and parallelism (although I'm still happy with the current speed as I did the same in Theano earlier). Now I want to redesign this and use queues and stumbled upon this problem. There are this, this, this related issues.

I'm currently thinking about doing it this way:

  • In training, I want to use a RandomShuffleQueue which makes it even more complicated. I think I will just ignore the problem and once the reader thread which enqueues tensors into the queue finishes, I will let the training stop, so I loose the remaining up-to capacity items for this epoch and just use it for the next epoch. Maybe to make it deterministic I check in the train-thread that I still read from the queue until there are only min_after_dequeue items left.

  • In evaluation, I want to use the same graph and the same session. I can use tf.cond to read from another separate queue instead of the RandomShuffleQueue. Or I could use feed_dict in evaluation. If I would use a separate queue, I would use a FIFOQueue and carefully track that I do the right amount of steps. I could also introduce another dummy tensor which I enqueue into the queue which gives me a end_of_epoch flag or so, so then I know in the eval-thread when to stop.


In TensorFlow 1.2, there will be the tf.contrib.data interface (issue comment, documentation overview, API documentation), which provides the tf.contrib.data.Dataset API which also supports shuffling similar as tf.RandomShuffleQueue and batching and looping over multiple epochs. Also, you access the data by creating an iterator over it and you can reset the iterator. Some related StackOverflow questions are here and here.

like image 74
Albert Avatar answered Oct 13 '22 23:10

Albert