Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In TensorFlow 2.0, how to feed TFRecord data to keras model?

I've tried to solve classification problem whose input data having 32 features and 16 labels by Deep Neural Network (DNN).

They look like,

# Input data
shape=(32,), dtype=float32, 
np.array([-0.9349509 ,  0.24052018, -0.29364416,  1.2375807 , -0.15996791,
        0.32468656,  0.43856472,  0.00573635, -0.48105922,  0.09342893,
        0.63206947,  0.44424117,  0.31256443,  0.09699771,  0.31004518,
        0.8685253 ,  0.74120486,  0.65404135, -0.4084895 ,  0.07065713,
        0.33964285, -0.20022233, -0.29489437,  0.40699714,  0.27028704,
        0.74895304, -0.4846958 ,  0.22371463,  0.3422047 , -0.24379562,
        0.38614622,  0.01282159])
# Label (Class)
shape=(16,), dtype=int64, np.array([0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

I want to train my NN with over 100 million data (very large size), so it's not possible load whole dataset into python array.
After googling, I found tf.TFRecord helps me to get out this capacity problem.

I followed the tutorial in the official TensorFlow site to write TFRecord file but could not find how to load the TFReocrd into the Keras Model.

TFRecord Writing Procedure

I used writer to write the dataset in TFRecord file.

writer = tf.io.TFRecordWriter(filenames)

for i range(number_of_sample):
    ...
    writer.write(serialize_example(input_data, label))
writer.close()

And serialize_example method is followed

def serialize_example(input_data, label):
    """
    Creates a tf.Example message ready to be written to a file.
    """
    # Create a dictionary mapping the feature name to the tf.Example-compatible data type.
    feature = {
        'feature': tf.train.Feature(float_list=tf.train.FloatList(value=input_data)),
        'label' : tf.train.Feature(int64_list=tf.train.Int64List(value=label))
    }

    # Create a Features message using tf.train.Example.
    example_proto = tf.train.Example(
        features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

TFRecord Reading Procedure

After writing the TFRecord, I find out how to read the files by parsing the string of it

dataset = tf.data.TFRecordDataset(filenames=[filenames])
parsed_dataset = dataset.map(_parse_function, num_parallel_calls=8)
final_dataset = parsed_dataset.shuffle(buffer_size=number_of_sample).batch(10)

print(parsed_dataset)
# <ParallelMapDataset shapes: {feature: (32,), label: (16,)}, 
types: {feature: tf.float32, label: tf.int64}>

for parsed_record in parsed_dataset.take(1):
    print(repr(parsed_record))
'''
{'feature': <tf.Tensor: id=10730, shape=(32,), dtype=float32, numpy=
array([ 0.3584828 ,  0.43238872,  0.84813404, -0.23866414, -0.3381694 ,
       -0.6825514 , -0.20499012, -0.60198826,  0.12879704, -0.6152373 ,
        0.21901904,  0.10998161,  0.04357208, -0.19996743, -0.24080099,
       -0.6282675 ,  0.57822317,  0.10296232, -0.25011575, -0.3454151 ,
        0.6235647 , -0.12194595, -0.18114032, -1.4484204 , -0.11394399,
       -0.20868362, -0.00653742,  0.677903  ,  0.09619896, -0.6428113 ,
       -0.59125495,  0.22995417], dtype=float32)>, 
'label': <tf.Tensor: id=10731, shape=(16,), dtype=int64, 
numpy=array([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0])>}
'''

And _parse_function method is followed

# Create a description of the features.
feature_description = {
    'feature': tf.io.FixedLenFeature([32, ], tf.float32),
    'label' : tf.io.FixedLenFeature([16, ], tf.int64)
}

def _parse_function(example_proto):
  # Parse the input `tf.Example` proto using the dictionary above.
  return tf.io.parse_single_example(example_proto, feature_description)

Feed TFRecord (Training)

So far it seems like flowing smoothly, but when I tried to feed this dataset into keras Layer, it produced error.

Model Definition and execute training

inputs = keras.Input(shape=(32, ), name='feature')
x = layers.Dense(1024, activation='linear', name='dense_input')(inputs)
outputs = layers.Dense(expected_output, activation='softmax', name='label')(x)

model = keras.Model(inputs=inputs, outputs = outputs)
model.compile(optimizer=tf.keras.optimizers.Adam(0.001), 
loss='categorical_crossentropy', metrics=['accuracy','categorical_crossentropy'])

model.fit(final_dataset)

Output would be ...

     1/Unknown - 0s 15ms/step
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-41-bb3547f32c4a> in <module>()
      8 loss='categorical_crossentropy', metrics=['accuracy','categorical_crossentropy'])
      9 
---> 10 model.fit(final_dataset )

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
    726         max_queue_size=max_queue_size,
    727         workers=workers,
--> 728         use_multiprocessing=use_multiprocessing)
    729 
    730   def evaluate(self,

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in fit(self, model, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, **kwargs)
    322                 mode=ModeKeys.TRAIN,
    323                 training_context=training_context,
--> 324                 total_epochs=epochs)
    325             cbks.make_logs(model, epoch_logs, training_result, ModeKeys.TRAIN)
    326 

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in run_one_epoch(model, iterator, execution_function, dataset_size, batch_size, strategy, steps_per_epoch, num_samples, mode, training_context, total_epochs)
    121         step=step, mode=mode, size=current_batch_size) as batch_logs:
    122       try:
