Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How-to run TensorFlow on multiple core and threads

Tags:

I should start saying that I am completely new to any kind of parallelism/multithreading/multiprocessing programming.

Now, I have the chance to run my TensorFlow CNN on 32 cores (each with 2 hyperthreads). I've spent a lot of time trying to understand how should I modify (if I have to) my code in order to exploit all of that computational power. Unfortuantely, I didn't come to anything. I hoped that TF could do that automatically but when I launch my model and check with top the CPU usage, I see most of the time a 100% CPU usage and a few 200% peaks.

If all the cores were used, I would expect to see a 100*64=6400% usage (correct?). How can I accomplish this?

Should I do something similar to what is explained here?

If that is the case, do I understand correctly that all the multithreading is only applied to calculations which involve Queue?

Is this really all that can be done to use all the computational power available (since it appears to me that queue are only used when reading and batching training samples)?

This is what my code looks like, if needed: (main.py)

# pylint: disable=missing-docstring from __future__ import absolute_import from __future__ import division from __future__ import print_function  import time  from six.moves import xrange  # pylint: disable=redefined-builtin import tensorflow as tf from pylab import *  import argparse import cnn import freader_2  training_feats_file = ["file_name"] training_lbls_file = ["file_name"] test_feats_file = 'file_name' test_lbls_file = 'file_name' learning_rate = 0.1 testset_size = 1000 batch_size = 1000 testset_size = 793 tot_samples = 810901 max_steps = 3300  def placeholder_inputs(batch_size):      images_placeholder = tf.placeholder(tf.float32, shape=(testset_size, cnn.IMAGE_HEIGHT, cnn.IMAGE_WIDTH, 1))     labels_placeholder = tf.placeholder(tf.float32, shape=(testset_size, 15))     return images_placeholder, labels_placeholder  def reader(images_file, lbls_file, images_pl, labels_pl, im_height, im_width):      images = loadtxt(images_file)     labels_feed = loadtxt(lbls_file)     images_feed = reshape(images, [images.shape[0], im_height, im_width, 1])      feed_dict = {         images_pl: images_feed,         labels_pl: labels_feed,     }      return feed_dict  tot_training_loss = [] tot_test_loss = [] tot_grad = []  print('Starting TensorFlow session...') with tf.Graph().as_default():      DS = freader_2.XICSDataSet()     images, labels = DS.trainingset_files_reader(training_feats_file, training_lbls_file)     keep_prob = tf.placeholder(tf.float32)      logits = cnn.inference(images, batch_size, keep_prob)     loss = cnn.loss(logits, labels)     global_step = tf.Variable(0, trainable=False)     train_op, grad_norm = cnn.training(loss, learning_rate, global_step)     summary_op = tf.merge_all_summaries()         test_images_pl, test_labels_pl = placeholder_inputs(testset_size)     test_pred = cnn.inference(test_images_pl, testset_size, keep_prob, True)     test_loss = cnn.loss(test_pred, test_labels_pl)      saver = tf.train.Saver()     sess = tf.Session()     summary_writer = tf.train.SummaryWriter("CNN", sess.graph)      init = tf.initialize_all_variables()     sess.run(init)     tf.train.start_queue_runners(sess=sess)     test_feed = reader(test_feats_file, test_lbls_file, test_images_pl, test_labels_pl, DS.height, DS.width)     test_feed[keep_prob] = 1.          # Start the training loop.     print('Starting training loop...')     start_time = time.time()     for step in xrange(max_steps):          _, grad, loss_value= sess.run([train_op, grad_norm, loss], feed_dict = {keep_prob:0.5})           tot_training_loss.append(loss_value)         tot_grad.append(grad)          _, test_loss_val = sess.run([test_pred, test_loss], feed_dict=test_feed)         tot_test_loss.append(test_loss_val)          if step % 1 == 0:                     duration = time.time() - start_time             print('Step %d (%.3f sec):\n training loss = %f\n test loss = %f ' % (step, duration, loss_value, test_loss_val))             print(' gradient = %f'%grad) #            summary_str = sess.run(summary_op)#, feed_dict=feed_dict) #            summary_writer.add_summary(summary_str, step) #            summary_writer.flush()          if (step+1) % 100 == 0:             print('Saving checkpoint...')             saver.save(sess, "chkpts/medias-res", global_step = global_step)          if test_loss_val < 0.01:# or grad < 0.01:             print("Stopping condition reached.")             break      print('Saving final network...')     saver.save(sess, "chkpts/final.chkpt")     print('Total training time: ' + str((time.time() - start_time)/3600) + ' h') 

