Press "Enter" to skip to content

tensorflow2.0 – 端到端的wide&deep模型训练

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

本文将基于泰坦尼克数据集,展现tensorflow2.0实现wide&deep模型训练的整个工程化流程,内容包括:

生成tfrecords格式的训练样本文件
使用keras构建wide&deep模型结构
利用dataset API加载tfrecords训练样本
训练并保存wide&deep模型文件

本文采用单机模式完成训练,在下一篇博客我将演示如何利用spark实现海量训练样本场景下的分布式训练。

 

本文代码: https://github.com/owenliang/tf2-widedeep ,你应该对tensorflow有一定了解,我不会对基础知识做过多解释。

 

csv原始数据

 

在data目录下,我下载了泰坦尼克的数据集:

train.csv:训练文件,Survived列是训练目标(0死亡,1生还)
test.csv:测试文件,没有Survived列,模型预测结果只能提交到kaggle来进行准确率判定。

train.csv格式:

 

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S

 

原始特征可以从2个视角划分:

数字、字符串。
类别、连续值。

train流程

 

先从整体视角,看一下train.py是如何完成模型训练的。

 

import pandas as pd
import tensorflow as tf
from dataset import dataframe_to_tfrecords, dataset_from_tfrecords
from model import build_model
import time
 
def train_model(model, dataset):
    version = int(time.time())
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir='./tensorboard/{}'.format(version), histogram_freq=1)    # tensorboard --logdir=./tensorboard
    model.fit(dataset.batch(100).shuffle(100), epochs=2000, callbacks=[tensorboard_callback])
    model.save('model/{}'.format(version), save_format='tf', include_optimizer=False) # two optimizer in wide&deep can not be serialized, excluding optimizer is ok for prediction
 
# csv转tfrecords文件
dataframe_to_tfrecords(pd.read_csv('../data/train.csv'), 'train.tfrecords', include_outputs=True)
# 加载tfrecords文件为dataset
dataset = dataset_from_tfrecords('train.tfrecords', include_outputs=True)
# 创建wide&deep model
wide_deep_model = build_model()
# 训练模型
train_model(wide_deep_model, dataset)

 

为了训练得到模型文件,整体分为4步:

预处理csv文件,生成tfrecords格式的训练文件。
将磁盘上的tfrecords文件加载为dataset对象。
构建wide&deep模型对象。
训练模型,保存模型文件。

仅看本文件的话,train_model负责训练wide&deep模型:

生成模型版本号。
创建TensorBoard的callback,这样训练过程将被trace,后续可以打开tensorboard可视化的训练过程和模型结构。
传入dataset完成fit,合理调整dataset的batch size,传入训练轮数epoch。
将keras模型保存为saved_model文件格式,后续可以从文件恢复出模型或者直接用tensorflow-serving加载提供GRPC服务。

save模型时include_optimizer=False表示不保存优化器,优化器是训练时用来梯度下降的,训练好的模型后续用作线上推断时是不需要使用优化器的。但这并不是设为False的理由,真正理由是我们的wide&deep模型为wide和deep两部分采用了各自的优化器,现有tensorflow还没有兼容1个model包含多个optimizer的导出,索性就不导出。

 

定义feature

 

打开config.py,它定义了wide&deep模型的原始输入与特征工程:

 

from collections import OrderedDict
 
model_spec = {
    'inputs': {
        'Pclass': {'default': -1, 'type': 'int'},
        'Sex': {'default': '', 'type': 'str'},
        'Age': {'default': -1, 'type': 'int'},
        'SibSp': {'default': -1, 'type': 'int'},
        'Parch': {'default': -1, 'type': 'int'},
        'Fare': {'default': -1, 'type': 'float'},
        'Embarked': {'default': '', 'type': 'str'},
        'Ticket': {'default': '', 'type': 'str'}
    },
    'outputs': {
        'Survived': {'default': -1, 'type': 'int'},
    },
    'feature_columns': {
        # 连续值, 只进入deep
        'num': [
            {'feature': 'Age'},
            {'feature': 'SibSp'},
            {'feature': 'Parch'},
            {'feature': 'Fare'}
        ],
        # 类别,onehot进入wide完成记忆,embedding进入deep完成扩展
        'cate': [
            {'feature': 'Sex', 'vocab': ['male', 'female'], 'embedding': 10},
            {'feature': 'Pclass', 'vocab': [1, 2, 3], 'embedding': 10},
            {'feature': 'Embarked', 'vocab': ['S', 'C', 'Q'], 'embedding': 10},
        ],
        'hash': [
            {'feature': 'Ticket', 'bucket': 10, 'embedding': 10}
        ],
        'bucket': [
            {'feature': 'Age', 'boundaries': [10, 20, 30, 40, 50, 60, 70, 80, 90], 'embedding': 10}
        ],
        # 人工交叉:进入wide
        'cross': [
            {'feature': ['Age#bucket', 'Sex#cate'], 'bucket': 10}
        ]
    }
}

 

