Press "Enter" to skip to content

tensorflow分布式训练

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

简介

 

如果你使用以下的代码来运行一个tensorflow的session, tf.Session只能知道本地机器的资源设备。

 

with tf.Session() as sess:
  sess.run(init_op)
  for _ in range(NUM_STEPS):
    sess.run(train_op)

 

TensorFlow的分布式框架是基于ps/worker 模式的,与单机代码差距比较大,相对来说,horovod 支持的allreduce 模式性能更好些,对单机代码的改动也小一些。TensorFlow高中低api,单机与分布式,ps与allreduce 代码长得完全不一样

 

基本概念

 

在Tensorflow分布式中,主要介绍几个概念

 

 

    1. Cluster : 是所有job的集合

 

    1. Job: 是任务的集合

 

    1. Task:是具体的任务

 

 

首先我们需要定义一个由参与分布式计算的机器组成的集群

 

tf.train.ClusterSpec({
    "worker": [
      "10.244.2.141:2222",
      "10.244.2.142:2222",      
    ],
    "ps": [
        "10.244.2.140:2222",
    ]
})

 

可以通过脚本或者借助调度框架来动态构建 ClusterSpec。 “ps” 及 “worker” 为 job_name

 

进程和主服务

 

分布式tensorflow(一)

 

分工

 

 

    1. client 是访问服务的部分

 

    1. master是用来维护参数或者数据初始化的

 

    1. worker是用来执行具体任务的

 

    1. chief,集群中一般有多个worker,需要指定其中一个worker为主节点(cheif,默认worker0),chief节点会执行一些额外的工作,比如模型导出之类的。

 

 

客户端(client)进程负责构建计算图(graph),创建 tensorflow::Session 实例。客户端一般由 Python 或 C++ 编写。当客户端调用 Session.run() 时将向主进程(master)发送请求,主进程会选择特定工作进程(worker)完成实际计算。客户端、主进程和工作进程可以位于同一台机器实现本地计算,也可以位于不同机器即分布式计算。主进程和工作进程的集合称为服务端(server),一个客户端可以同多个服务端交互。服务端进程会创建 tf.train.Server 实例并持续运行。client 与 server 之间以 grpc 通信

 

在每一台机器上起一个tf.train.Server的服务,然后放在一个集群里,整个集群的server会通过网络通信。

 

 

模型复制

 

In-graph replication一般不用了,现在主要是Between-graph replication。多个client(每个worker 启动一个client 和 server,创建一个图,计算一个图),由中间的PS task来交互client之间的数据变化。

 

用代码实现一个worker task

 

cluster = tf.train.ClusterSpec(...)
server = tf.train.Server(cluster,job_name="worker",task_index=0)
with tf.Session(server.target) as sess:
  ...

 

当有两个worker task时,会创建两个同样名字的变量,然后放在PS中的内存中共享,当一个worker task更新了变量,那个对于另一个task也是可见的

 

用代码实现一个ps task

 

cluster = tf.train.ClusterSpec(...)
server = tf.train.Server(cluster,job_name="ps",task_index=0)
# block在这里,等待集群中其他节点的接入
server.join()

 

TensorFlow程序可以通过tf.device函数来指定运行每一个操作的设备,这个设备可以是本地的CPU或者GPU,也可以是某一台远程的服务器。

 

变量放置问题

 

这种变量共享设计就带来另外一个问题,我们如何选择地址来放置我们的变量?因为上一个例子中只有一个PS task,这样把所有的变量用设备字都放置在一个固定设备中固然可行,但有时我们想要实现多于1个的PS task时候怎幺办呢?比如我们想要分配变量更新的工作,或者想平衡worker task来取变量时候的网络负载时,很可能就要用到多个PS任务了。tf 提供了一系列策略 tf_train_replica_device_setter/GreedyLoadBalancingStrategy/replica_device_setter

 

