Press "Enter" to skip to content

tensorflow分布式训练 — tensorflow on spark使用方法

tensorflow(2.x版本)生产训练需要在大规模训练样本下完成,单机已经无法满足训练速度。

 

tensorflow on spark是yahoo开源的基于spark进行分布式tensorflow训练的开发框架,本文要求读者熟练tensorflow单机使用,最好读一下前一篇博客: 《tensorflow2.0 – 端到端的wide&deep模型训练》

 

tensorflow自带分布式训练框架

 

tensorflow官方自带了分布式训练API,但是有一些小缺点:

 

1、要求我们 手动部署训练代码到多台服务器 ,并且为每个代码配置环境变量:

 

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"]
    },
   "task": {"type": "worker", "index": 1}
})

cluster告知tensorflow进程集群中有哪些节点
task告知自己是上述哪个节点,身份是什幺?(worker还是cheif)

2、 整个训练样本需要手动分发到所有服务器上 ,tensorflow会自行协调令每个计算节点只训练属于自己的那份数据(虽然他们都有完整的训练集),从而让集群快速训练掉所有的样本。

 

3、 训练过程中如果有某个节点宕机,则训练失败。

 

tensorflow分布式训练的默认策略MultiWorkerMirroredStrategy是:

所有worker的模型参数都是各自随机初始化的。
所有worker的模型基于各自不重叠的部分训练集分别训练,这叫做”数据并行”。
每一轮所有worker各自训练1个batch,基于各自loss求得各自的梯度,经过互相通讯获知所有worker的本轮梯度,求平均梯度用作梯度下降。
重复训练,直到所有worker将整个训练集耗尽。

总结一下,与单机版的不同之处在于:

数据集被所有worker”瓜分”训练。
所有worker是统一行动的,必须等1个batch完成后,大家再一起进入下一个batch。

那幺,到底哪个worker训练的模型是最终的模型呢?

 

道理很简单,任意一个worker都可以!

 

因为每一个batch训练后,每个worker都会基于集群所有worker的梯度来调整自己本地的模型参数,所以理论上充分训练后每个worker的模型都是最优的。

 

所以,我们手动指定某个节点为cheif身份(也就是master),只取cheif节点导出的model文件即可,包括tensorboard采样数据我们也只需要看cheif节点的即可。

 

关于上述分布式训练原理,大家可以读一下官方文档: https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#top_of_page

 

简单看了一下,每一轮batch训练完成后,tensorflow采用了一种环形广播的机制来让每个worker知道其他worker的本轮梯度,所以不存在单点性能瓶颈:

 

 

tensorflow on spark

 

因为官方方案的上述缺点,yahoo设计了tensorflow on spark框架,下面简称tfos。

 

原理

 

它利用spark编程API的方式拉起多个worker到yarn集群中运行,以此解决了计算资源分配问题。

 

对于每个worker容器,tfos会准备好TF_CONFIG环境变量并唤起我们编写的tensorflow训练函数(python),此后的工作其实还是tensorflow官方分布式训练框架的逻辑,与tfos就没什幺关系了。

 

tfos提供了python库,我们只需要把现有的训练代码封装到一个函数里,然后交给tfos来分发到spark集群中进行部署即可开始训练。

 

准备工作

 

首先我们得了解pyspark,因为我们的tensorflow代码和tfos代码都是python的,所以spark程序必须用pyspark开发而不是用java开发。

 

其次,我们必须提前将训练样本按tfrecords格式写入到HDFS中,这样后续训练时worker可以直接加载HDFS中的训练文件进行训练。tensorflow分布式训练框架能够智能协调”数据并行策略”,如果我们将样本打散成N个HDFS文件,并且指定拉起M个worker,那幺:

如果N>=M,那幺每个文件将被某个worker独占,这就是”文件级”的并行。
如果N<M,那幺就会有多个worker共享同1个文件,但是消费不同的offset,这就是”记录级”的并行。

从生产环境来讲,我们应该是开发(py)spark程序直接读hive表,将其map转化为tfrecords格式写入到hdfs中,不过就本篇博客来说我们将把CSV训练文件加载到spark中而不是从hive加载,区别仅限于此。

 

代码

 

首先是如何正确使用pyspark,我准备了一个说明项目: https://github.com/owenliang/pyspark-demo ,通过它你可以学会:

为不同的项目隔离python环境
使用pyspark编程并且提交到yarn集群

其次是tensorflow on spark的说明项目: https://github.com/owenliang/tf2-onspark ,你将学会:

如何用spark把训练样本转化为tfrecords格式并写入HDFS。
如何用tfos完成分布式训练,最终得到model和tensorboard。

步骤1:准备python环境

 

下面演示整个过程,所有操作发生在hadoop客户机。

 

安装miniconda

 

主流python多环境管理工具,自行安装: https://docs.conda.io/projects/conda/en/latest/user-guide/install/

 

我们用conda生成独立的python环境,压缩上传到HDFS,后续pyspark提交任务时可以指定spark executor从HDFS下载Python环境使用,这样就可以实现多项目python环境隔离。

 

创建tensorflow环境

 

1,创建名为tf的python3.8环境
conda create -n tf python=3.8
2,切换到该python环境
conda activate tf

 

安装pip依赖

 

正常来说是把依赖写在项目下的requirements.txt里,这里直接pip install -r 即可,就像我提供的项目一样: https://github.com/owenliang/tf2-onspark/blob/main/requirements.txt ,为了演示我手动安装一遍。

 

因为程序基于tensorflow、pandas(为了加载csv)、pyspark、tensorflow on spark开发,所以安装一下它们:

 

pip install tensorflow -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install pandas -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install tensorflowonspark -i https://pypi.tuna.tsinghua.edu.cn/simple

 

打包python到HDFS

 

先找到该python的目录:

 

(tf) [hadoop@10 liangdong]$ which python
~/workspace/service/anaconda3/envs/tf/bin/python
(tf) [hadoop@10 liangdong]$ cd ~/workspace/service/anaconda3/envs/tf
(tf) [hadoop@10 tf]$ ll
total 20
drwxrwxr-x  3 hadoop hadoop 4096 Jan  5 16:11 bin
drwxrwxr-x  2 hadoop hadoop   30 Jan  5 16:05 compiler_compat
drwxrwxr-x  2 hadoop hadoop 4096 Jan  5 16:05 conda-meta
drwxrwxr-x  9 hadoop hadoop 4096 Jan  5 16:05 include
drwxrwxr-x  3 hadoop hadoop   25 Jan  5 16:05 info
drwxrwxr-x 15 hadoop hadoop 4096 Jan  5 16:05 lib
drwxrwxr-x 11 hadoop hadoop  128 Jan  5 16:11 share
drwxrwxr-x  3 hadoop hadoop  146 Jan  5 16:05 ssl
drwxrwxr-x  3 hadoop hadoop   21 Jan  5 16:05 x86_64-conda_cos6-linux-gnu

 

将所有文件打包:

 

zip -r Python.zip *

 

将Python.zip上传 HDFS待用:

 

hdfs dfs -mkdir /tf2-onspark
hdfs dfs -put Python.zip /tf2-onspark

 

步骤2:准备tfrecords

 

克隆我的示例项目:

 

git clone https://github.com/owenliang/tf2-onspark

 

观察csv训练集

 

data下面是csv格式的泰坦尼克数据集:

 

(tf) [hadoop@10 tf2-onspark]$ ll data/  total 88  -rw-rw-r– 1 hadoop hadoop 28629 Jan 5 16:23 test.csv  -rw-rw-r– 1 hadoop hadoop 60302 Jan 5 16:23 train.csv

 

长相如下:

 

 

我们的目标是把这个csv文件转成tfrecords格式写到HDFS上,用作训练。

 

观察spark任务提交命令

 

我们要编写一段pyspark程序,让它在AM(spark的application master)上把CSV用pandas加载到内存里,将其按行记录拆分后转化成 RDD,然后对该RDD中的每一行执行map序列化为tfrecord格式的记录,最后利用tensorflow官方提供的org.tensorflow.hadoop.io.TFRecordFileOutputFormat输出格式写入到HDFS文件中。

 

提交命令见submit_gen.sh:

 

#!/bin/bash
 
cd src && zip -r src.zip * && mv src.zip .. && cd -
cd data && zip -r data.zip * && mv data.zip .. && cd -
 
TRAIN_NUM_PARTITIONS=5
TEST_NUM_PARTITIONS=5
TRAIN_OUTPUT=/tf2-onspark/train
TEST_OUTPUT=/tf2-onspark/test
 
# 删除旧输出
hdfs dfs -rm -r ${TRAIN_OUTPUT}
hdfs dfs -rm -r ${TEST_OUTPUT}
 
