「导语」模型的训练与评估是整个机器学习任务流程的核心环节。只有掌握了正确的训练与评估方法，并灵活使用，才能使我们更加快速地进行实验分析与验证，从而对模型有更加深刻的理解。

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

## Train and Test data from numpy array.

x_train, y_train = (
np.random.random((60000, 784)),
np.random.randint(10, size=(60000, 1)),
)
x_test, y_test = (
np.random.random((10000, 784)),
np.random.randint(10, size=(10000, 1)),
)

## Reserve 10,000 samples for validation.

x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]

## Model Create

inputs = keras.Input(shape=(784, ), name=’digits’)
x = layers.Dense(64, activation=’relu’, name=’dense_1′)(inputs)
x = layers.Dense(64, activation=’relu’, name=’dense_2′)(x)
outputs = layers.Dense(10, name=’predictions’)(x)
model = keras.Model(inputs=inputs, outputs=outputs)

## Model Compile.

model.compile(
# Optimizer
optimizer=keras.optimizers.RMSprop(),
# Loss function to minimize
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
# List of metrics to monitor
metrics=[‘sparse_categorical_accuracy’],
)

## Model Training.

print(‘# Fit model on training data’)
history = model.fit(
x_train,
y_train,
batch_size=64,
epochs=3,
# We pass some validation for monitoring validation loss and metrics
# at the end of each epoch
validation_data=(x_val, y_val),
)
print(‘\nhistory dict:’, history.history)

## Model Evaluate.

print(‘\n# Evaluate on test data’)
results = model.evaluate(x_test, y_test, batch_size=128)
print(‘test loss, test acc:’, results)

## Model Predict.

print(‘\n# Generate predictions for 3 samples’)
predictions = model.predict(x_test[:3])
print(‘predictions shape:’, predictions.shape)

def basic_loss_function(y_true, y_pred):
return tf.math.reduce_mean(tf.abs(y_true – y_pred))

(self) 和 call(self, y_true, y_pred) 方法，这种实现方式与子类化层和模型比较相似。比如要实现一个加权的二分类交叉熵损失，其代码如下：
class WeightedBinaryCrossEntropy(keras.losses.Loss):
“””
Args:
pos_weight: Scalar to affect the positive labels of the loss function.
weight: Scalar to affect the entirety of the loss function.
from_logits: Whether to compute loss from logits or the probability.
reduction: Type of tf.keras.losses.Reduction to apply to loss.
name: Name of the loss function.
“””
def init
(self,
pos_weight,
weight,
from_logits=False,
reduction=keras.losses.Reduction.AUTO,
name=’weighted_binary_crossentropy’):
super(). init
(reduction=reduction, name=name)
self.pos_weight = pos_weight
self.weight = weight
self.from_logits = from_logits

```def call(self, y_true, y_pred):
ce = tf.losses.binary_crossentropy(
y_true,
y_pred,
from_logits=self.from_logits,
)[:, None]
ce = self.weight * (ce * (1 - y_true) + self.pos_weight * ce * y_true)
return ce```

model.compile(
loss=WeightedBinaryCrossEntropy(
pos_weight=0.5,
weight=2,
from_logits=True,
),
)

(self) 方法，用来创建状态变量， update_state(self, y_true, y_pred, sample_weight=None) 方法，用来更新状态变量， result(self) 方法，用来返回状态变量的最终结果， 以及 reset_states(self) 方法，用来重新初始化状态变量。比如要实现一个多分类中真正例 (True Positives) 数量的统计指标，其代码如下：
class CategoricalTruePositives(keras.metrics.Metric):
def init
(self, name=’categorical_true_positives’, **kwargs):
super(). init
(name=name, **kwargs)

```def update_state(self, y_true, y_pred, sample_weight=None):
y_pred = tf.reshape(tf.argmax(y_pred, axis=1), shape=(-1, 1))
values = tf.cast(y_true, 'int32') == tf.cast(y_pred, 'int32')
values = tf.cast(values, 'float32')
if sample_weight is not None:
sample_weight = tf.cast(sample_weight, 'float32')
values = tf.multiply(values, sample_weight)
def result(self):
return self.true_positives
def reset_states(self):
# The state of the metric will be reset at the start of each epoch.
self.true_positives.assign(0.)```

model.compile(
optimizer=keras.optimizers.RMSprop(learning_rate=1e-3),
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[CategoricalTruePositives()],
)

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

inputs = keras.Input(shape=(784, ), name=’digits’)
x1 = layers.Dense(64, activation=’relu’, name=’dense_1′)(inputs)
x2 = layers.Dense(64, activation=’relu’, name=’dense_2′)(x1)
outputs = layers.Dense(10, name=’predictions’)(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

keras.backend.std(x1),
name=’std_of_activation’,
aggregation=’mean’,
)

model.compile(
optimizer=keras.optimizers.RMSprop(1e-3),
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
)
model.fit(x_train, y_train, batch_size=64, epochs=1)

fit 方法还提供了另外一个参数 validation_split 来自动从训练数据集中保留一定比例的数据作为验证，该参数取值为 0-1 之间，比如 0.2 代表 20% 的训练集用来做验证， fit 方法会默认保留 numpy 数组最后面 20% 的样本作为验证集。

TensorFlow 2.0 之后，更为推荐的是使用 tf.data 模块下 dataset 类型的数据作为训练和验证的数据输入，它能以更加快速以及可扩展的方式加载和预处理数据。

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))

## Shuffle and slice the dataset.

train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

## Prepare the validation dataset

val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(64)

## Now we get a test dataset.

test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_dataset = test_dataset.batch(64)

## we don’t pass a `batch_size` argument.

model.fit(train_dataset, epochs=3, validation_data=val_dataset)
result = model.evaluate(test_dataset)

dataset 一般是一个二元组，第一个元素为模型的输入特征，如果为多输入就是多个特征的字典 (dict) 或元组 (tuple)，第二个元素是真实的数据标签 (label) ，即 ground truth。

dataset 可以调用内置方法提前对数据进行预处理，比如数据打乱 (shuffle)， batch 以及 repeat 等操作。shuffle 操作是为了减小模型过拟合的几率，它仅为小范围打乱，需要借助于一个缓存区，先将数据填满，然后在每次训练时从缓存区里随机抽取 batch_size 条数据，产生的空缺用后面的数据填充，从而实现了局部打乱的效果。batch 是对数据进行分批次，常用于控制和调节模型的训练速度以及训练效果，因为在 dataset 中已经 batch 过，所以 fit 方法中的 batch_size 就无需再提供了。repeat 用来对数据进行复制，以解决数据量不足的问题，如果指定了其参数 count，则表示整个数据集要复制 count 次，不指定就会无限次复制 ，此时必须要设置 steps_per_epoch 参数，不然训练无法终止。

dataset 还有 map 与 prefetch 方法比较实用。 map 方法接收一个函数作为参数，用来对 dataset 中的每一条数据进行处理并返回一个新的 dataset ，比如我们在使用 TextLineDataset 读取文本文件后生成了一个 dataset ，而我们要抽取输入数据中的某些列作为特征 (features)，某些列作为标签 (labels)，此时就会用到 map 方法。prefetch 方法预先从 dataset 中准备好下次训练所需的数据并放于内存中，这样可以减少每轮训练之间的延迟等待时间。

import numpy as np

## Here’s the same example using `class_weight`

class_weight = {0: 1., 1: 1., 2: 1., 3: 1., 4: 1.,
# Set weight “2” for class “5”,
# making this class 2x more important
5: 2.,
6: 1., 7: 1., 8: 1., 9: 1.}
print(‘Fit with class weight’)
model.fit(x_train, y_train, class_weight=class_weight, batch_size=64, epochs=4)

## Here’s the same example using `sample_weight` instead:

sample_weight = np.ones(shape=(len(y_train), ))
sample_weight[y_train == 5] = 2.
print(‘\nFit with sample weight’)

model.fit(
x_train,
y_train,
sample_weight=sample_weight,
batch_size=64,
epochs=4,
)

sample_weight = np.ones(shape=(len(y_train), ))
sample_weight[y_train == 5] = 2.

## (3rd element in the return tuple).

train_dataset = tf.data.Dataset.from_tensor_slices((
x_train,
y_train,
sample_weight,
))

## Shuffle and slice the dataset.

train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

model.fit(train_dataset, epochs=3)

callbacks = [
keras.callbacks.EarlyStopping(
# Stop training when `val_loss`
is no longer improving
monitor=’val_loss’,
# “no longer improving” being defined as “no better than 1e-2 less”
min_delta=1e-2,
# “no longer improving” being further defined as “for at least 2 epochs”
patience=2,
verbose=1,
)
]

model.fit(
x_train,
y_train,
epochs=20,
batch_size=64,
callbacks=callbacks,
validation_split=0.2,
)

class LossHistory(keras.callbacks.Callback):
def on_train_begin(self, logs):
self.losses = []

```def on_batch_end(self, batch, logs):
self.losses.append(logs.get('loss'))```

from tensorflow import keras
from tensorflow.keras import layers

image_input = keras.Input(shape=(32, 32, 3), name=’img_input’)
timeseries_input = keras.Input(shape=(None, 10), name=’ts_input’)

x1 = layers.Conv2D(3, 3)(image_input)
x1 = layers.GlobalMaxPooling2D()(x1)

x2 = layers.Conv1D(3, 3)(timeseries_input)
x2 = layers.GlobalMaxPooling1D()(x2)

x = layers.concatenate([x1, x2])

score_output = layers.Dense(1, name=’score_output’)(x)
class_output = layers.Dense(5, name=’class_output’)(x)

model = keras.Model(
inputs=[image_input, timeseries_input],
outputs=[score_output, class_output],
)

model.compile(
optimizer=keras.optimizers.RMSprop(1e-3),
loss=[
keras.losses.MeanSquaredError(),
keras.losses.CategoricalCrossentropy(from_logits=True)
],
loss_weights=[1, 1],
)

model.compile(
optimizer=keras.optimizers.RMSprop(1e-3),
loss=[
keras.losses.MeanSquaredError(),
keras.losses.CategoricalCrossentropy(from_logits=True),
],
metrics=[
[
keras.metrics.MeanAbsolutePercentageError(),
keras.metrics.MeanAbsoluteError()
],
[keras.metrics.CategoricalAccuracy()],
],
)

model.compile(
optimizer=keras.optimizers.RMSprop(1e-3),
loss={
‘score_output’: keras.losses.MeanSquaredError(),
‘class_output’: keras.losses.CategoricalCrossentropy(from_logits=True),
},
metrics={
‘score_output’: [
keras.metrics.MeanAbsolutePercentageError(),
keras.metrics.MeanAbsoluteError()
],
‘class_output’: [
keras.metrics.CategoricalAccuracy(),
]
},
)

model.compile(
optimizer=keras.optimizers.RMSprop(1e-3),
loss=[
None,
keras.losses.CategoricalCrossentropy(from_logits=True),
],
)

## Or dict loss version

model.compile(
optimizer=keras.optimizers.RMSprop(1e-3),
loss={
‘class_output’: keras.losses.CategoricalCrossentropy(from_logits=True),
},
)

numpy 类型数据示例代码如下：

## Generate dummy Numpy data

img_data = np.random.random_sample(size=(100, 32, 32, 3))
ts_data = np.random.random_sample(size=(100, 20, 10))
score_targets = np.random.random_sample(size=(100, 1))
class_targets = np.random.random_sample(size=(100, 5))

## Fit on lists

model.fit(
x=[img_data, ts_data],
y=[score_targets, class_targets],
batch_size=32,
epochs=3,
)

## Alternatively, fit on dicts

model.fit(
x={
‘img_input’: img_data,
‘ts_input’: ts_data,
},
y={
‘score_output’: score_targets,
‘class_output’: class_targets,
},
batch_size=32,
epochs=3,
)

dataset 类型数据示例代码如下：

## Generate dummy dataset data from numpy

train_dataset = tf.data.Dataset.from_tensor_slices((
(img_data, ts_data),
(score_targets, class_targets),
))

## Alternatively generate with dict

train_dataset = tf.data.Dataset.from_tensor_slices((
{
‘img_input’: img_data,
‘ts_input’: ts_data,
},
{
‘score_output’: score_targets,
‘class_output’: class_targets,
},
))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

model.fit(train_dataset, epochs=3)

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

## Get the model.

inputs = keras.Input(shape=(784, ), name=’digits’)
x = layers.Dense(64, activation=’relu’, name=’dense_1′)(inputs)
x = layers.Dense(64, activation=’relu’, name=’dense_2′)(x)
outputs = layers.Dense(10, name=’predictions’)(x)
model = keras.Model(inputs=inputs, outputs=outputs)

## Instantiate an optimizer.

optimizer = keras.optimizers.SGD(learning_rate=1e-3)

## Instantiate a loss function.

loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

## Prepare the metrics.

train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

## Prepare the training dataset.

batch_size = 64
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

## Prepare the validation dataset.

val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(64)

epochs = 3
for epoch in range(epochs):
print(‘Start of epoch %d’ % (epoch, ))

```# Iterate over the batches of the dataset.
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
# Open a GradientTape to record the operations run
# during the forward pass, which enables autodifferentiation.
# Run the forward pass of the layer.
# The operations that the layer applies
# to its inputs are going to be recorded
logits = model(x_batch_train,
training=True)  # Logits for this minibatch
# Compute the loss value for this minibatch.
loss_value = loss_fn(y_batch_train, logits)
# Use the gradient tape to automatically retrieve
# the gradients of the trainable variables with respect to the loss.
# Run one step of gradient descent by updating
# the value of the variables to minimize the loss.
# Update training metric.
train_acc_metric(y_batch_train, logits)
# Log every 200 batches.
if step % 200 == 0:
print('Training loss (for one batch) at step %s: %s' %
(step, float(loss_value)))
print('Seen so far: %s samples' % ((step + 1) * 64))
# Display metrics at the end of each epoch.
train_acc = train_acc_metric.result()
print('Training acc over epoch: %s' % (float(train_acc), ))
# Reset training metrics at the end of each epoch
train_acc_metric.reset_states()
# Run a validation loop at the end of each epoch.
for x_batch_val, y_batch_val in val_dataset:
val_logits = model(x_batch_val)
# Update val metrics
val_acc_metric(y_batch_val, val_logits)
val_acc = val_acc_metric.result()
val_acc_metric.reset_states()
print('Validation acc: %s' % (float(val_acc), ))```

logits = model(x_batch_train)
loss_value = loss_fn(y_batch_train, logits)

```# Add extra losses created during this forward pass:
loss_value += sum(model.losses)```