Press "Enter" to skip to content

TensorFlow 篇 | TensorFlow 数据输入格式之 TFRecord

「导语」 TFRecord 是 TensorFlow 生态中的一个重要组件,它是一种二进制序列的存储格式,使用该格式可以使输入数据的读取和处理更为高效,从而提升整体训练流程的速度,另外,它还具有极高的灵活性,可以为复杂特征数据的构建与解析提供便利。本文将对 TFRecord 数据文件的生成与读取流程进行详细地介绍,并提供相应的示例代码作为参考。

 

TFRecord 格式简介
TFRecord 是 TensorFlow 生态中一个重要的组件,其本质上是一种文件格式,用于存储二进制序列的内容。 TFRecord 文件由序列化后的 protobuf 数据构成,可以直接供 TensorFlow 程序读取并用于模型训练。
对于文本格式的数据文件而言,其存储和读取的开销都比较大,相比之下, TFRecord 格式的数据文件占用的磁盘空间会更少,而且可以更高效地进行数据读取,因此将数据使用 TFRecord 格式存储能够在一定程度上提升数据处理的效率。
另外,文本格式的数据更适合处理定长且维度单一的特征数据,对于变长以及多维度的特征数据的处理会比较麻烦,而 TFRecord 格式的数据则没有这一限制,这给数据读取与处理带来了极大的灵活性。
TFRecord ProtoBuf
在生成或者读取 TFRecord 文件之前,我们需要对与 TFRecord 格式相关的 protobuf 协议有一定的了解,它们是构成 TFRecord 文件的关键部分。
Example
Example 指一个数据输入样本,它由一系列的特征组成。其 protobuf 格式如下所示:
message Example {
Features features = 1;
}

上面的 Features 即指特征组合,其 protobuf 格式如下所示:
message Features {
// Map from feature name to feature.
map<string, Feature> feature = 1;
}

这里 Features 使用 map 结构来进行存储,其中 map 的 key 表示特征的名称,为 string 类型, value 表示具体的特征值,为 Feature 类型。
另外,可以看到 Example 只是封装了一下 Features 的结构,其实质与 Features 是等同的。
Feature
Feature 指具体的特征值,其 protobuf 格式如下所示:
message Feature {
// Each feature can be exactly one kind.
oneof kind {
BytesList bytes_list = 1;
FloatList float_list = 2;
Int64List int64_list = 3;
}
}

从 Feature 的定义可以看出,它可以接收三种格式的数据,分别为:

 

BytesList 格式,可以表示 string 或 byte 类型的数据。

 

FloatList 格式,可以表示 float 和 double 类型的数据。

 

Int64List 格式,可以表示 bool 类型、 enum 类型、 int32 类型、 uint32 类型、 int64 类型以及 uint64 等多种类型的数据。

 

以上三种格式,基本囊括了所有常见的数据输入类型,在生成和解析 TFRecord 数据时,可以根据具体的数据类型来使用相应的 Feature 结构。
Python 实现
在将上述 protobuf 文件转为语言相关的数据结构后,即可使用这些结构来构建特征数据并进行序列化了。下面以 python 语言为例,来介绍这些数据结构的具体使用方法。(注:在 TensorFlow 的 python 安装包里已经包含了 Example 以及 Feature 等相关的数据结构,我们可以直接使用而无需再用 protoc 工具生成了。)
首先,我们需要构建 Feature 对象。因为 Feature 结构支持 3 种格式的数据,所以这里使用 3 个函数来分别生成不同类型的 Feature 对象。示例代码如下所示:
import tensorflow as tf
import numpy as np

 

def _bytes_feature(value):
“””Returns a bytes_list from a string / byte.”””
if isinstance(value, type(tf.constant(0))):
# BytesList won’t unpack a string from an EagerTensor.
value = value.numpy()
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

 

def _float_feature(value):
“””Returns a float_list from a float / double.”””
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

 

def _int64_feature(value):
“””Returns an int64_list from a bool / enum / int / uint.”””
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

 

Print BytesList.

 

print(_bytes_feature(b’test_string’))
print(_bytes_feature(u’test_bytes’.encode(‘utf-8’)))

 

Print FloatList.

 

print(_float_feature(np.exp(1)))

 

Print Int64List.

 

print(_int64_feature(True))
print(_int64_feature(1))

 

Serialize and Deserialize Feature.

 

