My training process use tfrecord format for train&eval dataset.
I test the benchmark of reader , only 8000records/second. and io speed(see from iotop command) just 400KB-500KB/s.
I'm using the cpp version of protobuf here
https://github.com/tensorflow/tensorflow/blob/master/tensorflow/g3doc/get_started/os_setup.md#protobuf-library-related-issues
If possible, provide a minimal reproducible example (We usually don't have time to read hundreds of lines of your code)
def read_and_decode(filename_queue):
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
return serialized_example
serialized_example = read_and_decode(filename_queue)
batch_serialized_example = tf.train.shuffle_batch(
[serialized_example],
batch_size=batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
features = tf.parse_example(
batch_serialized_example,
features={
"label": tf.FixedLenFeature([], tf.float32),
"ids": tf.VarLenFeature(tf.int64),
"values": tf.VarLenFeature(tf.float32),
})
What other attempted solutions have you tried?
I try to set num_threads in tf.train.shuffle_batch but not working.
It seems that when set to 2 threads, it work at 8000records/s, when enlarge the thread number, it get slower. (I remove all ops that cost cpus. Just read data.)
My sever are 24 core cpus.
The issue here is that there's a fixed cost overhead to each session.run
, and filling the queue with many tiny examples to the queue will be slow.
In particular, each session.run is about 100-200 usec, so you can only do about 5k-10k session.run
calls per second.
This problem is obvious if doing Python profiling (python -m cProfile), but hard to see if starting from timeline profile, or CPU profile.
The work-around is to use enqueue_many
to add things to your queue in batches. I took your benchmark from https://gist.github.com/ericyue/7705407a88e643f7ab380c6658f641e8 and modified it to enqueue many items per .run
call, and that gives 10x speed-up.
The modification is to modify tf.batch
call as follows:
if enqueue_many:
reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB))
queue_batch = []
for i in range(enqueue_many_size):
_, serialized_example = reader.read(filename_queue)
queue_batch.append(serialized_example)
batch_serialized_example = tf.train.shuffle_batch(
[queue_batch],
batch_size=batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue,
enqueue_many=True)
For complete source, check here: https://github.com/yaroslavvb/stuff/blob/master/ericyue-slowreader/benchmark.py
It's hard to optimize it to go much faster since now most of the time is spent in queue operations. Looking at stripped down version which just adds integers to a queue, you also get similar speed, and looking at timeline, time is spent in dequeue ops.
Each dequeue op takes about 60 usec, but there's on average 5 runnning in parallel, so you get 12 usec per dequeue. So that means you'll get <200k examples per second in the best case.
Here's a simple speedup building on Yaroslav's answer:
Tensorflow has a built in function, tf.TFRecordReader.read_up_to, that reads multiple records in each session.run()
call, thereby removing the excess overhead caused by multiple calls.
enqueue_many_size = SOME_ENQUEUE_MANY_SIZE
reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB))
_, queue_batch = reader.read_up_to(filename_queue, enqueue_many_size)
batch_serialized_example = tf.train.shuffle_batch(
[queue_batch],
batch_size=batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue,
enqueue_many=True)
As with Yaroslav's answer, you need to set enqueue_many=True
so that the batch function knows it is accepting multiple records.
This was very fast in my use case.
An addendum to Yaroslav's answer:
You can use tf.python_io.tf_record_iterator
to iterate through the examples in order append them to a list which you can pass to tf.train.shuffle_batch
with enqueue_many=true
:
queue_batch = []
for serialized_example in tf.python_io.tf_record_iterator(filename,options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB)):
queue_batch.append(serialized_example)
batch_serialized_example = tf.train.shuffle_batch(
[queue_batch],
batch_size=batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue,
enqueue_many=True)
It seems that trying to iterate through examples by using reader.read()
will result in one read per batch. i.e. the nth batch will be batch_num
copies of the nth record rather than batch_num
many unique records.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With