Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallelize tf.from_generator using tf.contrib.data.parallel_interleave

I have a bunch of JSON array files (AVRO to be accurate) and each of them yield multiple samples for training a Keras Model. Using ideas from @GPhilo and from @jsimsa, I was able to come up with this to parallelize my input pipeline. Unable to figure out how to design the generator(n) to divide the work of processing files. The code fails inside parse_file(f) as the function expects a string file path and not a Tensor,

N = num_cores = 2
files_to_process = ["f1.avro", "f2.avro", "f3.avro"]
shuffle_size = prefetch_buffer = 1000
batch_size = 512

def generator(n):
    size = math.ceil(len(files_to_process) / N)
    start_index = n * size
    end_index = start_index + size

    def gen():
        # for f in files_to_process[start_index:end_index]:
        for f in tf.slice(files_to_process, start_index, size):
            yield f

    return gen

def dataset(n):
    return tf.data.Dataset.from_generator(generator(n), (tf.string,))

def process_file(f):
    examples_x, examples_y = parse_file(f)
    return examples_x, examples_y

ds = tf.data.Dataset.range(N)
ds = ds.apply(tf.contrib.data.parallel_interleave(dataset, cycle_length=N))
ds = ds.map(process_file, num_parallel_calls=N)
ds = ds.prefetch(prefetch_buffer)
ds = ds.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
ds = ds.batch(batch_size).shuffle(shuffle_size)

...
myTfKerasModel.fit( ds.make_one_iterator(), NUM_TRAIN_SAMPLES // batch_size )
  • What is the correct way to design generator(n) here
  • Is this an optimized way to design my input pipeline using parallel_interleave and flat_map
like image 434
Nitin Avatar asked Sep 05 '18 07:09

Nitin


People also ask

What is TF data?

The tf. data API enables you to build complex input pipelines from simple, reusable pieces. For example, the pipeline for an image model might aggregate data from files in a distributed file system, apply random perturbations to each image, and merge randomly selected images into a batch for training.

How do you get the shape of a TF dataset?

To get the shape of a tensor, you can easily use the tf. shape() function. This method will help the user to return the shape of the given tensor. For example, suppose you have a tensor filled with integer numbers and you want to check the shape of the given input tensor.

What does TF data dataset from_tensor_slices do?

from_tensor_slices(tensor) creates a Dataset whose elements are slices of the given tensors.


1 Answers

It seems to me you're complicating your life unnecessarily with the generator. This is how I'd implement your input pipeline:

def parse_file_tf(filename):
    return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])

# version with map
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.map(parse_file_tf, num_parallel_calls=N)
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(2)
it = dataset.make_one_shot_iterator()

To test it, I define a dummy parse_file as:

i=0
def parse_file(f):
    global i
    i += 1
    return np.asarray([i]*i, dtype=np.float32), np.asarray([i]*i, dtype=np.float32) # mimicks variable-length examples_x, examples_y

which I feed into a basic loop that shows what the iterator returns:

sess = tf.Session()
try:
    while True:
        x, y = it.get_next()
        vx, vy = sess.run([x,y])
        print(vx)
        print(vy)
except tf.errors.OutOfRangeError:
    pass
sess.close()

Running the code above prints:

[2. 3. 2. 1. 3. 3.]
[2. 3. 2. 1. 3. 3.]

Explanation of the pipeline

Essentially, I leave the parallelization issue to map, where I can pass the number of threads it should run. No need for generators iterating over ranges and those extra complications.

I chose map over parallel_interleave because the latter requires you to generate a Dataset instance for each item it returns, which in your case doesn't really make sense because you already have loaded all values in memory when you run parse_file. parallel_interleave makes sense if you slowly generate the values (e.g., by applying tf.data.TFRecordDataset to a list of filenames), but if your dataset fits in memory go for map.

About the tf.py_func limitations, they do not affect your trained network, only the input pipeline. Ideally, you'll have a different pipeline for your training and for your final use of the network. You only need to take care of the limitations during the latter, while for training (unless you do something very specific with distributed training and/or moving the training across machines) you're reasonably safe.


Version with Generator

If your JSON files are very big and their content won't fit in memory, you can use a generator, but slightly different from the approach you began with. The idea is, the generator goes through the JSON file and yields one record at a time. Then, the generator has to be your parse_file function. As an example, let's assume you have the following parse_file generator:

i = 3
def parse_file(filename):
    global i
    i += 1
    ctr = 0
    while ctr < i:
        yield ctr, ctr

In this case, the pipeline would look as follows:

def wrap_generator(filename):
    return tf.data.Dataset.from_generator(parse_file(filename), [tf.int32, tf.int32])

files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.apply(tf.contrib.data.parallel_interleave(wrap_generator, cycle_length=N))
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.shuffle(shuffle_size).batch(batch_size).prefetch(2)
it = dataset.make_one_shot_iterator()

Note that here we need to use parallel_interleave because we turn the generators into Dataset instances from which we extract values. The rest stays the same.

Feeding this to the same sample loop as above prints:

[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
like image 143
GPhilo Avatar answered Nov 05 '22 17:11

GPhilo