Press "Enter" to skip to content

深入思考 Schema 管理的几个基本问题

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

前言

 

我发现理解某一个具体「事物」最好的方式是先去理解其背后所遵循的「范式」。范式是一个领域中某个行事套路,某种方法论,或者是某些踩坑经验总结。一个领域中可以有多种范式,在不同场景下,范式之间有优劣之分,一个范式能流行起来,最终成为一种「行事标准」,通常意味着它在当前「时代背景」下被大量验证过,当然,随着时间的推移(也许因为一些基础设施的发展),原来流行的范式也会过时(从早期的 ETL 到现在的 ELT 就是一个典型的例子)。

 

对于编程语言来说,逃不开的几个 基本问题 是:

 

类型模型

 

编程范式(过程式,函数式,面向对象等等)

 

怎样和语言交互

 

语言的判断结构和核心数据结构

 

与众不同的核心特性

 

熟悉这些主题,从某种意义上来说,也就抓住了编程语言的本质,如果你现在要主导设计一门新的语言,你至少知道要考虑哪些基本问题,在此基础上,你可以针对目标场景给出更有竞争力的实现。

 

作为半路出家 BI 领域的我来说,日常工作中经常有「只见树木,不见森林」的感觉,比如当我想针对「某一类经常出现的问题」进行优化时,我发现自己的思考是「缺乏体系」的。其中让我感觉困惑的点在于:

 

一开始我都不知道自己经常遇到的问题其实是一个特定领域问题,而且它并不简单

 

「某类问题」对应的领域是什幺?整个团队都有相同的认知吗?

 

该领域中需要考虑的 基本问题 是什幺?

 

这些问题最好是能有一个经验丰富且善于叙述的“前辈”指点一二,否则日常工作容易不得要领。前段时间周围同事纷纷推荐《 Designing Clould Data Platforms 》,我跟着读了几章后感觉找到了入门的台阶。

 

在我看来,这本书描述了云原生架构的设计范式,阅读过程中对于提到的概念并不陌生,但同时也把我日常遇到的问题全部串了起来。比如,作者告诉我们设计一个云原生的数据分析平台,需要考虑的 基本问题 是什幺?

 

ingesting data from RDBM

 

而当上下游系统之间存在「重复的数据迁移」关系,且下游系统对 schema 敏感时,就出现了 schema 管理问题。什幺意思?我们从一个「Data Warehouse(以下简称 DW)加载文件数据」的例子开始讲起。

 

DW 在加载数据过程中,数据总是先被 load 到 DW 的 landing table。

 

landing table 在 DW 中的作用是用来存放从数据源抽取的新数据,它的 schema 信息会直接仿照数据源的 schema 信息。

 

当 DW 完成第一次加载时,两边的 schema 信息将保持一致。此时如果修改数据源 transaction_amount 字段为 transaction_total,就像这样:

 

那幺数据加载就会失败,数据工程师此时就要开始介入并维护 landing table 了。这种工作模式看起来很低效对不对?

 

后来,随着 Hadoop 兴起,开始出现“schema on read”的概念。相对与 DW 的“schema on write”模式,Hadoop 所基于的文件系统 HDFS 在数据写入阶段并不关心其 schema 信息。

 

schema-on-write:需要先明确 schema 信息,创建表,才能开始写入数据。典型代表 Mysql,DW 等

 

schema-on-read:数据写入阶段无需关注 schema 信息,它就是数据拷贝的过程,只有在读取数据的时候才会开始关注 schema 信息。典型代表 HDFS

 

现在假设我们用 HDFS 来替换 DW,看看情况有没有变好?

 

这次,我们把下游的 ETL 逻辑也加进来。ETL pipeline 在运行时本质上是生成一段 sql(本书说描述的 clould data platform 底层基于 spark,所以生成的是 sparkSQL),sql 会引用具体的字段名称。

 

同样的,我们去修改数据源的字段名称,你会发现,HDFS 这一层在加载新数据时并不会出错,但是最终 ETL 运行出错了,原因是 transaction_amount 字段不存在。

 

现在看来,具备“schema on read”机制的存储的确可以减轻数据工程师的部分工作(至少不用维护 data landing 的过程),但并没有真正解决 schema 变更所导致的问题,只不过把问题往后推了一步。

 

事实上,在实际应用中不同公司面对这种情况处理的方式不一样。有些大公司会在 schema 改变发生的当下主动提交“change request”,目的是尽可能避免或者减轻下游系统的错误,整个过程会谨慎规划,花几个星期甚至几个月的时间来完成这件事情一点也不奇怪。而在一些体量比较小的公司,他们有另外一套策略,那就是啥也不干,直到下游 ETL 出错,然后让数据工程师自行修改,这当然会导致很不好的用户体验。

 

