Press "Enter" to skip to content

搭建自己的以图搜图系统(二):深入优化搭建生产级别的图搜系统

概述

 

本文是“搭建自己的以图搜图系统”系列的第二篇,在 第一篇 内容中我们了解了如何利用“机器学习框架 Towhee ¹”和“向量数据库 Milvus ²”快速搭建一个以图搜图的服务原型。那幺,如何搭建一个生产级别的服务呢?在真实的业务场景中,我们常常碰到这些技术难题:

海量数据的情况下系统延迟高,硬件资源成为瓶颈。
系统在一种数据集下召回效果不错,但在另一种数据集下召回效果却很差,想要尝试多种模型或者自己训练,但上手成本都很高。
系统容错率低,当我们批量处理图片时,如果存在坏数据系统很容易崩溃。

接下来本文会从性能、模型和业务流程方面讨论如何解决这些痛点,从而优化我们的以图搜图系统,最后会介绍如何使用 FastAPI 实现简单高效的 Web 服务。

 

本文中的相关代码已上传到 GitHub,欢迎大家参考和使用: https://github.com/towhee-io/…

 

性能优化

 

要想提升性能,“堆机器”无疑是最便捷的方式,但在有限的资源下,我们如何充分发挥算力优势呢?一般情况下,我们会采取下面几种方案: 并行处理 ,充分发挥资源性能; 数据降维 ,降低计算复杂度; 向量索引 ,使用近邻搜索的算法加速向量检索。

 

下文中的代码基本上都来自于以图搜图系列的第一篇内容,如果你想了解更详细的内容,可以移步: 《搭建自己的以图搜图系统(一):10 行代码搞定以图搜图》

 

并行处理

 

基于 Towhee 的以图搜图 AI 流水线支持使用并行执行的方式,来提升性能,在下面的代码中,我们只需要简单地调用 set_parallel 方法,就可以并发处理数据。下面的例子演示了如何并行处理图片数据:

 

import towhee
dc = ( 
    towhee.read_csv('reverse_image_search.csv')
     .runas_op['id', 'id'](func=lambda x: int(x))
     .set_parallel(3)  #3并发处理数据
     .image_decode['path', 'img']()
     .image_embedding.timm['img', 'vec'](model_name='resnet50')
     .tensor_normalize['vec', 'vec']()
     .to_milvus['id', 'vec'](collection=collection, batch=100)
)

 

在上面的例子中,我们设置了 3 个并发来处理数据,我们将会得到 2~3 倍的性能提升。

 

数据降维

 

计算高维向量的硬件成本很高,举个例子, ResNet50 模型生成的 Embedding 向量维度是 2048,如果图像数据规模为一亿,那幺内存占用大约是 768 GB(4 Bytes 2048 100000000),我们也遇到了不少企业,他们的数据量级都在百亿级别,那幺就需要约 76800 GB(75 TB) 内存,那幺针对向量数据进行降维就很有必要了,因为数据的计算量降低,性能会有提升。降维的方法有很多种,如 PCA、SVD 和 UMAP 等,我们以最简单的随机投影为例,在图片入库之前将向量维度从 2048 维降低到 512 维:

 

import numpy as np
projection_matrix = np.random.normal(scale=1.0, size=(2048, 512))
def dim_reduce(vec):
    return np.dot(vec, projection_matrix)
dc = ( 
    towhee.read_csv('reverse_image_search.csv')
        .runas_op['id', 'id'](func=lambda x: int(x))
        .image_decode['path', 'img']()
        .image_embedding.timm['img', 'vec'](model_name='resnet50')
        .runas_op['vec','vec'](func=dim_reduce)  #向量数据降维
        .tensor_normalize['vec', 'vec']()
        .to_milvus['id', 'vec'](collection=collection, batch=100)
)

 

随机投影是欧几里得空间中向量的降维方法,这种方法速度快且无需训练,示例中 dim_reduce 函数实现了该方法,并通过 runas_op 算子将其应用到 ResNet50 模型提取的向量数据中。

向量索引
在图片检索阶段,我们可以使用面向 AI 的向量数据库 Milvus 对大规模的向量数据进行相似度检索,Milvus 支持多种 ANN 索引³用于加速,包括基于量化的索引, 基于图的索引和基于树的索引等。我们可以创建合适的索引用于近邻搜索,值得一提的是 IVF_SQ8 索引,它不光通过聚类支持快速查找,还可以压缩数据,减少 70-75% 的内存。
使用 GPU 加速
不得不说模型加速最好的方法就是指定 GPU(前提是要有),当我们使用 Towhee 算子中的 AI 模型进行特征向量提取,Towhee 框架将会自动根据 GPU 是否可用,来启用 GPU 进行数据处理,也就是说当 cuda.is_aviliable=True 就会使用 GPU 。

