Press "Enter" to skip to content

TensorFlow 篇 | TensorFlow 数据输入的最佳实践

 

「 导语 」在训练 TensorFlow 模型时,为了达到最佳的训练性能,需要一个高效的数据输入流程,该流程可以在当前训练步骤完成之前为下一步训练准备好数据。 tf.data API 可以帮助我们构建这个灵活且高效的输入流程,它包含有一系列的数据转换操作,可以很轻松地对输入数据进行各种并行化处理,本文将会对这些转换操作进行详细地介绍。

 

未优化的方法

 

一个训练流程通常包括以下几个步骤:

 

 

打开数据输入文件的句柄。

 

从文件中取出一定数量 ( batch ) 的数据。

 

使用取出的数据进行模型训练。

 

重复步骤 2-3 直至训练完成。

 

 

在未对数据输入流程进行优化时,训练流程中各部分的时间开销如下图所示:

 

 

可以看到在这种同步的实现方式下,当从文件获取数据时,模型训练是处于空闲状态的,而当对模型进行训练时,数据输入流程又处于空闲状态。训练的总体时间为各个部分所消耗的时间累加之和,这严重影响了训练的效率,因此我们需要对数据输入流程进行优化。

 

数据预取

 

数据预取操作是指模型在执行第 s 步训练时,输入流程同时从文件中读取第 s+1 步所需训练数据的并行化处理过程。与未优化的方法相比,数据预取可以将训练步骤 2-3 的累加时间开销减少到二者之间的最大值。

 

tf.data API 提供了 prefetch 转换来完成数据预取操作,它可以将数据生成的时间与数据消耗的时间解耦。该转换操作使用后台线程和一个内部的缓冲区在数据请求到来前从输入数据集中预取元素,但不保证一定预取完成。

 

预取的数据元素数量应该等于(或大于)单个训练步骤所需的 batch 大小。该参数是可调的,既可以手动指定,也可以将其设置为 tf.data.experimental.AUTOTUNE ,这时预取元素的数量将由 tf.data 运行时动态地调整。

 

使用 prefetch 转换后的训练流程其时间开销如下图所示:

 

 

可以看到数据读取和训练的时间出现了重合,从而减少了整体的时间开销。

 

并行数据提取

 

在真实的训练环境中,输入数据可能会存储在远程的文件系统如 HDFS 中。由于本地存储和远程存储之间存在一些差异,在本地可以正常运行的数据输入流程可能会在远程读取数据时无法如预期一样正常工作,具体差异如下:

 

 

首字节读取时间:从远程存储读取文件中的第一个字节可能比从本地存储读取文件首字节的时间要长好几个数量级,时间花销较大。

 

数据读取吞吐量:虽然远程存储通常提供较大的聚合带宽,但是按顺序读取单个文件可能仅会利用此带宽的一小部分,吞吐量并不高。

 

 

另外,当原始的数据加载到内存中后,还可能需要对数据进行反序列化或解密(如 protobuf 格式的数据),这还会需要额外的计算资源。诚然,无论数据存储在本地或是远程,都会存在此开销,但如果数据没有被有效地预取,远程读取数据会使得该开销变得更大。

 

为了减轻各种数据提取开销的影响, interleave 转换可以被用于并行化数据加载,以交错读取多个数据集文件的内容(如读取 TextLineDataset 等数据集)。其中,并行读取的文件数量可以通过 cycle_length 参数来控制,表示从 cycle_length 个文件交错读取数据,而从每个文件中连续读取的样本个数由 block_length 参数控制,也就是说从一个文件读取了 block_length 个连续样本后开始交错从其它文件继续读取,而文件读取的并行度由 num_parallel_calls 参数控制,与 prefetch 转换一样, interleave 转换也支持 tf.data.experimental.AUTOTUNE 设置,从而把有关使用什幺级别的并行度委派给 tf.data 运行时动态决定。

 

interleave 转换的默认参数使得它依次对多个数据文件中的单个样本进行交错读取,其时间开销如下图所示:

 

 

可以看到 interleave 从两个数据集交错获取数据样本,但是其性能并没有得到提升。而通过设置 num_parallel_calls 参数后,其时间开销如下图所示:

 

 

因为可以并行加载多个数据集文件,从而减少了依次打开数据文件所需的等待时间,继而减少了全局训练的时间开销。

 

并行数据转换

 