# --master yarn :运行到yarn集群,固定写法
# --deploy-mode cluster:AM运行到yarn中,如果改成client则需要确保本地目录有./Python/bin/python3
# --num-executors 1:一个executor容器
# --archives hdfs:///Python.zip#Python:从hdfs集群下载/Python.zip到executor工作目录,并解压到Python目录
# --py-files ./src.zip:项目python源代码,会解压到executor的某目录下并令PYTHONPATH指向该目录
# --conf spark.pyspark.python=./Python/bin/python3:指定使用自行上传的Python
spark-submit \
  --master yarn \
  --deploy-mode client \
  --num-executors 1 \
  --executor-memory 1G \
  --archives hdfs:///tf2-onspark/Python.zip#Python,data.zip#data \
  --py-files ./src.zip \
  --conf spark.pyspark.python=./Python/bin/python3 \
  --jars hdfs:///tf2-onspark/tensorflow-hadoop-1.10.0.jar \
  src/gen_tfrecords.py \
  --train_csv ./data/train.csv  \
  --test_csv ./data/test.csv \
  --train_num_partitions ${TRAIN_NUM_PARTITIONS} \
  --test_num_partitions ${TEST_NUM_PARTITIONS} \
  --train_output ${TRAIN_OUTPUT} \
  --test_output ${TEST_OUTPUT}

 

我们实际提交任务的入口文件是src/gen_tfrecords.py,它支持–train_csv等一系列传参,这属于应用层。

 

更关键的是理解spark和pyspark,下面说一下。

 

–archives指定了要通过HDFS分发到计算节点的文件包括:

Python.zip:令spark executor直接从HDFS下载,解压到yarn容器内的当前工作目录的Python文件夹,后面–conf spark.pyspark.python可以控制pyspark使用该Python执行代码。
data.zip:在客户端本地把train.csv和test.csv打包成zip,上传到HDFS临时目录,进而被executor下载并解压到容器内工作目录的data文件夹,我们代码会加载它们用于生成tfrecords。

–jars指定要分发哪些jar包到spark executor内加入到CLASSPATH中:

hdfs:///tf2-onspark/tensorflow-hadoop-1.10.0.jar:我们最终将tfrecords写入HDFS需要用到这个jar包内的org.tensorflow.hadoop.io.TFRecordFileOutputFormat类。(PS:下面会说明如何编译这个Jar包)

–py-files:

进入src目录,把所有源代码打包zip,传给–py-files分发到所有executor,这样pyspark会自动将zip解压到某个目录然后把它加入PYTHONPATH,这样我们的spark程序才能顺利找到import的python模块。

我们写pyspark程序时,可能对某个RDD做一个map操作,传入一个function,其实pyspark框架会将function以及其依赖的模块关系(注意不包括依赖的模块代码)全部序列化成字节码(pyspark用的 cloudpickle 项目完成序列化),通过网络传输到executor再反序列化,此时会去动态import依赖的module,所以py-files就是要保证代码在executor可以查找到的地方,这和java把jar包分发出去是一个道理。

 

编译tensorflow-hadoop-1.10.0.jar

 

根据tensorflow on spark安装指导 ,我们需要从tensorflow官方子项目编译得到这个jar包,用到的项目是: https://github.com/tensorflow/ecosystem/tree/master/hadoop

 

步骤是:

 

git clone https://github.com/tensorflow/ecosystem.git
cd ecosystem/hadoop
 
# 编译jar(最好配置一下settings.xml走镜像加速,否则太慢)
mvn clean package
# 上传
hdfs dfs -put target/tensorflow-hadoop-1.10.0.jar /tf2-onspark/

 

分析src/gen_tfrecords.py代码

 

首先是入口文件:

 

from pyspark.sql import SparkSession
from dataset import dataframe_to_tfrecords
import pandas as pd
import argparse
 
parser = argparse.ArgumentParser()
parser.add_argument('--train_csv', help='train csv文件名')
parser.add_argument('--test_csv', help='test csv文件名')
parser.add_argument('--train_num_partitions', help='train tfrecords文件分片个数', type=int)
parser.add_argument('--test_num_partitions', help='test tfrecords文件分片个数', type=int)
parser.add_argument('--train_output', help='保存train tfrecords的目录')
parser.add_argument('--test_output', help='保存test tfrecords的目录')
args = parser.parse_args()
 
sess = SparkSession.builder.appName('gen_tfrecords').enableHiveSupport().getOrCreate()
 