模型优化

 

随着 AI 技术的不断发展,CV(Computer Vision) 领域出现了越来越多的算法模型,从 VGG 到 ResNet 再到 Transformer,模型不断升级,那幺哪一个模型才是最适合我们的呢?实践是检验真理的唯一标准,最简单的方式是在自己的数据集下试用各种预训练好的模型选最优,除此之外,我们也可以自己训练模型。

 

模型选型

 

Towhee 的 image-embedding ⁴算子涵盖了市面上主流的各种模型,通过修改算子参数,可以轻松调用任何模型,而无需额外的折腾。此外 Towhee 还提供关于 Recall、HR 和 mAP 等指标的计算和报告,我们可以基于自己的数据集来对比不同模型的指标结果,从而帮助选择最优的模型。

 

例如我们指定三个模型 (VGG16, resnet50 和efficientnet-b2) 进行测试并对比,先将图像数据集入库,然后搜索测试图片并返回搜索结果的准确率报告:

 

model_dim = {  #模型与生成向量维度的字典
    'vgg16': 4096,
    'resnet50': 2048,
    'tf_efficientnet_b2': 1408
}
for model in model_dim:
    collection = create_milvus_collection(model, model_dim[model])
    dc = (towhee.read_csv('reverse_image_search.csv')
            .runas_op['id', 'id'](func=lambda x: int(x))
            .image_decode['path', 'img']()
            .image_embedding.timm['img', 'vec'](model_name=model)
            .tensor_normalize['vec', 'vec']()
            .to_milvus['id', 'vec'](collection=collection, batch=100)
    )  #图像数据入库
    (towhee.glob['path']('./test/*/*.JPEG')
         .image_decode['path', 'img']()
         .image_embedding.timm['img', 'vec'](model_name=model)
         .tensor_normalize['vec', 'vec']()
         .milvus_search['vec', 'result'](collection=collection, limit=10)
         .runas_op['path', 'ground_truth'](func=ground_truth)                #获取测试数据的 ground truth
         .runas_op['result', 'result'](func=lambda res: [x.id for x in res]) #获取搜索结果的 id
         .with_metrics(['mean_hit_ratio', 'mean_average_precision'])         #指定 HR 和 mAP 两个指标
         .evaluate['ground_truth', 'result'](model)                          #将结果 id 和 ground truth 比较
         .report()
     )  #检索图像并返回指标报告

 

当然,也可以用使用自己的模型而不是 Towhee 的内置算子,下面以使用 Transformer 模型为例,首先定义 vit_embedding 函数用于提取特征向量,然后通过 runas_op 应用此函数,最后和上面的代码类似,用于计算指定的指标。

 

feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-large-patch32-384')
model = ViTModel.from_pretrained('google/vit-large-patch32-384')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
def vit_embedding(img):
    img = to_image_color(img, 'RGB')
    inputs = feature_extractor(img, return_tensors="pt")
    outputs = model(inputs['pixel_values'].to(device))
    return outputs.pooler_output.detach().cpu().numpy().flatten()
collection = create_milvus_collection('huggingface_vit', 1024)
dc = (towhee.read_csv('reverse_image_search.csv')
        .runas_op['id', 'id'](func=lambda x: int(x))
        .image_decode['path', 'img']()
        .image_embedding.timm['img', 'vec'](model_name=model)
        .tensor_normalize['vec', 'vec']()
        .to_milvus['id', 'vec'](collection=collection, batch=100)
)
(towhee.glob['path']('./test/*/*.JPEG')
     .image_decode['path', 'img']()
     .image_embedding.timm['img', 'vec'](model_name=model)
     .tensor_normalize['vec', 'vec']()
     .milvus_search['vec', 'result'](collection=collection, limit=10)
     .runas_op['path', 'ground_truth'](func=ground_truth)
     .runas_op['result', 'result'](func=lambda res: [x.id for x in res])
     .with_metrics(['mean_hit_ratio', 'mean_average_precision'])
     .evaluate['ground_truth', 'result'](model)
     .report()
 )

 

以上四个模型的准确率情况如下图所示,可见在本文数据集中 ViT-large 模型的准确度更高。

 

 

模型训练

 

除了使用这些预训练好的模型,我们还可以基于自己的数据集进行模型训练,Towhee 也提供了模型训练接口,你可以尝试训练任意的模型算子。

 

流程优化

 

在稳定性方面,我们希望系统既健壮又可靠,不至于碰到异常就崩溃;在业务方面,我们希望可以定制化流水线,不同的流水线用于处理不同的业务。

 

异常处理

 

