In order to speed up data augmentation for training a neural network, I am trying to have some form of parallel processing for feeding my GPU with data. At the moment the limitation is how fast I generate augmented data, not how fast the GPU trains the network.
If I try to use multiprocessing=True
with a generator, I get the following error with keras 2.2.0 in Python 3.6.6 under Windows 10 (v1083) 64-bit:
ValueError: Using a generator with
use_multiprocessing=True
is not supported on Windows (no marshalling of generators across process boundaries). Instead, use single thread/process or multithreading.
I found e.g. the following on GitHub so this is an expected behavior with keras under Windows. That link seemed to suggest moving to a sequence instead of a generator (even though the error message seems to suggest to use multithreading, but I also could not figure out how to use multithreading with keras instead of multi-processing - I may have overlooked it in the documentation, but I just did not find it). So, I used the the code below (modifying an example using a sequence), but that also achieves no speed-up or in the variant with use_multiprocessing=True
just freezes up.
Am I missing something obvious here for how to get some form of parallel generator going?
Minimal (non-)working example:
from keras.utils import Sequence
from keras.models import Sequential
from keras.layers import Dense
from keras.utils import to_categorical
import numpy as np
class DummySequence(Sequence):
def __init__(self, x_set, y_set, batch_size):
self.x, self.y = x_set, y_set
self.batch_size = batch_size
def __len__(self):
return int(np.ceil(len(self.x) / float(self.batch_size)))
def __getitem__(self, idx):
batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]
return np.array(batch_x), np.array(batch_y)
x = np.random.random((100, 3))
y = to_categorical(np.random.random(100) > .5).astype(int)
seq = DummySequence(x, y, 10)
model = Sequential()
model.add(Dense(32, input_dim=3))
model.add(Dense(2, activation='softmax'))
model.compile(optimizer='rmsprop',
loss='categorical_crossentropy',
metrics=['accuracy'])
print('single worker')
model.fit_generator(generator=seq,
steps_per_epoch = 100,
epochs = 2,
verbose=2,
workers=1)
print('achieves no speed-up')
model.fit_generator(generator=seq,
steps_per_epoch = 100,
epochs = 2,
verbose=2,
workers=6,
use_multiprocessing=False)
print('Does not run')
model.fit_generator(generator=seq,
steps_per_epoch = 100,
epochs = 2,
verbose=2,
workers=6,
use_multiprocessing=True)
In combination with a sequence, using multi_processing=False and workers=e.g. 4 does work.
I just realized that in the example code in the question, I was not seeing the speed-up, because the data was being generated too fast. By inserting a time.sleep(2) this becomes evident.
class DummySequence(Sequence):
def __init__(self, x_set, y_set, batch_size):
self.x, self.y = x_set, y_set
self.batch_size = batch_size
def __len__(self):
return int(np.ceil(len(self.x) / float(self.batch_size)))
def __getitem__(self, idx):
batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]
time.sleep(2)
return np.array(batch_x), np.array(batch_y)
x = np.random.random((100, 3))
y = to_categorical(np.random.random(100) > .5).astype(int)
seq = DummySequence(x, y, 10)
model = Sequential()
model.add(Dense(32, input_dim=3))
model.add(Dense(2, activation='softmax'))
model.compile(optimizer='rmsprop',
loss='categorical_crossentropy',
metrics=['accuracy'])
print('single worker')
model.fit_generator(generator=seq,
steps_per_epoch = 10,
epochs = 2,
verbose=2,
workers=1)
print('achieves speed-up!')
model.fit_generator(generator=seq,
steps_per_epoch = 10,
epochs = 2,
verbose=2,
workers=4,
use_multiprocessing=False)
This produced on my laptop the following:
single worker
>>> model.fit_generator(generator=seq,
... steps_per_epoch = 10,
... epochs = 2,
... verbose=2,
... workers=1)
Epoch 1/2
- 20s - loss: 0.6984 - acc: 0.5000
Epoch 2/2
- 20s - loss: 0.6955 - acc: 0.5100
and
achieves speed-up!
>>> model.fit_generator(generator=seq,
... steps_per_epoch = 10,
... epochs = 2,
... verbose=2,
... workers=4,
... use_multiprocessing=False)
Epoch 1/2
- 6s - loss: 0.6904 - acc: 0.5200
Epoch 2/2
- 6s - loss: 0.6900 - acc: 0.5000
Important notes:
You will probably want self.lock = threading.Lock()
in __init___
and then with self.lock:
in __getitem__
. Try to do the absolute bare minimum required within the with self.lock:
, as far as I understand it, that would be any reference to self.xxxx
(multi-threading is prevent while the with self.lock:
block is running).
Additionally, if you want multithreading to speed up calculations (i.e. CPU operations are the limit), do not expect any speed-up. The global-interpreter lock (GIL) will prevent that. Multithreading will only help you, if the limitation is in I/O operations. Apparently, to speed-up CPU computations we need true multiprocessing, which keras
currently does not support on Windows 10. Perhaps it is possible to hand-craft a multi-processing generator (I have no idea).
I tested your proposal at my solution with GPU / CPU monitoring.
Looks like one core runs more efficient way with more workers assigned. However no true multiprocessing is enabled.
TF 2.0
Keras 2.2.4
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With