Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Distribute data from `tf.data.Dataset` to multiple workers (e.g. for Horovod)

With Horovod, you basically run N independent instances (so it is a form of between-graph replication), and they communicate via special Horovod ops (basically broadcast + reduce).

Now let's say either instance 0, or some other external instance loads your data (via tf.data.Dataset). How would you distribute the iterator.get_next() to each instance? Using Horovod broadcast would be inefficient, as you would copy all the data to all instances.

Having the dataset in every instance, and doing all the loading in there, and then using shard on the dataset would also be inefficient, as you would load the data everywhere, and then throw away (N-1)/N of it. So that's why would also not want sharding, and instead have the dataset loading only in a single (producer/dataset worker) instance, which then distributes the batches on all the train workers.

I guess the TF MultiDeviceIterator provides some similar functionality (or basically exactly that) but I'm not sure whether that works together with Horovod, and how you would set it up?

Or maybe you can make the distribution somehow via TF workers (guide? (Maybe that is how you would configure MultiDeviceIterator as well?)

If possible, this should be via TensorFlow operations / functions (there are many related functions which might already give me this, but I might not know about them, or have misunderstood how it works). Or maybe the answer is that TensorFlow does not provide any such functionality yet? (This would still be useful to know. Then I would implement my own solution in C++, wrapped as a TensorFlow. But before doing so, it would be good to know whether this is really necessary.)

(Related is also this Horovod issue.)

(This question is actually a bit more generic than just Horovod, although Horovod might be a good example. You might have this problem always for distributed TensorFlow?)

(I collected an overview of all the distributed TensorFlow terminology and aspects here, mostly for clarification.)

(Related are (maybe?) also this, this, this, this or this questions.)

like image 511
Albert Avatar asked May 23 '20 17:05

Albert


People also ask

Is TF data dataset a generator?

data. Dataset objects as generators for the training of a machine learning model on Tensorflow, with parallelized processing. The tf. data pipeline is now the gold standard for building an efficient data pipeline for machine learning applications with TensorFlow.

What does TF data experimental autotune do?

AUTOTUNE , which will prompt the tf. data runtime to tune the value dynamically at runtime. Note that the prefetch transformation provides benefits any time there is an opportunity to overlap the work of a "producer" with the work of a "consumer."

What does TF data dataset do?

Applies a transformation function to this dataset. apply enables chaining of custom Dataset transformations, which are represented as functions that take one Dataset argument and return a transformed Dataset . A function that takes one Dataset argument and returns a Dataset .


2 Answers

As you said, copying the data in each instance and sharding the data for each instance would be impractical.

One solution would then be to separate the data in a data process and have each instance pull data from the data process as shown in the figure below. For example, this communication can be established using a queue.

In such a system, the data process would load the dataset, preprocess it into batches and push the batches into a queue. Each training instance would then pull batches from this queue. For example, you could pass the queue as a generator into the dataset API (see tf.data.Dataset.from_generator). Also, if batches are not produced fast enough, it is possible to create more data processes to increase the batches throughput.

Depending on your use case, the implementation specifics will vary. For more information, you can look up Networking and Interprocess communication and Multiprocessing pipes and queues.

                                                             Training        
                                                         +--------------+  ++
                                                         |              |   |
                                                    +----+  Instance 1  |   |
                                                    |    |              |   |
                                                    |    +--------------+   |
                                                    |                       |
                      Preprocessing                 |                       |
                  +--------------------+            +---->      X           |
                  |                    |            |                       |
             Load |                    | Batches    +           X           |
    Dataset+------>    Data Process    +--------->Queue                     |  N instances
                  |                    |            +           X           |  Distributed training
                  |                    |            |                       |  For example, using
                  +--------------------+            +---->      X           |  Horovod broadcast + reduce
                                                    |                       |
                                                    |        Training       |
                                                    |    +--------------+   |
                                                    |    |              |   |
                                                    +----+  Instance N  |   |
                                                         |              |   |
                                                         +--------------+  ++

For a tensorflow implementation, you could use tf.data.Dataset.shard with tf.data.TFRecordDataset.

The documentation addresses your inefficiency concern using TFRecords:

Important caveats:

  • Be sure to shard before you use any randomizing operator (such as shuffle).

  • Generally it is best if the shard operator is used early in the dataset pipeline. For example, when reading from a set of TFRecord files, shard before converting the dataset to input samples. This avoids reading every file on every worker. The following is an example of an efficient sharding strategy within a complete pipeline:

d = Dataset.list_files(pattern)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
like image 126
Maurice Qch Avatar answered Oct 12 '22 19:10

Maurice Qch


I would reccomend taking a looking at YogaDL. It allows you to cache your dataset such that during training (or re-training) you will only access the data you need on that shard, rather than throwing away (N-1)/N your data reads.

like image 1
Aaron Avatar answered Oct 12 '22 18:10

Aaron