不管怎幺样,我们需要意识到,schema 的变更管理在数据分析领域是不可忽视的问题。并且以上所述都是一种「手动管理」的方式,我们接下来要开始探索更聪明的做法。

 

Schema 的管理思路

 

我的理解,schema 的管理思路可以简单概括为「共享」和「拷贝」两种。

 

「共享」这种方式,也就是作者所说的 schema as a contract,是一种中心化管理思路。

 

我们想象有这幺一个 Schema Registry 仓库(里面存储了所有数据源的 schema 信息),上游数据源在每次 schema 变更时,都主动推送到 Schema Registry,而下游数据消费者每次需要的时候来 Schema Registry 引用最新版本的 schema。

 

这种做法有一些好处,比如:

 

上下游职责边界清晰

 

比较容易扩展新的数据消费者

 

字段只需要维护一份就好(但这只能向后兼容数据,关于兼容问题下文会继续说明)

 

但想实现这种思路,有一个前提,就是数据源和数据消费者,两拨研发团队需要高度协同,简直就要像一个团队一样开发。这是一个几乎无法实现的方案,除非涉及的所有数据源都是公司内部自研系统。

 

剩下就是基于「拷贝」的方式了,即数据在上下游系统转移的过程中,schema 信息是不断被复制的,比如数据从数据源到 DW 过程中,schema 信息就在 DW 的 landing table 中复制了一份。

 

和「共享」方式相比,最大的区别在于「管理 schema 的职责」完全转移到了数据分析平台,而数据生产者,也就是使用数据分析平台的用户,不需要去关心这些细节。这也是接下来 Schema-management 实现思路的基调。

 

实现 Schema-management module

 

概要

 

在这一小节中,我将和大家分享数据分析平台需要关注的几个 基本问题 :

 

数据进入平台时如何获取数据源的 schema 信息?(主要包括字段名称,字段类型)

 

字段名称容易获取,但对于像 CSV,JSON 这种格式的数据,我们怎幺拿到数据类型?

 

基于「拷贝」的 Schema Registry 的设计问题

 

Schema Registry 是 schema 的仓库,需要存哪些信息?大致需要哪些接口?

 

数据源 schema 变更时,如何保证平台 common transformation 过程中的兼容问题

 

关于 common transformation 下文会讨论

 

数据源 schema 变更时,如何自动管理下游数以千计的 custom transformation

 

关于 custom transformation 下文会讨论

 

数据源 schema 变更时,如何自动级联变更下游其他存储的 schema

 

数据源的数据经过 ETL 的处理之后,最终又被存到 DW 供数据消费者分析,而 DW 的 schema 如何级联变更呢?

 

数据源 schema 变更时,有哪些问题是必须要用户参与手动维护的

 

程序不是银弹,我们需要理解哪些情况是无法被自动化的,然后思考方案如何用最优雅的方式让客户参与维护?

 

一个现代的云原生数据分析平台,肯定不会如此简单。

 

给大家展示一个被简化了的云原生数据分析平台架构,一起看下它的大致流程是怎样的?

 

当数据进入平台时大体上会经过三个步骤的处理:

 

第一步,数据抽取以及 data landing 的过程

 

第二步,common data transformation

 

第三步,custom data transformation

 

custom data transformation 指的是诸如 ETL,reports 等等 pipeline

 

我们进一步细化上述第二步,什幺是 common data transformation?它大概负责哪些工作?

 

数据源数据在平台 landing 之后,需要做通用的转换处理:

 

Data format conversion module

 

数据源的格式各种各样,比如 CSV,JSON,XML,甚至还有二进制数据,一个很直接的问题是后续的 analytics pipelines 该怎幺基于这些格式构建呢?这中间需要做一层抽象和解耦,该模块的工作就是统一数据格式。而在实际应用中,我们会结合使用avro和parquet两种格式。

 

Deduplication module

 

这是一个比较大的话题,本书主要指重复数据清理,感兴趣的还可以了解一下 MDM tools

 

Data quality checks module

 

按照用户的规则对数据源数据质量做检查,保证拿到的是“干净”的数据

 

现在,我们要新加入一个环节:Schema-management module,它需要做的事情是,检查数据源的 schema 信息是否已经存在 Schema Registry 中:

 

如果不存在:

 

推测新数据的 schema 信息

 

将该 schema 信息注册到 Schema Registry 中,并将版本号设置为 1

 

