I would like to use multiple processes (not threads
) to do some preprocessing and enqueue the results to a tf.RandomShuffleQueue which can be used by my main graph for training.
Is there a way to do that ?
I have converted my dataset into TFRecords split across 256 shards. I want to start 20 processes using multiprocessing
and let each process a range of shards. Each process should read images and then augment them and push them into a tf.RandomShuffleQueue
from which the input can be given to a graph for training.
Some people advised me to go through the inception
example in tensorflow
. However, it is a very different situation because there only reading of the data shards is done by multiple threads (not processes
), while the preprocessing (e.g - augmentation) takes place in the main thread.
(This aims to solve your actual problem)
In another topic, someone told you that Python has the global interpreter lock (GIL) and therefore there would be no speed benefits from multi-core, unless you used multiple processes.
This was probably what prompted your desire to use multiprocessing
.
However, with TF, Python is normally used only to construct the "graph". The actual execution happens in native code (or GPU), where GIL plays no role whatsoever.
In light of this, I recommend simply letting TF use multithreading. This can be controlled using the intra_op_parallelism_threads
argument, such as:
with tf.Session(graph=graph,
config=tf.ConfigProto(allow_soft_placement=True,
intra_op_parallelism_threads=20)) as sess:
# ...
(Side note: if you have, say, a 2-CPU, 32-core system, the best argument may very well be intra_op_parallelism_threads=16
, depending on a lot of factors)
Comment: The pickling of TFRecords is not that important. I can pass a list of lists containing names of ranges of sharded TFRecord files.
Therebe I have to restart Decision process!
Comment: I can pass it to a Pool.map() as an argument.
Verify, if a multiprocesing.Queue()
can handle this.
Results of Tensor functions are a Tensor object
.
Try the following:
tensor_object = func(TFRecord)
q = multiprocessing.Manager().Queue()
q.put(tensor_object)
data = q.get()
print(data)
Comment: how do I make sure that all the processes enqueue to the same queue ?
This is simple done enqueue
the results from Pool.map(...
after all process
finished.
Alternate we can enqueue
parallel, queueing
data from all processes
.
But doing so, depends on pickleabel data as described above.
For instance:
import multiprocessing as mp
def func(filename):
TFRecord = read(filename)
tensor_obj = tf.func(TFRecord)
return tensor_obj
def main_Tensor(tensor_objs):
tf = # ... instantiat Tensor Session
rsq = tf.RandomShuffleQueue(...)
for t in tensor_objs:
rsq.enqueue(t)
if __name__ == '__main__':
sharded_TFRecords = ['file1', 'file2']
with mp.Pool(20) as pool:
tensor_objs = pool.map(func, sharded_TFRecords)
pool.join()
main_Tensor(tensor_objs)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With