I'm confused about the distributed training process in tensorflow.
I think the tensorflow feed a batch_size of data to a worker and then the worker update the ps server,is this right?
But when training , I noticed that the step number in the log may strange.
If I have only 2 workers , I thinks the right process should be some thing like
[worker1] step 0 xxxxxxx
[worker2] step 100 xxxxxxx
[worker1] step 200 xxxxxxx
[worker2] step 300 xxxxxxx
..... every worker should print different step to log.
Actually , the log are as below:
[worker1] step 0 xxxxxxx
[worker2] step 100 xxxxxxx
[worker1] step 100 xxxxxxx
[worker2] step 200 xxxxxxx
[worker1] step 300 xxxxxxx
... Why the worker1 dosn't print step 200?
I am confused about the job assign.
How the tensorflow do the distribution training? the chief worker split data to batch_size , then give a batch to a worker then update the ps server? OR, every worker will run whole data ,and update the ps server ?
```
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Read TFRecords files for training
filename_queue = tf.train.string_input_producer(
tf.train.match_filenames_once(FLAGS.train),
num_epochs=epoch_number)
serialized_example = read_and_decode(filename_queue)
batch_serialized_example = tf.train.shuffle_batch(
[serialized_example],
batch_size=batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
features = tf.parse_example(
batch_serialized_example,
features={
"label": tf.FixedLenFeature([], tf.float32),
"ids": tf.VarLenFeature(tf.int64),
"values": tf.VarLenFeature(tf.float32),
})
batch_labels = features["label"]
batch_ids = features["ids"]
batch_values = features["values"]
# Read TFRecords file for validatioin
validate_filename_queue = tf.train.string_input_producer(
tf.train.match_filenames_once(FLAGS.eval),
num_epochs=epoch_number)
validate_serialized_example = read_and_decode(validate_filename_queue)
validate_batch_serialized_example = tf.train.shuffle_batch(
[validate_serialized_example],
batch_size=validate_batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
validate_features = tf.parse_example(
validate_batch_serialized_example,
features={
"label": tf.FixedLenFeature([], tf.float32),
"ids": tf.VarLenFeature(tf.int64),
"values": tf.VarLenFeature(tf.float32),
})
validate_batch_labels = features["label"]
validate_batch_ids = features["ids"]
validate_batch_values = features["values"]
logits = inference(batch_ids, batch_values)
batch_labels = tf.to_int64(batch_labels)
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits,
batch_labels)
loss = tf.reduce_mean(cross_entropy, name='loss')
print("Use the optimizer: {}".format(FLAGS.optimizer))
optimizer = tf.train.FtrlOptimizer(learning_rate)
global_step = tf.Variable(0, name='global_step', trainable=False)
train_op = optimizer.minimize(loss, global_step=global_step)
# Initialize saver and summary
steps_to_validate = FLAGS.steps_to_validate
init_op = tf.initialize_all_variables()
saver = tf.train.Saver(max_to_keep = 2)
keys_placeholder = tf.placeholder("float")
keys = tf.identity(keys_placeholder)
tf.add_to_collection("inputs", json.dumps({'key': keys_placeholder.name}))
tf.add_to_collection("outputs", json.dumps({'key': keys.name,
'softmax': inference_softmax.name,
'prediction': inference_op.name}))
summary_op = tf.merge_all_summaries()
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir="./train_process/",
init_op=init_op,
summary_op=summary_op,
saver=saver,
global_step=global_step,
save_model_secs=60)
# Create session to run graph
with sv.managed_session(server.target) as sess:
while not sv.should_stop():
# Get coordinator and run queues to read data
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord, sess=sess)
start_time = datetime.datetime.now()
try:
while not coord.should_stop():
_, loss_value, step = sess.run([train_op, loss, global_step])
if step % steps_to_validate == 0:
accuracy_value, auc_value, summary_value = sess.run(
[accuracy, auc_op, summary_op])
end_time = datetime.datetime.now()
print("[{}] Task: {}, Step: {}, loss: {}, accuracy: {}, auc: {}".format(
end_time - start_time,
FLAGS.task_index,
step, loss_value, accuracy_value,
auc_value))
start_time = end_time
except tf.errors.OutOfRangeError:
print("Done training after reading all data")
finally:
coord.request_stop()
print("coord stopped")
# Wait for threads to exit
coord.join(threads)
```
(If logs are large, please upload as attachment or provide link). ```
[0:00:17.115814] Task: 0, Step: 74600, loss: 0.303285002708, accuracy: 0.910000026226, auc: 0.946377456188
[0:00:03.804889] Task: 1, Step: 74700, loss: 0.287385582924, accuracy: 0.879999995232, auc: 0.946395516396
[0:00:03.778589] Task: 0, Step: 74800, loss: 0.247096762061, accuracy: 0.860000014305, auc: 0.946370542049
[0:00:03.772320] Task: 1, Step: 74900, loss: 0.264987647533, accuracy: 0.899999976158, auc: 0.946406364441
[0:00:03.795459] Task: 0, Step: 75000, loss: 0.228719010949, accuracy: 0.899999976158, auc: 0.946437120438
[0:00:01.902293] Task: 1, Step: 75000, loss: 0.217391207814, accuracy: 0.910000026226, auc: 0.946473121643
[0:00:01.942055] Task: 1, Step: 75100, loss: 0.284583866596, accuracy: 0.889999985695, auc: 0.946496844292
[0:00:03.860608] Task: 0, Step: 75200, loss: 0.273199081421, accuracy: 0.850000023842, auc: 0.946503221989
[0:00:03.800881] Task: 1, Step: 75300, loss: 0.189931258559, accuracy: 0.930000007153, auc: 0.946559965611
```
There aren't really official docs besides the HowTo so a good way to figure out how things work is by studying examples.
The basic concept to understand is that there are 3 kinds of tensorflow processes.
The client -- this is the Python process which builds the graph, connects to local master (Session()
) or remote master (Session("grpc://...")
) and issues session.run
calls.
There's the master, which is the process that client connects to, and which figures out how to distribute the work among workers.
There's the worker, which does actual work. If your graph has a with tf.device(job:worker/task:0):
, block, then computation in that block should be executed on task:0
When you create new server with server = tf.train.Server
, the process that's started is both a worker and a master, but it's useful to understand the difference for debugging.
The easiest example of distributed TF is when you have a single client, which starts an in-process master, and multiple workers. Here's one such example. In this usage, the main difference from non-distributed version is that you do with tf.device("worker1")
instead of tf.device("gpu1")
to tell it to execute that part of graph on worker1
It gets more complicated when you have multiple clients, as in the case of "between-graph replication." The parameter server example, you have multiple parallel training loops, where each loop corresponds to a separate client which is a python process issuing run calls. To see on which worker the ops are actually located you can look on the with tf.device
annotations.
In your example you don't have explicit with.device("job:worker/task")
blocks in your snippet, but this part is done by tf.device(tf.train.replica_device_setter(
. Essentially instead of having a fixed device for all ops in block, the code runs the replica_device_setter
for each op to generate device to place it on. It places all variables onto /job:ps/task
workers, and the rest of the ops on the current worker. The code for replica_device_setter
got a bit complicated over time, but you could use a simpler implementation of it for the same effect as below
def simple_setter(ps_device="/job:ps/task:0"):
def _assign(op):
node_def = op if isinstance(op, tf.NodeDef) else op.node_def
if node_def.op == "Variable":
return ps_device
else:
return "/job:worker/task:%d" % (FLAGS.task)
return _assign
...
with tf.device(simple_setter):
...
When you run this, each python process will create slightly different version of the graph, except for the Variable nodes, which will look identical in each process (check with tf.get_default_graph().as_graph_def())
When you have multiple clients running training loops, one issue is -- who executes tasks that need to be done once for all clients? For instance, someone needs to run initializers for all variables. You could put sess.run(tf.initialize_all_variables...)
in client body, but with multiple clients running in parallel, this means op initializations are run more than once. So the solution is to designate one worker as "chief" worker, and only have that worker run the operation.
Also, there's no built-in distinction between worker
and ps
devices -- it's just a convention that variables get assigned to ps
devices, and ops are assigned to worker
devices. You could alternatively only have worker
devices, and have a version of replica_device_setter
put variables to 0'th worker.
Here's a barebones example with m
workers updating variables sharded over n
PS tasks, which uses explicit device assignment instead of replica_device_setter
To summarize, in your case replica_device_setter
makes sure that your global_step
is a variable that's stored on ps
worker, and as such that makes this variable shared across all of your training loops. As to why you get the same of global_step
in both workers -- there's nothing in your graph forcing global_step
to be read after it's incremented. So if you run sess.run([increment_global_step, fetch_global_step])
in parallel on two different workers, you could potentially see
worker 0: 0
worker 1: 0
worker 0: 2
worker 1: 2
etc
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With