I should start saying that I am completely new to any kind of parallelism/multithreading/multiprocessing programming.
Now, I have the chance to run my TensorFlow CNN on 32 cores (each with 2 hyperthreads). I've spent a lot of time trying to understand how should I modify (if I have to) my code in order to exploit all of that computational power. Unfortuantely, I didn't come to anything. I hoped that TF could do that automatically but when I launch my model and check with top
the CPU usage, I see most of the time a 100% CPU usage and a few 200% peaks.
If all the cores were used, I would expect to see a 100*64=6400% usage (correct?). How can I accomplish this?
Should I do something similar to what is explained here?
If that is the case, do I understand correctly that all the multithreading is only applied to calculations which involve Queue?
Is this really all that can be done to use all the computational power available (since it appears to me that queue are only used when reading and batching training samples)?
This is what my code looks like, if needed: (main.py)
# pylint: disable=missing-docstring from __future__ import absolute_import from __future__ import division from __future__ import print_function import time from six.moves import xrange # pylint: disable=redefined-builtin import tensorflow as tf from pylab import * import argparse import cnn import freader_2 training_feats_file = ["file_name"] training_lbls_file = ["file_name"] test_feats_file = 'file_name' test_lbls_file = 'file_name' learning_rate = 0.1 testset_size = 1000 batch_size = 1000 testset_size = 793 tot_samples = 810901 max_steps = 3300 def placeholder_inputs(batch_size): images_placeholder = tf.placeholder(tf.float32, shape=(testset_size, cnn.IMAGE_HEIGHT, cnn.IMAGE_WIDTH, 1)) labels_placeholder = tf.placeholder(tf.float32, shape=(testset_size, 15)) return images_placeholder, labels_placeholder def reader(images_file, lbls_file, images_pl, labels_pl, im_height, im_width): images = loadtxt(images_file) labels_feed = loadtxt(lbls_file) images_feed = reshape(images, [images.shape[0], im_height, im_width, 1]) feed_dict = { images_pl: images_feed, labels_pl: labels_feed, } return feed_dict tot_training_loss = [] tot_test_loss = [] tot_grad = [] print('Starting TensorFlow session...') with tf.Graph().as_default(): DS = freader_2.XICSDataSet() images, labels = DS.trainingset_files_reader(training_feats_file, training_lbls_file) keep_prob = tf.placeholder(tf.float32) logits = cnn.inference(images, batch_size, keep_prob) loss = cnn.loss(logits, labels) global_step = tf.Variable(0, trainable=False) train_op, grad_norm = cnn.training(loss, learning_rate, global_step) summary_op = tf.merge_all_summaries() test_images_pl, test_labels_pl = placeholder_inputs(testset_size) test_pred = cnn.inference(test_images_pl, testset_size, keep_prob, True) test_loss = cnn.loss(test_pred, test_labels_pl) saver = tf.train.Saver() sess = tf.Session() summary_writer = tf.train.SummaryWriter("CNN", sess.graph) init = tf.initialize_all_variables() sess.run(init) tf.train.start_queue_runners(sess=sess) test_feed = reader(test_feats_file, test_lbls_file, test_images_pl, test_labels_pl, DS.height, DS.width) test_feed[keep_prob] = 1. # Start the training loop. print('Starting training loop...') start_time = time.time() for step in xrange(max_steps): _, grad, loss_value= sess.run([train_op, grad_norm, loss], feed_dict = {keep_prob:0.5}) tot_training_loss.append(loss_value) tot_grad.append(grad) _, test_loss_val = sess.run([test_pred, test_loss], feed_dict=test_feed) tot_test_loss.append(test_loss_val) if step % 1 == 0: duration = time.time() - start_time print('Step %d (%.3f sec):\n training loss = %f\n test loss = %f ' % (step, duration, loss_value, test_loss_val)) print(' gradient = %f'%grad) # summary_str = sess.run(summary_op)#, feed_dict=feed_dict) # summary_writer.add_summary(summary_str, step) # summary_writer.flush() if (step+1) % 100 == 0: print('Saving checkpoint...') saver.save(sess, "chkpts/medias-res", global_step = global_step) if test_loss_val < 0.01:# or grad < 0.01: print("Stopping condition reached.") break print('Saving final network...') saver.save(sess, "chkpts/final.chkpt") print('Total training time: ' + str((time.time() - start_time)/3600) + ' h')
cnn.py:
from __future__ import absolute_import from __future__ import division from __future__ import print_function import math import tensorflow as tf NUM_OUTPUT = 15 IMAGE_WIDTH = 195 IMAGE_HEIGHT = 20 IMAGE_PIXELS = IMAGE_WIDTH * IMAGE_HEIGHT def inference(images, num_samples, keep_prob, reuse=None): with tf.variable_scope('conv1', reuse=reuse): kernel = tf.get_variable(name='weights', shape=[3, 30, 1, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False)) weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss') tf.add_to_collection('losses', weight_decay) conv = tf.nn.conv2d(images, kernel, [1, 1, 5, 1], padding='VALID') # output dim: 18x34 biases = tf.Variable(tf.constant(0.0, name='biases', shape=[5])) bias = tf.nn.bias_add(conv, biases) conv1 = tf.nn.relu(bias, name='conv1') pool1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool1') #output dim: 9x17 with tf.variable_scope('conv2', reuse=reuse): kernel = tf.get_variable(name='weights', shape=[2, 2, 5, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False)) weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss') tf.add_to_collection('losses', weight_decay) conv = tf.nn.conv2d(pool1, kernel, [1, 1, 1, 1], padding='VALID') #output dim: 8x16 biases = tf.Variable(tf.constant(0.1, name='biases', shape=[5])) bias = tf.nn.bias_add(conv, biases) conv2 = tf.nn.relu(bias, name='conv2') pool2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool2') #output dim: 4x8 h_fc1_drop = tf.nn.dropout(pool2, keep_prob) with tf.variable_scope('fully_connected', reuse=reuse): reshape = tf.reshape(h_fc1_drop, [num_samples, -1]) dim = reshape.get_shape()[1].value weights = tf.get_variable(name='weights', shape=[dim, 20], initializer=tf.contrib.layers.xavier_initializer(uniform=False)) weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss') tf.add_to_collection('losses', weight_decay) biases = tf.Variable(tf.zeros([20], name='biases')) fully_connected = tf.nn.relu(tf.matmul(reshape, weights) + biases, name='fully_connected') with tf.variable_scope('identity', reuse=reuse): weights = tf.get_variable(name='weights', shape=[20,NUM_OUTPUT], initializer=tf.contrib.layers.xavier_initializer(uniform=False)) weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss') tf.add_to_collection('losses', weight_decay) biases = tf.Variable(tf.zeros([NUM_OUTPUT], name='biases')) output = tf.matmul(fully_connected, weights) + biases return output def loss(outputs, labels): rmse = tf.sqrt(tf.reduce_mean(tf.square(tf.sub(labels, outputs))), name="rmse") loss_list = tf.get_collection('losses') loss_list.append(rmse) rmse_tot = tf.add_n(loss_list, name='total_loss') return rmse_tot def training(loss, starter_learning_rate, global_step): tf.scalar_summary(loss.op.name, loss) # optimizer = tf.train.AdamOptimizer() learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step, 200, 0.8, staircase=True) optimizer = tf.train.MomentumOptimizer(learning_rate, 0.8) grads_and_vars = optimizer.compute_gradients(loss) grad_norms = [tf.nn.l2_loss(g[0]) for g in grads_and_vars] grad_norm = tf.add_n(grad_norms) train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step) # train_op = optimizer.minimize(loss, global_step=global_step) return train_op, grad_norm
freader_2.py:
# -*- coding: utf-8 -*- from __future__ import absolute_import from __future__ import division from __future__ import print_function import os import collections import numpy as np from six.moves import xrange import tensorflow as tf class XICSDataSet: def __init__(self, height=20, width=195, batch_size=1000, noutput=15): self.depth = 1 self.height = height self.width = width self.batch_size = batch_size self.noutput = noutput def trainingset_files_reader(self, im_file_name, lb_file_name, nfiles=1): im_filename_queue = tf.train.string_input_producer(im_file_name, shuffle=False) lb_filename_queue = tf.train.string_input_producer(lb_file_name, shuffle=False) imreader = tf.TextLineReader() lbreader = tf.TextLineReader() imkey, imvalue = imreader.read(im_filename_queue) lbkey, lbvalue = lbreader.read(lb_filename_queue) im_record_defaults = [[.0]]*self.height*self.width lb_record_defaults = [[.0]]*self.noutput im_data_tuple = tf.decode_csv(imvalue, record_defaults=im_record_defaults, field_delim = ' ') lb_data_tuple = tf.decode_csv(lbvalue, record_defaults=lb_record_defaults, field_delim = ' ') features = tf.pack(im_data_tuple) label = tf.pack(lb_data_tuple) depth_major = tf.reshape(features, [self.height, self.width, self.depth]) min_after_dequeue = 10 capacity = min_after_dequeue + 3 * self.batch_size example_batch, label_batch = tf.train.shuffle_batch([depth_major, label], batch_size=self.batch_size, capacity=capacity, min_after_dequeue=min_after_dequeue) return example_batch, label_batch
TensorFlow has the ability to execute a given operator using multiple threads ("intra-operator parallelisation"), as well as different operators in parallel ("inter-operator parallelisation").
The TensorFlow Session object is multithreaded, so multiple threads can easily use the same session and run ops in parallel.
In short: yes, a thread can run on different cores. Not at the same time, of course - it's only one thread of execution — but it could execute on core C0 at time T0, and then on core C1 at time T1.
Key Takeaways. Python is NOT a single-threaded language. Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.
This is a comment, but I'm posting it as an answer because I don't have enough rep to post comments yet. Marco D.G.'s answer is correct, I just wanted to add the fun-fact that with tf.device('/cpu:0')
automatically tries to use all available cores. Happy flowing!
According to Tensorflow:
The two configurations listed below are used to optimize CPU performance by adjusting the thread pools.
intra_op_parallelism_threads
: Nodes that can use multiple threads to parallelize their execution will schedule the individual pieces into this pool.inter_op_parallelism_threads
: All ready nodes are scheduled in this pool.These configurations are set via the
tf.ConfigProto
and passed totf.Session
in theconfig
attribute as shown in the snippet below. For both configuration options, if they are unset or set to 0, will default to the number of logical CPU cores. Testing has shown that the default is effective for systems ranging from one CPU with 4 cores to multiple CPUs with 70+ combined logical cores. A common alternative optimization is to set the number of threads in both pools equal to the number of physical cores rather than logical coresconfig = tf.ConfigProto() config.intra_op_parallelism_threads = 44 config.inter_op_parallelism_threads = 44 tf.session(config=config)
In versions of TensorFlow before 1.2, It is recommended using multi-threaded, queue-based input pipelines for performance. Beginning with TensorFlow 1.4, however, It is recommended using the tf.data module instead.
Yes, in Linux, you can check your CPU usage with top
and press 1 to show the usage per CPU. note: The percentage depends on the Irix/Solaris mode.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With