serialized_feature = _float_feature(np.exp(1)).SerializeToString()
print(serialized_feature)
feature_proto = tf.train.Feature.FromString(serialized_feature)
print(feature_proto)

其中 _bytes_feature 、 _float_feature 和 _int64_feature 函数分别用来生成 BytesList 、 FloatList 和 Int64List 格式的 Feature 对象,这里假设这 3 个函数的参数 value 均为单个值。
由于 tf.train.*List 函数接收的参数均为列表 (list) 或数组 (array),所以上面的代码中才会加上 [] 符号来表示列表,如果输入的参数 value 已经为列表或数组则无需此操作。另外还需注意传递给 3 个函数的参数 value 的基础类型要与具体的 *List 相匹配,否则会报错。
在生成了 Feature 对象后,可以调用其 SerializeToString 方法对其进行序列化,从而生成序列化后的字符串。另外还可以使用 tf.train.Feature.FromString 方法来将序列化后的数据还原为 Feature 对象。
接着,我们就可以构建 Example 对象并对其进行序列化了。假设我们有 4 种 Feature ,它们分别为 boolean 类型的特征、 integer 类型的特征、 string 类型的特征和 float 类型的特征,我们首先将这 4 种特征通过上面 3 个函数编码后返回相应的 Feature 对象,然后构建一个 Feature Map 字典并生成 Features 对象,最后使用 Features 对象生成 Example 对象并进行序列化。将上述流程统一到一个函数 serialize_example 中,其示例代码如下所示:
def serialize_example(feature0, feature1, feature2, feature3):
“””
Creates a tf.train.Example message ready to be written to a file.
“””
# Create a dictionary mapping the feature name to the tf.train.Example-compatible
feature = {
‘feature0’: _int64_feature(feature0),
‘feature1’: _int64_feature(feature1),
‘feature2’: _bytes_feature(feature2),
‘feature3’: _float_feature(feature3),
}
# Create a Features message using tf.train.Example.
example_proto = tf.train.Example(features=tf.train.Features(
feature=feature))
return example_proto.SerializeToString()

 

Serialize and Deserialize Example.

 

serialized_example = serialize_example(False, 4, b’goat’, 0.9876)
print(serialized_example)
example_proto = tf.train.Example.FromString(serialized_example)
print(example_proto)

同样地,可以调用 Example 对象的 SerializeToString 来将其序列化为字符串,调用 tf.train.Example.FromString 方法来将序列化后的 Example 对象还原。
TFRecord 文件生成
假设我们有 4 种类型的 Feature ,如上一节所述,并假设它们的原始数据 (numpy) 生成方式如下述代码所示:

 

The number of observations in the dataset.

 

n_observations = int(1e4)

 

Boolean feature, encoded as False or True.

 

feature0 = np.random.choice([False, True], n_observations)

 

Integer feature, random from 0 to 4.

 

feature1 = np.random.randint(0, 5, n_observations)

 

String feature.

 

strings = np.array([b’cat’, b’dog’, b’chicken’, b’horse’, b’goat’])
feature2 = strings[feature1]

 

Float feature, from a standard normal distribution.

 

feature3 = np.random.randn(n_observations)
print(feature0, feature1, feature2, feature3)

现在我们要使用这 4 种 Feature 来生成一个包含 10,000 个数据样本的 TFRecord 文件,可以使用以下几种方式进行生成。
使用 tf.data 生成
首先使用 tf.data.Dataset.from_tensor_slices 函数来生成一个包含原始数据类型的 dateset ,代码如下所示:
features_dataset = tf.data.Dataset.from_tensor_slices(
(feature0, feature1, feature2, feature3))

 

Print dataset.

 

print(features_dataset)

 

Print one element in dataset.

 

for f0, f1, f2, f3 in features_dataset.take(1):
print(f0, f1, f2, f3)

接着我们使用上一节定义的 serialize_example 函数来生成一个包含序列化字符串类型的 dataset ,代码如下所示:
def generator():
for features in features_dataset:
yield serialize_example(*features)

 

serialized_features_dataset = tf.data.Dataset.from_generator(
generator,
output_types=tf.string,
output_shapes=(),
)

 

Print serialized dataset.

 

print(serialized_features_dataset)

 

Print one element in serialized_features_dataset.

 

for s in serialized_features_dataset.take(1):
print(s)

