Press "Enter" to skip to content

使用Pytorch快速的入手训练模型,包含整个训练步骤,包括序列化和反序列化

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

写在最前面:
本次博客不涉及模型原理的解释,可以看作是一个纯工程性的一次实验。之前看了很多论文模型中的代码,我只是不求甚解,把大概的流程理解了就放下了。本次实验就是为了仔细的体会其中的细节。

 

大家都知道,pytorch已经将底层的代码封装的很好的,我们只需要写很少的代码就能跑一个模型。所以本次实验还有一个目的,让写的代码尽量能够复用。

 

1. SVHN数据集

 

在实验开始之前的第一步,就是选取数据集。我之前看到顶会论文中很多使用的是这个数据集,在这里我们也跟风一下,想要下载的小伙伴可以点击 这里 。这个数据集是一个关于数字彩色图像设别的数据集,可以理解为更加复杂的 Mnist 数据集。给大家展示一下它的复杂度。有些样本我都看不清楚,真不知道大佬些是怎幺干到90+的,可怕!

 

2. Dataset与DataLoader

 

这两个类是将数据集加载过程与预处理过程封装,让上层忽略底层实现细节。

 

Dataset:

 

import scipy.io as sio
from torch.utils.data import Dataset
from torch.utils.data.dataset import T_co
class SVHN(Dataset):
    def __init__(self, file_path) -> None:
        super().__init__()
        self.file_path = file_path
        data_mat = sio.loadmat(self.file_path)
        self.X = data_mat["X"]
        self.y = data_mat["y"]
    def __getitem__(self, index) -> T_co:
        return self.X[:, :, :, index], self.y[index]
    def __len__(self):
        return self.y.shape[0]

 

值得注意的是,我们需要重写父类 Dataset 的两个方法, __getitem____len____getitem__ 方法就是返回一个训练样本与标签, __len__ 方法是返回数据集的长度。

 

DataLoader:

 

dataLoader = DataLoader(dataset, batch_size=batchSize, shuffle=True)

 

有的同学看到这儿就会问了,Dataset不是已经有返回数据的接口了吗?为什幺还有包一层 DataLoader 呢?原因就是在网络训练的过程中,样本不是一个一个输入的,而是一个Batch一个Batch的输入。这里的Batch可以理解为是一个训练样本的集合(多个样本打包在一起)。DataLoader还有很多可选的参数,在这里就不详细介绍了,感兴趣的同学可以去查阅pytoch的 API文档

 

3. ResNet Model

 

在这里就不自己写模型结构了,pytorch有官方的实现,我们这里偷一下懒。

 

from torchvision import models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet18 = models.resnet18()
# 修改全连接层的输出
num_ftrs = resnet18.fc.in_features
# 十分类,将输出层修改成10
resnet18.fc = nn.Linear(num_ftrs, 10)
# 模型参数放大GPU上,加快训练速度
resnet18 = resnet18.to(device)

 

4. 训练

 

这部分其实才是本次主要的工作量。这其中充斥着大量的模板代码,几乎每个模型都会用上。这部分主要是计算损失,反向传播,优化器。其中优化器就优化反向传播的。比较无奈的是,这部分也已经有实现了,直接用就是了,非常的方便。

 

def train(model, dataLoader, optimizer, lossFunc, n_epoch):
    start_time = time.time()
    test_best_loss = float('inf')
    last_improve = 0  # 记录上次验证集loss下降的batch数
    flag = False  # 记录是否很久没有效果提升
    total_batch = 0  # 记录进行到多少batch
    writer = SummaryWriter(log_dir=log_path + '/' + time.strftime('%m-%d_%H.%M', time.localtime()))
    for epoch in range(n_epoch):
        print('Epoch [{}/{}]'.format(epoch + 1, n_epoch))
        model.train()
        sum_loss = 0.0
        correct = 0.0
        total = 0.0
        for batch_idx, dataset in enumerate(dataLoader):
            length = len(dataLoader)
            optimizer.zero_grad()
            data, labelOrg = dataset
            data = data.to(device)
            label = F.one_hot(labelOrg.to(torch.long), 10).to(torch.float).to(device)
            predict = model(data)
            loss = lossFunc(predict, label)
            loss.backward()
            optimizer.step()
            # Tensor.item() 类型转换,返回一个数
            sum_loss += loss.item()
            # maxIdx, maxVal = torch.max
            _, predicted = torch.max(predict.data, dim=1)
            total += label.size(0)
            correct += predicted.cpu().eq(labelOrg.data).sum()
            # 注意这里是以一个batch为一个单位
            print("[epoch:%d, iter:%d] Loss: %.03f | Acc: %.3f%% "
                  % (epoch + 1, (batch_idx + 1 + epoch * length), sum_loss / (batch_idx + 1), 100. * correct / total))
            # 每一百个batch计算模型再测试集或者验证集的正确率
            if total_batch % 100 == 0:
                testDataLoss, testDataAcc = evalTestAcc(model)
                time_dif = get_time_dif(start_time)
                if testDataLoss < test_best_loss:
                    test_best_loss = testDataLoss
                    torch.save(model.state_dict(), save_path)
                    improve = '*'
                    last_improve = total_batch
                else:
                    improve = ''
                msg = 'Iter: {0:>6},  Train Loss: {1:>5.2},  Train Acc: {2:>6.2%},  Test Loss: {3:>5.2},  Test Acc: {4:>6.2%},  Time: {5} {6}'
                print(msg.format(total_batch, sum_loss / (batch_idx + 1), correct / total, testDataLoss, testDataAcc, time_dif, improve))
                writer.add_scalar("loss/train", loss.item(), total_batch)
                writer.add_scalar("loss/dev", testDataLoss, total_batch)
                writer.add_scalar("acc/train", correct / total, total_batch)
                writer.add_scalar("acc/dev", testDataAcc, total_batch)
            # 提供训练程序的两个出口: n_epoch, require_improvement个batch没有提升
            total_batch += 1
            model.train()
            if total_batch - last_improve > require_improvement:
                # 验证集loss超过1000batch没下降,结束训练
                print("No optimization for a long time, auto-stopping...")
                flag = True
                break
        if flag:
            break
    writer.close()

 