dataframe_to_tfrecords(sess, pd.read_csv(args.train_csv), args.train_num_partitions, args.train_output, include_outputs=True)
dataframe_to_tfrecords(sess, pd.read_csv(args.test_csv), args.test_num_partitions, args.test_output, include_outputs=False)

 

ArgumentParser支持了一些传参,主要是指定tfrecords文件写到HDFS什幺路径下,以及切分成几份(训练”数据并行”目的)。

 

创建spark session,用pandas把csv加载到内存,传给自己写的dataframe_to_tfrecords函数进行处理。

 

处理数据的逻辑就是逐行的把样本转成tf.train.Example对象,然后序列化成二进制,最终利用jar包写入到HDFS中:

 

def dataframe_to_tfrecords(sess, df, num_partitions, output_dir, include_outputs=False):
    all_feature_spec = deepcopy(model_spec['inputs'])
    if include_outputs:
        all_feature_spec.update(model_spec['outputs'])
 
    rows = []
    for _, row in df.iterrows():
        rows.append(row)
 
    def to_example(row):
        feature_dict = {}
        for feature_name, feature_spec in all_feature_spec.items():
            feature_value = row[feature_name]
            if feature_value is None or pd.isna(feature_value):
                feature_value = feature_spec['default']
            if feature_spec['type'] == 'int':
                feature = tf.train.Feature(int64_list=tf.train.Int64List(value=[int(feature_value)]))
            elif feature_spec['type'] == 'float':
                feature = tf.train.Feature(float_list=tf.train.FloatList(value=[float(feature_value)]))
            elif feature_spec['type'] == 'str':
                feature = tf.train.Feature(
                    bytes_list=tf.train.BytesList(value=[str(feature_value).encode('utf-8')]))
            feature_dict[feature_name] = feature
        example = tf.train.Example(features=tf.train.Features(feature=feature_dict))
        return example.SerializeToString(), None
 
    train_rdd = sess.sparkContext.parallelize(rows, num_partitions).map(to_example)
    train_rdd.saveAsNewAPIHadoopFile(output_dir, 'org.tensorflow.hadoop.io.TFRecordFileOutputFormat',
                                     keyClass="org.apache.hadoop.io.BytesWritable",
                                     valueClass="org.apache.hadoop.io.NullWritable")

 

to_example函数可以将一行样本将变为一个Example对象并序列化,不了解需要看一下文章顶部的前一篇博客。

 

我们将pandas的行数组丢给spark的parallelize变为RDD,同时指定分片num_partitions个,每个分片最终会落地为一个HDFS文件,就达到了训练集拆分文件的目的。

 

RDD经过to_example分布式计算转换后就可以保存到HDFS目录中去了。

 

正式执行submit_gen.sh

 

注意submit_gen.sh中,我们采用了:

 

spark-submit \ –master yarn \ –deploy-mode client \

 

为了方便调试我们让AM运行在client本地(–deploy-mode client),那幺因为–conf spark.pyspark.python=./Python/bin/python3 的影响,它就会在本地找这个python,然而我们本意是让任务跑到yarn容器里并从HDFS拉python环境下来。

 

为了能够使用client模式,我们把conda Python软链到当前目录下即可:

 

(tf) [hadoop@10 tf2-onspark]$ ln -s ~/workspace/service/anaconda3/envs/tf ./Python
(tf) [hadoop@10 tf2-onspark]$ ll
total 60
drwxrwxr-x 2 hadoop hadoop    39 Jan  5 17:25 data
-rw-rw-r-- 1 hadoop hadoop 33204 Jan  5 17:25 data.zip
lrwxrwxrwx 1 hadoop hadoop    48 Jan  5 17:29 Python -> /home/hadoop/workspace/service/anaconda3/envs/tf
-rw-rw-r-- 1 hadoop hadoop    55 Jan  5 16:23 README.md
-rw-rw-r-- 1 hadoop hadoop   751 Jan  5 16:23 requirements.txt
drwxrwxr-x 2 hadoop hadoop    97 Jan  5 17:25 src
-rw-rw-r-- 1 hadoop hadoop  4437 Jan  5 17:25 src.zip
-rw-rw-r-- 1 hadoop hadoop  1414 Jan  5 16:23 submit_gen.sh
-rw-rw-r-- 1 hadoop hadoop  1500 Jan  5 16:23 submit_train.sh

 

然后运行命令:

 

sh submit_gen.sh

 

查看HDFS中的tfrecords文件:

 

 

train和test两个csv各自被转成了tfrecords格式,并且都切成了5份。

 

