I try to optimize my data input pipeline. The dataset is a set of 450 TFRecord files of size ~70MB each, hosted on GCS. The job is executed with GCP ML Engine. There is no GPU.
Here is the pipeline:
def build_dataset(file_pattern):
return tf.data.Dataset.list_files(
file_pattern
).interleave(
tf.data.TFRecordDataset,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
buffer_size=2048
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).repeat(
).map(
map_func=_parse_example_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).prefetch(
buffer_size=1
)
With the mapped function:
def _bit_to_float(string_batch: tf.Tensor):
return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
), tf.float32), 2), (tf.shape(string_batch)[0], -1))
def _parse_example_batch(example_batch):
preprocessed_sample_columns = {
"features": tf.io.VarLenFeature(tf.float32),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
dense_float = tf.sparse.to_dense(samples["features"])
bits_to_float = _bit_to_float(samples["booleanFeatures"])
return (
tf.concat([dense_float, bits_to_float], 1),
tf.reshape(samples["label"], (-1, 1))
)
I tried to follow the best practices of the data pipeline tutorial, and vectorize my mapped function (as advised by mrry).
With this settings, while data are downloaded at high-speed (bandwidth is around 200MB/s) the CPU is under-used (14%) and the training is very slow (more than 1hour for a epoch).
I tried some parameters configuration, changing the interleave()
arguments like num_parallel_calls
or cycle_length
or the TFRecordDataset
arguments like num_parallel_calls
.
The fastest configuration uses this set of parameters:
interleave.num_parallel_calls
: 1interleave.cycle_length
: 8TFRecordDataset.num_parallel_calls
: 8With this one, one epoch only take ~20 minutes to run. However, CPU usage is only at 50% while bandwidth consumption is around 55MB/s
tf.data.experimental.AUTOTUNE
not find best value to speed up the training?Kind, Alexis.
After some more experimentations, I came to the following solution.
interleave
step which is already handled by TFRecordDataset
if num_parallel_calls
is greater than 0.parse_example
and decode_raw
, returning a tuple `((, ), ())cache
after the map
_bit_to_float
function as a component of the modelFinally, here is the data pipeline code:
def build_dataset(file_pattern):
return tf.data.TFRecordDataset(
tf.data.Dataset.list_files(file_pattern),
num_parallel_reads=multiprocessing.cpu_count(),
buffer_size=70*1000*1000
).shuffle(
buffer_size=2048
).map(
map_func=split,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).repeat(
).prefetch(
buffer_size=32
)
def split(example):
preprocessed_sample_columns = {
"features": tf.io.VarLenFeature(tf.float32),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_single_example(example, preprocessed_sample_columns)
dense_float = tf.sparse.to_dense(samples["features"])
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(dense_float, bits_to_float),
tf.reshape(samples["label"], (1,))
)
def build_model(input_shape):
feature = keras.Input(shape=(N,))
bool_feature = keras.Input(shape=(M,), dtype="uint8")
one_hot = dataset._bit_to_float(bool_feature)
dense_input = tf.reshape(
keras.backend.concatenate([feature, one_hot], 1),
input_shape)
output = actual_model(dense_input)
model = keras.Model([feature, bool_feature], output)
return model
def _bit_to_float(string_batch: tf.Tensor):
return tf.dtypes.cast(tf.reshape(
tf.bitwise.bitwise_and(
tf.bitwise.right_shift(
tf.expand_dims(string_batch, 2),
tf.reshape(
tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),
(1, 1, 8)
),
),
tf.constant(0x01, dtype=tf.uint8)
),
(tf.shape(string_batch)[0], -1)
), tf.float32)
Thanks to all these optimizations:
So this seems to be a good first setup. But CPU and BW are still not overused, so any advice is still welcomed!
So, after some benchmarking I came accross what I think is our best input pipeline:
def build_dataset(file_pattern):
tf.data.Dataset.list_files(
file_pattern
).interleave(
TFRecordDataset,
cycle_length=tf.data.experimental.AUTOTUNE,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
2048
).batch(
batch_size=64,
drop_remainder=True,
).map(
map_func=parse_examples_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).cache(
).prefetch(
tf.data.experimental.AUTOTUNE
)
def parse_examples_batch(examples):
preprocessed_sample_columns = {
"features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(examples, preprocessed_sample_columns)
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(samples['features'], bits_to_float),
tf.expand_dims(samples["label"], 1)
)
So, what's new:
TFRecordDataset
interleaving is a legacy one, so interleave
function is better.batch
before map
is a good habit (vectorizing your function) and reduce the number of times the mapped function is called.repeat
anymore. Since TF2.0, the Keras model API supports the dataset API and can use cache (see the SO post)VarLenFeature
to a FixedLenSequenceFeature
, removing a useless call to tf.sparse.to_dense
.Hope this can help. Advices are still welcomed.
AUTOTUNE , which will prompt the tf. data runtime to tune the value dynamically at runtime. Note that the prefetch transformation provides benefits any time there is an opportunity to overlap the work of a "producer" with the work of a "consumer."
Tensorflow and pre-trained model can be used to configure the dataset for performance using the 'AUTOTUNE' attribute that is present in the 'tf. Data' module. Buffered prefetching is used to ensure that the data can be taken from disk without having I/O become blocking.
prefetch() function is used to produce a dataset that prefetches the specified elements from this given dataset.
The input pipeline is a quick and easy utility provided in tf. data api to make complex input pipelines from simple and reusable codes and all in few lines of code. It also allows handling a large amount of data, thus giving low-end machines an advantage in computing them.
Mentioning the Solution and the Important observations of @AlexisBRENON in the Answer Section, for the benefit of the Community.
Below mentioned are the Important Observations:
TFRecordDataset
interleaving
is a legacy one, so interleave
function is better.batch
before map
is a good habit (vectorizing your function) and reduce the number of times the mapped function is called.repeat
anymore. Since TF2.0, the Keras model API supports the dataset API and can use cache (see the SO post)VarLenFeature
to a FixedLenSequenceFeature
, removing a useless call to tf.sparse.to_dense
.Code for the Pipeline, with improved performance, in line with above observations is mentioned below:
def build_dataset(file_pattern):
tf.data.Dataset.list_files(
file_pattern
).interleave(
TFRecordDataset,
cycle_length=tf.data.experimental.AUTOTUNE,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
2048
).batch(
batch_size=64,
drop_remainder=True,
).map(
map_func=parse_examples_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).cache(
).prefetch(
tf.data.experimental.AUTOTUNE
)
def parse_examples_batch(examples):
preprocessed_sample_columns = {
"features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(examples, preprocessed_sample_columns)
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(samples['features'], bits_to_float),
tf.expand_dims(samples["label"], 1)
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With