Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is the class generator (inheriting Sequence) thread safe in Keras/Tensorflow?

For making the training of a model faster, it seems to be a good practice to populate/generate batches on CPU and run the training of the model on GPU in parallel. For this purpose, a generator class can be written in Python that inherits the Sequence class.

Here is the link to the documentation: https://www.tensorflow.org/api_docs/python/tf/keras/utils/Sequence

The important thing that the document states is:

Sequence are a safer way to do multiprocessing. This structure guarantees that the network will only train once on each sample per epoch which is not the case with generators.

And it gives a simple code example as following:

from skimage.io import imread
from skimage.transform import resize
import numpy as np
import math

# Here, `x_set` is list of path to the images
# and `y_set` are the associated classes.

class CIFAR10Sequence(Sequence):

    def __init__(self, x_set, y_set, batch_size):
        self.x, self.y = x_set, y_set
        self.batch_size = batch_size

    def __len__(self):
        return math.ceil(len(self.x) / self.batch_size)

    def __getitem__(self, idx):
        batch_x = self.x[idx * self.batch_size:(idx + 1) *
        self.batch_size]
        batch_y = self.y[idx * self.batch_size:(idx + 1) *
        self.batch_size]

        return np.array([
            resize(imread(file_name), (200, 200))
               for file_name in batch_x]), np.array(batch_y)

What - to my understanding - ideally needs to be done in the model is to create an instance of this generator class and give it to the fit_generator(...) function.

gen = CIFAR10Sequence(x_set, y_set, batch_size)
# Train the model
model.fit_generator(generator=gen,
                    use_multiprocessing=True,
                    workers=6)

Here is a quote from Keras documentation:

The use of keras.utils.Sequence guarantees the ordering and guarantees the single use of every input per epoch when using use_multiprocessing=True.

In this shape, I assume that this setup is thread safe. Question 1) Is my assumption correct?

One confusing thing though is that the parameter use_multiprocessing may not be set to True on Windows 10. Keras does not allow it; seemingly it only can be set to True on Linux. (I don't know how it is in other platforms.) But the workers parameter can still be set to a value that is greater than 0.

Let's have a look at the definition of these 2 parameters:

workers: Integer. Maximum number of processes to spin up when using process-based threading. If unspecified, workers will default to 1. If 0, will execute the generator on the main thread.

use_multiprocessing: Boolean. If True, use process-based threading. If unspecified, use_multiprocessing will default to False. Note that because this implementation relies on multiprocessing, you should not pass non-picklable arguments to the generator as they can't be passed easily to children processes.

So, by using the workers parameter, it seems to be possible to create multiple processes to speed up the training independent from whether use_multiprocessing is True or not.

If one wants to use the generator class inheriting Sequence (on Windows 10), s/he has to set the use_multiprocessing to False as following:

gen = CIFAR10Sequence(x_set, y_set, batch_size)
# Train the model
model.fit_generator(generator=gen,
                    use_multiprocessing=False,  # CHANGED
                    workers=6)

And there are still multiple processes running here because workers = 6.

Question 2) Is this setup still thread safe or is the thread safe characteristic lost now after setting the use_multiprocessing parameter to False? I cannot make it clear based on documentation.

Question 3) Still related to this topic... When training is done in this way where data is generated by the CPU and training on GPU, if the model that is being trained is shallow, the GPU utilization ends up being very low and CPU utilization becomes significantly higher because the GPU keeps waiting for data that will come from CPU. In such cases, is there a way to utilize some GPU resources as well for data generation?

like image 633
edn Avatar asked Oct 22 '18 15:10

edn


2 Answers

Among those who have seen this post, no one seems to have the ultimate answer so that I wanted to give my answer that worked out for me. Because of lack of documentation in the domain, my answer might be missing some relevant details. Please feel free to add more information that I do not mention down here.

Seemingly, writing a generator class in Python that inherits the Sequence class is just not supported in Windows. (You can seemingly make it work on Linux.) To be able to make it work, you need to set the parameter use_multiprocessing=True (with the class approach). But it is not working on Windows as mentioned so that you have to set use_multiprocessing to False (on Windows). Nevertheless, that does not mean that multiprocessing does not work on Windows. Even if you set use_multiprocessing=False, multiprocessing can still be supported when the code is run with the following setup where you just set the workers parameter to any value that is bigger than 1.

Example:

history = \
   merged_model.fit_generator(generator=train_generator,
                              steps_per_epoch=trainset_steps_per_epoch,
                              epochs=300,
                              verbose=1,
                              use_multiprocessing=False,
                              workers=3,
                              max_queue_size=4)

At this point, let's remember the Keras documentation again:

