Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

keras thread safe generator for model.fit_generator with Python 3.6.x

I'm using Keras 2.0.8 for U-net 2D medical segmentation project. Currently I'm struggling to create custom thread safe image generator (for X and y simultaneously). X and y are 4D matrix, with shape n_img x n_col x n_row x T, where T for X is 4, and for y is 1 (4 numerical labels are transformed into one hot encoding in 4th dimension)

Here is my code:

def gen_tr(X,y,batch_size):
    n=np.floor((len(X)-1)/batch_size).astype(int)
    s=list(X.shape)
    s[0]=batch_size
    while 1:
        for i in range(n):
            j=0
            X_b=np.zeros(s,dtype=np.float32)
            y_b=np.zeros(s,dtype=int)
            while j<batch_size:
                data=distort_imgs(X[i*batch_size+j,:,:,0, np.newaxis],
                              X[i*batch_size+j,:,:,1, np.newaxis], 
                              X[i*batch_size+j,:,:,2, np.newaxis],
                              X[i*batch_size+j,:,:,3, np.newaxis], 
                              y[i*batch_size+j,:,:,0, np.newaxis])
                X_i=np.concatenate(data[:4],axis=2)
                y_i=data[-1]
                y_i=np.concatenate((y_i==0,y_i==1,y_i==2,y_i==4),
                               axis=2).astype(int)
                X_b[j]=X_i
                y_b[j]=y_i
                j+=1
            yield (X_b,y_b)
batch_size=20
gen = gen_tr(X_train,Y_train,batch_size)
steps=np.floor((len(X_train)-1)/batch_size).astype(int)
model.fit_generator(gen,steps_per_epoch=steps, epochs=5, verbose=1, shuffle=True, 
max_queue_size=10,workers=2, use_multiprocessing=False)

And error:

Exception in thread Thread-13:
Traceback (most recent call last):
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\utils\data_utils.py", line 568, in data_generator_task
    generator_output = next(self._generator)
ValueError: generator already executing

Traceback (most recent call last):

  File "<ipython-input-17-1a91cea3a91e>", line 7, in <module>
    max_queue_size=10,workers=2, use_multiprocessing=False)

  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\legacy\interfaces.py", line 87, in wrapper
    return func(*args, **kwargs)

  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\engine\training.py", line 2011, in fit_generator
    generator_output = next(output_generator)

StopIteration

I've tried solutions from: keunwoochoi.wordpress.com and stanford (same).

None of them worked. When I added:

import threading
class threadsafe_iter:
def __init__(self, it):
    self.it = it
    self.lock = threading.Lock()
def __iter__(self):
    return self
def __next__(self):
    with self.lock:
        return self.it.next()

def threadsafe_generator(f):
    def g(*a, **kw):
        return threadsafe_iter(f(*a, **kw))
    return g

@threadsafe_generator
#now goes my generator from above

I got error:

Epoch 1/5
Exception in thread Thread-10:
Traceback (most recent call last):
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\utils\data_utils.py", line 568, in data_generator_task
    generator_output = next(self._generator)
  File "<ipython-input-12-24605a93d655>", line 17, in __next__
    return self.it.next()
AttributeError: 'generator' object has no attribute 'next'

Traceback (most recent call last):

  File "<ipython-input-13-b07830ef87c0>", line 5, in <module>
    max_queue_size=10,workers=2, use_multiprocessing=False)

  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\legacy\interfaces.py", line 87, in wrapper
    return func(*args, **kwargs)

  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\engine\training.py", line 2011, in fit_generator
    generator_output = next(output_generator)

StopIteration

With workers=1 inside fit_generator and everything works fine (provided generator without code from solutions), including next(gen) and gen.__ next__

With 1 thread data generator performance is insufficient, especially that I have multiple cores avaliable...

Can someone can help me with this problem? I'm kind of newbie in python's threading.

Edit: I have found a solution/workaround. Maybe a little bit too hacky for keras, but it works. Inspired by zsdonghao. By splitting dataset augmentation in 10 parts of 2750 samples I'm able to prepare data quite quickly and utilize gtx1080 in almost 100%. Also ram usage doesn't exceed ~22GB. Training of 1 epoch takes about 14-15min, Data prep/aug takes in total 10-12min. When I compare it to fit_generator with single worker that is more than 3 times decrease

If it may help someone, here is exact code:

import tensorlayer as tl
import pandas as pd

batch_size=20
epochs=10
batch_size=20
step_size=2750
steps=np.floor((len(X_train)-1)/step_size).astype(int)
s=list(X_train.shape)
train_all=pd.DataFrame()
eval_all=pd.DataFrame()

#training and evaluation
for i in range(epochs):
    start_time = time.clock()
    print('Epoch: {0:02d}'.format(i+1))
    for j in range(steps):
        ind=range(step_size*j,step_size*(j+1))
        data = tl.prepro.threading_data([_ for _ in zip(X_train[ind,:,:,0, np.newaxis],
                                                        X_train[ind,:,:,1, np.newaxis], 
                                                        X_train[ind,:,:,2, np.newaxis],
                                                        X_train[ind,:,:,3, np.newaxis],
                                                        y_train[ind])],fn=distort_imgs,thread_count=None)
        X_s = data[:,0:4,:,:,:]                                                 
        y_s = data[:,4,:,:,:]
        X_s = X_s.transpose((0,2,3,1,4))
        X_s.shape = (step_size, s[1], s[2], s[3])
        y_s=np.concatenate((y_s==0,y_s==1,y_s==2,y_s==4),
                                           axis=3).astype(int)
        train=model.fit(X_s, y_s,class_weight=weights, verbose=0,
                        batch_size=batch_size, epochs=i+2,initial_epoch=i+1)
        train.history['epoch']=i+1
        train.history['step']=j+1
        train=pd.DataFrame(train.history)
        train_all=pd.concat([train_all,train],ignore_index=True)
        print(train.to_string(index=False))
    eval=model.evaluate(X_test, y_test, batch_size=batch_size, verbose=0)
    eval=pd.DataFrame({'val_dice_coe':eval[0],'val_dice_hard_coe':eval[1], 'val_iou_coe':eval[2], 'val_loss':eval[3]},index=[0])
    eval['epoch']=i+1
    eval_all=pd.concat([eval_all,eval],ignore_index=True)
    print(eval.to_string(index=False))
    model.save('{0}_ep_{1}.h5'.format(model_name,i+1))
    print('Epoch {0:02d} took: {1:.3f} min'.format(i+1,(time.clock()-start_time)/60))
like image 504
Szymon Kocot Avatar asked Oct 01 '17 03:10

Szymon Kocot


1 Answers

In Python 3, you should use next(self.it) instead of self.it.next().

You could also try using Keras Sequences, it seems safer, since it's indexed to preserve the correct order of data when multiprocessing.

Finally, it seems that workers affect only the generator itself, not the model. In my tests (I'm not good in threading either...) the only difference I could see with more workers was a bigger queue of preloaded data waiting for their turn into the model.

like image 185
Daniel Möller Avatar answered Nov 11 '22 02:11

Daniel Möller