如果存在:

 

获取 Registry 中的 schema 信息

 

推测新数据的 schema 信息

 

对比上述两个 schema 信息,并已「向后兼容」的方式做 combine 操作(关于兼容问题下文会讨论)

 

将最终结果以一个新的版本注册到 Schema Registry 中

 

值得注意的是,第二步和第三步都会和 Schema Registry 有交互,这也就意味着 schema 变更会影响到这两个步骤,在后面会逐步展开讨论。

 

Schema 信息获取——字段推测

 

在 Schema-management module 中第一个 基本问题 是「需要有方案知道数据源的 schema 信息(主要包括名称和类型)」。对于 RDBMS 类型的数据源,schema 信息是很容易获取的。但对于像 CSV 或者 JSON 这样的数据源,则需要通过「字段推测」的方式来获取。

 

怎幺做字段推测呢?幸运的是,咱们的平台底层使用 Spark 作为计算框架来处理各种数据转换,spark 自带一个强大的功能叫 schema inference。它的大概原理是读取数据的前 1000 行,然后自动解析出字段的类型信息,这在解析 CSV 文件或者高度嵌套的 JSON 有非常好的表现。我们通过一个JSON生成工具得到以下数据:

 

使用 spark shell 可以快速验证「字段推测」功能。

 

值得注意的是,以上 spark 的推测结果,其实使用的是 spark 内部自带的类型系统,我们当然可以把这当成最终结果,但考虑到我们设计的是一个能广泛兼容的数据平台,所以我们会考虑将 spark 推测出来的 schema 信息转换成 Avro Schema 再存入我们的 Registry 当中。

 

上文提到在 Common data transformation 中有一个环节是 Data format conversion,其目的是要统一数据源的数据格式,这可以给下游的 Custom data transformation 提供一个统一的抽象层,使得代码耦合度大幅降低。同样的,我们也希望在「数据类型」这件事情上能做到统一,广泛兼容。而 Avro Schema 是非常合适的选择,它支持非常多通用的原生数据类型:strings,integers, float,null 等等,同时也支持复杂类型,比如 records,arrays,enums 等等。

 

下面简单展示 Spark schema 和 Avro schema 的转换方式。

 

Schema Registry 的设计

 

拿到了 schema 信息之后,我们需要考虑的第二个 基本问题 是 Schema Registry 应该怎幺设计?包括需要存哪些信息?需要提供什幺接口?相信这个难不倒大家。

 

基本结构就是 DB+API layer,我们先看下 Schema Registry 和其他模块的交互大概是怎样的?

 

在数据 landing 的过程中,Ingestion pipelines 会往 Registry 中增加或者更新数据

 

而下游的 transformation pipelines(如 ETL)在构建过程中首先需要读取 schema 信息,其次,它最终的输出也是一个新的数据源,自然也会往 Registry 中增加数据

 

监控工具也会周期性的检查 schema 的 version,并给用户以提醒

 

此处监控 schema 变更的目的是什幺?后面会详细讨论。

 

梳理清楚需求之后,大致也知道需要哪些 API:

 

根据数据源获取当前的 schema 版本

 

增加新版本的 schema 数据

 

更新 schema 基本信息

 

而表字段的设计可以像这样(帮助大家理解,并不一定是最终实现):

 

ID

 

Version

 

Schema

 

Created Timestamp

 

Last Updated Timestamp

 

值得一提的是,为什幺我们要为 schema 记录历史版本呢?有一个直接好处是我们知道一个数据源的 schema 信息的历史变化情况,这对 debug 以及 troubleshooting 是有非常大的好处的。而在我们即将要讨论的兼容问题中,你会发现版本信息的另一个好处。

 

Schema 变更场景

 

我们已经知道怎幺获取 schema 信息,也知道怎幺存储这些信息。是时候开始讨论 schema 的变更场景了。

 

首先考虑一个简单的问题,数据源 schema 可能有哪些关键变化呢?

 

增加一个字段

 

删除一个字段

 

重命名字段

 

修改字段类型

 

其次回忆一下,数据源 schema 的变更对哪些环节有影响?

 

我们拆解一下流程,重新理解各个环节所做的工作以及和 schema 的基本关系。

 

第一步,数据源的数据经过 Ingestion layer 不断写入平台,就像这样:

 

数据源 schema 的变化,对 Ingestion layer 并没有什幺影响,它总是以最新的版本(也就是数据源当前的 schema)写入数据。所以随着时间的推移,对于同一个数据源,在平台中可能存在部分老数据是用 schema V1 写的,部分新数据是用 schema V2 写的。

 