容错性

 

 

    1. 最好的情况就是非Chief的worker task出错了,因为这些task实际上是无状态的。那幺当遇到了这种错误,当这样的一个worker task恢复的时候,它会重新与它的PS task中建立连接,并且重新开始之前崩溃过的进程。

 

    1. 比较差的一种情况就是PS task失败了,那幺就有一个麻烦,因为PS task是有状态的,所有的worker task需要依赖他们来发送他们的梯度并且取得新的参数值。所以这种情况下,他们的chief worker task负责监测这种错误,如果发生了这种错误,chief worker task就打断整个训练,并从上一个检查点恢复所有的PS tasks。

 

    1. 最糟糕的情况就是chief worker task失败了,打断训练,并在当它恢复了时候从上一个好的检查点恢复。

 

 

Fault tolerance 的API

 

MonitoredTrainingSession会自动帮你初始化参数,并且当PS 任务失败会自动恢复。

 

server = tf.train.Server(...)
is_cheif = FLAGS.task_index == 0
with tf.train.MonitoredTrainingSession(master=server.target,is_chief=is_chief) as sess:
  while not sess.should_stop():
    sess.run(train_op)

 

示例代码

 

#coding=utf-8
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data    # 数据的获取不是本章重点,这里直接导入
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string("job_name", "worker", "ps or worker")
tf.app.flags.DEFINE_integer("task_id", 0, "Task ID of the worker/ps running the train")
tf.app.flags.DEFINE_string("ps_hosts", "localhost:2222", "ps机")
tf.app.flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", "worker机,用逗号隔开")
# 全局变量
MODEL_DIR = "./distribute_model_ckpt/"
DATA_DIR = "./data/mnist/"
BATCH_SIZE = 32
# main函数
def main(self):
    # ==========  STEP1: 读取数据  ========== #
    mnist = input_data.read_data_sets(DATA_DIR, one_hot=True, source_url='http://yann.lecun.com/exdb/mnist/')   
    # ==========  STEP2: 声明集群  ========== #
    # 构建集群ClusterSpec和服务声明
    ps_hosts = FLAGS.ps_hosts.split(",")
    worker_hosts = FLAGS.worker_hosts.split(",")
    cluster = tf.train.ClusterSpec({"ps":ps_hosts, "worker":worker_hosts})    # 构建集群名单
    server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_id)    # 声明服务
    n_workers = len(worker_hosts)    # worker机的数量
    # ==========  STEP3: ps机内容  ========== #
    # 分工,对于ps机器不需要执行训练过程,只需要管理变量。server.join()会一直停在这条语句上。
    if FLAGS.job_name == "ps":
        with tf.device("/cpu:0"):
            server.join()
    # ==========  STEP4: worker机内容  ========== #
    # 下面定义worker机需要进行的操作
    is_chief = (FLAGS.task_id == 0)    # 选取task_id=0的worker机作为chief
    # 通过replica_device_setter函数来指定每一个运算的设备。
    # replica_device_setter会自动将所有参数分配到参数服务器上,将计算分配到当前的worker机上。
    device_setter = tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_id,
        cluster=cluster)
    # 这一台worker机器需要做的计算内容
    with tf.device(device_setter):
        # 输入数据
        x = tf.placeholder(name="x-input", shape=[None, 28*28], dtype=tf.float32)    # 输入样本像素为28*28
        y_ = tf.placeholder(name="y-input", shape=[None, 10], dtype=tf.float32)      # MNIST是十分类
        # 第一层(隐藏层)
        with tf.variable_scope("layer1"):
            weights = tf.get_variable(name="weights", shape=[28*28, 128], initializer=tf.glorot_normal_initializer())
            biases = tf.get_variable(name="biases", shape=[128], initializer=tf.glorot_normal_initializer())
            layer1 = tf.nn.relu(tf.matmul(x, weights) + biases, name="layer1")
        # 第二层(输出层)
        with tf.variable_scope("layer2"):
            weights = tf.get_variable(name="weights", shape=[128, 10], initializer=tf.glorot_normal_initializer())
            biases = tf.get_variable(name="biases", shape=[10], initializer=tf.glorot_normal_initializer())
            y = tf.add(tf.matmul(layer1, weights), biases, name="y")
        pred = tf.argmax(y, axis=1, name="pred")
        global_step = tf.contrib.framework.get_or_create_global_step()    # 必须手动声明global_step否则会报错
        # 损失和优化
        cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, axis=1))
        loss = tf.reduce_mean(cross_entropy)
        # **通过tf.train.SyncReplicasOptimizer函数实现函数同步更新**
        opt = tf.train.SyncReplicasOptimizer(
            tf.train.GradientDescentOptimizer(0.01),
            replicas_to_aggregate=n_workers,
            total_num_replicas=n_workers
        )
        sync_replicas_hook = opt.make_session_run_hook(is_chief)
        train_op = opt.minimize(loss, global_step=global_step)
        if is_chief:
            train_op = tf.no_op()
        hooks = [sync_replicas_hook, tf.train.StopAtStepHook(last_step=10000)]    # 把同步更新的hook加进来
        config = tf.ConfigProto(
            allow_soft_placement=True,    # 设置成True,那幺当运行设备不满足要求时,会自动分配GPU或者CPU。
            log_device_placement=False,   # 设置为True时,会打印出TensorFlow使用了哪种操作
        )
        # ==========  STEP5: 打开会话  ========== #
        # 对于分布式训练,打开会话时不采用tf.Session(),而采用tf.train.MonitoredTrainingSession()
        # 详情参考:https://www.cnblogs.com/estragon/p/10034511.html
        with tf.train.MonitoredTrainingSession(
                master=server.target,
                is_chief=is_chief,
                checkpoint_dir=MODEL_DIR,
                hooks=hooks,
                save_checkpoint_secs=10,
                config=config) as sess:
            print("session started!")
            start_time = time.time()
            step = 0
        
            while not sess.should_stop():
                xs, ys = mnist.train.next_batch(BATCH_SIZE)    # batch_size=32
                _, loss_value, global_step_value = sess.run([train_op, loss, global_step], feed_dict={x:xs, y_:ys})
                if step > 0 and step % 100 == 0:
                    duration = time.time() - start_time
                    sec_per_batch = duration / global_step_value
                    print("After %d training steps(%d global steps), loss on training batch is %g (%.3f sec/batch)" % (step, global_step_value, loss_value, sec_per_batch))
                step += 1
