I've tried to solve classification problem whose input data having 32 features and 16 labels by Deep Neural Network (DNN).
They look like,
# Input data
shape=(32,), dtype=float32,
np.array([-0.9349509 , 0.24052018, -0.29364416, 1.2375807 , -0.15996791,
0.32468656, 0.43856472, 0.00573635, -0.48105922, 0.09342893,
0.63206947, 0.44424117, 0.31256443, 0.09699771, 0.31004518,
0.8685253 , 0.74120486, 0.65404135, -0.4084895 , 0.07065713,
0.33964285, -0.20022233, -0.29489437, 0.40699714, 0.27028704,
0.74895304, -0.4846958 , 0.22371463, 0.3422047 , -0.24379562,
0.38614622, 0.01282159])
# Label (Class)
shape=(16,), dtype=int64, np.array([0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
I want to train my NN with over 100 million data (very large size), so it's not possible load whole dataset into python array.
After googling, I found tf.TFRecord
helps me to get out this capacity problem.
I followed the tutorial in the official TensorFlow site to write TFRecord file but could not find how to load the TFReocrd into the Keras Model.
I used writer to write the dataset in TFRecord file.
writer = tf.io.TFRecordWriter(filenames)
for i range(number_of_sample):
...
writer.write(serialize_example(input_data, label))
writer.close()
And serialize_example
method is followed
def serialize_example(input_data, label):
"""
Creates a tf.Example message ready to be written to a file.
"""
# Create a dictionary mapping the feature name to the tf.Example-compatible data type.
feature = {
'feature': tf.train.Feature(float_list=tf.train.FloatList(value=input_data)),
'label' : tf.train.Feature(int64_list=tf.train.Int64List(value=label))
}
# Create a Features message using tf.train.Example.
example_proto = tf.train.Example(
features=tf.train.Features(feature=feature))
return example_proto.SerializeToString()
After writing the TFRecord, I find out how to read the files by parsing the string of it
dataset = tf.data.TFRecordDataset(filenames=[filenames])
parsed_dataset = dataset.map(_parse_function, num_parallel_calls=8)
final_dataset = parsed_dataset.shuffle(buffer_size=number_of_sample).batch(10)
print(parsed_dataset)
# <ParallelMapDataset shapes: {feature: (32,), label: (16,)},
types: {feature: tf.float32, label: tf.int64}>
for parsed_record in parsed_dataset.take(1):
print(repr(parsed_record))
'''
{'feature': <tf.Tensor: id=10730, shape=(32,), dtype=float32, numpy=
array([ 0.3584828 , 0.43238872, 0.84813404, -0.23866414, -0.3381694 ,
-0.6825514 , -0.20499012, -0.60198826, 0.12879704, -0.6152373 ,
0.21901904, 0.10998161, 0.04357208, -0.19996743, -0.24080099,
-0.6282675 , 0.57822317, 0.10296232, -0.25011575, -0.3454151 ,
0.6235647 , -0.12194595, -0.18114032, -1.4484204 , -0.11394399,
-0.20868362, -0.00653742, 0.677903 , 0.09619896, -0.6428113 ,
-0.59125495, 0.22995417], dtype=float32)>,
'label': <tf.Tensor: id=10731, shape=(16,), dtype=int64,
numpy=array([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0])>}
'''
And _parse_function
method is followed
# Create a description of the features.
feature_description = {
'feature': tf.io.FixedLenFeature([32, ], tf.float32),
'label' : tf.io.FixedLenFeature([16, ], tf.int64)
}
def _parse_function(example_proto):
# Parse the input `tf.Example` proto using the dictionary above.
return tf.io.parse_single_example(example_proto, feature_description)
So far it seems like flowing smoothly, but when I tried to feed this dataset into keras Layer, it produced error.
Model Definition and execute training
inputs = keras.Input(shape=(32, ), name='feature')
x = layers.Dense(1024, activation='linear', name='dense_input')(inputs)
outputs = layers.Dense(expected_output, activation='softmax', name='label')(x)
model = keras.Model(inputs=inputs, outputs = outputs)
model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
loss='categorical_crossentropy', metrics=['accuracy','categorical_crossentropy'])
model.fit(final_dataset)
Output would be ...
1/Unknown - 0s 15ms/step
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
<ipython-input-41-bb3547f32c4a> in <module>()
8 loss='categorical_crossentropy', metrics=['accuracy','categorical_crossentropy'])
9
---> 10 model.fit(final_dataset )
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
726 max_queue_size=max_queue_size,
727 workers=workers,
--> 728 use_multiprocessing=use_multiprocessing)
729
730 def evaluate(self,
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in fit(self, model, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, **kwargs)
322 mode=ModeKeys.TRAIN,
323 training_context=training_context,
--> 324 total_epochs=epochs)
325 cbks.make_logs(model, epoch_logs, training_result, ModeKeys.TRAIN)
326
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in run_one_epoch(model, iterator, execution_function, dataset_size, batch_size, strategy, steps_per_epoch, num_samples, mode, training_context, total_epochs)
121 step=step, mode=mode, size=current_batch_size) as batch_logs:
122 try:
--> 123 batch_outs = execution_function(iterator)
124 except (StopIteration, errors.OutOfRangeError):
125 # TODO(kaftan): File bug about tf function and errors.OutOfRangeError?
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in execution_function(input_fn)
84 # `numpy` translates Tensors to values in Eager mode.
85 return nest.map_structure(_non_none_constant_value,
---> 86 distributed_function(input_fn))
87
88 return execution_function
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in __call__(self, *args, **kwds)
455
456 tracing_count = self._get_tracing_count()
--> 457 result = self._call(*args, **kwds)
458 if tracing_count == self._get_tracing_count():
459 self._call_counter.called_without_tracing()
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _call(self, *args, **kwds)
501 # This is the first call of __call__, so we have to initialize.
502 initializer_map = object_identity.ObjectIdentityDictionary()
--> 503 self._initialize(args, kwds, add_initializers_to=initializer_map)
504 finally:
505 # At this point we know that the initialization is complete (or less
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _initialize(self, args, kwds, add_initializers_to)
406 self._concrete_stateful_fn = (
407 self._stateful_fn._get_concrete_function_internal_garbage_collected( # pylint: disable=protected-access
--> 408 *args, **kwds))
409
410 def invalid_creator_scope(*unused_args, **unused_kwds):
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _get_concrete_function_internal_garbage_collected(self, *args, **kwargs)
1846 if self.input_signature:
1847 args, kwargs = None, None
-> 1848 graph_function, _, _ = self._maybe_define_function(args, kwargs)
1849 return graph_function
1850
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _maybe_define_function(self, args, kwargs)
2148 graph_function = self._function_cache.primary.get(cache_key, None)
2149 if graph_function is None:
-> 2150 graph_function = self._create_graph_function(args, kwargs)
2151 self._function_cache.primary[cache_key] = graph_function
2152 return graph_function, args, kwargs
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _create_graph_function(self, args, kwargs, override_flat_arg_shapes)
2039 arg_names=arg_names,
2040 override_flat_arg_shapes=override_flat_arg_shapes,
-> 2041 capture_by_value=self._capture_by_value),
2042 self._function_attributes,
2043 # Tell the ConcreteFunction to clean up its graph once it goes out of
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/framework/func_graph.py in func_graph_from_py_func(name, python_func, args, kwargs, signature, func_graph, autograph, autograph_options, add_control_dependencies, arg_names, op_return_value, collections, capture_by_value, override_flat_arg_shapes)
913 converted_func)
914
--> 915 func_outputs = python_func(*func_args, **func_kwargs)
916
917 # invariant: `func_outputs` contains only Tensors, CompositeTensors,
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in wrapped_fn(*args, **kwds)
356 # __wrapped__ allows AutoGraph to swap in a converted function. We give
357 # the function a weak reference to itself to avoid a reference cycle.
--> 358 return weak_wrapped_fn().__wrapped__(*args, **kwds)
359 weak_wrapped_fn = weakref.ref(wrapped_fn)
360
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in distributed_function(input_iterator)
71 strategy = distribution_strategy_context.get_strategy()
72 outputs = strategy.experimental_run_v2(
---> 73 per_replica_function, args=(model, x, y, sample_weights))
74 # Out of PerReplica outputs reduce or pick values to return.
75 all_outputs = dist_utils.unwrap_output_dict(
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in experimental_run_v2(self, fn, args, kwargs)
758 fn = autograph.tf_convert(fn, ag_ctx.control_status_ctx(),
759 convert_by_default=False)
--> 760 return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
761
762 def reduce(self, reduce_op, value, axis):
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in call_for_each_replica(self, fn, args, kwargs)
1785 kwargs = {}
1786 with self._container_strategy().scope():
-> 1787 return self._call_for_each_replica(fn, args, kwargs)
1788
1789 def _call_for_each_replica(self, fn, args, kwargs):
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in _call_for_each_replica(self, fn, args, kwargs)
2130 self._container_strategy(),
2131 replica_id_in_sync_group=constant_op.constant(0, dtypes.int32)):
-> 2132 return fn(*args, **kwargs)
2133
2134 def _reduce_to(self, reduce_op, value, destinations):
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/autograph/impl/api.py in wrapper(*args, **kwargs)
290 def wrapper(*args, **kwargs):
291 with ag_ctx.ControlStatusCtx(status=ag_ctx.Status.DISABLED):
--> 292 return func(*args, **kwargs)
293
294 if inspect.isfunction(func) or inspect.ismethod(func):
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in train_on_batch(model, x, y, sample_weight, class_weight, reset_metrics)
262 y,
263 sample_weights=sample_weights,
--> 264 output_loss_metrics=model._output_loss_metrics)
265
266 if reset_metrics:
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in train_on_batch(model, inputs, targets, sample_weights, output_loss_metrics)
309 sample_weights=sample_weights,
310 training=True,
--> 311 output_loss_metrics=output_loss_metrics))
312 if not isinstance(outs, list):
313 outs = [outs]
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _process_single_batch(model, inputs, targets, output_loss_metrics, sample_weights, training)
250 output_loss_metrics=output_loss_metrics,
251 sample_weights=sample_weights,
--> 252 training=training))
253 if total_loss is None:
254 raise ValueError('The model cannot be run '
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _model_loss(model, inputs, targets, output_loss_metrics, sample_weights, training)
164
165 if hasattr(loss_fn, 'reduction'):
--> 166 per_sample_losses = loss_fn.call(targets[i], outs[i])
167 weighted_losses = losses_utils.compute_weighted_loss(
168 per_sample_losses,
IndexError: list index out of range
I don't know what is the list
for.
» Code examples / Quick Keras Recipes / How to train a Keras model on TFRecord files Description: Loading TFRecords for computer vision models. TFRecords store a sequence of binary records, read linearly. They are useful format for storing data because they can be read efficiently. Learn more about TFRecords here.
The TFRecord file format is a simple record-oriented binary format. If your input data are on disk or working with large data then TensorFlow recommended using TFRecord format. You get a significant impact on the performance of your input pipeline.
The tf.data API supports a variety of file formats so that you can process large datasets that do not fit in memory. For example, the TFRecord file format is a simple record-oriented binary format that many TensorFlow applications use for training data.
There is a multitude of getting your images from the disk into the model: writing a custom generator, using Keras’ built-in tools, or loading it from a NumPy array. To make loading and parsing image data-efficient, we can resort to TFRecords as the underlying file format.
I'm doing something similar in TF 2.0 with a couple differences that may address your issues. Separate parsed_record in features and label:
feature, label = parsed_record['feature'], parsed_record['label']
To continue getting batches from a dataset use ds.repeat:
ds.repeat(ds.shuffle(buffer_size=number_of_sample).batch(batch_size))
My full input pipeline looks like:
def _parse_function_same_side(example_proto):
"""Extracts features and labels.
Args:
example_proto: tf.Example protocol
Returns:
A `tuple` `(features, labels)`:
features: A 2D tensor representing the features
labels: A tensor with the corresponding labels.
"""
feature_description = {
"features": tf.io.FixedLenFeature(4, tf.int64),
"label": tf.io.FixedLenFeature(1, tf.int64)
}
parsed_features = tf.io.parse_single_example(example_proto, feature_description)
features = parsed_features['features']
labels = tf.one_hot(parsed_features['label'],depth=len(hero_vocab))
return features, labels
def _input_fn(input_filenames, num_epochs=None,
shuffle=True, batch_size=50,compression_type=""):
ds=tf.data.TFRecordDataset(input_filenames,compression_type=compression_type)
ds=ds.map(_parse_function)
#only shuffle if shuffle flag
if shuffle:
ds = ds.shuffle(10000)
#take only dataset of length batch_size
ds = ds.batch(batch_size)
#make sure you can repeatedly take datasets from the TFRecord
ds = ds.repeat()
# Return the dataset.
return ds
After this I just directly feed the dataset to my model.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With