步骤3:分布式训练

 

现在就需要用到tfos这个python包来实现spark分布式训练了。

 

观察submit_train.sh

 

该脚本提交训练任务:

 

#!/bin/bash
 
cd src && zip -r src.zip * && mv src.zip .. && cd -
 
LIB_JVM=/usr/local/jdk/jre/lib/amd64/server/
 
# --master yarn :运行到yarn集群,固定写法
# --deploy-mode cluster:AM运行到yarn中,如果改成client则需要确保本地目录有./Python/bin/python3
# --num-executors 1:一个executor容器
# --archives hdfs:///Python.zip#Python:从hdfs集群下载/Python.zip到executor工作目录,并解压到Python目录
# --py-files ./src.zip:项目python源代码,会解压到executor的某目录下并令PYTHONPATH指向该目录
# --conf spark.pyspark.python=./Python/bin/python3:指定使用自行上传的Python
#  --conf spark.executorEnv.LD_LIBRARY_PATH=${LIB_JVM}:依赖libjvm.so
#  --conf spark.dynamicAllocation.enabled=false  禁止spark自动扩容executor数量
#  --conf spark.yarn.maxAppAttempts=1 失败重试1次
spark-submit \
  --master yarn \
  --deploy-mode client \
  --num-executors 5 \
  --executor-cores 2 \
  --conf spark.task.cpus=2 \
  --executor-memory 8G \
  --archives hdfs:///tf2-onspark/Python.zip#Python \
  --py-files ./src.zip \
  --conf spark.pyspark.python=./Python/bin/python3 \
  --conf spark.executorEnv.LD_LIBRARY_PATH=${LIB_JVM} \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.yarn.maxAppAttempts=1 \
  src/train.py \
  --batch_size 32 \
  --shuffle_size 32 \
  --worker_size 5 \
  --epochs 1000 \
  --train_dir hdfs:///tf2-onspark/train \
  --model_dir hdfs:///tf2-onspark/model \
  --tensorboard_dir hdfs:///tf2-onspark/tensorboard

 

Python.zip和src.zip仍旧要分发,spark.pyspark.python配置当然也不变。

 

多了几个–conf非常关键:

–conf spark.executorEnv.LD_LIBRARY_PATH=${LIB_JVM}:是tensorflowonspark要求指定libjvm.so,根据自己的jvm安装目录做一下指向即可,没有它无法运行成功。
–conf spark.dynamicAllocation.enabled=false:禁止动态分配executor数量,默认spark有一个行为就是如果集群有闲置资源的话可能会创建比–num-executors更多的容器,这个对我们来说不可接受,因为tensorflow分布式训练的节点数量必须固定(还记得TF_CONFIG吗)。
–conf spark.yarn.maxAppAttempts=1:限制该任务只能尝试submit一次,应该没啥用。
–executor-cores 2并且–conf spark.task.cpus=2:tfos要求一个executor中只能跑1个task,我们可以令executor-core为2核同时指定单个task也用2核,这样就可以让1个executor中只跑1个task(spark的executor与task关系参考: https://blog.csdn.net/xuejianbest/article/details/85994504

后续就是程序主入口src/train.py,以及它需要的各种应用层参数,都是我们自定义的东西,这里一定要注意worker_size和spark的–num-executors要一致,启动几个executor就对应几个tensorflow worker节点,还记得tensorflowonspark原理吗?忘记请回看上面内容。

 

另外注意:

 

–model_dir hdfs:///tf2-onspark/model \  –tensorboard_dir hdfs:///tf2-onspark/tensorboard

 

这2个hdfs输出目录只有cheif节点会写入(tensorflow on spark会指定其中1个executor作为cheif身份),而其他worker节点则只会将model和tensorflow保存到yarn容器本地盘,随着任务结束就释放了,还记得tensorflowonspark原理吗?忘记请回看上面内容。

 

分析src/train.py

 

该文件为训练主入口代码:

 

import argparse
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from tensorflowonspark import TFCluster
import tensorflow as tf
from model import build_model
from dataset import dataset_from_tfrecords
import time
 
MODEL_VERSION = 0
 
def main_fun(args, ctx):
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
    with strategy.scope():
        wide_deep_model = build_model()
 
    dataset = dataset_from_tfrecords(args.train_dir + '/part*', include_outputs=True)
    dataset = dataset.batch(args.batch_size * args.worker_size).shuffle(args.shuffle_size)
 
    tensorboard_dir = args.tensorboard_dir if ctx.job_name == 'chief' else './tensorboard'
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir='{}/{}'.format(tensorboard_dir, MODEL_VERSION), histogram_freq=1)
    wide_deep_model.fit(dataset, epochs=args.epochs, callbacks=[tensorboard_callback])
 
    model_dir = args.model_dir if ctx.job_name == 'chief' else './model'
    wide_deep_model.save('{}/{}'.format(model_dir, MODEL_VERSION), save_format='tf', include_optimizer=False)
 
