Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to improve data input pipeline performance?

I try to optimize my data input pipeline. The dataset is a set of 450 TFRecord files of size ~70MB each, hosted on GCS. The job is executed with GCP ML Engine. There is no GPU.

Here is the pipeline:

def build_dataset(file_pattern):
    return tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        tf.data.TFRecordDataset,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        buffer_size=2048
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).map(
        map_func=_parse_example_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).prefetch(
        buffer_size=1
    )

With the mapped function:

def _bit_to_float(string_batch: tf.Tensor):
    return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
        tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
        tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
    ), tf.float32), 2), (tf.shape(string_batch)[0], -1))


def _parse_example_batch(example_batch):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = _bit_to_float(samples["booleanFeatures"])
    return (
        tf.concat([dense_float, bits_to_float], 1),
        tf.reshape(samples["label"], (-1, 1))
    )

I tried to follow the best practices of the data pipeline tutorial, and vectorize my mapped function (as advised by mrry).

With this settings, while data are downloaded at high-speed (bandwidth is around 200MB/s) the CPU is under-used (14%) and the training is very slow (more than 1hour for a epoch).

I tried some parameters configuration, changing the interleave() arguments like num_parallel_calls or cycle_length or the TFRecordDataset arguments like num_parallel_calls.

The fastest configuration uses this set of parameters:

  • interleave.num_parallel_calls: 1
  • interleave.cycle_length: 8
  • TFRecordDataset.num_parallel_calls: 8

With this one, one epoch only take ~20 minutes to run. However, CPU usage is only at 50% while bandwidth consumption is around 55MB/s

Questions:

  1. How to optimize the pipeline to reach 100% CPU usage (and something like 100MB/s of bandwidth consumption)?
  2. Why does tf.data.experimental.AUTOTUNE not find best value to speed up the training?

Kind, Alexis.


Edit

After some more experimentations, I came to the following solution.

  1. Remove the interleave step which is already handled by TFRecordDataset if num_parallel_calls is greater than 0.
  2. Update the mapped function to only do parse_example and decode_raw, returning a tuple `((, ), ())
  3. cache after the map
  4. Move the _bit_to_float function as a component of the model

Finally, here is the data pipeline code:

def build_dataset(file_pattern):
    return tf.data.TFRecordDataset(
        tf.data.Dataset.list_files(file_pattern),
        num_parallel_reads=multiprocessing.cpu_count(),
        buffer_size=70*1000*1000
    ).shuffle(
        buffer_size=2048
    ).map(
        map_func=split,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).prefetch(
        buffer_size=32
    )


def split(example):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_single_example(example, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (dense_float, bits_to_float),
        tf.reshape(samples["label"], (1,))
    )


def build_model(input_shape):
    feature = keras.Input(shape=(N,))
    bool_feature = keras.Input(shape=(M,), dtype="uint8")
    one_hot = dataset._bit_to_float(bool_feature)
    dense_input = tf.reshape(
        keras.backend.concatenate([feature, one_hot], 1),
        input_shape)
    output = actual_model(dense_input)

    model = keras.Model([feature, bool_feature], output)
    return model

def _bit_to_float(string_batch: tf.Tensor):
    return tf.dtypes.cast(tf.reshape(
        tf.bitwise.bitwise_and(
            tf.bitwise.right_shift(
                tf.expand_dims(string_batch, 2),
                tf.reshape(
                    tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),
                    (1, 1, 8)
                ),
            ),
            tf.constant(0x01, dtype=tf.uint8)
        ),
        (tf.shape(string_batch)[0], -1)
    ), tf.float32)

Thanks to all these optimizations:

  • Bandwidth consumption is around 90MB/s
  • CPU usage is around 20%
  • First epoch spends 20 minutes
  • Successives epochs spend 5 minutes each

So this seems to be a good first setup. But CPU and BW are still not overused, so any advice is still welcomed!


Edit Bis

So, after some benchmarking I came accross what I think is our best input pipeline:

def build_dataset(file_pattern):
    tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        TFRecordDataset,
        cycle_length=tf.data.experimental.AUTOTUNE,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        2048
    ).batch(
        batch_size=64,
        drop_remainder=True,
    ).map(
        map_func=parse_examples_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).cache(
    ).prefetch(
        tf.data.experimental.AUTOTUNE
    )

def parse_examples_batch(examples):
    preprocessed_sample_columns = {
        "features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(examples, preprocessed_sample_columns)
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (samples['features'], bits_to_float),
        tf.expand_dims(samples["label"], 1)
    )

So, what's new:

  • According to this GitHub issue, the TFRecordDataset interleaving is a legacy one, so interleave function is better.
  • batch before map is a good habit (vectorizing your function) and reduce the number of times the mapped function is called.
  • No need of repeat anymore. Since TF2.0, the Keras model API supports the dataset API and can use cache (see the SO post)
  • Switch from a VarLenFeature to a FixedLenSequenceFeature, removing a useless call to tf.sparse.to_dense.

Hope this can help. Advices are still welcomed.

like image 480
AlexisBRENON Avatar asked Sep 19 '19 15:09

AlexisBRENON


People also ask

What does TF data autotune do?

AUTOTUNE , which will prompt the tf. data runtime to tune the value dynamically at runtime. Note that the prefetch transformation provides benefits any time there is an opportunity to overlap the work of a "producer" with the work of a "consumer."

How can TensorFlow be used to configure the dataset for performance?

Tensorflow and pre-trained model can be used to configure the dataset for performance using the 'AUTOTUNE' attribute that is present in the 'tf. Data' module. Buffered prefetching is used to ensure that the data can be taken from disk without having I/O become blocking.

What does dataset prefetch do?

prefetch() function is used to produce a dataset that prefetches the specified elements from this given dataset.

What is an input pipeline?

The input pipeline is a quick and easy utility provided in tf. data api to make complex input pipelines from simple and reusable codes and all in few lines of code. It also allows handling a large amount of data, thus giving low-end machines an advantage in computing them.


1 Answers

Mentioning the Solution and the Important observations of @AlexisBRENON in the Answer Section, for the benefit of the Community.

Below mentioned are the Important Observations:

  1. According to this GitHub issue, the TFRecordDataset interleaving is a legacy one, so interleave function is better.
  2. batch before map is a good habit (vectorizing your function) and reduce the number of times the mapped function is called.
  3. No need of repeat anymore. Since TF2.0, the Keras model API supports the dataset API and can use cache (see the SO post)
  4. Switch from a VarLenFeature to a FixedLenSequenceFeature, removing a useless call to tf.sparse.to_dense.

Code for the Pipeline, with improved performance, in line with above observations is mentioned below:

def build_dataset(file_pattern):
    tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        TFRecordDataset,
        cycle_length=tf.data.experimental.AUTOTUNE,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        2048
    ).batch(
        batch_size=64,
        drop_remainder=True,
    ).map(
        map_func=parse_examples_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).cache(
    ).prefetch(
        tf.data.experimental.AUTOTUNE
    )

def parse_examples_batch(examples):
    preprocessed_sample_columns = {
        "features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(examples, preprocessed_sample_columns)
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (samples['features'], bits_to_float),
        tf.expand_dims(samples["label"], 1)
    )
like image 92
Tensorflow Support Avatar answered Oct 23 '22 01:10

Tensorflow Support