Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Distributed Tensorflow: good example for synchronous training on CPUs

I am new to distributed tensorflow and am looking for a good example to perform synchronous training on CPUs.

I have already tried the Distributed Tensorflow Example and it can perform the asynchronous training successfully over 1 parameter server (1 machine with 1 CPU) and 3 workers (each worker = 1 machine with 1 CPU). However, when it comes to the synchronous training, I am not able to run it correctly, although I have followed the tutorial of SyncReplicasOptimizer(V1.0 and V2.0).

I have inserted the official SyncReplicasOptimizer code into the working asynchronous training example but the training process is still asynchronous. My detailed code is as follows. Any code relates to synchronous training is within the block of ******.

import tensorflow as tf
import sys
import time

# cluster specification ----------------------------------------------------------------------
parameter_servers = ["xx1.edu:2222"]
workers = ["xx2.edu:2222", "xx3.edu:2222", "xx4.edu:2222"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})

# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS

# start a server for a specific task
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)

# Parameters  ----------------------------------------------------------------------
N = 3 # number of replicas
learning_rate = 0.001
training_epochs = int(21/N)
batch_size = 100

# Network Parameters
n_input = 784 # MNIST data input (img shape: 28*28)
n_hidden_1 = 256 # 1st layer number of features
n_hidden_2 = 256 # 2nd layer number of features
n_classes = 10 # MNIST total classes (0-9 digits)

if FLAGS.job_name == "ps":
    server.join()
    print("--- Parameter Server Ready ---")
elif FLAGS.job_name == "worker":
    # Import MNIST data
    from tensorflow.examples.tutorials.mnist import input_data
    mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)
    # Between-graph replication
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
        # count the number of updates
        global_step = tf.get_variable('global_step', [], 
                                      initializer = tf.constant_initializer(0), 
                                      trainable = False,
                                      dtype = tf.int32)
        # tf Graph input
        x = tf.placeholder("float", [None, n_input])
        y = tf.placeholder("float", [None, n_classes])

        # Create model
        def multilayer_perceptron(x, weights, biases):
            # Hidden layer with RELU activation
            layer_1 = tf.add(tf.matmul(x, weights['h1']), biases['b1'])
            layer_1 = tf.nn.relu(layer_1)
            # Hidden layer with RELU activation
            layer_2 = tf.add(tf.matmul(layer_1, weights['h2']), biases['b2'])
            layer_2 = tf.nn.relu(layer_2)
            # Output layer with linear activation
            out_layer = tf.matmul(layer_2, weights['out']) + biases['out']
            return out_layer

        # Store layers weight & bias
        weights = {
            'h1': tf.Variable(tf.random_normal([n_input, n_hidden_1])),
            'h2': tf.Variable(tf.random_normal([n_hidden_1, n_hidden_2])),
            'out': tf.Variable(tf.random_normal([n_hidden_2, n_classes]))
        }
        biases = {
            'b1': tf.Variable(tf.random_normal([n_hidden_1])),
            'b2': tf.Variable(tf.random_normal([n_hidden_2])),
            'out': tf.Variable(tf.random_normal([n_classes]))
        }

        # Construct model
        pred = multilayer_perceptron(x, weights, biases)

        # Define loss and optimizer
        cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y))

        # ************************* SyncReplicasOpt Version 1.0 *****************************************************
        ''' This optimizer collects gradients from all replicas, "summing" them, 
        then applying them to the variables in one shot, after which replicas can fetch the new variables and continue. '''
        # Create any optimizer to update the variables, say a simple SGD
        opt = tf.train.AdamOptimizer(learning_rate=learning_rate)

        # Wrap the optimizer with sync_replicas_optimizer with N replicas: at each step the optimizer collects N gradients before applying to variables.
        opt = tf.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=N,
                                        replica_id=FLAGS.task_index, total_num_replicas=N)

        # Now you can call `minimize()` or `compute_gradients()` and `apply_gradients()` normally
        train = opt.minimize(cost, global_step=global_step)

        # You can now call get_init_tokens_op() and get_chief_queue_runner().
        # Note that get_init_tokens_op() must be called before creating session
        # because it modifies the graph.
        init_token_op = opt.get_init_tokens_op()
        chief_queue_runner = opt.get_chief_queue_runner()
        # **************************************************************************************

        # Test model
        correct = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1))
        accuracy = tf.reduce_mean(tf.cast(correct, "float"))

        # Initializing the variables
        init_op = tf.initialize_all_variables()
        print("---Variables initialized---")

    # **************************************************************************************
    is_chief = (FLAGS.task_index == 0)
    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=is_chief,
                             logdir="/tmp/train_logs",
                             init_op=init_op,
                             global_step=global_step,
                             save_model_secs=600)
    # **************************************************************************************

    with sv.prepare_or_wait_for_session(server.target) as sess:
        # **************************************************************************************        
        # After the session is created by the Supervisor and before the main while loop:
        if is_chief:
            sv.start_queue_runners(sess, [chief_queue_runner])
            # Insert initial tokens to the queue.
            sess.run(init_token_op)
        # **************************************************************************************
        # Statistics
        net_train_t = 0
        # Training
        for epoch in range(training_epochs):
            total_batch = int(mnist.train.num_examples/batch_size)
            # Loop over all batches
            for i in range(total_batch):
                batch_x, batch_y = mnist.train.next_batch(batch_size)
                # ======== net training time ========
                begin_t = time.time()
                sess.run(train, feed_dict={x: batch_x, y: batch_y})
                end_t = time.time()
                net_train_t += (end_t - begin_t)
                # ===================================
            # Calculate training accuracy
            # acc = sess.run(accuracy, feed_dict={x: mnist.train.images, y: mnist.train.labels})
            # print("Epoch:", '%04d' % (epoch+1), " Train Accuracy =", acc)
            print("Epoch:", '%04d' % (epoch+1))
        print("Training Finished!")
        print("Net Training Time: ", net_train_t, "second")
        # Testing
        print("Testing Accuracy = ", accuracy.eval({x: mnist.test.images, y: mnist.test.labels}))

    sv.stop()
    print("done")

