How can I evaluate a validation set after every K training iterations, using separate queues for training and validation data, without resorting to separate tf.Sessions
in multiple processes? There doesn't seem to be a clean way to achieve this, given my particular problem, and my current workaround (which I thought would work) gives me undefined behavior. Help!
I want to evaluate a validation set every K training iterations, and I cannot figure out how to implement this properly in TensorFlow. This should be one of the most common operations, yet it feels that TensorFlow's API/architecture is working against me here or is at least making things unnecessarily difficult.
My assumptions are:
The training input pipeline is set up as follows:
tf.train.slice_input_producer()
generates a (shuffled) list of filenames, each referring to raw input data.tf.train.shuffle_batch()
before being fed into the network.Due to [A3], [A4], [A5], the validation input pipeline is set up in an almost identical way, except that the final input queue is generated via tf.train.batch()
, since shuffling is not desirable. Due to the above assumptions, a feed_dict based approach is also infeasible, and also seemingly incompatible with using a higher level function such as tf.train.batch
.
However, a straightforward implementation using two different sets of queues for training and validation does not work. As far as I understand, I have two options:
[B1] Set the num_epochs
argument of the validation tf.train.slice_input_producer
to None
.
In this case, the validation set is cycled through endlessly, but I would need to know the size of the validation set in advance to explicitly limit the number of batches to evaluate per run through the validation set. Furthermore, if the validation set size is not divisible by the batch size, I will always pull a bit more in the last batch. As this would shift the order of evaluation of the validation data each time, this is not acceptable.
[B2] Set the num_epochs
argument of the validation tf.train.slice_input_producer
to 1
, and additionally set the allow_smaller_final_batch
argument of the tf.train.batch
function to True
.
In this case, the validation set is cycled through exactly once, after which the respective queue is closed forever. By default, this will make evaluating the validation set two or more times impossible. Since I do not know of a good way to reopen a queue in TensorFlow, I need to work around this limitation.
Due to the greater limitations of option [B1], I chose to work around the issues of option [B2] instead. The (pseudo-)code outlining my current approach is as follows:
The training loop should be fairly canonical. Every K iterations, a function to evaluate the validation set is called.
Note that I only start the queues that have a name starting with "train_"; these is the queue set up for collecting generated training data. In order to do this, I created two helper functions, get_queues_by_name
and start_queue_runners
.
def train_loop(train_ops, vali_ops, ...):
with tf.Session() as sess:
coord = tf.train.Coordinator()
sess.run([tf.initialize_all_variables(), tf.initialize_local_variables()])
load_latest_snapshot(sess, loader, snapshot_file)
# Launch the queue runners
queues = get_queues_by_name("train")
threads = start_queue_runners(sess, coord, queues)
try:
for step in range(start_iteration, num_train_iterations):
# Runs the session on validation set
if step % K == 0:
validation_results = run_validation(vali_ops, snapshot_file)
# TRAINING:
# ...
except Exception as e:
coord.request_stop(e)
finally:
coord.request_stop()
coord.join(threads)
The helper functions look like this:
def get_queues_by_name(name):
"""Retrieves all queues that contain the string given by 'name'"""
all_queues = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
return [q for q in all_queues if name in q.name]
def start_queue_runners(session, coordinator, queues):
"""Similar to tf.train.start_queue_runners but now accepts a list of queues instead of a graph collection"""
with session.graph.as_default():
threads = []
for queue in queues:
log("Queue", "Starting queue '%s'" % queue.name, level=2)
threads.extend(queue.create_threads(session, coordinator, daemon=True, start=True))
return threads
In the run_validation
function, my chosen workaround against the issue of a closed queue is to create a new tf.Session
. I also only start the threads associated with the queue collecting validation set data.
def run_validation(ops, snapshot_file): # Called inside train_loop()
results = None
loader = tf.train.Saver()
with tf.Session() as sess:
coord = tf.train.Coordinator()
sess.run([tf.initialize_local_variables()])
load_latest_snapshot(sess, loader, snapshot_file)
# Launch the queue runners
queues = get_queues_by_name("eval")
threads = start_queue_runners(sess, coord, queues)
# Performs the inference in batches
try:
# Evaluate validation set:
results = eval_in_batches(ops, sess)
except Exception as e:
coord.request_stop(e)
finally:
coord.request_stop()
coord.join(threads)
return results
I do not know whether creating a new tf.Session
here is a good idea, but it seems like the only way to accomplish restarting the validation queue. Ideally, I also wouldn't want to re-load the model snapshot, as this seems conceptually unnecessary.
The issue with this code is that I see erratic/undefined behavior during running, such as NaN's or Inf's appearing inside the network during validation set evaluation. This seems to occur predominantly when the validation set queue is being filled at the same time as the training set queue is still being filled (since the training queue is open during validation set evaluation). For example, this very often happens if I evaluate the validation set at iteration 0 (when both queues still need to be filled). It almost seems as if the training/validation queues share some global state, although they are running in a different session.
Can someone explain why this is happening, and how I can solve this more cleanly while taking my above assumptions [A1]-[A5] into account?
I'm currently facing a similar problem. So far I avoided any queues at all and just feeding in the data via the feed_dict
but I'm obviously loosing some performance by not using queues and parallelism (although I'm still happy with the current speed as I did the same in Theano earlier). Now I want to redesign this and use queues and stumbled upon this problem. There are this, this, this related issues.
I'm currently thinking about doing it this way:
In training, I want to use a RandomShuffleQueue
which makes it even more complicated. I think I will just ignore the problem and once the reader thread which enqueues tensors into the queue finishes, I will let the training stop, so I loose the remaining up-to capacity
items for this epoch and just use it for the next epoch. Maybe to make it deterministic I check in the train-thread that I still read from the queue until there are only min_after_dequeue
items left.
In evaluation, I want to use the same graph and the same session. I can use tf.cond
to read from another separate queue instead of the RandomShuffleQueue
. Or I could use feed_dict
in evaluation. If I would use a separate queue, I would use a FIFOQueue
and carefully track that I do the right amount of steps. I could also introduce another dummy tensor which I enqueue into the queue which gives me a end_of_epoch
flag or so, so then I know in the eval-thread when to stop.
In TensorFlow 1.2, there will be the tf.contrib.data
interface (issue comment, documentation overview, API documentation), which provides the tf.contrib.data.Dataset
API which also supports shuffling similar as tf.RandomShuffleQueue
and batching and looping over multiple epochs. Also, you access the data by creating an iterator over it and you can reset the iterator. Some related StackOverflow questions are here and here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With