parser = argparse.ArgumentParser()
parser.add_argument("--batch_size", help="number of records per batch", type=int)
parser.add_argument("--shuffle_size", help="size of shuffle buffer", type=int)
parser.add_argument("--worker_size", help="number of nodes in the cluster", type=int)
parser.add_argument("--epochs", help="number of epochs", type=int)
parser.add_argument("--train_dir", help="HDFS path to training tfrecords files in parallelized format")
parser.add_argument("--model_dir", help="hdfs path to save model")
parser.add_argument("--tensorboard_dir", help="hdfs path to tensorboard logs")
 
args = parser.parse_args()
print("args:", args)
 
MODEL_VERSION = int(time.time())
 
conf = SparkConf().setAppName("tf2-onspark-training")
sc = SparkContext(conf=conf)
cluster = TFCluster.run(sc, main_fun, args, args.worker_size, num_ps=0, tensorboard=False, input_mode=TFCluster.InputMode.TENSORFLOW, master_node='chief')
cluster.shutdown()
 
print('model version={}'.format(MODEL_VERSION))

 

和直接使用tensorflow分布式训练框架的区别在于,我们是运行在spark环境下的。

 

准备好SparkContext后,把它交给tfos的TFCluster.run函数,传入我们的训练函数main_fun,该函数会被pyspark AM序列化后传到所有executor上执行。

 

args参数其实可以传任意东西,会被回调给main_fun,我们把命令行参数传入使用。

 

我们需要告诉tfos我们要启动几个worker的训练集群,所以args.worker_size是我们的第4个传参。

 

num_ps永远传0,因为我们不是采用的PS分布式训练架构,而是tensorflow的同步训练策略MultiWorkerMirroredStrategy(本文开始介绍过它的工作原理)。

 

tensorboard控制tfos是否给我们开启tensorboard的web界面,我们在集群里训练不需要它帮我们开启,后续我们直接用tensorboard命令行打开hdfs上的hdfs:///tf2-onspark/tensorboard即可显示界面。

 

input_mode=TFCluster.InputMode.TENSORFLOW是说我们自己给模型喂HDFS上的tfreocrds训练数据,其实tfos支持另外一种直接将RDD喂给tensorflow模型训练的方式,这种方式不太offical,所以我们压根不用去了解。

 

最后master_node=’cheif’是让tfos给某一个executor下发 TF_CONFIG时候指定其task type叫做cheif,而其余executor则叫做worker,这样我们的main_fun就可以通过第二个参数ctx的ctx.job_name获取到这个名字,决定当前训练节点是将model和tensorboard写入到HDFS还是写入到容器的临时目录。

 

根据对上述参数的了解,你会发现main_fun里根据是否为cheif身份,对tensorboard和save目录做了区分,cheif写入HDFS而非cheif写入本地被丢弃即可。

 

tensorflow分布式训练框架用法不变,在分布式策略的scope下进行keras模型的网络结构建设并compile,最后fit并且save模型即可,这里build_model和单机一模一样,大家简单一看即可:

 

