Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Enqueuing a tf.RandomShuffleQueue from multiple processes using multiprocessing

I would like to use multiple processes (not threads) to do some preprocessing and enqueue the results to a tf.RandomShuffleQueue which can be used by my main graph for training.

Is there a way to do that ?

My actual problem

I have converted my dataset into TFRecords split across 256 shards. I want to start 20 processes using multiprocessing and let each process a range of shards. Each process should read images and then augment them and push them into a tf.RandomShuffleQueue from which the input can be given to a graph for training.

Some people advised me to go through the inception example in tensorflow. However, it is a very different situation because there only reading of the data shards is done by multiple threads (not processes), while the preprocessing (e.g - augmentation) takes place in the main thread.

like image 351
Ujjwal Avatar asked Oct 29 '22 07:10

Ujjwal


2 Answers

(This aims to solve your actual problem)

In another topic, someone told you that Python has the global interpreter lock (GIL) and therefore there would be no speed benefits from multi-core, unless you used multiple processes.

This was probably what prompted your desire to use multiprocessing.

However, with TF, Python is normally used only to construct the "graph". The actual execution happens in native code (or GPU), where GIL plays no role whatsoever.

In light of this, I recommend simply letting TF use multithreading. This can be controlled using the intra_op_parallelism_threads argument, such as:

with tf.Session(graph=graph, 
    config=tf.ConfigProto(allow_soft_placement=True, 
    intra_op_parallelism_threads=20)) as sess:
    # ...

(Side note: if you have, say, a 2-CPU, 32-core system, the best argument may very well be intra_op_parallelism_threads=16, depending on a lot of factors)

like image 60
MWB Avatar answered Nov 15 '22 07:11

MWB


Comment: The pickling of TFRecords is not that important. I can pass a list of lists containing names of ranges of sharded TFRecord files.

Therebe I have to restart Decision process!

Comment: I can pass it to a Pool.map() as an argument.

Verify, if a multiprocesing.Queue() can handle this.
Results of Tensor functions are a Tensor object.
Try the following:

tensor_object = func(TFRecord)
q = multiprocessing.Manager().Queue()
q.put(tensor_object)
data = q.get()
print(data)

Comment: how do I make sure that all the processes enqueue to the same queue ?

This is simple done enqueue the results from Pool.map(... after all process finished.
Alternate we can enqueue parallel, queueing data from all processes.

But doing so, depends on pickleabel data as described above.


For instance:

import multiprocessing as mp
def func(filename):
    TFRecord = read(filename)
    tensor_obj = tf.func(TFRecord)
    return tensor_obj

def main_Tensor(tensor_objs):
    tf = # ... instantiat Tensor Session
    rsq = tf.RandomShuffleQueue(...)
    for t in tensor_objs:
        rsq.enqueue(t)

if __name__ == '__main__':
    sharded_TFRecords = ['file1', 'file2']
    with mp.Pool(20) as pool:
        tensor_objs = pool.map(func, sharded_TFRecords)
        pool.join()

    main_Tensor(tensor_objs)
like image 26
stovfl Avatar answered Nov 15 '22 05:11

stovfl