Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Tensorflow: Batching input queues then changing the queue source

I have a model that runs over a set of images and calculates some stats on them using - for simplicity say it just outputs the average image from the set (it does more than that in practice). I have a number of directories that contain images and I want to get the output from each directory. Each directory has a variable number of images in it.

I have constructed the graph, output variable and loss function once for my script. The inputs are batched using slightly adapted code from here. I adapted it to take an array of paths which I feed in using a placeholder of variable size. I got the inspiration for that from here.

Then I loop over the directories and run the following:

  1. Initialize the variables (this resets the previous output variable from the results calculated on the previous directory)
  2. Set the image paths variable to the current files array from the new directory: sess.run(image_paths.initializer, feed_dict={image_paths_initializer: image_paths})
  3. Start the queues running: queue_threads = tf.train.start_queue_runners(sess=sess, coord=coord)
  4. Run for a number of epochs to get results
  5. Close threads coord.request_stop(); coord.join(queue_threads); coord.clear_stop()
  6. Return results, save results, move onto the next directory...

The problem is that when it comes to the second directory the queue runner threads refuse to start (I can see this by debugging the queue_threads variable). This gives errors like:

Compute status: Aborted: FIFOQueue '_1_input_producer' is closed.
Compute status: Aborted: RandomShuffleQueue '_0_shuffle_batch/random_shuffle_queue' is closed.

If I don't close the threads (and don't start them a second time) then they don't produce files from the new directory - they ignore the variable assignment op in (2). Is it just not possible to restart the queues like this?

I have tried setting up the queues in their own separate session and pulling the batches from them, but that gives me various CUDA / memory errors. If I do this and add debug stops I can get it to run quite far before it hits this - but I don't know if it is possible to add control dependencies between disjoint sessions/graphs?

It would be possible to start from scratch for each new directory but that adds a lot of overhead to the process that I am trying to avoid. I have done similar things to this (ie, resetting variables and rerunning with different inputs) without queues and it saves a lot of time, so I know that bit works.

Can any of you wonderful SO folk think of a way out of this?

like image 224
lopsided Avatar asked Dec 24 '22 07:12

lopsided


1 Answers

string_input_producer is a FIFOQueue + QueueRunner. You get more control if you use a FIFOQueue and enqueue things manually. Something like this

filename_queue = tf.FIFOQueue(100, tf.string)
enqueue_placeholder = tf.placeholder(dtype=tf.string)
enqueue_op = filename_queue.enqueue(enqueue_placeholder)

config = tf.ConfigProto()
config.operation_timeout_in_ms=2000  # for debugging queue hangs
sess = tf.InteractiveSession(config=config)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)

sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir1/0"})
sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir1/1"})

# do stats for /temp/dir1

sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir2/0"})
sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir2/1"})

# do stats for /temp/dir2

coord.request_stop()
coord.join(threads)
like image 100
Yaroslav Bulatov Avatar answered Dec 30 '22 20:12

Yaroslav Bulatov