--> 123         batch_outs = execution_function(iterator)
    124       except (StopIteration, errors.OutOfRangeError):
    125         # TODO(kaftan): File bug about tf function and errors.OutOfRangeError?

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in execution_function(input_fn)
     84     # `numpy` translates Tensors to values in Eager mode.
     85     return nest.map_structure(_non_none_constant_value,
---> 86                               distributed_function(input_fn))
     87 
     88   return execution_function

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in __call__(self, *args, **kwds)
    455 
    456     tracing_count = self._get_tracing_count()
--> 457     result = self._call(*args, **kwds)
    458     if tracing_count == self._get_tracing_count():
    459       self._call_counter.called_without_tracing()

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _call(self, *args, **kwds)
    501       # This is the first call of __call__, so we have to initialize.
    502       initializer_map = object_identity.ObjectIdentityDictionary()
--> 503       self._initialize(args, kwds, add_initializers_to=initializer_map)
    504     finally:
    505       # At this point we know that the initialization is complete (or less

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _initialize(self, args, kwds, add_initializers_to)
    406     self._concrete_stateful_fn = (
    407         self._stateful_fn._get_concrete_function_internal_garbage_collected(  # pylint: disable=protected-access
--> 408             *args, **kwds))
    409 
    410     def invalid_creator_scope(*unused_args, **unused_kwds):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _get_concrete_function_internal_garbage_collected(self, *args, **kwargs)
   1846     if self.input_signature:
   1847       args, kwargs = None, None
-> 1848     graph_function, _, _ = self._maybe_define_function(args, kwargs)
   1849     return graph_function
   1850 

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _maybe_define_function(self, args, kwargs)
   2148         graph_function = self._function_cache.primary.get(cache_key, None)
   2149         if graph_function is None:
-> 2150           graph_function = self._create_graph_function(args, kwargs)
   2151           self._function_cache.primary[cache_key] = graph_function
   2152         return graph_function, args, kwargs

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _create_graph_function(self, args, kwargs, override_flat_arg_shapes)
   2039             arg_names=arg_names,
   2040             override_flat_arg_shapes=override_flat_arg_shapes,
-> 2041             capture_by_value=self._capture_by_value),
   2042         self._function_attributes,
   2043         # Tell the ConcreteFunction to clean up its graph once it goes out of

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/framework/func_graph.py in func_graph_from_py_func(name, python_func, args, kwargs, signature, func_graph, autograph, autograph_options, add_control_dependencies, arg_names, op_return_value, collections, capture_by_value, override_flat_arg_shapes)
    913                                           converted_func)
    914 
--> 915       func_outputs = python_func(*func_args, **func_kwargs)
    916 
    917       # invariant: `func_outputs` contains only Tensors, CompositeTensors,

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in wrapped_fn(*args, **kwds)
    356         # __wrapped__ allows AutoGraph to swap in a converted function. We give
    357         # the function a weak reference to itself to avoid a reference cycle.
--> 358         return weak_wrapped_fn().__wrapped__(*args, **kwds)
    359     weak_wrapped_fn = weakref.ref(wrapped_fn)
    360 

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in distributed_function(input_iterator)
     71     strategy = distribution_strategy_context.get_strategy()
     72     outputs = strategy.experimental_run_v2(
---> 73         per_replica_function, args=(model, x, y, sample_weights))
     74     # Out of PerReplica outputs reduce or pick values to return.
     75     all_outputs = dist_utils.unwrap_output_dict(

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in experimental_run_v2(self, fn, args, kwargs)
    758       fn = autograph.tf_convert(fn, ag_ctx.control_status_ctx(),
    759                                 convert_by_default=False)
--> 760       return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    761 
    762   def reduce(self, reduce_op, value, axis):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in call_for_each_replica(self, fn, args, kwargs)
   1785       kwargs = {}
   1786     with self._container_strategy().scope():
-> 1787       return self._call_for_each_replica(fn, args, kwargs)
   1788 
   1789   def _call_for_each_replica(self, fn, args, kwargs):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in _call_for_each_replica(self, fn, args, kwargs)
   2130         self._container_strategy(),
   2131         replica_id_in_sync_group=constant_op.constant(0, dtypes.int32)):
