Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

alexnet distributed tensorflow performance

Tags:

tensorflow

Running Alexnet using distributed tensorflow does not scale in number of images/sec. I am using the alexnet model here alexnet_benchmark.py with a few modifications for distributed training on EC2 G2(NVIDIA GRID K520) instance and I see that it can process 56 images/sec on a single GPU,single host, however running it without the distributed code can process 112 images/sec on a single GPU. This seems very odd, Can you please review what could be wrong in this code for running it distributed ? The parameter server is not run on GPU but workers are run using CUDA_VISIBLE_DEVICES prefix

ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")

# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

# Create and start a server for the local task.
server = tf.train.Server(cluster,
                   job_name=FLAGS.job_name,
                   task_index=FLAGS.task_index)

if FLAGS.job_name == "ps":
    server.join()
elif FLAGS.job_name == "worker":

    gpu = FLAGS.task_index % 4

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        #'/gpu:%d' % i
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        #worker_device='/gpu:%d' % gpu,
        cluster=cluster)):

        summary_op = tf.merge_all_summaries()

        y, x = get_graph()

        y_ = tf.placeholder(tf.float32, [None, NUM_LABELS])

        cross_entropy = tf.reduce_mean( -tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]) )

        global_step = tf.Variable(0)

        gradient_descent_opt = tf.train.GradientDescentOptimizer(LEARNING_RATE)

        num_workers = len(worker_hosts)
        sync_rep_opt = tf.train.SyncReplicasOptimizer(gradient_descent_opt, replicas_to_aggregate=num_workers,
                replica_id=FLAGS.task_index, total_num_replicas=num_workers)

        train_op = sync_rep_opt.minimize(cross_entropy, global_step=global_step)

        init_token_op = sync_rep_opt.get_init_tokens_op()
        chief_queue_runner = sync_rep_opt.get_chief_queue_runner()

        #saver = tf.train.Saver()
        summary_op = tf.merge_all_summaries()

        init_op = tf.initialize_all_variables()
        saver = tf.train.Saver()

    is_chief=(FLAGS.task_index == 0)

    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             #logdir="/tmp/train_logs",
                             init_op=init_op,
                             summary_op=summary_op,
                             saver=saver,
                             global_step=global_step)
                             #save_model_secs=600)

    # The supervisor takes care of session initialization, restoring from
    # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:

        if is_chief:
            sv.start_queue_runners(sess, [chief_queue_runner])
            sess.run(init_token_op)

        num_steps_burn_in = 1000
        total_duration = 0
        total_duration_squared = 0
        step = 0

        while step <= 40000:

            print('Iteration %d' % step)
            sys.stdout.flush()
            batch_xs, batch_ys = get_data(BATCH_SIZE)
            train_feed = {x: batch_xs, y_: batch_ys}

            start_time = time.time()

            _, step = sess.run([train_op, global_step], feed_dict=train_feed)

            duration = time.time() - start_time
            if step > num_steps_burn_in:
                total_duration += duration
                total_duration_squared += duration * duration

                if not step % 1000:
                    iterations = step - num_steps_burn_in
                    images_processed = BATCH_SIZE * iterations
                    print('%s: step %d, images processed: %d, images per second: %.3f, time taken: %.2f' %
                            (datetime.now(), iterations, images_processed, images_processed/total_duration, total_duration))
                    sys.stdout.flush()
    sv.stop()
like image 983
Naveen Swamy Avatar asked Apr 20 '26 09:04

Naveen Swamy


1 Answers

Your code looks good- Here are a few points to keep in mind:

  • The graph created between single node and multi-node is different, comparing the might have some variations associated with them. There are added queues and synchronization that are added to transfer gradient information to and from server and worker.
  • Since Alexnet has a relatively fast forward and backwards pass, this will make the overhead of I/O transfers to and from the server to be more prominent. This may or may not show up on inception V3 (leaning toward may not).
  • Your post mentioned that you were using a separate EC2 instance for the parameter server and worker; this is the best configuration. Running workers and servers on the same node will definitely impact performance a great deal.
  • For increase workers, you would undoubtedly have to increase the number of servers serving the workers. In inception this starts occurring after 32 independent workers.
  • keep in mind, after about 16 workers there is evidence that convergence might be affected.

My suggestion is to try distributed inception V3. This topology should exhibit nearly perfect scalability, compared to its single node counter part. If it does, your hardware setup is good; if it doesn't double check your HW configuration.

If your doing a scalability study I recommend starting your relative performance gathering from one parameter server and one worker on independent instance, comparing to a single node run will have its variations.

like image 146
mriera Avatar answered Apr 22 '26 04:04

mriera



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!