分布式tensorflow搭建实践

工业中面临的常常是庞大的数据计算量,采用单一的tensorflow不能完全解决问题,需要结合hadoop,kafka,GPU,C++ 来进行加速和搭建系统。官方的whl安装包不能满足系统要求,通过源码编译安装来加快运行速度。

在此前请先搭建hadoop环境,安装bazel,我的环境 Ubuntu18.04,python3,hadoop2.7

一. 编译本地tensorflow安装包

1.clone tensorflow源代码到本地

2.进入 tensorflow目录下目录下 执行 ./configure

3.

1.png

需要注意的几点是:指定python3目录,开启jemalloc,hadoop,kafka接口支持(jemalloc用于管理内存分配,如果安装了cuda和MKL加速需要指定目录)

4.运行

bazel build -c opt //tensorflow/tools/pip_package:build_pip_package

编译耗时:至少60分钟

5.指定位置生成安装包并安装

bazel-bin/tensorflow/tools/pip_package/build_pip_package /tmp/tensorflow_pkg   pip install /tmp/tensorflow_pkg/tensorflow-1.8.0-cp36-cp36m-linux_x86_64.whl

6.分发安装包到其它机器并安装

7.程序使用多块显卡

c = []for d in ['/device:GPU:2', '/device:GPU:3']:  with tf.device(d):    a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3])    b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2])    c.append(tf.matmul(a, b))with tf.device('/cpu:0'):  sum = tf.add_n(c)# 创建一个 session ,并将 log_device_placement 设置为 True。sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))# 执行这个操作。print(sess.run(sum))

8.运行分布式tensorflow程序

import argparseimport sysimport tensorflow as tfFLAGS = Nonedef main(_):  ps_hosts = FLAGS.ps_hosts.split(",")  worker_hosts = FLAGS.worker_hosts.split(",")  # 从参数服务器和工作主机创建一个集群  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})  # 创建并启动本地任务的服务器  server = tf.train.Server(cluster,                           job_name=FLAGS.job_name,                           task_index=FLAGS.task_index)  if FLAGS.job_name == "ps":    server.join()  elif FLAGS.job_name == "worker":    # 默认情况下将操作分配给本地Worker    with tf.device(tf.train.replica_device_setter(        worker_device="/job:worker/task:%d" % FLAGS.task_index,        cluster=cluster)):      # 建立模型...      loss = ...      global_step = tf.contrib.framework.get_or_create_global_step()      train_op = tf.train.AdagradOptimizer(0.01).minimize(          loss, global_step=global_step)    # StopAtStepHook 在运行给定步骤后处理停止    hooks=[tf.train.StopAtStepHook(last_step=1000000)]    # MonitoredTrainingSession 负责会话初始化    # 从检查点恢复,保存到检查点,一旦完成或报错就关闭    with tf.train.MonitoredTrainingSession(master=server.target,                                           is_chief=(FLAGS.task_index == 0),                                           checkpoint_dir="/tmp/train_logs",                                           hooks=hooks) as mon_sess:      while not mon_sess.should_stop():        # 异步运行训练        # mon_sess.run 在被抢占 PS 的情况下处理 AbortedError        mon_sess.run(train_op)if __name__ == "__main__":  parser = argparse.ArgumentParser()  parser.register("type", "bool", lambda v: v.lower() == "true")  # 用于定义 tf.train.ClusterSpec 的标志  parser.add_argument(      "--ps_hosts",      type=str,      default="",      help="Comma-separated list of hostname:port pairs"  )  parser.add_argument(      "--worker_hosts",      type=str,      default="",      help="Comma-separated list of hostname:port pairs"  )  parser.add_argument(      "--job_name",      type=str,      default="",      help="One of 'ps', 'worker'"  )  # Flags for defining the tf.train.Server  parser.add_argument(      "--task_index",      type=int,      default=0,      help="Index of task within the job"  )  FLAGS, unparsed = parser.parse_known_args()  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

在脚本中启动多个训练:

python trainer.py      --ps_hosts=ps0.xxx.com:2222,ps1.xxx.com:2222      --worker_hosts=worker0.xxx.com:2222,worker1.xxx.com:2222      --job_name=ps --task_index=0

如果在hadoop上运行,数据改为读写HDFS文件路径

filename_queue = tf.train.string_input_producer([    "hdfs://namenode:8020/path/to/file1.csv",    "hdfs://namenode:8020/path/to/file2.csv",])

发表评论

电子邮件地址不会被公开。 必填项已用*标注