Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting some form of keras multi-processing/threading to work on Windows

In order to speed up data augmentation for training a neural network, I am trying to have some form of parallel processing for feeding my GPU with data. At the moment the limitation is how fast I generate augmented data, not how fast the GPU trains the network.

If I try to use multiprocessing=True with a generator, I get the following error with keras 2.2.0 in Python 3.6.6 under Windows 10 (v1083) 64-bit:

ValueError: Using a generator with use_multiprocessing=True is not supported on Windows (no marshalling of generators across process boundaries). Instead, use single thread/process or multithreading.

I found e.g. the following on GitHub so this is an expected behavior with keras under Windows. That link seemed to suggest moving to a sequence instead of a generator (even though the error message seems to suggest to use multithreading, but I also could not figure out how to use multithreading with keras instead of multi-processing - I may have overlooked it in the documentation, but I just did not find it). So, I used the the code below (modifying an example using a sequence), but that also achieves no speed-up or in the variant with use_multiprocessing=True just freezes up.

Am I missing something obvious here for how to get some form of parallel generator going?

Minimal (non-)working example:

from keras.utils import Sequence
from keras.models import Sequential
from keras.layers import Dense
from keras.utils import to_categorical
import numpy as np

class DummySequence(Sequence):

    def __init__(self, x_set, y_set, batch_size):
        self.x, self.y = x_set, y_set
        self.batch_size = batch_size

    def __len__(self):
        return int(np.ceil(len(self.x) / float(self.batch_size)))

    def __getitem__(self, idx):        
        batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]

        return np.array(batch_x), np.array(batch_y)



x = np.random.random((100, 3))
y = to_categorical(np.random.random(100) > .5).astype(int)

seq = DummySequence(x, y, 10)

model = Sequential()
model.add(Dense(32, input_dim=3))
model.add(Dense(2, activation='softmax'))
model.compile(optimizer='rmsprop',
              loss='categorical_crossentropy',
              metrics=['accuracy'])

print('single worker')
model.fit_generator(generator=seq, 
                    steps_per_epoch = 100,
                    epochs = 2, 
                    verbose=2,
                    workers=1)
print('achieves no speed-up')
model.fit_generator(generator=seq, 
                    steps_per_epoch = 100,
                    epochs = 2, 
                    verbose=2,
                    workers=6,
                    use_multiprocessing=False)
print('Does not run')
model.fit_generator(generator=seq, 
                    steps_per_epoch = 100,
                    epochs = 2, 
                    verbose=2,
                    workers=6,
                    use_multiprocessing=True)
like image 633
Björn Avatar asked Aug 03 '18 12:08

Björn


2 Answers

In combination with a sequence, using multi_processing=False and workers=e.g. 4 does work.

I just realized that in the example code in the question, I was not seeing the speed-up, because the data was being generated too fast. By inserting a time.sleep(2) this becomes evident.

class DummySequence(Sequence):
def __init__(self, x_set, y_set, batch_size):
    self.x, self.y = x_set, y_set
    self.batch_size = batch_size
def __len__(self):
    return int(np.ceil(len(self.x) / float(self.batch_size)))
def __getitem__(self, idx):        
    batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
    batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]
    time.sleep(2)
    return np.array(batch_x), np.array(batch_y)

x = np.random.random((100, 3))
y = to_categorical(np.random.random(100) > .5).astype(int)

seq = DummySequence(x, y, 10)

model = Sequential()
model.add(Dense(32, input_dim=3))
model.add(Dense(2, activation='softmax'))
model.compile(optimizer='rmsprop',
              loss='categorical_crossentropy',
              metrics=['accuracy'])

print('single worker')
model.fit_generator(generator=seq, 
                    steps_per_epoch = 10,
                    epochs = 2, 
                    verbose=2,
                    workers=1)

print('achieves speed-up!')
model.fit_generator(generator=seq, 
                    steps_per_epoch = 10,
                    epochs = 2, 
                    verbose=2,
                    workers=4,
                    use_multiprocessing=False)

This produced on my laptop the following:

single worker
>>> model.fit_generator(generator=seq,
...                     steps_per_epoch = 10,
...                     epochs = 2,
...                     verbose=2,
...                     workers=1)
Epoch 1/2
 - 20s - loss: 0.6984 - acc: 0.5000
Epoch 2/2
 - 20s - loss: 0.6955 - acc: 0.5100

and

achieves speed-up!
>>> model.fit_generator(generator=seq,
...                     steps_per_epoch = 10,
...                     epochs = 2,
...                     verbose=2,
...                     workers=4,
...                     use_multiprocessing=False)
Epoch 1/2
 - 6s - loss: 0.6904 - acc: 0.5200
Epoch 2/2
 - 6s - loss: 0.6900 - acc: 0.5000

Important notes: You will probably want self.lock = threading.Lock() in __init___ and then with self.lock: in __getitem__. Try to do the absolute bare minimum required within the with self.lock:, as far as I understand it, that would be any reference to self.xxxx (multi-threading is prevent while the with self.lock: block is running).

Additionally, if you want multithreading to speed up calculations (i.e. CPU operations are the limit), do not expect any speed-up. The global-interpreter lock (GIL) will prevent that. Multithreading will only help you, if the limitation is in I/O operations. Apparently, to speed-up CPU computations we need true multiprocessing, which keras currently does not support on Windows 10. Perhaps it is possible to hand-craft a multi-processing generator (I have no idea).

like image 136
Björn Avatar answered Oct 21 '22 23:10

Björn


I tested your proposal at my solution with GPU / CPU monitoring.

  1. There is some speed increase ~10% (440 sec vs. 550 sec) in my case
  2. The CPU uses only 1 core at time. GPU load is not above 22%

Looks like one core runs more efficient way with more workers assigned. However no true multiprocessing is enabled.

TF 2.0

Keras 2.2.4

like image 25
Petr Ivanov Avatar answered Oct 21 '22 21:10

Petr Ivanov