def build_model():
    linear_features, dnn_features = _build_feature_columns()
 
    # wide
    linear_input_layer = _build_input_layer()
    linear_feature_layer = tf.keras.layers.DenseFeatures(linear_features)
    linear_dense_layer1 = tf.keras.layers.Dense(units=1)
 
    output = linear_feature_layer(linear_input_layer)
    output = linear_dense_layer1(output)
 
    linear_model = tf.keras.Model(inputs=list(linear_input_layer.values()), outputs=[output])
    linear_optimizer = tf.keras.optimizers.Ftrl(l1_regularization_strength=0.5)
 
    # deep
    dnn_feature_layer = tf.keras.layers.DenseFeatures(dnn_features)
    dnn_norm_layer = tf.keras.layers.BatchNormalization()   # important for deep
    dnn_dense_layer1 = tf.keras.layers.Dense(units=128, activation='relu')
    dnn_dense_layer2 =  tf.keras.layers.Dense(units=1)
 
    dnn_input_layer = _build_input_layer()
    output = dnn_feature_layer(dnn_input_layer)
    output = dnn_norm_layer(output) # this will break the tensorboard graph because of unfixed bug
    output = dnn_dense_layer1(output)
    output = dnn_dense_layer2(output)
 
    dnn_model =  tf.keras.Model(inputs=list(dnn_input_layer.values()), outputs=[output])
    dnn_optimizer = tf.keras.optimizers.Adagrad()
 
    # wide&deep
    wide_deep_model = tf.keras.experimental.WideDeepModel(linear_model, dnn_model, activation='sigmoid')
    wide_deep_model.compile(optimizer=[linear_optimizer,dnn_optimizer],
                            loss=tf.keras.losses.BinaryCrossentropy(),
                            metrics=tf.keras.metrics.BinaryAccuracy())
    return wide_deep_model

 

这里需要注意一个关键:

 

dataset = dataset.batch(args.batch_size * args.worker_size).shuffle(args.shuffle_size)

 

分布式训练要求:如果我们希望单个worker采用batch_size训练,那幺就得让dataset采用batch_size*worker_size的大小,其中worker_size是计算节点个数。

 

这个牵扯到”数据并行”的底层实现,我们只需要按tensorflow要求做即可。

 

至于从HDFS上加载若干tfrecords文件得到Dataset对象,这个和单机版没有任何区别,只是文件schema从file://变成了HDFS,因为tensorflow默认支持HDFS访问:

 

def dataset_from_tfrecords(tfrecords_pattern, include_outputs=False):
    all_feature_spec = deepcopy(model_spec['inputs'])
    if include_outputs:
        all_feature_spec.update(model_spec['outputs'])
 
    feature_dict = {}
    for feature_name, feature_spec in all_feature_spec.items():
        if feature_spec['type'] == 'int':
            feature = tf.io.FixedLenFeature((), tf.int64)
        elif feature_spec['type'] == 'float':
            feature = tf.io.FixedLenFeature((), tf.float32)
        elif feature_spec['type'] == 'str':
            feature = tf.io.FixedLenFeature((), tf.string)
        feature_dict[feature_name] = feature
 
    def parse_func(s):
        inputs = tf.io.parse_single_example(s, feature_dict)
        outputs = []
        if include_outputs:
            for output_name in model_spec['outputs']:
                outputs.append(inputs[output_name])
                inputs.pop(output_name)
        if include_outputs:
            return inputs, outputs
        return inputs
 
    dataset = tf.data.Dataset.list_files(tfrecords_pattern).interleave(
        lambda filename: tf.data.TFRecordDataset(filename).map(parse_func),
        num_parallel_calls=tf.data.AUTOTUNE,
    )
    return dataset

 

我们传入的实际是一个通配符字符串:hdfs:///tf2-onspark/train/part-*,那幺经过tf Dataset的list_files可以从HDFS上枚举出所有训练文件,经过interleave函数可以将每个文件名转换成一个子Dataset,每个子Dataset需要经过parse_func反序列化tfrecord记录为Example对象,并返回样本元祖(x, y),其中x是特征,y是标签。

 

interleave通过num_parallel_calls传参可以开启多线程数据预加载等特性,可以具体看一下文档。

 

总之,这样的父子结构dataset交给分布式scope下compile的模型后,在fit训练中会自动在多个worker之间均衡数据(文章开始讲过”数据划分”策略),这样就算有很大的数据集也可以通过增加更多的worker来加快训练速度。

 

正式执行submit_gen.sh

 

现在我们开始进行分布式训练:

 

sh submit_train.sh

 

然后我遇到了一个错误:

 

Traceback (most recent call last):  File “/data/workspace/users/liangdong/tf2-onspark/src/train.py”, line 4, in <module>  from tensorflowonspark import TFCluster  File “/data/workspace/users/liangdong/tf2-onspark/Python/lib/python3.8/site-packages/tensorflowonspark/TFCluster.py”, line 35, in <module>  from . import TFSparkNode  File “/data/workspace/users/liangdong/tf2-onspark/Python/lib/python3.8/site-packages/tensorflowonspark/TFSparkNode.py”, line 23, in <module>  from packaging import version  ModuleNotFoundError: No module named ‘packaging’

 

看样是tensorflowonspark依赖了一个packaing模块,可能它做pip包的时候没有写好依赖,导致我们必须手动安装一下了。

 