它围绕wide&deep的输入输出进行定义,分为3部分,对于后续构建模型与训练数据生成都至关重要:

inputs:模型原始输入,需要定义它们的name,type,default value,对生成tfrecords文件非常重要。
outputs:模型输出。
feature_columns:模型中的”特征工程layer”,它对inputs进行各种特征工程,最终输入到model:

num:连续值,例如Fare票价,只会喂入deep部分,不需要做预处理(也可以标准化)。
cate:离散值(类别,但是枚举有限),例如Sex性别可选值为male和female,它会被onehot后送入wide部分,embedding后送入deep部分。
hash:离散值(类别,但是枚举无限),例如Ticket票据是无穷枚举的字符串,可以通过hash算法变为有限大小的整数类别,同样onehot送入wide,embedding送入deep。
bucket:连续值转离散值,例如Age进行分桶得到桶编号,然后onehot送入wide,embedding送入deep。
cross:将多个类别特征交叉相乘再哈希得到新离散值,然后onehot送入wide,这属于人工经验特征交叉,依靠wide强化模型记忆。

关于wide&deep的原理,你必须读一下这篇知乎: https://www.zhihu.com/question/23194692/answer/1629971674?utm_source=hot_content_share&utm_medium=all

 

根据wide&deep理念,num连续值只会喂入deep层,cate/hash/bucket生成的类别可以同时喂入wide与deep,只不过wide采用one-hot侧重记忆性,deep采用embedding侧重扩展性,而cross属于人工特征交叉更侧重wide的记忆性,所以cross只onehot输入到wide部分即可。

 

生成tfrecords文件

 

打开dataset.py,它的第一个函数负责将csv文件转换成tfrecords文件:

 

import pandas as pd
import tensorflow as tf
from config import model_spec
from copy import deepcopy
 
def dataframe_to_tfrecords(df, tfrecords_filename, include_outputs=False):
    all_feature_spec = deepcopy(model_spec['inputs'])
    if include_outputs:
        all_feature_spec.update(model_spec['outputs'])
 
    with tf.io.TFRecordWriter(tfrecords_filename, 'GZIP') as tfrecords_writer:
        for _, row in df.iterrows():
            feature_dict = {}
            for feature_name, feature_spec in all_feature_spec.items():
                feature_value = row[feature_name]
                if feature_value is None or pd.isna(feature_value):
                    feature_value = feature_spec['default']
                if feature_spec['type'] == 'int':
                    feature = tf.train.Feature(int64_list=tf.train.Int64List(value=[int(feature_value)]))
                elif feature_spec['type'] == 'float':
                    feature = tf.train.Feature(float_list=tf.train.FloatList(value=[float(feature_value)]))
                elif feature_spec['type'] == 'str':
                    feature = tf.train.Feature(
                        bytes_list=tf.train.BytesList(value=[str(feature_value).encode('utf-8')]))
                feature_dict[feature_name] = feature
            example = tf.train.Example(features=tf.train.Features(feature=feature_dict))
            tfrecords_writer.write(example.SerializeToString())

 

首先引入config.py中的model_spec定义,如果是训练数据则include_outputs=True,即把目标值Survived也写入tfrecords文件,因为训练时需要用到目标值。

 