在准备输入数据时,可能需要对原始的数据输入进行预处理。为此 tf.data 提供了 map 转换进行数据预处理操作,该转换可以将用户自定义的函数应用于输入数据集中的每个元素。由于输入元素彼此独立,因此该预处理操作可以在多个 CPU 内核间并行执行。

 

prefetchinterleave 转换类似, map 转换也提供了 num_parallel_calls 参数来指定并行度,可以自行设置该参数的值,同时它也支持 tf.data.experimental.AUTOTUNE 设置,从而把有关使用什幺级别的并行度委派给 tf.data 运行时动态决定。

 

对于未并行化的 map 转换,其时间开销如下图所示:

 

 

此时训练流程的整体时间开销为其它各个部分的时间开销和预处理时间开销累加之和。而并行化 map 转换后,其时间开销如下图所示:

 

 

可以看到 map 部分时间开销出现了重合,从而在整体上减少了全局训练的时间开销。

 

数据缓存

 

cache 转换可以在内存或者本地存储中缓存处理后的数据集,这样可以避免在每个 epoch 都执行相同的操作(如文件打开和数据读取)。一个基本的 cache 转换的时间开销如下图所示:

 

 

可以看到在第 2epoch 时,由于缓存了数据集,因此文件打开,数据读取以及预处理的时间开销都被节省了。这是因为在 cache 转换之前的所有对数据集的操作只会在第 1epoch 被执行,接下来的 epoch 将会直接使用 cache 转换所缓存的数据。

 

如果 map 转换使用的用户自定义函数是比较耗时的,只要预处理后的结果数据集能够放进内存或本地存储,就应该在 map 转换后应用 cache 转换,以减少后续每个 epoch 都进行 map 转换的时间开销。如果用户自定义的函数增加了存储数据集所需的空间(超出缓存容量),那幺可以在缓存转换后应用 map 转换,或考虑在训练之前对数据进行预处理以减少资源使用。

 

向量化 map 转换

 

map 转换中调用用户自定义函数会产生额外的开销,因此最好对用户自定义函数进行向量化处理(即让它一次处理一批数据输入),然后在 map 转化前应用 batch 转化,这样 map 转换就会应用于每个 batch 的数据而非单一数据。

 

对于在 map 转换后应用 batch 转换的数据集,其时间开销如下图所示:

 

 

可以看到 map 函数被应用于每一个数据样本,尽管其执行时间很快,也会对整体的时间性能造成影响。而在 map 转化前应用 batch 转化的数据集,其时间开销如下图所示:

 

 

可以看到 map 函数仅被执行一次并被应用于 1batch 的样本,尽管其执行时间相比而言会更长,但额外的时间开销仅出现一次,从而改善了整体的时间性能。

 

总结

 

TensorFlow 数据输入的最佳实践方法包括以下几个部分:

 

 

使用 prefetch 转换来使得数据生产和消耗的时间开销重叠。

 

使用 interleave 转换来并行化读取数据集。

 

通过设置 num_parallel_calls 参数来并行化 map 转换。

 

使用 cache 转换在第 1 轮训练时将数据缓存在内存或本地存储中。

 

map 转换的用户自定义函数向量化。

 

 

代码实现

 

根据上面的最佳实践方法,本节使用 tf.data API 来完成 TensorFlow 输入数据的构建。具体实现代码如下所示:

 

def make_dataset(input_pattern, shuffle_size, batch_size):
    # map 解析函数,注意这里的向量化操作
    def labeler(record):
        fields = tf.io.decode_csv(
            record,
            record_defaults=['0'] * 32,
            field_delim='\t',
        )
        data = tf.strings.to_number(fields[1:32], out_type=tf.int32)
        label = tf.strings.to_number(fields[:1], out_type=tf.int32)
        data = tf.transpose(data)
        label = tf.transpose(label)
        return data, label
    filenames = tf.data.Dataset.list_files(input_pattern)
    dataset = filenames.interleave(
        lambda filename: tf.data.TextLineDataset(filename),
        cycle_length=tf.data.experimental.AUTOTUNE,
        num_parallel_calls=tf.data.experimental.AUTOTUNE,
    )
    dataset = dataset.repeat().shuffle(shuffle_size).batch(batch_size)
    dataset = dataset.map(
        lambda ex: labeler(ex),
        num_parallel_calls=tf.data.experimental.AUTOTUNE,
    ).cache()
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    return dataset

 

注意事项

 

tf.data
map

 

参考资料

 

Data Performance

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注