本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.
简介
如果你使用以下的代码来运行一个tensorflow的session, tf.Session只能知道本地机器的资源设备。
with tf.Session() as sess: sess.run(init_op) for _ in range(NUM_STEPS): sess.run(train_op)
TensorFlow的分布式框架是基于ps/worker 模式的,与单机代码差距比较大,相对来说,horovod 支持的allreduce 模式性能更好些,对单机代码的改动也小一些。TensorFlow高中低api,单机与分布式,ps与allreduce 代码长得完全不一样
基本概念
在Tensorflow分布式中,主要介绍几个概念
- Cluster : 是所有job的集合
- Job: 是任务的集合
- Task:是具体的任务
首先我们需要定义一个由参与分布式计算的机器组成的集群
tf.train.ClusterSpec({ "worker": [ "10.244.2.141:2222", "10.244.2.142:2222", ], "ps": [ "10.244.2.140:2222", ] })
可以通过脚本或者借助调度框架来动态构建 ClusterSpec。 “ps” 及 “worker” 为 job_name
进程和主服务
分工
- client 是访问服务的部分
- master是用来维护参数或者数据初始化的
- worker是用来执行具体任务的
- chief,集群中一般有多个worker,需要指定其中一个worker为主节点(cheif,默认worker0),chief节点会执行一些额外的工作,比如模型导出之类的。
客户端(client)进程负责构建计算图(graph),创建 tensorflow::Session 实例。客户端一般由 Python 或 C++ 编写。当客户端调用 Session.run() 时将向主进程(master)发送请求,主进程会选择特定工作进程(worker)完成实际计算。客户端、主进程和工作进程可以位于同一台机器实现本地计算,也可以位于不同机器即分布式计算。主进程和工作进程的集合称为服务端(server),一个客户端可以同多个服务端交互。服务端进程会创建 tf.train.Server 实例并持续运行。client 与 server 之间以 grpc 通信
在每一台机器上起一个tf.train.Server的服务,然后放在一个集群里,整个集群的server会通过网络通信。
模型复制
In-graph replication一般不用了,现在主要是Between-graph replication。多个client(每个worker 启动一个client 和 server,创建一个图,计算一个图),由中间的PS task来交互client之间的数据变化。
用代码实现一个worker task
cluster = tf.train.ClusterSpec(...) server = tf.train.Server(cluster,job_name="worker",task_index=0) with tf.Session(server.target) as sess: ...
当有两个worker task时,会创建两个同样名字的变量,然后放在PS中的内存中共享,当一个worker task更新了变量,那个对于另一个task也是可见的
用代码实现一个ps task
cluster = tf.train.ClusterSpec(...) server = tf.train.Server(cluster,job_name="ps",task_index=0) # block在这里,等待集群中其他节点的接入 server.join()
TensorFlow程序可以通过tf.device函数来指定运行每一个操作的设备,这个设备可以是本地的CPU或者GPU,也可以是某一台远程的服务器。
变量放置问题
这种变量共享设计就带来另外一个问题,我们如何选择地址来放置我们的变量?因为上一个例子中只有一个PS task,这样把所有的变量用设备字都放置在一个固定设备中固然可行,但有时我们想要实现多于1个的PS task时候怎幺办呢?比如我们想要分配变量更新的工作,或者想平衡worker task来取变量时候的网络负载时,很可能就要用到多个PS任务了。tf 提供了一系列策略 tf_train_replica_device_setter/GreedyLoadBalancingStrategy/replica_device_setter
容错性
- 最好的情况就是非Chief的worker task出错了,因为这些task实际上是无状态的。那幺当遇到了这种错误,当这样的一个worker task恢复的时候,它会重新与它的PS task中建立连接,并且重新开始之前崩溃过的进程。
- 比较差的一种情况就是PS task失败了,那幺就有一个麻烦,因为PS task是有状态的,所有的worker task需要依赖他们来发送他们的梯度并且取得新的参数值。所以这种情况下,他们的chief worker task负责监测这种错误,如果发生了这种错误,chief worker task就打断整个训练,并从上一个检查点恢复所有的PS tasks。
- 最糟糕的情况就是chief worker task失败了,打断训练,并在当它恢复了时候从上一个好的检查点恢复。
Fault tolerance 的API
MonitoredTrainingSession会自动帮你初始化参数,并且当PS 任务失败会自动恢复。
server = tf.train.Server(...) is_cheif = FLAGS.task_index == 0 with tf.train.MonitoredTrainingSession(master=server.target,is_chief=is_chief) as sess: while not sess.should_stop(): sess.run(train_op)
示例代码
#coding=utf-8 import time import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data # 数据的获取不是本章重点,这里直接导入 FLAGS = tf.app.flags.FLAGS tf.app.flags.DEFINE_string("job_name", "worker", "ps or worker") tf.app.flags.DEFINE_integer("task_id", 0, "Task ID of the worker/ps running the train") tf.app.flags.DEFINE_string("ps_hosts", "localhost:2222", "ps机") tf.app.flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", "worker机,用逗号隔开") # 全局变量 MODEL_DIR = "./distribute_model_ckpt/" DATA_DIR = "./data/mnist/" BATCH_SIZE = 32 # main函数 def main(self): # ========== STEP1: 读取数据 ========== # mnist = input_data.read_data_sets(DATA_DIR, one_hot=True, source_url='http://yann.lecun.com/exdb/mnist/') # ========== STEP2: 声明集群 ========== # # 构建集群ClusterSpec和服务声明 ps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts.split(",") cluster = tf.train.ClusterSpec({"ps":ps_hosts, "worker":worker_hosts}) # 构建集群名单 server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_id) # 声明服务 n_workers = len(worker_hosts) # worker机的数量 # ========== STEP3: ps机内容 ========== # # 分工,对于ps机器不需要执行训练过程,只需要管理变量。server.join()会一直停在这条语句上。 if FLAGS.job_name == "ps": with tf.device("/cpu:0"): server.join() # ========== STEP4: worker机内容 ========== # # 下面定义worker机需要进行的操作 is_chief = (FLAGS.task_id == 0) # 选取task_id=0的worker机作为chief # 通过replica_device_setter函数来指定每一个运算的设备。 # replica_device_setter会自动将所有参数分配到参数服务器上,将计算分配到当前的worker机上。 device_setter = tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_id, cluster=cluster) # 这一台worker机器需要做的计算内容 with tf.device(device_setter): # 输入数据 x = tf.placeholder(name="x-input", shape=[None, 28*28], dtype=tf.float32) # 输入样本像素为28*28 y_ = tf.placeholder(name="y-input", shape=[None, 10], dtype=tf.float32) # MNIST是十分类 # 第一层(隐藏层) with tf.variable_scope("layer1"): weights = tf.get_variable(name="weights", shape=[28*28, 128], initializer=tf.glorot_normal_initializer()) biases = tf.get_variable(name="biases", shape=[128], initializer=tf.glorot_normal_initializer()) layer1 = tf.nn.relu(tf.matmul(x, weights) + biases, name="layer1") # 第二层(输出层) with tf.variable_scope("layer2"): weights = tf.get_variable(name="weights", shape=[128, 10], initializer=tf.glorot_normal_initializer()) biases = tf.get_variable(name="biases", shape=[10], initializer=tf.glorot_normal_initializer()) y = tf.add(tf.matmul(layer1, weights), biases, name="y") pred = tf.argmax(y, axis=1, name="pred") global_step = tf.contrib.framework.get_or_create_global_step() # 必须手动声明global_step否则会报错 # 损失和优化 cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, axis=1)) loss = tf.reduce_mean(cross_entropy) # **通过tf.train.SyncReplicasOptimizer函数实现函数同步更新** opt = tf.train.SyncReplicasOptimizer( tf.train.GradientDescentOptimizer(0.01), replicas_to_aggregate=n_workers, total_num_replicas=n_workers ) sync_replicas_hook = opt.make_session_run_hook(is_chief) train_op = opt.minimize(loss, global_step=global_step) if is_chief: train_op = tf.no_op() hooks = [sync_replicas_hook, tf.train.StopAtStepHook(last_step=10000)] # 把同步更新的hook加进来 config = tf.ConfigProto( allow_soft_placement=True, # 设置成True,那幺当运行设备不满足要求时,会自动分配GPU或者CPU。 log_device_placement=False, # 设置为True时,会打印出TensorFlow使用了哪种操作 ) # ========== STEP5: 打开会话 ========== # # 对于分布式训练,打开会话时不采用tf.Session(),而采用tf.train.MonitoredTrainingSession() # 详情参考:https://www.cnblogs.com/estragon/p/10034511.html with tf.train.MonitoredTrainingSession( master=server.target, is_chief=is_chief, checkpoint_dir=MODEL_DIR, hooks=hooks, save_checkpoint_secs=10, config=config) as sess: print("session started!") start_time = time.time() step = 0 while not sess.should_stop(): xs, ys = mnist.train.next_batch(BATCH_SIZE) # batch_size=32 _, loss_value, global_step_value = sess.run([train_op, loss, global_step], feed_dict={x:xs, y_:ys}) if step > 0 and step % 100 == 0: duration = time.time() - start_time sec_per_batch = duration / global_step_value print("After %d training steps(%d global steps), loss on training batch is %g (%.3f sec/batch)" % (step, global_step_value, loss_value, sec_per_batch)) step += 1 if __name__ == "__main__": tf.app.run()
high level api
基于Tensorflow高阶API构建大规模分布式深度学习模型系列: 开篇
部署
tensorflow手工启动示例
TensorFlow 没有提供一次性启动整个集群的解决方案,所以用户需要在每台机器上逐个手动启动一个集群的所有ps 和worker 任务。为了能够以同一行代码启动不同的任务,我们需要将所有worker任务的主机名和端口、 所有ps任务的主机名和端口、当前任务的作业名称以及任务编号这4个集群配置项参数化。通过输入不同的命令行参数组合,用户就可以使用同一份代码启动每一个任务。
// 在在参数服务器上执行 python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="ps" --task_index=0 // 在第一个worker节点上执行 python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=0 // 在第二个worker节点上执行 python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=1
与k8s 整合
Kubeflow实战系列: 利用TFJob运行分布式TensorFlow 未读
horovod
因为 TensorFlow的分布式框架是基于参数服务器的 ,这种结构容易造成网络堵塞;并且开源版 TensorFlow 的跨机通信是通过 gRPC + Protocol Buffers 实现的,这种方案的问题是,首先 gRPC 本身的效率就比较差,其次使用 Protocol Buffers 序列化就意味着节点间的所有交互必须经过内存,无法使用 GPUDirect RDMA,限制了速度提升;
Be First to Comment