打开TFRecordWriter文件(GZIP压缩),逐行迭代pandas dataframe的每一行样本,将inputs+outputs的各个列作为若干feature对象,如果dataframe对应值缺失则用default值填充,这样每一行就会生成一个tf.train.Example样本对象,其本质是一个dict,其中key是特征名,value是特征值,被序列化写入到tfrecords文件中。

 

所以tfrecords文件就是一行一行的样本,每一行样本又是KV结构的若干特征。

 

加载tfrecords为dataset

 

tfrecords是一种文件格式,用来存储训练或者预测样本。

 

dataset是一种数据集加载API,可以支持从tfrecords磁盘文件加载数据集,而不必全内存加载数据集。

 

为了训练,我们用dataset API加载tfrecords文件(dataset.py文件):

 

def dataset_from_tfrecords(tfrecords_pattern, include_outputs=False):
    all_feature_spec = deepcopy(model_spec['inputs'])
    if include_outputs:
        all_feature_spec.update(model_spec['outputs'])
 
    feature_dict = {}
    for feature_name, feature_spec in all_feature_spec.items():
        if feature_spec['type'] == 'int':
            feature = tf.io.FixedLenFeature((), tf.int64)
        elif feature_spec['type'] == 'float':
            feature = tf.io.FixedLenFeature((), tf.float32)
        elif feature_spec['type'] == 'str':
            feature = tf.io.FixedLenFeature((), tf.string)
        feature_dict[feature_name] = feature
 
    def parse_func(s):
        inputs = tf.io.parse_single_example(s, feature_dict)
        outputs = []
        if include_outputs:
            for output_name in model_spec['outputs']:
                outputs.append(inputs[output_name])
                inputs.pop(output_name)
        if include_outputs:
            return inputs, outputs
        return inputs
 
    dataset = tf.data.Dataset.list_files(tfrecords_pattern).interleave(
        lambda filename: tf.data.TFRecordDataset(filename, compression_type='GZIP').map(parse_func)
    )
    return dataset

 

最终return的dataset是这样加工而来的:

利用tf.data.Dataset.list_files扫描目录下的若干tfrecords文件名。
利用interleave将每个tfrecord文件名转换成一个dataset数据集对象。
利用map逐行反序列化example样本对象。

如果tfrecords_pattern通配到N个tfrecords文件,那幺return的dataset对象实际内含N个dataset子对象分别对应N个文件,它会自动在这些文件之间轮转加载数据。

 

在map回调函数parse_func中,为了解析tfrecords中每一行序列化的example,我们必须根据model_spec定义的name和type来定义一个feature_dict描述字典,然后调用parse_single_example完成反序列化。如果是训练数据集,我们还需要将Survived列挑出来,最终返回元组(inputs特征KV字典, output目标值列表),这才是模型训练所需要的内存数据格式。

 

构建模型

 

打开model.py,它负责构建模型结构:

 

def build_model():
    linear_features, dnn_features = _build_feature_columns()
 
    # wide
    linear_input_layer = build_input_layer()
    linear_feature_layer = tf.keras.layers.DenseFeatures(linear_features)
    linear_dense_layer1 = tf.keras.layers.Dense(units=1)
 
    output = linear_feature_layer(linear_input_layer)
    output = linear_dense_layer1(output)
 
    linear_model = tf.keras.Model(inputs=list(linear_input_layer.values()), outputs=[output])
    linear_optimizer = tf.keras.optimizers.Ftrl(l1_regularization_strength=0.5)
 
    # deep
    dnn_feature_layer = tf.keras.layers.DenseFeatures(dnn_features)
    dnn_norm_layer = tf.keras.layers.BatchNormalization()   # important for deep
    dnn_dense_layer1 = tf.keras.layers.Dense(units=128, activation='relu')
    dnn_dense_layer2 =  tf.keras.layers.Dense(units=1)
 
    dnn_input_layer = build_input_layer()
    output = dnn_feature_layer(dnn_input_layer)
    output = dnn_norm_layer(output) # this will break the tensorboard graph because of unfixed bug
    output = dnn_dense_layer1(output)
    output = dnn_dense_layer2(output)
 
    dnn_model =  tf.keras.Model(inputs=list(dnn_input_layer.values()), outputs=[output])
    dnn_optimizer = tf.keras.optimizers.Adagrad()
 
    # wide&deep
    wide_deep_model = tf.keras.experimental.WideDeepModel(linear_model, dnn_model, activation='sigmoid')
    wide_deep_model.compile(optimizer=[linear_optimizer,dnn_optimizer],
                            loss=tf.keras.losses.BinaryCrossentropy(),
                            metrics=tf.keras.metrics.BinaryAccuracy())
    return wide_deep_model

 

