I have a bunch of JSON array files (AVRO to be accurate) and each of them yield multiple samples for training a Keras Model. Using ideas from @GPhilo and from @jsimsa, I was able to come up with this to parallelize my input pipeline. Unable to figure out how to design the generator(n)
to divide the work of processing files. The code fails inside parse_file(f)
as the function expects a string file path and not a Tensor
,
N = num_cores = 2
files_to_process = ["f1.avro", "f2.avro", "f3.avro"]
shuffle_size = prefetch_buffer = 1000
batch_size = 512
def generator(n):
size = math.ceil(len(files_to_process) / N)
start_index = n * size
end_index = start_index + size
def gen():
# for f in files_to_process[start_index:end_index]:
for f in tf.slice(files_to_process, start_index, size):
yield f
return gen
def dataset(n):
return tf.data.Dataset.from_generator(generator(n), (tf.string,))
def process_file(f):
examples_x, examples_y = parse_file(f)
return examples_x, examples_y
ds = tf.data.Dataset.range(N)
ds = ds.apply(tf.contrib.data.parallel_interleave(dataset, cycle_length=N))
ds = ds.map(process_file, num_parallel_calls=N)
ds = ds.prefetch(prefetch_buffer)
ds = ds.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
ds = ds.batch(batch_size).shuffle(shuffle_size)
...
myTfKerasModel.fit( ds.make_one_iterator(), NUM_TRAIN_SAMPLES // batch_size )
generator(n)
hereparallel_interleave
and flat_map
The tf. data API enables you to build complex input pipelines from simple, reusable pieces. For example, the pipeline for an image model might aggregate data from files in a distributed file system, apply random perturbations to each image, and merge randomly selected images into a batch for training.
To get the shape of a tensor, you can easily use the tf. shape() function. This method will help the user to return the shape of the given tensor. For example, suppose you have a tensor filled with integer numbers and you want to check the shape of the given input tensor.
from_tensor_slices(tensor) creates a Dataset whose elements are slices of the given tensors.
It seems to me you're complicating your life unnecessarily with the generator. This is how I'd implement your input pipeline:
def parse_file_tf(filename):
return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])
# version with map
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.map(parse_file_tf, num_parallel_calls=N)
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(2)
it = dataset.make_one_shot_iterator()
To test it, I define a dummy parse_file
as:
i=0
def parse_file(f):
global i
i += 1
return np.asarray([i]*i, dtype=np.float32), np.asarray([i]*i, dtype=np.float32) # mimicks variable-length examples_x, examples_y
which I feed into a basic loop that shows what the iterator returns:
sess = tf.Session()
try:
while True:
x, y = it.get_next()
vx, vy = sess.run([x,y])
print(vx)
print(vy)
except tf.errors.OutOfRangeError:
pass
sess.close()
Running the code above prints:
[2. 3. 2. 1. 3. 3.]
[2. 3. 2. 1. 3. 3.]
Essentially, I leave the parallelization issue to map
, where I can pass the number of threads it should run. No need for generators iterating over ranges and those extra complications.
I chose map over parallel_interleave
because the latter requires you to generate a Dataset
instance for each item it returns, which in your case doesn't really make sense because you already have loaded all values in memory when you run parse_file
.
parallel_interleave
makes sense if you slowly generate the values (e.g., by applying tf.data.TFRecordDataset
to a list of filenames), but if your dataset fits in memory go for map
.
About the tf.py_func
limitations, they do not affect your trained network, only the input pipeline. Ideally, you'll have a different pipeline for your training and for your final use of the network. You only need to take care of the limitations during the latter, while for training (unless you do something very specific with distributed training and/or moving the training across machines) you're reasonably safe.
If your JSON files are very big and their content won't fit in memory, you can use a generator, but slightly different from the approach you began with.
The idea is, the generator goes through the JSON file and yield
s one record at a time. Then, the generator has to be your parse_file
function. As an example, let's assume you have the following parse_file
generator:
i = 3
def parse_file(filename):
global i
i += 1
ctr = 0
while ctr < i:
yield ctr, ctr
In this case, the pipeline would look as follows:
def wrap_generator(filename):
return tf.data.Dataset.from_generator(parse_file(filename), [tf.int32, tf.int32])
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.apply(tf.contrib.data.parallel_interleave(wrap_generator, cycle_length=N))
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.shuffle(shuffle_size).batch(batch_size).prefetch(2)
it = dataset.make_one_shot_iterator()
Note that here we need to use parallel_interleave
because we turn the generators into Dataset
instances from which we extract values.
The rest stays the same.
Feeding this to the same sample loop as above prints:
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With