Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Tensorflow Variables are Not Initialized using Between-graph Replication

I have Python code test.py as below, which uses "Between-graph Replication" for Distributed Tensorflow:

import argparse
import logging

import tensorflow as tf

log = logging.getLogger(__name__)

# Job Names
PARAMETER_SERVER = "ps"
WORKER_SERVER = "worker"

# Cluster Details
CLUSTER_SPEC = {
    PARAMETER_SERVER: ["localhost:2222"],
    WORKER_SERVER: ["localhost:1111", "localhost:1112"]}


def parse_command_arguments():
    """ Set up and parse the command line arguments passed for experiment. """
    parser = argparse.ArgumentParser(
        description="Parameters and Arguments for the Test.")
    parser.add_argument(
        "--job_name",
        type=str,
        default="",
        help="One of 'ps', 'worker'"
    )
    # Flags for defining the tf.train.Server
    parser.add_argument(
        "--task_index",
        type=int,
        default=0,
        help="Index of task within the job"
    )

    return parser.parse_args()


def start_server(job_name, task_index):
    """ Create a server based on a cluster spec. """
    cluster = tf.train.ClusterSpec(CLUSTER_SPEC)
    server = tf.train.Server(
        cluster, job_name=job_name, task_index=task_index)

    return server, cluster


def model():
    """ Build up a simple estimator model. """
    # Build a linear model and predict values
    W = tf.Variable([.3], tf.float32)
    b = tf.Variable([-.3], tf.float32)
    x = tf.placeholder(tf.float32)
    linear_model = W * x + b
    y = tf.placeholder(tf.float32)
    global_step = tf.get_variable('global_step', [],
                                  initializer=tf.constant_initializer(0),
                                  trainable=False)

    # Loss sub-graph
    loss = tf.reduce_sum(tf.square(linear_model - y))

    # optimizer
    optimizer = tf.train.GradientDescentOptimizer(0.01)
    train = optimizer.minimize(loss, global_step=global_step)

    init_op = tf.global_variables_initializer()
    log.info("Variables initialized ...")

    return W, b, loss, x, y, train, global_step, init_op


if __name__ == "__main__":
    # Initializing logging with level "INFO".
    logging.basicConfig(level=logging.INFO)

    # Parse arguments from command line.
    arguments = parse_command_arguments()
    job_name = arguments.job_name
    task_index = arguments.task_index

    # Start a server.
    server, cluster = start_server(job_name, task_index)

    if job_name == "ps":
        server.join()
    else:
        with tf.device(tf.train.replica_device_setter(
                worker_device="/job:worker/task:%d" % task_index,
                cluster=cluster)):
            W, b, loss, x, y, train, global_step, init_op = model()
        with tf.train.MonitoredTrainingSession(
                master=server.target,
                is_chief=(arguments.task_index == 0 and (
                            arguments.job_name == 'worker'))) as sess:
            step = 0
            # training data
            x_train = [1, 2, 3, 4]
            y_train = [0, -1, -2, -3]
            while not sess.should_stop() and step < 1000:
                _, step = sess.run(
                    [train, global_step], {x: x_train, y: y_train})

            # evaluate training accuracy
            curr_W, curr_b, curr_loss = sess.run(
                [W, b, loss], {x: x_train, y: y_train})
            print("W: %s b: %s loss: %s" % (curr_W, curr_b, curr_loss))

I ran the code with 3 different processes in a single machine (MacPro with only CPUs) following the order below:

  1. Parameter Server: $ python test.py --task_index 0 --job_name ps
  2. Worker 1: $ python test.py --task_index 0 --job_name worker
  3. Worker 2: $ python test.py --task_index 1 --job_name worker

and I found that the process for "Worker 2" hit an error:

$ python test.py --task_index 1 --job_name worker
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:197] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2222}
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:197] Initialize GrpcChannelCache for job worker -> {0 -> localhost:1111, 1 -> localhost:1112}
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:211] Started server with target: grpc://localhost:1112
INFO:__main__:Variables initialized ...
I tensorflow/core/distributed_runtime/master_session.cc:993] Start master session 9912c75f2921fe13 with config: 

INFO:tensorflow:Waiting for model to be ready.  Ready_for_local_init_op:  None, ready: Variables not initialized: Variable, Variable_1, global_step
INFO:tensorflow:Waiting for model to be ready.  Ready_for_local_init_op:  None, ready: Variables not initialized: Variable, Variable_1, global_step

and that process for "Worker 2" was just frozen there. The error shows Tensorflow variables for "Worker 2" are unsuccessfully initialized, so I wonder if there is a bug for MonitoredTrainingSession in terms of coordinating variable initializations across Tensorflow Sessions or somewhere else, or I missed things in my code.

NOTE: The code was running with Tensorflow 0.12

like image 289
Ruofan Kong Avatar asked Mar 29 '17 05:03

Ruofan Kong


People also ask

How to create and run a graph in TensorFlow?

You create and run a graph in TensorFlow by using tf.function, either as a direct call or as a decorator. tf.function takes a regular function as input and returns a Function. A Function is a Python callable that builds TensorFlow graphs from the Python function. You use a Function in the same way as its Python equivalent. import tensorflow as tf

What is a TensorFlow variable?

Was this helpful? A TensorFlow variable is the recommended way to represent shared, persistent state your program manipulates. This guide covers how to create, update, and manage instances of tf.Variable in TensorFlow. Variables are created and tracked via the tf.Variable class.

Does TensorFlow run faster on GPU or CPU?

For better performance, TensorFlow will attempt to place tensors and variables on the fastest device compatible with its dtype. This means most variables are placed on a GPU if one is available. However, you can override this. In this snippet, place a float tensor and a variable on the CPU, even if a GPU is available.

How does the print statement work in TensorFlow?

To explain, the print statement is executed when Function runs the original code in order to create the graph in a process known as "tracing". Tracing captures the TensorFlow operations into a graph, and print is not captured in the graph. That graph is then executed for all three calls without ever running the Python code again.


1 Answers

I think this is "intended behavior" for the tf.train.MonitoredTrainingSession coordination protocol. In a recent answer, I explained how this protocol is geared towards long-running training jobs, so a worker will sleep for 30 seconds between checking to see if the variables have been initialized.

There is a race condition between Worker 1 running the initialization op and Worker 2 checking the variables, and if Worker 2 "wins" the race, it will observe that some variables are uninitialized, and it will enter a 30-second sleep before checking again.

However, the overall amount of computation in your program is very small, so in this 30-second period Worker 1 will be able to complete its work and terminate. When Worker 2 checks to see if the variables are initialized, it will create a new tf.Session that tries to connect to the other tasks, but Worker 1 is no longer running, so you will see a log message like this (repeating every 10 seconds or so):

I tensorflow/core/distributed_runtime/master.cc:193] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0

When the training job is substantially longer than 30 seconds, this would not be a problem.

One workaround is to remove the interdependency between the workers, by setting a "device filter". Since in a typical between-graph configuration the individual workers do not communicate, you can tell TensorFlow to ignore the absence of another worker at session-creation time, using tf. ConfigProto:

# Each worker only needs to contact the PS task(s) and the local worker task.
config = tf.ConfigProto(device_filters=[
    '/job:ps', '/job:worker/task:%d' % arguments.task_index])

with tf.train.MonitoredTrainingSession(
    master=server.target,
    config=config,
    is_chief=(arguments.task_index == 0 and (
              arguments.job_name == 'worker'))) as sess:
  # ...
like image 75
mrry Avatar answered Oct 26 '22 22:10

mrry