I'm using Keras 2.0.8 for U-net 2D medical segmentation project. Currently I'm struggling to create custom thread safe image generator (for X and y simultaneously). X and y are 4D matrix, with shape n_img x n_col x n_row x T, where T for X is 4, and for y is 1 (4 numerical labels are transformed into one hot encoding in 4th dimension)
Here is my code:
def gen_tr(X,y,batch_size):
n=np.floor((len(X)-1)/batch_size).astype(int)
s=list(X.shape)
s[0]=batch_size
while 1:
for i in range(n):
j=0
X_b=np.zeros(s,dtype=np.float32)
y_b=np.zeros(s,dtype=int)
while j<batch_size:
data=distort_imgs(X[i*batch_size+j,:,:,0, np.newaxis],
X[i*batch_size+j,:,:,1, np.newaxis],
X[i*batch_size+j,:,:,2, np.newaxis],
X[i*batch_size+j,:,:,3, np.newaxis],
y[i*batch_size+j,:,:,0, np.newaxis])
X_i=np.concatenate(data[:4],axis=2)
y_i=data[-1]
y_i=np.concatenate((y_i==0,y_i==1,y_i==2,y_i==4),
axis=2).astype(int)
X_b[j]=X_i
y_b[j]=y_i
j+=1
yield (X_b,y_b)
batch_size=20
gen = gen_tr(X_train,Y_train,batch_size)
steps=np.floor((len(X_train)-1)/batch_size).astype(int)
model.fit_generator(gen,steps_per_epoch=steps, epochs=5, verbose=1, shuffle=True,
max_queue_size=10,workers=2, use_multiprocessing=False)
And error:
Exception in thread Thread-13:
Traceback (most recent call last):
File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 916, in _bootstrap_inner
self.run()
File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\utils\data_utils.py", line 568, in data_generator_task
generator_output = next(self._generator)
ValueError: generator already executing
Traceback (most recent call last):
File "<ipython-input-17-1a91cea3a91e>", line 7, in <module>
max_queue_size=10,workers=2, use_multiprocessing=False)
File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\legacy\interfaces.py", line 87, in wrapper
return func(*args, **kwargs)
File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\engine\training.py", line 2011, in fit_generator
generator_output = next(output_generator)
StopIteration
I've tried solutions from: keunwoochoi.wordpress.com and stanford (same).
None of them worked. When I added:
import threading
class threadsafe_iter:
def __init__(self, it):
self.it = it
self.lock = threading.Lock()
def __iter__(self):
return self
def __next__(self):
with self.lock:
return self.it.next()
def threadsafe_generator(f):
def g(*a, **kw):
return threadsafe_iter(f(*a, **kw))
return g
@threadsafe_generator
#now goes my generator from above
I got error:
Epoch 1/5
Exception in thread Thread-10:
Traceback (most recent call last):
File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 916, in _bootstrap_inner
self.run()
File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\utils\data_utils.py", line 568, in data_generator_task
generator_output = next(self._generator)
File "<ipython-input-12-24605a93d655>", line 17, in __next__
return self.it.next()
AttributeError: 'generator' object has no attribute 'next'
Traceback (most recent call last):
File "<ipython-input-13-b07830ef87c0>", line 5, in <module>
max_queue_size=10,workers=2, use_multiprocessing=False)
File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\legacy\interfaces.py", line 87, in wrapper
return func(*args, **kwargs)
File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\engine\training.py", line 2011, in fit_generator
generator_output = next(output_generator)
StopIteration
With workers=1 inside fit_generator and everything works fine (provided generator without code from solutions), including next(gen) and gen.__ next__
With 1 thread data generator performance is insufficient, especially that I have multiple cores avaliable...
Can someone can help me with this problem? I'm kind of newbie in python's threading.
Edit: I have found a solution/workaround. Maybe a little bit too hacky for keras, but it works. Inspired by zsdonghao. By splitting dataset augmentation in 10 parts of 2750 samples I'm able to prepare data quite quickly and utilize gtx1080 in almost 100%. Also ram usage doesn't exceed ~22GB. Training of 1 epoch takes about 14-15min, Data prep/aug takes in total 10-12min. When I compare it to fit_generator with single worker that is more than 3 times decrease
If it may help someone, here is exact code:
import tensorlayer as tl
import pandas as pd
batch_size=20
epochs=10
batch_size=20
step_size=2750
steps=np.floor((len(X_train)-1)/step_size).astype(int)
s=list(X_train.shape)
train_all=pd.DataFrame()
eval_all=pd.DataFrame()
#training and evaluation
for i in range(epochs):
start_time = time.clock()
print('Epoch: {0:02d}'.format(i+1))
for j in range(steps):
ind=range(step_size*j,step_size*(j+1))
data = tl.prepro.threading_data([_ for _ in zip(X_train[ind,:,:,0, np.newaxis],
X_train[ind,:,:,1, np.newaxis],
X_train[ind,:,:,2, np.newaxis],
X_train[ind,:,:,3, np.newaxis],
y_train[ind])],fn=distort_imgs,thread_count=None)
X_s = data[:,0:4,:,:,:]
y_s = data[:,4,:,:,:]
X_s = X_s.transpose((0,2,3,1,4))
X_s.shape = (step_size, s[1], s[2], s[3])
y_s=np.concatenate((y_s==0,y_s==1,y_s==2,y_s==4),
axis=3).astype(int)
train=model.fit(X_s, y_s,class_weight=weights, verbose=0,
batch_size=batch_size, epochs=i+2,initial_epoch=i+1)
train.history['epoch']=i+1
train.history['step']=j+1
train=pd.DataFrame(train.history)
train_all=pd.concat([train_all,train],ignore_index=True)
print(train.to_string(index=False))
eval=model.evaluate(X_test, y_test, batch_size=batch_size, verbose=0)
eval=pd.DataFrame({'val_dice_coe':eval[0],'val_dice_hard_coe':eval[1], 'val_iou_coe':eval[2], 'val_loss':eval[3]},index=[0])
eval['epoch']=i+1
eval_all=pd.concat([eval_all,eval],ignore_index=True)
print(eval.to_string(index=False))
model.save('{0}_ep_{1}.h5'.format(model_name,i+1))
print('Epoch {0:02d} took: {1:.3f} min'.format(i+1,(time.clock()-start_time)/60))
In Python 3, you should use next(self.it)
instead of self.it.next()
.
You could also try using Keras Sequences, it seems safer, since it's indexed to preserve the correct order of data when multiprocessing.
Finally, it seems that workers
affect only the generator itself, not the model. In my tests (I'm not good in threading either...) the only difference I could see with more workers was a bigger queue of preloaded data waiting for their turn into the model.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With