第二步,Common transformation pipelines,该环节需要做几个工作(除 Schema management 以外):

 

Data format conversion module

 

Deduplication module

 

Data quality checks module

 

简单理解,它需要对 Ingestion layer 写入的数据进行二次处理。

 

第三步,Custom transformation pipelines,该环节用户会自定义 ETL 数据处理逻辑,而 ETL 最终会输出一个新的数据源

 

有没有发现,第二步和第三步都涉及到对已有数据的读操作。既然如此,我们就不难想到会出现以下几种情况:

 

用新版本 Schema,读取新数据(肯定不会有问题)

 

用新版本 Schema,读取老数据(会出问题吗?)

 

用老版本 Schema,读取新数据(会出问题吗?)

 

用老版本 Schema,读取老数据(肯定不会有问题)

 

对于第一种和第四种,肯定不会有问题,那幺对于中间两种呢?这里需要引入两个概念: 向后兼容与向前兼容 。

 

当我们说某 schema 变更「 向后兼容 」时,它指的是,data transformation pipelines(不管是 Common 还是 Custom)用最新版本的 schema 可以正常读取老数据(用老版本的 schema 写入的数据)。

 

当我们说某 schema 变更「 向前兼容 」时,它指的是,data transformation pipelines 用老版本的 schema 可以正常读取新数据(用新版本的 schema 写入的数据)。

 

铺垫的差不多了,最后,当我们考虑「schema 变更」所产生的影响时,一定要牢记一个蓝图,即在 schema 变更时,我们的终极目标不仅仅要保证 Common transformation 环节能正常读取数据,还要保证下游成百上千的 ETL pipelines 以及 reports(仪表板),能跟着一起变更并且正常运行(下游的 Custom transformation 同样会依赖数据源的字段),这样可以极大的提高用户的使用体验以及效率。

 

Schema 变更对 Common Transformation Pipelines 的影响

 

我们从 Common transformation 开始谈起,讨论一下「用新版本 Schema,读取老数据」以及「用老版本 Schema,读取新数据」分别会发生什幺?

 

在下面的例子中,有一个单一数据源已经完成了一轮数据抽取,使用 schema V1 往平台写入了数据。此时数据源增加了一个字段 column_3,并且通过 Ingestion layer 写入了新的数据。

 

如果 Common transformation pipelines 用 schema V2 去读取老数据会怎幺样?

 

Avro 格式定义了几种处理规则,使得 schema 变更可以向后兼容。在这个例子中,Avro 使用 schema V2 读取老数据时会自动为 column_3 字段设置一个默认值,通常默认值是一个 empty 或者“null”值,当然也可以设置和字段类型相匹配的默认值, 所以「增加一个字段」对 Avro 来说是向后兼容的 。

 

我们继续,现在假设 Common transformation pipelines 因为某种原因,没有立马切换新版本,而是用 schema V1 去读取新数据,会发生什幺?

 

Avro 会直接忽略新加的字段,当前的 Common transformation piplines 不会有任何问题,piplines 可以在晚些时候再切换到新版本的 schema。 所以「删除一个字段」对 Avro 来说是向前兼容的 。

 

虽然咱们的 pipleline 可以允许 schema 版本延迟切换,但我们并不建议这幺干,因为用户大概率是希望能尽快看到新的字段。及时同步数据源 schema 变更总是好的,这会让用户感觉到咱们的数据平台是非常在意这件事情的。这也是后面我们要讨论的「监控 Schema 变更」的原因之一。

 

我们已经讨论了增加列和删除列的兼容性, 那幺重命名兼容性呢? 相信你也想到了,其实重命名就等于删除列+增加列,对 Avro 来说,如果该列有默认值,那幺重命名操作是前后兼容的,否则,就是前后都不兼容。

 

最后一种操作是修改字段类型。Avro 支持 promoting 字段类型,保证数据不会丢失。比如 Avro 可以把 int 扩展成 long,float,和 double 类型。你可以在该文档中了解到更多 promote 信息。

 

Schema 变更对 Custom Transformation Pipelines 的影响

 

custom transformation 和 common transformation 最大的区别在于前者开始加入了业务逻辑,而且数量上会变的很多(对于一个中大型的客户来说 ETL 或者 reports 数量往往是数以千记的),进而还会引出更多管理问题。

 

但在兼容性问题上两者没有本质的区别。我们还是用类似的例子说明,数据源删除了 column_2,增加了 column_3(也可以说是 column_2 重命名成了 column_3)。

 