if __name__ == "__main__":
    tf.app.run()

 

high level api

 

基于Tensorflow高阶API构建大规模分布式深度学习模型系列: 开篇

 

部署

 

tensorflow手工启动示例

 

TensorFlow 没有提供一次性启动整个集群的解决方案,所以用户需要在每台机器上逐个手动启动一个集群的所有ps 和worker 任务。为了能够以同一行代码启动不同的任务,我们需要将所有worker任务的主机名和端口、 所有ps任务的主机名和端口、当前任务的作业名称以及任务编号这4个集群配置项参数化。通过输入不同的命令行参数组合,用户就可以使用同一份代码启动每一个任务。

 

// 在在参数服务器上执行
python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="ps" --task_index=0
// 在第一个worker节点上执行
python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=0
// 在第二个worker节点上执行
python mnist_dist_test_k8s.py --ps_hosts=10.244.2.140:2222 --worker_hosts=10.244.1.134:2222,10.244.2.141:2222 --job_name="worker" --task_index=1

 

与k8s 整合

 

Kubeflow实战系列: 利用TFJob运行分布式TensorFlow 未读

 

horovod

 

因为 TensorFlow的分布式框架是基于参数服务器的 ,这种结构容易造成网络堵塞;并且开源版 TensorFlow 的跨机通信是通过 gRPC + Protocol Buffers 实现的,这种方案的问题是,首先 gRPC 本身的效率就比较差,其次使用 Protocol Buffers 序列化就意味着节点间的所有交互必须经过内存,无法使用 GPUDirect RDMA,限制了速度提升;

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注