The use of keras.utils.Sequence guarantees the ordering and guarantees the single use of every input per epoch when using use_multiprocessing=True.

To my understanding, if use_multiprocessing=False, then the generator is not thread safe anymore, which makes it difficult to write a generator class that inherits Sequence.

To come around this problem, I have written a generator myself which I have made thread safe manually. Here is an example pseudocode:

import tensorflow as tf
import threading

class threadsafe_iter:
    """Takes an iterator/generator and makes it thread-safe by
    serializing call to the `next` method of given iterator/generator.
    """
    def __init__(self, it):
        self.it = it
        self.lock = threading.Lock()

    def __iter__(self):
        return self

    def __next__(self): # Py3
        return next(self.it)

    #def next(self):     # Python2 only
    #    with self.lock:
    #        return self.it.next()

def threadsafe_generator(f):
    """A decorator that takes a generator function and makes it thread-safe.
    """
    def g(*a, **kw):
        return threadsafe_iter(f(*a, **kw))
    return g


@threadsafe_generator
def generate_data(tfrecord_file_path_list, ...):

    dataset = tf.data.TFRecordDataset(tfrecord_file_path_list)

    # example proto decode
    def _parse_function(example_proto):
      ...
      return batch_data

    # Parse the record into tensors.
    dataset = dataset.map(_parse_function)  

    dataset = dataset.shuffle(buffer_size=100000)

    # Repeat the input indefinitly
    dataset = dataset.repeat()  

    # Generate batches
    dataset = dataset.batch(batch_size)

    # Create an initializable iterator
    iterator = dataset.make_initializable_iterator()

    # Get batch data
    batch_data = iterator.get_next()

    iterator_init_op = iterator.make_initializer(dataset)

    with tf.Session() as sess:

        sess.run(iterator_init_op)

        while True:            
            try:
                batch_data = sess.run(batch_data)
            except tf.errors.OutOfRangeError:
                break
            yield batch_data

Well, it can be discussed if it is really elegant to do it in this way but it seems to be working pretty well.

To summarize:

  • If writing your program on Windows, set use_multiprocessing to False.
  • (As of today, to my knowledge) it is not supported to write a generator class that inherits Sequence when writing code on Windows. (It is a Tensorflow/Keras problem I guess).
  • To come around the problem, write an ordinary generator, make your generator thread safe, and set workers to a number that is greater than 1.

Important note: In this setup, the generator is being run on CPU and the training is being done on GPU. One problem I could observe is that if the model you are training is shallow enough, the utilization of GPU remains very low while CPU utilization gets high. If the model is shallow and the dataset is small enough, it can be a good option to store all the data in the memory and run everything on GPU. It should speed up the training significantly. If, for any reason, you would like to use CPU and GPU simultaneously, my modest recommendation is to try to use Tensorflow's tf.data API which significantly speeds up the data preprocessing and batch preparation. If the generator is only written in Python, GPU keeps waiting for data to continue with the training. One can say everything about the Tensorflow/Keras documentation, but it is really efficient code!

Anyone having more complete knowledge on the API and seeing this post, please feel free to correct me here in case I misunerstand anything or the API is updated to solve the problems even on Windows.

like image 187
edn Avatar answered Oct 11 '22 12:10

edn


I have a proposed "improved" solution that may interest others. Please note this is coming from my experience with Tensorflow 1.15 (I have yet to use version 2).

TL;DR

Install wsl version 2 on Windows, install Tensorflow in a Linux environment (e.g. Ubuntu) here, and then set use_multiprocessing to True to get this to work.

NOTE: The Windows Subshell for Linux (WSL) version 2 is only available in Windows 10, Version 1903, Build 18362 or higher. Be sure to upgrade your Windows version in Windows Update to get this to work.

See Install Tensorflow-GPU on WSL2

Long Answer

For multitasking and multithreading (i.e. parallelism and concurrency), there are two operations we must consider:

  • forking = a parent process creates a copy of itself (a child) that has an exact copy of all the memory segments it uses
  • spawning = a parent process creates an entirely new child process that does not share its memory and the parent process must wait for the child process to finish before continuing

Linux supports forking, but Windows does not. Windows only supports spawning.

The reason Windows hangs when using use_multiprocessing=True is because the Python threading module uses spawn for Windows. Hence, the parent process waits forever for the child to finish because the parent cannot transfer its memory to the child, so the child doesn't know what to do.

Answer 2: It is not threadsafe. On Windows, if you've ever attempted to use a data generator or sequence, you've probably seen an error like this

ValueError: Using a generator with use_multiprocessing=True is not supported on Windows 
(no marshalling of generators across process boundaries). Instead, use single 
thread/process or multithreading.