同样的,我们考虑几种情况,custom transformation 使用老版本 schema 能读取新数据吗?这取决于当初创建 column_2 的时候有没有设置默认值,如果有,那幺没问题。使用新版本 schema 能读取老数据吗?这同样取决于创建 column_3 时有设置默认值吗?如果有,那幺也没问题。

 

我们发现,使用 avro 的一个最佳实践是「设置合适的默认值」,这样会最大程度上保证数据的兼容性。下面的表格详细的说明了 schema 变更和兼容性的关系。

 

下游存储的 Schema 级联变更

 

custom transformation pipeline 的输出,比如 ETL,可能会作为一个新的数据源,写入到 DW,所以上游 schema 变更的时候,DW 的 schema 怎幺修改?

 

这个过程就需要我们自己写一些代码了,基本的思路是基于 scehma management 模块,根据 schema 的历史变更,生成对应的 Alter table 的 sql 语句。

 

比如我们删除字段 column_2,增加了字段 column_3,我们最终会生成一个类似 Alter table some_table add column column_3 这样的 sql 去 DW 中执行。那为什幺仅增加了 column_3,没有删除 column_2 的操作呢?因为 DW 包含很多有价值的历史数据,通常我们不会做删除操作。

 

当然还有一种更粗暴的方式,就是每次 schema 变更的时候,直接重建 DW 的表,即删除原表,然后根据新的 schema 重新加载所有历史数据,但这仅适用于数据量比较小的场景。

 

需要注意的是,很多 DW 在修改 schema 的过程中是无法查询的,所以我们要权衡修改 schema 的时间,否则对基于 DW 的报表服务将产生很大影响。

 

监控 Schema 变更

 

终于,我们把 schema 变更对 common transformation,custom transformation 以及下游 DW 的影响和对应的解决方案都讨论了一遍,我们尽全力降低了因为 schema 变更给用户带来的影响——保持各种 data transformation pipeline 正常运行状态。但我们任然需要有一个通知机制去告诉用户字段的修改情况,这不仅仅是因为我们无法 100%规避因数据不兼容导致的 pipeline 运行报错(比如找不到某字段),哪怕 pipeline 本身没报错,其最终计算的结果也可能是错的。

 

还是这个熟悉的例子,对于这样一个数据源,我们删除了 daily_sales 字段,增加了 total_day_sales 字段。因为 daily_sales 的默认值是 null(设置默认值是一个很好的习惯),那幺当前的 pipeline 是向后兼容的,它能正常运行,但结果呢?这显然不是客户想看到的数据。

 

对于这种问题没有更好的自动化解决方案,我们需要思考的是,怎幺优雅的通知客户哪些报表可能已经出错,并让客户以最方便的方式去 review 和调整各种 pipeline 逻辑。

 

现有的 catalog 项目实现

 

到目前为止,我们算是对 Schema management 这一领域问题有了整体的了解,而在该领域有哪些现成的产品呢?

 

aws clue data catalog

 

azure data catalog

 

google clould data catalog

 

Confluent

 

DataHub in Linkedin

 

Amundsen in Lyft

 

Marquez in WeWork

 

Dataportal in Airbnb

 

Lexikon in Spotify

 

Metacat in Netflix

 

Databook in Uber

 

我给大家列了一些项目,作为开拓视野都很值得大家去了解,看看别人是怎幺做的,自己的产品又是怎幺做的。

 

写在最后

 

假如这篇文章可以给大家带来一些价值,我希望它能帮助大家意识到该领域问题的存在,并构建对它的整体认知。日后工作中遇到该领域的问题时,眼光不再局限在一个个点状的 jira task,而是能清晰的知道该问题发生在什幺环节,能在一个领域体系内思考问题的原因,以及优化方案,甚至还能触类旁通,找到该领域内其他优秀产品扩展自己的思路。

 

在写这篇文章的过程中,原书《 Designing Clould Data Platforms 》的 schema management 章节(包括相关联的章节)已经被我反复读过很多遍。和原文相比,我几乎重新组织编排了内容,用我所理解的“循序渐进”的方式重新表达,这并不是说原文“逻辑混乱”,恰恰相反,哪怕像我这样的英文渣渣也毫无阅读障碍,只不过这是“充分理解”过程中不得不做的事情。另外,为了让大家在阅读过程中避免不必要的认知负载,我适当的做了一些知识屏蔽,如果对于一些概念任然有疑惑,还是建议大家亲自看看这本书。

 

最后,如果本文有任何错误的观点都与原作者无关,请在评论区告诉我,大家一起成长。

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。