cnn.py:

from __future__ import absolute_import from __future__ import division from __future__ import print_function  import math  import tensorflow as tf  NUM_OUTPUT = 15  IMAGE_WIDTH = 195 IMAGE_HEIGHT = 20 IMAGE_PIXELS = IMAGE_WIDTH * IMAGE_HEIGHT  def inference(images, num_samples, keep_prob, reuse=None):      with tf.variable_scope('conv1', reuse=reuse):         kernel = tf.get_variable(name='weights', shape=[3, 30, 1, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))                 weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')         tf.add_to_collection('losses', weight_decay)         conv = tf.nn.conv2d(images, kernel, [1, 1, 5, 1], padding='VALID')         # output dim: 18x34         biases = tf.Variable(tf.constant(0.0, name='biases', shape=[5]))         bias = tf.nn.bias_add(conv, biases)         conv1 = tf.nn.relu(bias, name='conv1')      pool1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool1')         #output dim: 9x17      with tf.variable_scope('conv2', reuse=reuse):         kernel = tf.get_variable(name='weights', shape=[2, 2, 5, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))         weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')         tf.add_to_collection('losses', weight_decay)         conv = tf.nn.conv2d(pool1, kernel, [1, 1, 1, 1], padding='VALID')         #output dim: 8x16         biases = tf.Variable(tf.constant(0.1, name='biases', shape=[5]))         bias = tf.nn.bias_add(conv, biases)         conv2 = tf.nn.relu(bias, name='conv2')       pool2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool2')     #output dim: 4x8      h_fc1_drop = tf.nn.dropout(pool2, keep_prob)      with tf.variable_scope('fully_connected', reuse=reuse):         reshape = tf.reshape(h_fc1_drop, [num_samples, -1])         dim = reshape.get_shape()[1].value         weights = tf.get_variable(name='weights', shape=[dim, 20], initializer=tf.contrib.layers.xavier_initializer(uniform=False))         weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')         tf.add_to_collection('losses', weight_decay)         biases = tf.Variable(tf.zeros([20], name='biases'))         fully_connected = tf.nn.relu(tf.matmul(reshape, weights) + biases, name='fully_connected')      with tf.variable_scope('identity', reuse=reuse):         weights = tf.get_variable(name='weights', shape=[20,NUM_OUTPUT], initializer=tf.contrib.layers.xavier_initializer(uniform=False))         weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')         tf.add_to_collection('losses', weight_decay)         biases = tf.Variable(tf.zeros([NUM_OUTPUT], name='biases'))         output = tf.matmul(fully_connected, weights) + biases      return output   def loss(outputs, labels):      rmse = tf.sqrt(tf.reduce_mean(tf.square(tf.sub(labels, outputs))), name="rmse")     loss_list = tf.get_collection('losses')     loss_list.append(rmse)     rmse_tot = tf.add_n(loss_list, name='total_loss')       return rmse_tot   def training(loss, starter_learning_rate, global_step):        tf.scalar_summary(loss.op.name, loss) #      optimizer = tf.train.AdamOptimizer()       learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step, 200, 0.8, staircase=True)       optimizer = tf.train.MomentumOptimizer(learning_rate, 0.8)       grads_and_vars = optimizer.compute_gradients(loss)       grad_norms = [tf.nn.l2_loss(g[0]) for g in grads_and_vars]             grad_norm = tf.add_n(grad_norms)       train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step) #      train_op = optimizer.minimize(loss, global_step=global_step)       return train_op, grad_norm 

freader_2.py:

# -*- coding: utf-8 -*-  from __future__ import absolute_import from __future__ import division from __future__ import print_function  import os  import collections import numpy as np  from six.moves import xrange   import tensorflow as tf  class XICSDataSet:         def __init__(self, height=20, width=195, batch_size=1000, noutput=15):         self.depth = 1         self.height = height         self.width = width         self.batch_size = batch_size         self.noutput = noutput      def trainingset_files_reader(self, im_file_name, lb_file_name, nfiles=1):          im_filename_queue = tf.train.string_input_producer(im_file_name, shuffle=False)         lb_filename_queue = tf.train.string_input_producer(lb_file_name, shuffle=False)          imreader = tf.TextLineReader()         lbreader = tf.TextLineReader()         imkey, imvalue = imreader.read(im_filename_queue)         lbkey, lbvalue = lbreader.read(lb_filename_queue)         im_record_defaults = [[.0]]*self.height*self.width         lb_record_defaults = [[.0]]*self.noutput         im_data_tuple = tf.decode_csv(imvalue, record_defaults=im_record_defaults, field_delim = ' ')         lb_data_tuple = tf.decode_csv(lbvalue, record_defaults=lb_record_defaults, field_delim = ' ')         features = tf.pack(im_data_tuple)         label = tf.pack(lb_data_tuple)          depth_major = tf.reshape(features, [self.height, self.width, self.depth])          min_after_dequeue = 10         capacity = min_after_dequeue + 3 * self.batch_size         example_batch, label_batch = tf.train.shuffle_batch([depth_major, label], batch_size=self.batch_size, capacity=capacity,                                                             min_after_dequeue=min_after_dequeue)          return example_batch, label_batch 
like image 517
bored_to_death Avatar asked Sep 09 '16 15:09

bored_to_death


People also ask

Can TensorFlow use multiple cores?

TensorFlow has the ability to execute a given operator using multiple threads ("intra-operator parallelisation"), as well as different operators in parallel ("inter-operator parallelisation").

Does TensorFlow use multiple threads?

The TensorFlow Session object is multithreaded, so multiple threads can easily use the same session and run ops in parallel.

Can user threads run on different cores?

In short: yes, a thread can run on different cores. Not at the same time, of course - it's only one thread of execution — but it could execute on core C0 at time T0, and then on core C1 at time T1.

Can Python threads run on multiple cores?

Key Takeaways. Python is NOT a single-threaded language. Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.


2 Answers

This is a comment, but I'm posting it as an answer because I don't have enough rep to post comments yet. Marco D.G.'s answer is correct, I just wanted to add the fun-fact that with tf.device('/cpu:0') automatically tries to use all available cores. Happy flowing!

like image 200
TheLoneDeranger Avatar answered Oct 15 '22 04:10

TheLoneDeranger


According to Tensorflow:

The two configurations listed below are used to optimize CPU performance by adjusting the thread pools.

  • intra_op_parallelism_threads: Nodes that can use multiple threads to parallelize their execution will schedule the individual pieces into this pool.
  • inter_op_parallelism_threads: All ready nodes are scheduled in this pool.

These configurations are set via the tf.ConfigProto and passed to tf.Session in the config attribute as shown in the snippet below. For both configuration options, if they are unset or set to 0, will default to the number of logical CPU cores. Testing has shown that the default is effective for systems ranging from one CPU with 4 cores to multiple CPUs with 70+ combined logical cores. A common alternative optimization is to set the number of threads in both pools equal to the number of physical cores rather than logical cores

config = tf.ConfigProto() config.intra_op_parallelism_threads = 44 config.inter_op_parallelism_threads = 44 tf.session(config=config) 

In versions of TensorFlow before 1.2, It is recommended using multi-threaded, queue-based input pipelines for performance. Beginning with TensorFlow 1.4, however, It is recommended using the tf.data module instead.


Yes, in Linux, you can check your CPU usage with top and press 1 to show the usage per CPU. note: The percentage depends on the Irix/Solaris mode.

like image 28
Marco D.G. Avatar answered Oct 15 '22 05:10

Marco D.G.