Hi I am studying the dataset API in tensorflow now and I have a question regarding to the dataset.map() function which performs data preprocessing.
file_name = ["image1.jpg", "image2.jpg", ......]
im_dataset = tf.data.Dataset.from_tensor_slices(file_names)
im_dataset = im_dataset.map(lambda image:tuple(tf.py_func(image_parser(), [image], [tf.float32, tf.float32, tf.float32])))
im_dataset = im_dataset.batch(batch_size)
iterator = im_dataset.make_initializable_iterator()
The dataset takes in image names and parse them into 3 tensors (3 infos about the image).
If I have a very larger number of images in my training folder, preprocessing them is gonna take a long time. My question is that, since Dataset API is said to be designed for efficient input pipeline, the preprocessing is done for the whole dataset before I feed them to my workers (let's say GPUs), or it only preprocess one batch of image each time I call iterator.get_next()?
To iterate over the dataset several times, use . repeat() . We can enumerate each batch by using either Python's enumerator or a build-in method. The former produces a tensor, which is recommended.
Feeding: Python code provides the data when running each step. Reading from files: an input pipeline reads the data from files at the beginning of a TensorFlow graph. Preloaded data: a constant or variable in the TensorFlow graph holds all the data (for small data sets).
If your preprocessing pipeline is very long and the output is small, the processed data should fit in memory. If this is the case, you can use tf.data.Dataset.cache
to cache the processed data in memory or in a file.
From the official performance guide:
The
tf.data.Dataset.cache
transformation can cache a dataset, either in memory or on local storage. If the user-defined function passed into the map transformation is expensive, apply the cache transformation after the map transformation as long as the resulting dataset can still fit into memory or local storage. If the user-defined function increases the space required to store the dataset beyond the cache capacity, consider pre-processing your data before your training job to reduce resource usage.
Here is an example where each pre-processing takes a lot of time (0.5s). The second epoch on the dataset will be much faster than the first
def my_fn(x):
time.sleep(0.5)
return x
def parse_fn(x):
return tf.py_func(my_fn, [x], tf.int64)
dataset = tf.data.Dataset.range(5)
dataset = dataset.map(parse_fn)
dataset = dataset.cache() # cache the processed dataset, so every input will be processed once
dataset = dataset.repeat(2) # repeat for multiple epochs
res = dataset.make_one_shot_iterator().get_next()
with tf.Session() as sess:
for i in range(10):
# First 5 iterations will take 0.5s each, last 5 will not
print(sess.run(res))
If you want to write the cached data to a file, you can provide an argument to cache()
:
dataset = dataset.cache('/tmp/cache') # will write cached data to a file
This will allow you to only process the dataset once, and run multiple experiments on the data without reprocessing it again.
Warning: You have to be careful when caching to a file. If you change your data, but keep the /tmp/cache.*
files, it will still read the old data that was cached. For instance, if we use the data from above and change the range of the data to be in [10, 15]
, we will still obtain data in [0, 5]
:
dataset = tf.data.Dataset.range(10, 15)
dataset = dataset.map(parse_fn)
dataset = dataset.cache('/tmp/cache')
dataset = dataset.repeat(2) # repeat for multiple epochs
res = dataset.make_one_shot_iterator().get_next()
with tf.Session() as sess:
for i in range(10):
print(sess.run(res)) # will still be in [0, 5]...
Always delete the cached files whenever the data that you want to cache changes.
Another issue that may arise is if you interrupt the script before all the data is cached. You will receive an error like this:
AlreadyExistsError (see above for traceback): There appears to be a concurrent caching iterator running - cache lockfile already exists ('/tmp/cache.lockfile'). If you are sure no other running TF computations are using this cache prefix, delete the lockfile and re-initialize the iterator.
Make sure that you let the whole dataset be processed to have an entire cache file.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With