-> 2132       return fn(*args, **kwargs)
   2133 
   2134   def _reduce_to(self, reduce_op, value, destinations):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/autograph/impl/api.py in wrapper(*args, **kwargs)
    290   def wrapper(*args, **kwargs):
    291     with ag_ctx.ControlStatusCtx(status=ag_ctx.Status.DISABLED):
--> 292       return func(*args, **kwargs)
    293 
    294   if inspect.isfunction(func) or inspect.ismethod(func):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in train_on_batch(model, x, y, sample_weight, class_weight, reset_metrics)
    262       y,
    263       sample_weights=sample_weights,
--> 264       output_loss_metrics=model._output_loss_metrics)
    265 
    266   if reset_metrics:

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in train_on_batch(model, inputs, targets, sample_weights, output_loss_metrics)
    309           sample_weights=sample_weights,
    310           training=True,
--> 311           output_loss_metrics=output_loss_metrics))
    312   if not isinstance(outs, list):
    313     outs = [outs]

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _process_single_batch(model, inputs, targets, output_loss_metrics, sample_weights, training)
    250               output_loss_metrics=output_loss_metrics,
    251               sample_weights=sample_weights,
--> 252               training=training))
    253       if total_loss is None:
    254         raise ValueError('The model cannot be run '

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _model_loss(model, inputs, targets, output_loss_metrics, sample_weights, training)
    164 
    165         if hasattr(loss_fn, 'reduction'):
--> 166           per_sample_losses = loss_fn.call(targets[i], outs[i])
    167           weighted_losses = losses_utils.compute_weighted_loss(
    168               per_sample_losses,

IndexError: list index out of range

I don't know what is the list for.

like image 215
구마왕 Avatar asked Nov 21 '19 09:11

구마왕


People also ask

What is a tfrecord file in keras?

» Code examples / Quick Keras Recipes / How to train a Keras model on TFRecord files Description: Loading TFRecords for computer vision models. TFRecords store a sequence of binary records, read linearly. They are useful format for storing data because they can be read efficiently. Learn more about TFRecords here.

What is tfrecord in TensorFlow?

The TFRecord file format is a simple record-oriented binary format. If your input data are on disk or working with large data then TensorFlow recommended using TFRecord format. You get a significant impact on the performance of your input pipeline.

What file formats does TensorFlow support?

The tf.data API supports a variety of file formats so that you can process large datasets that do not fit in memory. For example, the TFRecord file format is a simple record-oriented binary format that many TensorFlow applications use for training data.

How do I load an image into a keras model?

There is a multitude of getting your images from the disk into the model: writing a custom generator, using Keras’ built-in tools, or loading it from a NumPy array. To make loading and parsing image data-efficient, we can resort to TFRecords as the underlying file format.


1 Answers

I'm doing something similar in TF 2.0 with a couple differences that may address your issues. Separate parsed_record in features and label:

    feature, label = parsed_record['feature'], parsed_record['label']

To continue getting batches from a dataset use ds.repeat:

    ds.repeat(ds.shuffle(buffer_size=number_of_sample).batch(batch_size))

My full input pipeline looks like:

def _parse_function_same_side(example_proto):
    """Extracts features and labels.
  
    Args:
        example_proto: tf.Example protocol    
      Returns:
    A `tuple` `(features, labels)`:
      features: A 2D tensor representing the features
      labels: A tensor with the corresponding labels.
    """
    feature_description = {
        "features": tf.io.FixedLenFeature(4, tf.int64), 
        "label": tf.io.FixedLenFeature(1, tf.int64)
                }
    
    parsed_features = tf.io.parse_single_example(example_proto, feature_description)
    
    features = parsed_features['features']
    
    labels = tf.one_hot(parsed_features['label'],depth=len(hero_vocab))
    return features, labels
def _input_fn(input_filenames, num_epochs=None, 
              shuffle=True, batch_size=50,compression_type=""):
   
    ds=tf.data.TFRecordDataset(input_filenames,compression_type=compression_type)
    ds=ds.map(_parse_function)
    
    #only shuffle if shuffle flag
    if shuffle:
        ds = ds.shuffle(10000)
    
    #take only dataset of length batch_size
    ds = ds.batch(batch_size)
    
    #make sure you can repeatedly take datasets from the TFRecord
    ds = ds.repeat()
    
    # Return the dataset.
    return ds

After this I just directly feed the dataset to my model.

like image 98
Josh Ziegler Avatar answered Oct 28 '22 23:10

Josh Ziegler