Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TFRecordReader seems extremely slow , and multi-threads reading not working

My training process use tfrecord format for train&eval dataset.

I test the benchmark of reader , only 8000records/second. and io speed(see from iotop command) just 400KB-500KB/s.

I'm using the cpp version of protobuf here

https://github.com/tensorflow/tensorflow/blob/master/tensorflow/g3doc/get_started/os_setup.md#protobuf-library-related-issues

If possible, provide a minimal reproducible example (We usually don't have time to read hundreds of lines of your code)

def read_and_decode(filename_queue):
     reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)
    return serialized_example
  serialized_example = read_and_decode(filename_queue)
  batch_serialized_example = tf.train.shuffle_batch(
      [serialized_example],
      batch_size=batch_size,
      num_threads=thread_number,
      capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  features = tf.parse_example(
      batch_serialized_example,
      features={
          "label": tf.FixedLenFeature([], tf.float32),
          "ids": tf.VarLenFeature(tf.int64),
          "values": tf.VarLenFeature(tf.float32),
      })

What other attempted solutions have you tried?

I try to set num_threads in tf.train.shuffle_batch but not working.

It seems that when set to 2 threads, it work at 8000records/s, when enlarge the thread number, it get slower. (I remove all ops that cost cpus. Just read data.)

My sever are 24 core cpus.

like image 352
ericyue Avatar asked Jan 14 '17 07:01

ericyue


3 Answers

The issue here is that there's a fixed cost overhead to each session.run, and filling the queue with many tiny examples to the queue will be slow.

In particular, each session.run is about 100-200 usec, so you can only do about 5k-10k session.run calls per second.

This problem is obvious if doing Python profiling (python -m cProfile), but hard to see if starting from timeline profile, or CPU profile.

The work-around is to use enqueue_many to add things to your queue in batches. I took your benchmark from https://gist.github.com/ericyue/7705407a88e643f7ab380c6658f641e8 and modified it to enqueue many items per .run call, and that gives 10x speed-up.

The modification is to modify tf.batch call as follows:

if enqueue_many:
    reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB))
    queue_batch = []
    for i in range(enqueue_many_size):
        _, serialized_example = reader.read(filename_queue)
        queue_batch.append(serialized_example)
    batch_serialized_example = tf.train.shuffle_batch(
        [queue_batch],
        batch_size=batch_size,
        num_threads=thread_number,
        capacity=capacity,
        min_after_dequeue=min_after_dequeue,
        enqueue_many=True)

For complete source, check here: https://github.com/yaroslavvb/stuff/blob/master/ericyue-slowreader/benchmark.py

It's hard to optimize it to go much faster since now most of the time is spent in queue operations. Looking at stripped down version which just adds integers to a queue, you also get similar speed, and looking at timeline, time is spent in dequeue ops.

enter image description here

Each dequeue op takes about 60 usec, but there's on average 5 runnning in parallel, so you get 12 usec per dequeue. So that means you'll get <200k examples per second in the best case.

like image 72
Yaroslav Bulatov Avatar answered Nov 15 '22 11:11

Yaroslav Bulatov


Here's a simple speedup building on Yaroslav's answer:

Tensorflow has a built in function, tf.TFRecordReader.read_up_to, that reads multiple records in each session.run() call, thereby removing the excess overhead caused by multiple calls.

enqueue_many_size = SOME_ENQUEUE_MANY_SIZE
reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB))
_, queue_batch = reader.read_up_to(filename_queue, enqueue_many_size)
batch_serialized_example = tf.train.shuffle_batch(
    [queue_batch],
    batch_size=batch_size,
    num_threads=thread_number,
    capacity=capacity,
    min_after_dequeue=min_after_dequeue,
    enqueue_many=True)

As with Yaroslav's answer, you need to set enqueue_many=True so that the batch function knows it is accepting multiple records.

This was very fast in my use case.

like image 5
Erik Shilts Avatar answered Nov 15 '22 13:11

Erik Shilts


An addendum to Yaroslav's answer: You can use tf.python_io.tf_record_iterator to iterate through the examples in order append them to a list which you can pass to tf.train.shuffle_batch with enqueue_many=true:

queue_batch = []
for serialized_example in tf.python_io.tf_record_iterator(filename,options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB)):
    queue_batch.append(serialized_example)
batch_serialized_example = tf.train.shuffle_batch(
    [queue_batch],
    batch_size=batch_size,
    num_threads=thread_number,
    capacity=capacity,
    min_after_dequeue=min_after_dequeue,
    enqueue_many=True)

It seems that trying to iterate through examples by using reader.read() will result in one read per batch. i.e. the nth batch will be batch_num copies of the nth record rather than batch_num many unique records.

like image 1
Aderlar Avatar answered Nov 15 '22 13:11

Aderlar