I have a model that runs over a set of images and calculates some stats on them using - for simplicity say it just outputs the average image from the set (it does more than that in practice). I have a number of directories that contain images and I want to get the output from each directory. Each directory has a variable number of images in it.
I have constructed the graph, output variable and loss function once for my script. The inputs are batched using slightly adapted code from here. I adapted it to take an array of paths which I feed in using a placeholder of variable size. I got the inspiration for that from here.
Then I loop over the directories and run the following:
sess.run(image_paths.initializer, feed_dict={image_paths_initializer: image_paths})
queue_threads = tf.train.start_queue_runners(sess=sess, coord=coord)
coord.request_stop(); coord.join(queue_threads); coord.clear_stop()
The problem is that when it comes to the second directory the queue runner threads refuse to start (I can see this by debugging the queue_threads
variable). This gives errors like:
Compute status: Aborted: FIFOQueue '_1_input_producer' is closed.
Compute status: Aborted: RandomShuffleQueue '_0_shuffle_batch/random_shuffle_queue' is closed.
If I don't close the threads (and don't start them a second time) then they don't produce files from the new directory - they ignore the variable assignment op in (2). Is it just not possible to restart the queues like this?
I have tried setting up the queues in their own separate session and pulling the batches from them, but that gives me various CUDA / memory errors. If I do this and add debug stops I can get it to run quite far before it hits this - but I don't know if it is possible to add control dependencies between disjoint sessions/graphs?
It would be possible to start from scratch for each new directory but that adds a lot of overhead to the process that I am trying to avoid. I have done similar things to this (ie, resetting variables and rerunning with different inputs) without queues and it saves a lot of time, so I know that bit works.
Can any of you wonderful SO folk think of a way out of this?
string_input_producer
is a FIFOQueue
+ QueueRunner
. You get more control if you use a FIFOQueue
and enqueue things manually. Something like this
filename_queue = tf.FIFOQueue(100, tf.string)
enqueue_placeholder = tf.placeholder(dtype=tf.string)
enqueue_op = filename_queue.enqueue(enqueue_placeholder)
config = tf.ConfigProto()
config.operation_timeout_in_ms=2000 # for debugging queue hangs
sess = tf.InteractiveSession(config=config)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir1/0"})
sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir1/1"})
# do stats for /temp/dir1
sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir2/0"})
sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir2/1"})
# do stats for /temp/dir2
coord.request_stop()
coord.join(threads)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With