最后我们将序列化后的 dataset 写入 TFRecord 文件中,代码如下所示:
filename = ‘train.tfrecord’
writer = tf.data.experimental.TFRecordWriter(filename)
writer.write(serialized_features_dataset)

注意这里的 writer 使用的是 tf.data.experimental.TFRecordWriter 对象,它专用于将序列化的 dataset 对象写入到 TFRecord 文件中,要与后面介绍的 tf.io.TFRecordWriter 对象区分开来。
使用 tf.io 生成
首先将每个数据样本都转为 tf.train.Example 对象并序列化,然后再将其写入 TFRecord 文件中,这里同样使用上面介绍过的 serialize_example 函数来进行序列化,代码如下所示:

 

Write the tf.train.Example
observations to the file.

 

with tf.io.TFRecordWriter(filename) as writer:
for i in range(n_observations):
example = serialize_example(
feature0[i],
feature1[i],
feature2[i],
feature3[i],
)
writer.write(example)

这里的 writer 使用的是 tf.io.TFRecordWriter 对象,它直接将序列化的字符串写入到 TFRecord 文件中。
一般情况下,这种生成 TFRecord 文件的方式在 python 中是最常使用的,在实际使用中可以根据具体情况进行选择。
使用 MapReduce 生成
在数据处理环节,我们可能会使用 MapReduce 进行一些预处理操作,同时我们也希望可以直接借助 MapReduce 任务来生成多个 TFRecord 数据文件以供分布式训练使用,为了满足这一需求, TensorFlow 生态提供了一个扩展库 tensorflow-hadoop ,它包含了 TFRecord 格式的 MapReduce InputFormat 和 OutputFormat 实现。利用这个扩展库,我们就可以直接使用 MapReduce 任务来生成和读取 TFRecord 文件了。部分示例代码如下所示:
// Main function.
import org.tensorflow.hadoop.io.TFRecordFileOutputFormat;
Job job = Job.getInstance(config, “TFRecord”);
job.setOutputFormatClass(TFRecordFileOutputFormat.class);

 

// Mapper or Reducer.
import java.util.Arrays;
import com.google.protobuf.ByteString;
import org.tensorflow.example.BytesList;
import org.tensorflow.example.Example;
import org.tensorflow.example.Feature;
import org.tensorflow.example.Features;
import org.tensorflow.example.FloatList;
import org.tensorflow.example.Int64List;
// map or reduce function
// *List value.
Int64List value0 = Int64List.newBuilder().addAllValue(Arrays.asList(0L)).build();
Int64List value1 = Int64List.newBuilder().addAllValue(Arrays.asList(4L)).build();
BytesList value2 = BytesList.newBuilder()
.addAllValue(Arrays.asList(ByteString.copyFrom(“goat”.getBytes()))).build();
FloatList value3 = FloatList.newBuilder().addAllValue(Arrays.asList(0.9876f)).build();
// All features.
Feature feature0 = Feature.newBuilder().setInt64List(value0).build();
Feature feature1 = Feature.newBuilder().setInt64List(value1).build();
Feature feature2 = Feature.newBuilder().setBytesList(value2).build();
Feature feature3 = Feature.newBuilder().setFloatList(value3).build();
// Feature map.
Features feature = Features.newBuilder().putFeature(“feature0”, feature0)
.putFeature(“feature1”, feature1).putFeature(“feature2”, feature2)
.putFeature(“feature3”, feature3).build();
// Example.
Example example = Example.newBuilder().setFeatures(feature).build();
// Write to TFRecord file.
context.write(new BytesWritable(example.toByteArray()), NullWritable.get());

需要注意的是,为了匹配正在使用的 hadoop 版本,你可能需要修改 tensorflow-hadoop 源码中的 pom.xml 文件,将 hadoop.version 设置为你正在使用的 hadoop 版本并使用 maven 工具重新编译该项目,然后将生成的 jar 包引入到 MapReduce 项目中,避免因版本不匹配而报错。
另外,为了使 MapReduce 项目能正常编译,你还需引入 org.tensorflow:proto 库以及 com.google.protobuf:protobuf-java 库,可以从 maven 官方仓库搜索这 2 个库的最新版本并加入到 gradle 或 maven 项目的配置文件中,然后再进行项目编译即可。
使用 TFRecorder 生成
由于在生成 TFRecord 文件时往往需要编写大量的复杂代码,为了优化代码的复杂度, TensorFlow 官方开源了 TensorFlow Recorder 项目(即 TFRecorder)来更为便捷地生成 TFRecord 文件。
TFRecorder 允许用户从 Pandas dataframe 或 CSV 直接生成 TFRecords 文件,而无需编写任何复杂的代码。其对于图像数据的处理尤其方便,在 TFRecorder 之前,要大规模生成 TFRecord 格式的图像数据,必须编写一个数据流水线来从存储中加载图像并将结果序列化为 TFRecord 格式,而现在只需几行代码即可生成基于图像的 TFRecord 文件。示例代码如下所示:
import pandas as pd
import tfrecorder

 