def evalTestAcc(net):
    net.eval()
    totalAcc = 0.0
    sumLoss = 0.0
    total = 0.0
    with torch.no_grad():
        for idx, dataset in enumerate(testDataLoader):
            data, labelOrg = dataset
            predict = net(data.to(device))
            _, predicted = torch.max(predict.data, dim=1)
            totalAcc += predicted.cpu().eq(labelOrg).sum()
            label = F.one_hot(labelOrg.to(torch.long), 10).to(torch.float).to(device)
            sumLoss += lossFunc(predict, label).item()
            total += label.size(0)
    return sumLoss / len(testDataLoader), totalAcc / total

 

看了一下,感觉没什幺讲的,几乎都是模板代码,放在任何一个模型中都可以使用。值得注意的是,在本次实验中没有区分测试集与验证集,可以理解为没有测试集,实验中的testDataset被用作是验证集,调整训练参数了。

 

5. 调用

 

if __name__ == '__main__':
    # filePath = r"E:\dataset\SVHN\train_32x32.mat"
    save_path = r"model_save/net.pt"
    log_path = r"logs"
    require_improvement = 1000
    batchSize = 256
    n_epoch = 10
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    resnet18 = models.resnet18()
    # 修改全连接层的输出
    num_ftrs = resnet18.fc.in_features
    resnet18.fc = nn.Linear(num_ftrs, 10)
    resnet18 = resnet18.to(device)
    # SVHNTrainData = SVHN(filePath)
    train_dataset = torchvision.datasets.SVHN(
        root=r'E:\dataset\SVHN',
        split='train',
        download=False,
        transform=torchvision.transforms.ToTensor()
    )
    test_dataset = torchvision.datasets.SVHN(
        root=r'E:\dataset\SVHN',
        split='test',
        download=False,
        transform=torchvision.transforms.ToTensor()
    )
    dataLoader = DataLoader(train_dataset, batch_size=batchSize, shuffle=True)
    testDataLoader = DataLoader(test_dataset, batch_size=batchSize, shuffle=True)
    optimizer = optim.SGD(resnet18.parameters(), lr=0.01, momentum=0.9)
    lossFunc = nn.CrossEntropyLoss()
    train(resnet18, dataLoader, optimizer, lossFunc, n_epoch)

 

这里把所有的内容串起来了。在运行完成后,在当前目录会产生于一个logs文件夹, 大家可以运行 tensorboard --logdir 文件夹地址 ,就可以看到如下图所示,记录训练过程中,损失与准确率在测试集与验证集上的变化曲线。

 

6. 序列化与反序列化

 

序列化与反序列化,我们可以理解为保存于加载。我们的模型训练好之后,就可以直接进行预测任务,这时候就不会在反向传播更新模型参数了。

 

参考,这篇博客讲得太清楚了,几乎包括了所有的内容,我都不想在讲了。我这里就记录一下我的反序列化过程吧。

 

import random
import numpy as np
import torch
import torchvision
from matplotlib import pyplot as plt
from torch import nn
from torch.utils.data import DataLoader
from torchvision import models
if __name__ == '__main__':
    path = r"model_save/net.pt"
    batchSize = 256
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    resnet18 = models.resnet18()
    # 修改全连接层的输出
    num_ftrs = resnet18.fc.in_features
    resnet18.fc = nn.Linear(num_ftrs, 10)
    # resnet18 = resnet18.to(device)
    resnet18.load_state_dict(torch.load(path, map_location=torch.device("cpu")))
    resnet18.eval()
    test_dataset = torchvision.datasets.SVHN(
        root=r'E:\dataset\SVHN',
        split='test',
        download=False,
        transform=torchvision.transforms.ToTensor()
    )
    testDataLoader = DataLoader(test_dataset, batch_size=batchSize, shuffle=True)
    trains, labels = iter(testDataLoader).__next__()
    predicts = resnet18(trains)
    # 其实可以只用预测一个样本,而不是一个batch
    # resnet18(trains[0].unsqueeze(0))
    _, predictLabels = torch.max(predicts, dim=1)
    fig, axs = plt.subplots(1, 5, figsize=(10, 10))  # 建立子图
    print("predictLabels: {}".format(predictLabels))
    print("labels: {}".format(labels))
    print("Acc: {:.2f}".format(predictLabels.data.eq(labels).sum() / labels.shape[0]))
    for i in range(5):
        num = random.randint(0, batchSize)  # 首先选取随机数,随机选取五次
        npimg, nplabel = trains[num], labels[num]
        axs[i].imshow(np.transpose(npimg, (1, 2, 0)))
        axs[i].set_title("GroundTruth: {}, Predict: {}".format(nplabel, predictLabels[num]))  # 给每个子图加上标签
        axs[i].axis("off")  # 消除每个子图的坐标轴
    plt.show()

 

Be First to Comment

发表评论

您的电子邮箱地址不会被公开。