Anything wrong with my code? Or can I have a good example to follow?

like image 519
leonardo_zz Avatar asked Dec 23 '16 00:12

leonardo_zz


People also ask

What is the advantage of using distributed training in TensorFlow?

Advantages. It can train large models with millions and billions of parameters like: GPT-3, GPT-2, BERT, et cetera. Potentially low latency across the workers. Good TensorFlow community support.

Does TensorFlow support distributed training?

distribute. Strategy is a TensorFlow API to distribute training across multiple GPUs, multiple machines, or TPUs. Using this API, you can distribute your existing models and training code with minimal code changes.

Can TensorFlow run on multiple GPU?

TensorFlow provides strong support for distributing deep learning across multiple GPUs. TensorFlow is an open source platform that you can use to develop and train machine learning and deep learning models. TensorFlow operations can leverage both CPUs and GPUs.

What is distributed model training?

In distributed training the workload to train a model is split up and shared among multiple mini processors, called worker nodes. These worker nodes work in parallel to speed up model training.


1 Answers

I think your question can be answered as the comments in the issue #9596 of the tensorflow. This problem is caused by the bugs of the new version of tf.train.SyncReplicasOptimizer(). You can use old version of this API to avoid this problem.

Another solution is from the Tensorflow Distributed Benchmarks. Take a look at the source code, and you can find that they synchronize workers manually through the queue in the tensorflow. Through experiments, this benchmark runs exactly as what you expect.

Hope these comments and resources can help you solve your problem. Thanks!

like image 158
XINFENG XIE Avatar answered Oct 22 '22 13:10

XINFENG XIE