Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Input pipeline w/ keras.utils.Sequence object or tf.data.Dataset?

I am currently using a tf.keras.utils.Sequence object to generate image batches for a CNN. I am using Tensorflow 2.2 and the Model.fit method for the model. When I fit the model, the following warning is thrown in each epoch when I set use_multiprocessing=True in tf.keras.model.fit(...):

WARNING:tensorflow:multiprocessing can interact badly with TensorFlow,
 causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended

The model is optimizing just fine, as expected from the docs and the fact that I am using a Sequence-based generator. But if use_multiprocessing is going to be a deprecated functionality in lieu of tf.data objects, I would like to be using the most up-to-date input pipeline. I currently use the following tf.keras.utils.Sequence-based generator inspired by this article on good practices for partitioning large datasets: https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly

class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, data_dir, batch_size=32, dim=(128,128), n_channels=1,
             n_classes=2, shuffle=True, **augmentation_kwargs):
    'Initialization'
    self.dim = dim
    self.batch_size = batch_size
    self.labels = labels
    self.list_IDs = list_IDs
    self.data_dir = data_dir
    self.n_channels = n_channels
    self.n_classes = n_classes
    self.shuffle = shuffle
    self.on_epoch_end()
    self.augmentor = keras.preprocessing.image.ImageDataGenerator(**augmentation_kwargs)

def __len__(self):
    'Denotes the number of batches per epoch'
    return int(np.floor(len(self.list_IDs) / self.batch_size))

def __getitem__(self, index):
    'Generate one batch of data'
    # Generate indexes of the batch
    indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

    # Find list of IDs
    list_IDs_temp = [self.list_IDs[k] for k in indexes]

    # Generate data
    X, y = self.__data_generation(list_IDs_temp)

    return X, y

def on_epoch_end(self):
    'Updates indexes after each epoch'
    self.indexes = np.arange(len(self.list_IDs))
    if self.shuffle == True:
        np.random.shuffle(self.indexes)

def __data_generation(self, list_IDs_temp):
    'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
    # Initialization
    X = np.empty((self.batch_size, *self.dim))
    y = np.empty((self.batch_size), dtype=int)

    # Generate data
    for i, ID in enumerate(list_IDs_temp):

        # Store sample
        X[i,] = np.load(self.data_dir +'/{}_stars.npy'.format(ID))

        # Store class
        y[i] = self.labels[ID]

    # Reshape and apply augmentation to sample
    X,y = self.augmentor.flow(X.reshape(self.batch_size,*self.dim,1),y=y,
                              shuffle=False,batch_size=self.batch_size)[0]

    return X, y

All data from all classes is in the data_dir directory and are stored as individual .npy files. The IDs come from a list of strings. The class labels are taken from a dictionary whose keys are the IDs -- as in the article.

I really like the intuition of the Sequence generator set-up. I can also easily generator random batches to check that it is behaving as I would expect. But how can I reproduce this set-up with tf.data? How do I reproduce the multiprocessing batch generation of a Sequence generator with the interleave and prefetch methods of tf.data.Dataset? And/or can I simply ingest this Sequence-based generator with the tf.data.Dataset.from_generator() method?

Many thanks in advance.

like image 976
Connor Avatar asked Jun 12 '20 15:06

Connor


People also ask

What is Keras utils sequence?

keras. utils. Sequence and define the methods: __init__ , __getitem__ , __len__ . In addition, you can define the method on_epoch_end , which is called at the end of each epoch and is usually used to shuffle the sample indexes. There is an example in the link you gave Tensorflow Sequence.

What is TF data pipeline?

The tf. data API enables you to build complex input pipelines from simple, reusable pieces. For example, the pipeline for an image model might aggregate data from files in a distributed file system, apply random perturbations to each image, and merge randomly selected images into a batch for training.

What is TF data dataset?

TensorFlow Datasets is a collection of datasets ready to use, with TensorFlow or other Python ML frameworks, such as Jax. All datasets are exposed as tf. data. Datasets , enabling easy-to-use and high-performance input pipelines. To get started see the guide and our list of datasets.


1 Answers

may be to late to answer, but that what I did and it's work fine for me; 1- my class was like that;

class DataGen(Sequence):
    def __init__(self, df, sr=8000, seconds=3, batch_size=16, shuffle=True):
        self.files = np.array(df.filepath)
        self.label = np.array(df.label)
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.sr = sr
        self.seconds = seconds
        self.dim = self.sr*self.seconds
        self.on_epoch_end()
    
    def __len__():
        return len(self.label)//self.batch_size
    
    def __getitem__(self, x):
        indexs = self.indexs[np.arange(x, x+self.batch_size)]
        return self.__getBatch__(indexs)
        
    def __getBatch__(self, indexs):
        X, y = [], []
        for i in indexs:
            wav = self.__loadFile__(self.files[i])
            X.append(librosa.feature.mfcc(wav, self.sr).T)
            y.append(self.label[i])
        return tf.convert_to_tensor(X), to_categorical(y, num_classes=2)
        
    def __loadFile__(self, file):
        y, sr = librosa.load(file, sr=8000, mono=True)
        if len(y)>self.dim:
            return y[:self.dim]
        return np.pad(y, (0, self.dim-len(y)), 'constant', constant_values=0)
        
    def on_epoch_end(self):
        self.indexs = np.arange(len(self.label))
        if self.shuffle:
            np.random.shuffle(self.indexs)

2- than I change to a function like follow;

def gen(sr=8000, seconds=3, batch_size=16, shuffle=True):
    dim = sr*seconds
    def loadFile(file):
        wav, _ = librosa.load(file, sr=sr, mono=True)
        if len(wav)>dim:
            return wav[:dim]
        return np.pad(wav, (0, dim-len(wav)), 'constant', constant_values=0)
    
    while True:
        indexs = np.arange(len(df))
        if shuffle:
            np.random.shuffle(indexs)
        
        for x in range(len(df)//batch_size):
            X, y = [], []
            for i in indexs[np.arange(x*batch_size, (x+1)*batch_size)]:
                X.append(librosa.feature.mfcc(loadFile(df.filepath[i]), sr).T)
                y.append(df.label[i])
                
            yield tf.convert_to_tensor(X), to_categorical(y, num_classes=2)

3- and works fine:

dataset = tf.data.Dataset.from_generator(gen, (tf.dtypes.float32, tf.dtypes.int32))
like image 163
Walid Bousseta Avatar answered Oct 07 '22 10:10

Walid Bousseta