marshalling means "transforming the memory representation of an object into a data format that is suitable for transmission." The error is saying that unlike Linux, which uses fork, use_multiprocessing=True doesn't work on Windows because it uses spawn` and cannot transfer its data to the child thread.

At this point, you may be asking yourself:

"Wait...What about the Python Global Interpreter Lock (GIL)?..If Python only allows one thread to run at a time, why does it even have the threading module and why do we care about this in Tensorflow??!"

The answer lies in the difference between CPU-bound tasks and I/O-bound tasks:

  • CPU-bound tasks = those that are waiting for data to be crunched
  • I/O-bound tasks = those that are waiting for input or output from other processes (i.e. data transferring)

In programming, when we say two tasks are concurrent, we mean they can start, run, and complete in overlapping time. When we say they are parallel, we mean they are literally running at the same time.

So, the GIL prevents threads from running in parallel, but not concurrently. The reason this is important for Tensorflow is because concurrency is all about I/O operations (data transfer). A good dataflow pipeline in Tensorflow should try to be concurrent so that there's no lag time when data is being transferred to-and-from the CPU, GPU, and/or RAM and training finishes faster. (Rather than have a thread sit and wait until it gets data back from somewhere else, we can have it executing image preprocessing or something else until the data gets back.)


IMPORTANT ASIDE: The GIL was made in Python because everything in Python is an object. (This is why you can do "weird" things with "dunder/magic" methods, like (5).__add__(3) to get 8 NOTE: In the above, parentheses are needed around 5 since 5. is a float, so we need to take advantage of order of operations by using parentheses. Python handles memory and garbage collection by counting all references made to individual objects. When the count goes to 0, Python deletes the object. If two threads tried to access the same object simultaneously, or if one thread finishes faster than another, you can get a race condition and objects would be deleted "randomly". We could put a lock on each thread, but then we would be unable to prevent deadlocks. Losing parallel thread execution was seen by Guido (and by myself, though it is certainly arguable) as a minor loss because we still maintained I/O concurrent operations, and tasks could still be run in parallel by running them on different cpu cores (i.e. multiprocessing). Hence, this is (one reason) why Python has both the threading and multiprocessing modules.

Now back to threadsafe. When running concurrent/parallel tasks, you have to watch out for additional things. Two big ones are:

  1. race conditions - operations don't take exactly the same time to compute each time a program as run (why with timeit we average over a number of runs). Because threads will finish at different times depending on the run, you get different results with each run.

  2. deadlock - if two threads try to access the same memory at the same time, you'll get an error. To prevent this, we add a lock or mutex (mutual exclusion) to threads to prevent other threads from accessing the same memory while it is running. However, if two threads need to access the same memory, are locked, and each thread depends on the other one finishing in order to execute, the program hangs.

I bring this up because Tensorflow needs to be able to pickle Python objects to make code run faster. (pickling is turning objects and data into byte code, much in the same way that an entire program's source code is converted into an exe on Windows). The Tensorflow Iterator.__init__() method locks threads and contains a threading.Lock()

def __init__(self, n, batch_size, shuffle, seed):
    ...
    self.lock = threading.Lock()
    ...

The problem is Python cannot pickle threading lock objects on Windows (i.e. Windows cannot marshall thread locks to child threads).

If you were to try to use a generator and pass it to fit_generator, you will get the error (see GitHub Issue #10842

TypeError: can't pickle _thread.lock objects

So, while use_multiprocessing=True is threadsafe on Linux, it is not on Windows.

Solution: Around June 2020, Microsoft came out with version 2 of the Windows Subshell for Linux (wsl). This was significant because it enabled GPU hardware acceleration. Version 1 was "simply" a driver between Windows NT and Linux, whereas wsl is now actually a kernel. Thus, you can now install Linux on Windows, open a bash shell from the command prompt, and (most importantly) access hardware. Thus, it is now possible to install tensorflow-gpu on wsl. In addition, you'll now be able to use fork.

**Thus, I recommend

  1. Installing wsl version 2 on Windows and add your desired Linux environment
  2. Install tensorflow-gpu in a virtual environment in wsl Linux environment here
  3. Retry use_multiprocessing=True to see if it works.**

CAVEAT: I haven't tested this yet to verify that it works, but to the best of my limited knowledge, I believe it should.

After this, answering Question 3 should be a simple matter of tuning the amount of concurrency with the amount of parallelism, and I recommend the TensorflowDev 2018 Summit video Training Performance: A user’s guide to converge faster to see how to do that.

like image 1
A. Hendry Avatar answered Oct 11 '22 14:10

A. Hendry