Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's going on in tf.train.shuffle_batch and `tf.train.batch?

I use Binary data to train a DNN.

But tf.train.shuffle_batch and tf.train.batchmake me confused.

This is my code and I will do some tests on it.

First Using_Queues_Lib.py:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os

from six.moves import xrange  # pylint: disable=redefined-builtin
import tensorflow as tf

NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 100
REAL32_BYTES=4


def read_dataset(filename_queue,data_length,label_length):
  class Record(object):
    pass
  result = Record()

  result_data  = data_length*REAL32_BYTES
  result_label = label_length*REAL32_BYTES
  record_bytes = result_data + result_label

  reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
  result.key, value = reader.read(filename_queue)

  record_bytes = tf.decode_raw(value, tf.float32)
  result.data  = tf.strided_slice(record_bytes, [0],[data_length])#record_bytes: tf.float list
  result.label = tf.strided_slice(record_bytes, [data_length],[data_length+label_length])
  return result


def _generate_data_and_label_batch(data, label, min_queue_examples,batch_size, shuffle):
  num_preprocess_threads = 16   #only speed code
  if shuffle:
    data_batch, label_batch = tf.train.shuffle_batch([data, label],batch_size=batch_size,num_threads=num_preprocess_threads,capacity=min_queue_examples + batch_size,min_after_dequeue=min_queue_examples)
  else:
    data_batch, label_batch = tf.train.batch([data, label],batch_size=batch_size,num_threads=num_preprocess_threads,capacity=min_queue_examples + batch_size)
  return data_batch, label_batch

def inputs(data_dir, batch_size,data_length,label_length):
  filenames = [os.path.join(data_dir, 'test_data_SE.dat')]
  for f in filenames:
    if not tf.gfile.Exists(f):
      raise ValueError('Failed to find file: ' + f)

  filename_queue = tf.train.string_input_producer(filenames)

  read_input = read_dataset(filename_queue,data_length,label_length)

  read_input.data.set_shape([data_length])   #important
  read_input.label.set_shape([label_length]) #important


  min_fraction_of_examples_in_queue = 0.4
  min_queue_examples = int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN *
                       min_fraction_of_examples_in_queue)
  print ('Filling queue with %d samples before starting to train. '
     'This will take a few minutes.' % min_queue_examples)

  return _generate_data_and_label_batch(read_input.data, read_input.label,
                                     min_queue_examples, batch_size,
                                     shuffle=True)

Second Using_Queues.py:

import Using_Queues_Lib
import tensorflow as tf
import numpy as np
import time


max_steps=10
batch_size=100
data_dir=r'.'
data_length=2
label_length=1

#-----------Save paras-----------
import struct
def WriteArrayFloat(file,data):
  fout=open(file,'wb')        
  fout.write(struct.pack('<'+str(data.flatten().size)+'f',
                                *data.flatten().tolist()))
  fout.close()
#-----------------------------

def add_layer(inputs, in_size, out_size, activation_function=None):
  Weights = tf.Variable(tf.truncated_normal([in_size, out_size]))
  biases = tf.Variable(tf.zeros([1, out_size]) + 0.1)
  Wx_plus_b = tf.matmul(inputs, Weights) + biases
  if activation_function is None:
    outputs = Wx_plus_b
  else:
    outputs = activation_function(Wx_plus_b)
  return outputs

data_train,labels_train=Using_Queues_Lib.inputs(data_dir=data_dir,
                      batch_size=batch_size,data_length=data_length,
                                          label_length=label_length)

xs=tf.placeholder(tf.float32,[None,data_length])
ys=tf.placeholder(tf.float32,[None,label_length])

l1 = add_layer(xs, data_length, 5, activation_function=tf.nn.sigmoid)
l2 = add_layer(l1, 5, 5, activation_function=tf.nn.sigmoid)
prediction = add_layer(l2, 5, label_length, activation_function=None)

loss = tf.reduce_mean(tf.square(ys - prediction))
train_step = tf.train.GradientDescentOptimizer(0.2).minimize(loss)

sess=tf.InteractiveSession()
tf.global_variables_initializer().run()

tf.train.start_queue_runners()