我们使用keras的实验性API WideDeepModel,它要求传入wide和deep两个子模型,以及各自的optimizer,整个Wide&deep模型就一路输出目标值,采用Binary交叉熵计算loss,使用BinaryAcc评估准确率。

 

根据wide&deep实现原理,wide就是一层线性函数并且采用带L2正则的Ftrl优化器做梯度下降,这样可以令wide部分的权重系数更加稀疏易于保存,deep部分是普通的深层网络,最终只有1个神经元的输出与wide部分做add求和操作得到1个数字,再经过sigmoid激活后作为最终wide&deep模型的概率输出。

 

对于wide和deep来说,大部分的inputs都会使用,因此它们采用一样的input layer,但是wide和deep的特征工程不一样(wide输入onehot,deep输入embedding),所以它们有各自不同feature column层做特征预处理。

 

input layer负责接受模型的输入,因为我们的输入的样本是KV字典风格的Example,因此input layer也遵循相同格式生成:

 

def build_input_layer():
    input_layer = {}
    for input_name, input_spec in model_spec['inputs'].items():
        if input_spec['type'] == 'int':
            input_feature = tf.keras.Input((), name=input_name, dtype=tf.int64)
        elif input_spec['type'] == 'float':
            input_feature = tf.keras.Input((), name=input_name, dtype=tf.float32)
        elif input_spec['type'] == 'str':
            input_feature = tf.keras.Input((), name=input_name, dtype=tf.string)
        input_layer[input_name] = input_feature
    return input_layer

 

根据model_spec生成KV风格的Input()映射关系即可,实际生产中我们都是像这样按KV风格传入若干特征的。

 

最终就是feature column特征预处理层的逻辑:

 

def _build_feature_columns():
    num_feature_arr = []
    onehot_feature_arr = []
    embedding_feature_arr = []
 
    feature_column_spec = model_spec['feature_columns']
 
    base_cate_map = {}
    for num_feature in feature_column_spec['num']:
        num_feature_arr.append(tf.feature_column.numeric_column(num_feature['feature']))
    for cate_feature in feature_column_spec['cate']:
        base_cate_map[cate_feature['feature'] + '#cate'] = (tf.feature_column.categorical_column_with_vocabulary_list(cate_feature['feature'], cate_feature['vocab']), cate_feature)
    for hash_feature in feature_column_spec['hash']:
        base_cate_map[hash_feature['feature'] + '#hash'] = (tf.feature_column.categorical_column_with_hash_bucket(hash_feature['feature'], hash_feature['bucket']), hash_feature)
    for bucket_feature in feature_column_spec['bucket']:
        num_feature = tf.feature_column.numeric_column(bucket_feature['feature'])
        base_cate_map[bucket_feature['feature'] + '#bucket'] = (tf.feature_column.bucketized_column(num_feature, boundaries=bucket_feature['boundaries']), bucket_feature)
 
    cross_cate_map = {}
    for cross_feature in feature_column_spec['cross']:
        cols = []
        for col_name in cross_feature['feature']:
            column, spec = base_cate_map[col_name]
            cols.append(column)
        cross_cate_map['&'.join(cross_feature['feature']) + '#cross'] = tf.feature_column.crossed_column(cols, hash_bucket_size=cross_feature['bucket'])
 
    for cate_name in base_cate_map:
        column, spec = base_cate_map[cate_name]
        onehot_feature_arr.append(tf.feature_column.indicator_column(column))
        embedding_feature_arr.append(tf.feature_column.embedding_column(column, spec['embedding']))
    for cross_cate_name in cross_cate_map:
        cross_feature_col = cross_cate_map[cross_cate_name]
        onehot_feature_arr.append(tf.feature_column.indicator_column(cross_feature_col))
 
    return onehot_feature_arr, num_feature_arr + embedding_feature_arr

 