From Pandas DataFrame

 

df = pd.read_csv(‘/path/to/data.csv’)
df.tensorflow.to_tfr(output_dir=’/my/output/path’)

 

From CSV

 

tfrecorder.create_tfrecords(
source=’/path/to/data.csv’,
output_dir=’/my/output/path’,
)

 

From an image directory

 

tfrecorder.create_tfrecords(
source=’/path/to/image_dir’,
output_dir=’/my/output/path’,
)

更多 TFRecorder 的用法请参考其官方文档。
TFRecord 文件读取
TensorFlow 提供了专用于读取 TFRecord 文件的 API 接口 tf.data.TFRecordDataset ,该接口可以将 TFRecord 文件中的内容读取到 dataset 中。代码如下所示:

 

Read TFRecord file to dataset.

 

raw_dataset = tf.data.TFRecordDataset(filename)
print(raw_dataset)

此时 dataset 中存储的是序列化格式的字符串,如果要将其解析为真实的值,还需要进一步操作。
还原为 Example
我们可以将 raw_dataset 中的每个元素都还原为 tf.train.Example ,一般在小范围对 TFRecord 数据进行检查时使用,示例代码如下所示:
for raw_record in raw_dataset.take(1):
example = tf.train.Example()
example.ParseFromString(raw_record.numpy())
print(example)
# or
example_proto = tf.train.Example.FromString(raw_record.numpy())
print(example_proto)

模型训练使用
为了在模型训练时使用该 dataset ,我们需要将 raw_dataset 中的每个元素都解析为 FeatureMap ,以匹配 Keras 模型的输入与输出。代码如下所示:
def _parse_function(example_proto):
# Create a description of the features.
feature_description = {
‘feature0’: tf.io.FixedLenFeature([], tf.int64, default_value=0),
‘feature1’: tf.io.FixedLenFeature([], tf.int64, default_value=0),
‘feature2’: tf.io.FixedLenFeature([], tf.string, default_value=”),
‘feature3’: tf.io.FixedLenFeature([], tf.float32, default_value=0.0),
}
# Parse the single input tf.train.Example
proto using the dictionary above.
# return tf.io.parse_single_example(example_proto, feature_description)
# Parse the batch input tf.train.Example protos using the dictionary above.
return tf.io.parse_example(example_proto, feature_description)

 

Print parsed dataset.

 

parsed_dataset = raw_dataset.map(_parse_function)
print(parsed_dataset)

 

Print one element in parsed_dataset.

 

for parsed_record in parsed_dataset.take(1):
print(parsed_record)

这里使用了 tf.io.parse_example 函数来将序列化后字符串解析为指定的数据类型,我们需要提前准备好 feature_description 字典, 该字典定义了 feature 的名称、长度(定长/变长)、数据类型以及默认值,以供在解析时使用。最终我们通过调用 raw_dataset 的 map 方法来将该解析函数应用到 dataset 中的每个元素。
另外,我们也可以使用 tf.io.parse_single_example 函数来进行解析,但要注意它与 tf.io.parse_example 的区别,前者适合解析单个序列化的元素,而后者适用于一个 batch 的解析。在TensorFlow 数据输入的最佳实践一文中曾介绍过 dataset 的向量化 map 操作,即对 dataset 先应用 batch 转换然后再应用 map 转换以提升效率,因此这里推荐使用后者作为序列化数据的解析函数。
最终我们可以将 parsed_dataset 应用于模型的训练中。示例代码如下所示:
model.fit(parsed_dataset)

参考资料

 

TFRecord and tf.train.Example
TFRecord 相关 ProtoBuf 文件地址
创建 TFRecords 的救星 — TensorFlow Recorder 现已开源!
TFRecorder Github 地址
Hadoop MapReduce InputFormat/OutputFormat for TFRecords

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注