for i in range(max_steps):
  start_time=time.time()
  data_batch,label_batch=sess.run([data_train,labels_train])
  sess.run(train_step, feed_dict={xs: data_batch, ys: label_batch})
  duration=time.time()-start_time
  if i % 1 == 0:
    example_per_sec=batch_size/duration
    sec_pec_batch=float(duration)
    WriteArrayFloat(r'./data/'+str(i)+'.bin',
        np.concatenate((data_batch,label_batch),axis=1))
    format_str=('step %d,loss=%.8f(%.1f example/sec;%.3f sec/batch)')
    loss_value=sess.run(loss, feed_dict={xs: data_batch, ys: label_batch})
    print(format_str%(i,loss_value,example_per_sec,sec_pec_batch))

The data in here. And it generated by Mathematica.

data = Flatten@Table[{x, y, x*y}, {x, -1, 1, .05}, {y, -1, 1, .05}];
BinaryWrite[file, mydata, "Real32", ByteOrdering -> -1];
Close[file];

Length of data:1681

The data looks like this:

enter image description here

plot the data:The Red to Green color means the time when they occured in here

enter image description here

Run the Using_Queues.py,it will produce ten batch,and I draw each bach in this graph:(batch_size=100 and min_queue_examples=40) enter image description here

If batch_size=1024 and min_queue_examples=40: enter image description here

If batch_size=100 and min_queue_examples=4000: enter image description here

If batch_size=1024 and min_queue_examples=4000: enter image description here

And even If batch_size=1681 and min_queue_examples=4000: enter image description here

The region are not filled with points.

Why?

So why change the min_queue_examples make more random? How to determine the value min_queue_examples?

What's going on in tf.train.shuffle_batch?

like image 842
partida Avatar asked Mar 26 '17 12:03

partida


2 Answers

shuffle_batch is nothing but a RandomShuffleQueue implementation of asynchronism. You will have to firstly understand what is asynchronism.Then shuffle_batch should be very straightforward to understand, with a little help with the official documents(https://www.tensorflow.org/versions/r1.3/programmers_guide/threading_and_queues). Say you want to design a system that can read and write data at the same time. Most people designed it as such:

1) create one thread for reading data, and one thread for writing data. the reading thread will remove one element from the queue for reading (dequeue)and writing thread will add one element to the queue as the outcome of writing(enqueue).

2) use blocking-queues to manage the synchronization between reading and writing threads, because you don't want the reading thread is reading the same data as the writing thread is writing, and when the queue is empty, reading thread should be hanging(blocked) to wait data to be written(enqueue) by the writing thread, and when the queue is full, the writing thread should be wait for the reading thread popping data out of the queue(dequeue). In tensorflow input pipeline, things are not anywhere different. There are basically two set of threads working, One is adding training examples to a queue and the other is responsible for taking training examples from the queue for training. That is exactly how slice_input_producer, string_input_producer, shuffle_batch are designed.

I wrote you a little program to break things down for you to understand the tensorflow input pipeline, shuffle_batch and the effect of the parameters of min_after_dequeue and batch_size:

import tensorflow as tf
import numpy as np
test_size = 2000
input_data = tf.range(test_size)