我们必须回头给conda tf环境安装一下这个包:

 

pip install packaging -i https://pypi.tuna.tsinghua.edu.cn/simple

 

然后重新把python打包上传一下HDFS,在此不做演示。

 

启动后可以看出tfos的原理:

 

2021-01-05 18:44:44,232 INFO (MainThread-296579) All TFSparkNodes started
2021-01-05 18:44:44,232 INFO (MainThread-296579) {'executor_id': 1, 'host': '10.45.6.105', 'job_name': 'worker', 'task_index': 0, 'port': 34737, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-8hh75zfg/listener-8s6tckcf', 'authkey': b'\x0b\x9a`\x9c\xaauL|\xaf\xdf\x99\xa9\xf6\xb62d'}
2021-01-05 18:44:44,232 INFO (MainThread-296579) {'executor_id': 2, 'host': '10.45.8.219', 'job_name': 'worker', 'task_index': 1, 'port': 37954, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-8u4sx686/listener-iokneywe', 'authkey': b'W\xdb\x9e\xbe\xa4\xf3O\xf1\x98\x91u"\xd2\x80\xb1b'}
2021-01-05 18:44:44,232 INFO (MainThread-296579) {'executor_id': 0, 'host': '10.45.7.163', 'job_name': 'chief', 'task_index': 0, 'port': 41245, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-rrj7a0ix/listener-zd7o8yhg', 'authkey': b'\x7f\xde\xa3l\x89\x93O\x1b\xa8\x92.z!\xddd\xe6'}
2021-01-05 18:44:44,232 INFO (MainThread-296579) {'executor_id': 3, 'host': '10.45.7.163', 'job_name': 'worker', 'task_index': 2, 'port': 42065, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-egfzxbbk/listener-ofybltd4', 'authkey': b'}\xf6\xd53r\x19M-\xa5\r\x92B\xc7\xfdi\x06'}
2021-01-05 18:44:44,232 INFO (MainThread-296579) {'executor_id': 4, 'host': '10.45.8.219', 'job_name': 'worker', 'task_index': 3, 'port': 41258, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-tq93wnd3/listener-si44r__v', 'authkey': b'\xd7^\x82q8KA/\x83\x89\n\xd7g[N\x18'}
2021-01-05 18:44:44,232 INFO (MainThread-296579) Waiting for TensorFlow nodes to complete...
2021-01-05 18:48:44,690 INFO (MainThread-296579) Shutting down cluster
model version=1609843466

 

训练完成后,HDFS上的model和tensorboard文件:

 

(tf) [hadoop@10 tf2-onspark]$ hdfs dfs -ls /tf2-onspark/model/1609843466
Found 3 items
drwxr-xr-x   - hadoop supergroup          0 2021-01-05 18:48 /tf2-onspark/model/1609843466/assets
-rw-r--r--   3 hadoop supergroup    2849331 2021-01-05 18:48 /tf2-onspark/model/1609843466/saved_model.pb
drwxr-xr-x   - hadoop supergroup          0 2021-01-05 18:48 /tf2-onspark/model/1609843466/variables
 
(tf) [hadoop@10 tf2-onspark]$ hdfs dfs -ls hdfs dfs -ls /tf2-onspark/tensorboard/1609843466
Found 1 items
drwxr-xr-x   - hadoop supergroup          0 2021-01-05 18:44 /tf2-onspark/tensorboard/1609843466/train

 

这些都是cheif身份的节点写入的,cheif是worker中的幸运者,被我们选中采用它的训练结果和训练过程,其实它和普通worker没啥区别,我们选哪个worker作为cheif得到的结果都差不多,实际我们也不需要操心到底cheif是哪个,让tfos随便给我们指定好即可,我们根据ctx.job_name做不同动作即可。

 

如果训练中想实时观察tensorboard,那幺直接命令行启动tensorboard指向hdfs即可:

 

(tf) [hadoop@10 tf2-onspark]$ tensorboard –logdir=hdfs:///tf2-onspark/model/

 

最后:分布式predict(inference)

 

得到模型之后,如果你有需求对大规模数据进行预测,那幺把model加载到tf-serving然后通过网络调用来预测不是一个好选择。

 

更好的方法是直接用tfos启动spark,用多个worker加载model,对HDFS数据集进行sharding后直接全内存运算打分,最后结果写回HDFS即可。

 

这个过程非常简单,大家自己看看tfos的demo即可: https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/keras/mnist_inference.py

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注