feature column将特征预处理纳入了模型结构中,因此线上推断时客户端不需要做特征预处理,比如Sex的male与female直接按字符串传入即可。

 

根据model_spec的feature_column定义,我们生成:

num_feature_arr:连续值,给deep用。
onehot_feature_arr:独热编码,给wide用,包含:枚举的类别、hash取模产生的类别、连续值经过bucket分桶产生的类别,或者将这几个类别cross交叉产生的新类别。
embedding_feature_arr:嵌入稠密向量,给deep用,包含:枚举的类别、hash取模产生的类别、连续值经过bucket分桶产生的类别。

indicator_column用于生成onthot向量,embedding_column用于生成embedding向量。

 

函数最后返回wide部分使用的特征工程,与deep部分使用的特征工程。

 

模型predict

 

查看predict.py,它加载keras model进行预测:

 

import pandas as pd
import numpy as np
import tensorflow as tf
from dataset import dataframe_to_tfrecords, dataset_from_tfrecords
 
# 预测
def model_predict(model, dataset, df ):
    dataset = dataset.batch(100)
    pred_y = []
    for x in dataset:
        y = wide_deep_model(x)
        y = np.where(y.numpy()[:, 0]>0.5, 1, 0)
        pred_y.extend(y)
    df['Survived'] = pred_y
 
# csv转tfrecords文件
df = pd.read_csv('../data/test.csv')
dataframe_to_tfrecords(df, 'test.tfrecords', include_outputs=False)
# 加载tfrecords文件为dataset
dataset = dataset_from_tfrecords('test.tfrecords', include_outputs=False)
# 加载模型
wide_deep_model = tf.keras.models.load_model('model/1608529479')
# 使用模型
model_predict(wide_deep_model, dataset, df)
# 保存预测结果
df[['PassengerId','Survived']].to_csv('./result.csv', index=False, header=True)

 

这里将test.csv转成tfrecords文件,然后再用dataset加载它,这时候里面不包含目标值Survived。

 

模型通过load_model加载对应版本,注意keras模型经过load后就不再具备predict方法了,只能当作函数调用。

 

模型输出值是对目标值的推断,因此我们判断>0.5就是Survived=1生还,否则Survied=0死亡,最后将passengerId和预测Survived保存到csv中。

 

查看tensorboard

 

我们每次训练都会在tensorboard目录下生成一个版本号的子目录,我们可以用tensorboard –logdir指向tensorboard目录(注意不是某个版本的子目录),从而可以选择任意模型版本查看其训练过程和网络结构。(注:tensorboard可以在模型训练期间打开,通过定时刷新实时观察模型进展)

 

tensorboard –logdir=./src/tensorboard

 

查看随着epoch增长,准确率和损失的变化。

 

 

查看模型结构(我这里wide&deep模型部分被折叠了,需要右键add to main graph):

 

 

然后双击放大wide&deep模型:

 

 

可以看到wide部分和deep部分,两路输出add后经过sigmoid:

 

 

双击展开左侧wide部分model,可以看到其内部是由dense_feature特征工程层+单神经元的dense层构成:

 

 

双击展开dense_feature层,我们理应看到feature column带来的各种one-hot向量:

 

 

可以看到输入到dense feature层经过各种Indicator做onehot后重新concat到了一个宽向量,再送入后续的dense层,这就是dense feature layer的原理了。

 

再回头看一下deep层:

 

 

可以看出原始inputs经过dense feature层特征工程后(都是embedding),经过batch norm标准化(为了连续值),再经过2层神经网络。

 

双击展开dense feature层:

 

 

最左侧是连续值Age经过分桶后得到离散类别,再embedding。

 

左2是枚举类别直接embedding,左3是连续值。

 

结束语

 

tensorflow2.x的keras模型不支持optional input,也就是说调用模型的时候必须保证传所有的特征,如果没有就传一个自定义的默认值(比如-1),训练数据也遵循同样的默认值即可保证正确。

 

目前来看,tfrecords+dataset+feature column+saved_model+tensorboard是tensorflow端到端工程化的必备工具链,希望本文对大家有所启发,有任何问题欢迎留言交流。

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注