xi = [x for x in range(0, test_size, 50)[1:]]
yi = [int(test_size * x) for x in np.array(range(1, 100, 5)) / 100.0]
zi = np.zeros(shape=(len(yi), len(xi)))
with tf.Session() as sess:
    for idx, batch_size in enumerate(xi):
        for idy, min_after_dequeue in enumerate(yi):
            # synchronization example 1: create a fifo queue, one thread is
            # adding many training examples  at a time to the queue, and the other
            # is taking one example at a time out of the queue.
            # this is similar to what slice_input_producer does.
            fifo_q = tf.FIFOQueue(capacity=test_size, dtypes=tf.int32,
                                  shapes=[[]])
            en_fifo_q = fifo_q.enqueue_many(input_data)
            single_data = fifo_q.dequeue()
            # synchronization example 2: create a random shuffle queue, one thread is
            # adding one training example  at a time to the queue, and the other
            # is taking many examples as a batch at a time out of the queue.
            # this is similar to what shuffle_batch does.
            rf_queue = tf.RandomShuffleQueue(capacity=test_size,
                                             min_after_dequeue=min_after_dequeue,
                                             shapes=single_data._shape, dtypes=single_data._dtype)
            rf_enqueue = rf_queue.enqueue(single_data)
            batch_data = rf_queue.dequeue_many(batch_size)

            # now let's creating threads for enqueue operations(writing thread).
            # enqueue threads have to be started at first, the tf session will
            # take care of your training(reading thread) which will be running when you call sess.run.
            # the tf coordinators are nothing but threads managers that take care of the life cycle
            # for created threads
            qr_fifo = tf.train.QueueRunner(fifo_q, [en_fifo_q] * 8)
            qr_rf = tf.train.QueueRunner(rf_queue, [rf_enqueue] * 4)
            coord = tf.train.Coordinator()
            fifo_queue_threads = qr_fifo.create_threads(sess, coord=coord, start=True)
            rf_queue_threads = qr_rf.create_threads(sess, coord=coord, start=True)
            shuffle_pool = []
            num_steps = int(np.ceil(test_size / float(batch_size)))
            for i in range(num_steps):
                shuffle_data = sess.run([batch_data])
                shuffle_pool.extend(shuffle_data[0].tolist())
            # evaluating unique_rate of each combination of batch_size and min_after_dequeue
            # unique rate 1.0 indicates each example is shuffled uniformly.
            # unique rate < 1.0 means that some examples are shuffled twice.
            unique_rate = len(np.unique(shuffle_pool)) / float(test_size)
            print min_after_dequeue, batch_size, unique_rate
            zi[idy, idx] = unique_rate
            # stop threads.
            coord.request_stop()
            coord.join(rf_queue_threads)
            coord.join(fifo_queue_threads)

print xi, yi, zi
plt.clf()
plt.title('shuffle_batch_example')
plt.ylabel('num_dequeue_ratio')
plt.xlabel('batch_size')
xxi, yyi = np.meshgrid(xi, yi)
plt.pcolormesh(xxi, yyi, zi)
plt.colorbar()
plt.show()

if you run the above code, you should see the plot: shuffle_batch_example

we can clearly see that when the batch_size goes larger, the unique_rate is getting higher, and when min_after_dequeue gets smaller, the unique rate gets higher. the unique rate is one indicator that i calculate to monitor how many duplicated samples are generated on the fly of shuffle_batch over mini-batches.

like image 84
weiyixie Avatar answered Nov 11 '22 22:11

weiyixie


The sampling function that tf.train.shuffle_batch() (and hence tf.RandomShuffleQueue) uses is a bit subtle. The implementation uses tf.RandomShuffleQueue.dequeue_many(batch_size), whose (simplified) implementation is as follows:

  • While the number of elements dequeued is less than batch_size:
    • Wait until the queue contains at least min_after_dequeue + 1 elements.
    • Select an element from the queue uniformly at random, remove it from the queue, and add it the output batch.

The other thing to note is how elements are added to the queue, which uses a background thread running tf.RandomShuffleQueue.enqueue() on the same queue:

  • Wait until the current size of the queue is less than its capacity.
  • Add the element to the queue.

As a result, the capacity and min_after_dequeue properties of the queue (plus the distribution of the input data being enqueued) determine the population from which tf.train.shuffle_batch() will sample. It appears that the data in your input files is ordered, so you are relying completely on the tf.train.shuffle_batch() function for randomness.

Taking your visualizations in turn:

  1. If capacity and min_after_dequeue are small relative to the dataset, the "shuffling" will select random elements from a small population resembling a "sliding window" across the dataset. With some small probability you will see old elements in the dequeued batch.

  2. If batch_size is large and min_after_dequeue is small relative to the dataset, the "shuffling" will again be selecting from a small "sliding window" across the dataset.

  3. If min_after_dequeue is large relative to batch_size and the size of the dataset, you will see (approximately) uniform samples from the data in each batch.

  4. If min_after_dequeue and batch_size are large relative to the size of the dataset, you will see (approximately) uniform samples from the data in each batch.

  5. In the case where min_after_dequeue is 4000, and batch_size is 1681, note that the expected number of copies of each element in the queue when it samples is 4000 / 1681 = 2.38, so it more is likely that some elements will be sampled more than once (and less likely that you will sample each unique element exactly once).

like image 32
mrry Avatar answered Nov 11 '22 22:11

mrry