当以图搜图系统处理大规模数据时,如果其中存在损坏的图像或格式错误的图像,这很可能导致程序中断,但我们又很难清理掉所有出现的坏数据,该怎幺办呢? Towhee 提供 exception_safe 接口能够确保出现异常时继续执行。

 

(towhee.glob['path']('./exception/*.JPEG')
    .exception_safe() #异常安全
    .image_decode['path', 'img']()
    .image_embedding.timm['img', 'vec'](model_name='resnet50')
    .tensor_normalize['vec', 'vec']()
    .milvus_search['vec', 'result'](collection=resnet_collection, limit=5)
    .runas_op['result', 'result_img'](func=read_images)
    .drop_empty()    #清除坏数据
    .select['img', 'result_img']()
    .show()
)

 

上面的例子中,我们使用了四张图片作为输入数据,其中包含一张损坏的图片,由于我们在流水线中加入了 exception_safe ,所以程序不会发生中断,仅仅是打印了错误信息。

 

 

增加目标检测

 

根据不同的业务场景我们可以定制不同的流水线,比如在商品推荐场景中,我们更关注图像包含的商品,那幺可以在流水线中加上 Towhee 的目标检测⁵算子,用于检测图像中的商品。代码如下所示,具体原理是先利用 get_object 返回图像所有目标中面积最大的物体(没有目标将返回图像本身),然后针对找到的目标物体进行特征提取。

 

def get_object(img, boxes):
    if len(boxes) == 0:
        return img
    max_area = 0
    for box in boxes:
        x1, y1, x2, y2 = box
        area = (x2-x1)*(y2-y1)
        if area > max_area:
            max_area = area
            max_img = img[y1:y2,x1:x2,:]
    return max_img
(towhee.glob['path']('./object/*.jpg')
        .image_decode['path', 'img']()
        .object_detection.yolov5['img', ('boxes', 'class', 'score')]() #目标检测算子
        .runas_op[('img', 'boxes'), 'object'](func=get_object)
        .image_embedding.timm['object', 'object_vec'](model_name='resnet50')
        .tensor_normalize['object_vec', 'object_vec']()
        .milvus_search['object_vec', 'object_result'](collection=yolo_collection, limit=3)
        .runas_op['object_result', 'object_result_img'](func=read_images)
        .select['img', 'result_img', 'object_result_img']()
        .show()
)

 

在进行有无目标检测的结果对比之后,我们可以发现一个有意思的事情:在某些情况下,目标检测后的结果更优。以下图中每一行的结果为例,从左至右是我们输入的检索图片,和三张图搜的结果、进行目标检测后搜索的结果。第一行在加上目标检测后才能找到车相关的图片,否则都是蜘蛛,我们可以推测 ResNet 把待检索图片中的树枝理解成了蜘蛛,但是在 Towhee 流水线中应用上目标检测就可以解决这个问题。

 

 

FastAPI 部署

 

上一篇文章中我们介绍了利用 Gradio 部署图像搜索的服务,这次将展示如何利用 FastAPI ⁶来提供 Web 服务,FastAPI 是目前主流的基于 Python 的 Web 框架之一。我们先创建一个 FastAPI 实例 app ,然后将这个实例 app 绑定到 Towhee 的流水线中, app_search 提供了基于 FastAPI 的图像检索服务,最后我们设置好服务端口和地址,就能够得到一个在线服务了。

 

from fastapi import FastAPI
import uvicorn
import nest_asyncio
app = FastAPI()
with towhee.api['file']() as api:
    app_search = (
        api.image_load['file', 'img']()
        .image_embedding.timm['img', 'vec'](model_name='resnet50')
        .tensor_normalize['vec', 'vec']()
        .milvus_search['vec', 'result'](collection=milvus_collection)
        .runas_op['result', 'res_file'](func=lambda res: str([id_img[x.id] for x in res]))
        .select['res_file']()
        .serve('/search', app) #绑定流水线到 app,接口为 /search
    )
nest_asyncio.apply()
uvicorn.run(app=app, host='0.0.0.0', port=8000)

 

类似的,参考 towhee-io/examples 我们可以完成 /load/count 两个接口的创建,在所有工作就绪之后,我们就能够在浏览器中打开 http://0.0.0.0:8000/docs 来体验拥有更高性能的以图搜图服务应用了。

 

总结

 

相信在跟随本文耐心实践之后,你一定可以得到一个性能颇高、生产可用的“以图搜图系统”。当然,如果你愿意的话,也可以结合本文的例子,对其他的非结构化数据和项目(音视频)进行分析优化,原理是相通的。欢迎留言讨论,或者给我们的项目提出改进建议( https://github.com/towhee-io/… )。

 

下一篇文章中,我们将分析如何“打包 AI 流水线”,通过使用 Docker 来完成系统的快速搭建和完整数据迁移。

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注