Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Tensorflow: Load data in multiple threads on cpu

I have a python class SceneGenerator which has multiple member functions for preprocessing and a generator function generate_data(). The basic structure is like this:

class SceneGenerator(object):
    def __init__(self):
       # some inits

    def generate_data(self):
        """
        Generator. Yield data X and labels y after some preprocessing
        """
        while True:
            # opening files, selecting data
            X,y = self.preprocess(some_params, filenames, ...)            

            yield X, y

I used the class member function sceneGenerator.generate_data() in keras model.fit_generator() function to read the data from disk, preprocess it and yield it. In keras, this is done on multiple CPU threads, if the workers parameter of model.fit_generator() is set to something > 1.

I now want to use the same SceneGenerator class in tensorflow. My current approach is this:

sceneGenerator = SceneGenerator(some_params)
for X, y in sceneGenerator.generate_data():

    feed_dict = {ops['data']: X,
                 ops['labels']: y,
                 ops['is_training_pl']: True
                 }
    summary, step, _, loss, prediction = sess.run([optimization_op, loss_op, pred_op],
                                                  feed_dict=feed_dict)

This, however, is slow and does not use multiple threads. I found the tf.data.Dataset api with some documentation, but I fail to implement the methods.

Edit: Notice that I do not work with images so that the image loading mechanisms with file paths etc. do not work here. My SceneGenerator loads data from hdf5 files. But not complete datasets but - depending on the initialization parameters - only parts of a dataset. I would love to keep the generator function as it is and learn how this generator can be directly used as input for tensorflow and runs on multiple threads on the CPU. Rewriting the data from the hdf5 files to csv is not a good option because it duplicated lots of data.

Edit 2:: I think something similar to this could help: parallelising tf.data.Dataset.from_generator

like image 850
Merlin1896 Avatar asked Nov 30 '17 08:11

Merlin1896


People also ask

Does TensorFlow use multiple threads?

TensorFlow Lite supports multi-threaded kernels for many operators. You can increase the number of threads and speed up execution of operators. Increasing the number of threads will, however, make your model use more resources and power. For some applications, latency may be more important than energy efficiency.

Does multithreading reduce CPU usage?

Although you can take advantage of multithreading to perform several tasks simultaneously and increase the application's throughput, it should be used judiciously. Incorrect usage of multithreading may result in high CPU usages or increased CPU cycles and can drastically reduce your application's performance.

Can multiple threads run the same function?

A thread can execute a function in parallel with other threads. Each thread shares the same code, data, and files while they have their own stack and registers.

Does multithreading increase CPU usage?

Multithreading allows many parts of a program to run simultaneously. These parts are referred to as threads, and they are lightweight processes that are available within the process. As a result, multithreading increases CPU utilization through multitasking.


2 Answers

Assuming you're using the latest Tensorflow (1.4 at the time of this writing), you can keep the generator and use the tf.data.* API as follows (I chose arbitrary values for the thread number, prefetch buffer size, batch size and output data types):

NUM_THREADS = 5
sceneGen = SceneGenerator()
dataset = tf.data.Dataset.from_generator(sceneGen.generate_data, output_types=(tf.float32, tf.int32))
dataset = dataset.map(lambda x,y : (x,y), num_parallel_calls=NUM_THREADS).prefetch(buffer_size=1000)
dataset = dataset.batch(42)
X, y = dataset.make_one_shot_iterator().get_next()

To show that it's actually multiple threads extracting from the generator, I modified your class as follows:

import threading    
class SceneGenerator(object):
  def __init__(self):
    # some inits
    pass

  def generate_data(self):
    """
    Generator. Yield data X and labels y after some preprocessing
    """
    while True:
      # opening files, selecting data
      X,y = threading.get_ident(), 2 #self.preprocess(some_params, filenames, ...)            
      yield X, y

This way, creating a Tensorflow session and getting one batch shows the thread IDs of the threads getting the data. On my pc, running:

sess = tf.Session()
print(sess.run([X, y]))

prints

[array([  8460.,   8460.,   8460.,  15912.,  16200.,  16200.,   8460.,
         15912.,  16200.,   8460.,  15912.,  16200.,  16200.,   8460.,
         15912.,  15912.,   8460.,   8460.,   6552.,  15912.,  15912.,
          8460.,   8460.,  15912.,   9956.,  16200.,   9956.,  16200.,
         15912.,  15912.,   9956.,  16200.,  15912.,  16200.,  16200.,
         16200.,   6552.,  16200.,  16200.,   9956.,   6552.,   6552.], dtype=float32),
 array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
        2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])]

Note: You might want to experiment removing the map call (that we only use to have the multiple threads) and checking if the prefetch's buffer is enough to remove the bottleneck in your input pipeline (even with only one thread, often the input preprocessing is faster than the actual graph execution, so the buffer is enough to have the preprocessing go as fast as it can).

like image 171
GPhilo Avatar answered Nov 08 '22 06:11

GPhilo


Running a session with a feed_dict is indeed pretty slow:

Feed_dict does a single-threaded memcpy of contents from Python runtime into TensorFlow runtime.

A faster way to feed the data is by using tf.train.string_input_producer + *Reader + tf.train.Coordinator, which will batch the data in multiple threads. For that, you read the data directly into tensors, e.g., here's a way to read and process a csv file:

def batch_generator(filenames):
  filename_queue = tf.train.string_input_producer(filenames)
  reader = tf.TextLineReader(skip_header_lines=1)
  _, value = reader.read(filename_queue)

  content = tf.decode_csv(value, record_defaults=record_defaults)
  content[4] = tf.cond(tf.equal(content[4], tf.constant('Present')),
                       lambda: tf.constant(1.0),
                       lambda: tf.constant(0.0))

  features = tf.stack(content[:N_FEATURES])
  label = content[-1]

  data_batch, label_batch = tf.train.shuffle_batch([features, label],
                                                   batch_size=BATCH_SIZE,
                                                   capacity=20*BATCH_SIZE,
                                                   min_after_dequeue=10*BATCH_SIZE)
  return data_batch, label_batch

This function gets the list of input files, creates the reader and data transformations and outputs the tensors, which are evaluated to the contents of these files. Your scene generator is likely to do different transformations, but the idea is the same.

Next, you start a tf.train.Coordinator to parallelize this:

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for _ in range(10):  # generate 10 batches
        features, labels = sess.run([data_batch, label_batch])
        print(features)
    coord.request_stop()
    coord.join(threads)

In my experience, this way feeds the data much faster and allows to utilize the whole available GPU power. Complete working example can be found here.

like image 41
Maxim Avatar answered Nov 08 '22 05:11

Maxim