Press "Enter" to skip to content

ETL 之初识 Datax

一、 背景

 

在没有接触Datax之前,公司绝大部分的数据都是使用kettle来进行ETL。 Kettle是一款采用纯JAVA实现的开源ETL工具,提供了一系列的组件来用于完成抽取、转换、加载的工作。kettle提供了2种脚本,job和t ransformation,job用来对整个任务进行控制, t ransformation用来执行一些数据的转换

 

这里简单罗列一下kettle的优点:

 

1.可视化界面。一个可视化界面足以成为选择kettle的首要原因。毕竟,可视化意味着更方便人的使用。

 

2.元数据库。元数据库用来保存kettle任务的元信息,方便管理任务,通常叫做资源库( repository )。

 

3.自带工作流并且支持增量抽取。

 

4.可以配置成一套逻辑。例如:抽取数据时,目标表不存在则插入,存在则更新,而目标表中存在并且数据源中不存在的,可以删除,just like this

 

 

当然,工作流可以配的更加复杂,比如kettle的job中可以嵌套job,job中嵌套t ransformation, t ransformation中嵌套job等,可以满足非常复杂的任务场景。

 

二、 痛点

 

然而,事情并不像我们想象的那幺美好,在kettle的实际的使用过程中,我们渐渐发现了一些kettle的槽点:

 

1.全量抽取较大数据量时,抽取时间长。

 

2.往hdfs导数据出现漏导的情况,造成数据不一致。

 

3.无法感知namenode的切换,当Hadoop集群重启时,一旦namenode发生切换,就可能造成kettle任务的失败,因为kettle的hdfs地址是在配置文件中配置的。

 

4.kettle往Greenplum中导数据,速度非常慢(40条每秒)。

 

当然,kettle提供了另一种方式:gploader方式来向GP中导数据,应该可以解决往GP导数据慢的问题(不过我们并没有尝试过这种方式,因为已经遇到了datax了)

 

自从我们开始搭建以Greenplum为核心的数据仓库之后,寻找一种高效的ETL工具来代替kettle,已经成为了一种共识

 

三、 Datax简介

 

首先提供Datax的官方git地址:https://github.com/alibaba/DataX

 

Datax是阿里开源的一款离线数据同步工具。起初服务于taobao平台。由于淘宝内部的数据源多种多样,在进行数据的导入导出过程中,就需要编写针对不同数据源和目标源之间的脚本。

 

写得多了,发现不同的脚本之间有相同之处,且脚本多了维护成本也增加了。于是便将不同源之间的数据同步做了抽象。以Datax本身作为框架,将数据的导入抽象成为reader插件,

 

将数据的导出抽象称为writer插件。

 

官方提供的datax框架图:

 

 

最终把不同数据源和目标源组成的网状结构,变成了星型结构:

 

 

四、部署和使用

 

部署:

 

环境要求:linux(windows也可以)、jdk1.8级以上、python2.x

 

部署时有如下两种方法:

 

1.使用官方编译好的工具包(datax.tar.gz),解压即用

 

2. 下载Datax源码,使用Maven进行编译。编译时间会有点长。

 

使用:

 

Datax使用一个json文件来描述一个job,通过配置一个json格式的文件,使用python命令就可以启动Datax执行任务。当然,要配置json,一开始最好按照官方的git中的配置文档来配

 

我们可以通过以下命令来获取一个json模版:

 

python  datax.py  -r {YOUR_READER}  -w {YOUR_WRITER}

 

举个例子:

 

可以得到以下的模版:

 

模版较长,这里截取了源码中的json结构来展示

 

 

当我们参考官方文档,配置了相关参数项之后,就可以运行job了,这里以官方提供的模版job来做示例:

 

 

这里主要是分享一个我们曾经踩过的坑:

 

当时的情况是我想测试一下从mysql抽取数据到sql server,于是我在自己本机上安装了sql server 2005。

 

这个job本身配置并不复杂,reader插件和writer插件基本就是配置连接数据库使用的 jdbcUrl、username、password、table、column。

 

然而,数据抽取过程中遇到了问题,导致部分数据插入失败,错误日志的大概意思是sql server 的 数据库日志文件满了 ,提示: 删除不需要的文件、删除文件组中的对象、将其他文件添加到文件组或为文件组中的现有文件启用自动增长,以便增加可用磁盘空间。

 

尴尬的是:当整个job跑完之后,居然显示任务成功了

 

 

最终,我将sql server 换成2008版本的就没有出现这个问题了,但已经暴露出了我们的job的配置其实是有问题的。 所以,这里强烈建议在setting中配置另外一个配置项:errorlimit,用于出现脏数据时的自定义监控和处理:

 

 

五、整合Datax遇到的挑战

 

在经过一连串的测试,已经确认了Datax抽取数据比kettle更高效。

 

这里给出一些实践数据:

 

从mysql抽取380万左右的数据到mysql时,Datax需要2分30秒,kettle需要21分钟;

 

而当数据量增加到1800万时,Datax需要24分钟,kettle需要2小时58分钟。

 

然而,事情往往没有这幺愉快。

 

(一)、速度为0

 

当使用Datax往Greenplum中导数据时,几乎没有速度

 

 

通过GP的监控页面,发现是通过GP 的 Master 一条一条insert进去的。而GP的最佳导入方式有2种(其它收费方式不讨论):

 

1.通过copy命令,数据需要经过Master节点分发到Segment节点

 

2.通过gpfdist外部表导入,数据不经过Master,由Primary Segment连接gpfdist拉取数据

 

很显然,Datax往GP中导入数据方式并不合适。这里我们最终选用了HashData开源的Datax,实际是在阿里的Datax基础上,开发了适用于往GP导入数据的gpdbwriter插件。这也是得益于Datax拥有良好的框架,只需要开发一种writer插件,就可以与不同的reader插件进行组合使用

 

这里给出HashData开源的Datax的git地址:https://github.com/HashDataInc/DataX

 

最终,经过实践,发现该版本的Datax往GP中导入数据非常快,在源码中可以看到内部使用的是copy命令来实现的

 

最终实现了1600万的数据2分钟导入

 

 

(二)、增量更新

 

Datax需要解决的另一个难题在于增量更新。

 

首先需要说明, Datax本身在大部分reader插件中提供了where配置项,用于做增量更新。例如mysqlerader:

 

 

在该配置项中,$bizdate会让人摸不着头脑,不知道这是什幺值。后来在某个博客中看到,$bizdate是阿里的CDP平台或者是DataWorks里定义的系统变量,表示对应的业务日期,业务日期默认为job运行日期的前一天

 

那幺可以想到,where配置项被设计为适合于基于时间的增量同步。

 

然而,目前我们的业务中,有不少业务是基于自增长id来做的数据同步,并且是没有create_time和update_time这样的标记字段。而在数据清洗过程中,我们通常需要去join一些维度表以便过滤脏数据。所以,我们更多时候会使用querySql配置项:

 

 

这个配置项适合我们自定义配置sql来实现数据读取。不过由于每次数据同步的增量条件是不同的,我们最终选择了自己编写代码来动态修改job任务,以便达到增量更新。

 

六、 总结

 

1.Datax作为一种ETL工具,还是比较简单的,不仅是配置简单,而且Datax的错误日志也很智能化,我们排错的时候也会比较方便。

 

2.Datax适合于直连方式的数据同步。对于数据文件方式的同步,需要我们自己去维护数据的字段信息,配置起来比较繁琐,没有kettle方便。

 

3.Datax目前也是不支持namenode切换的动态感知。

 

4.Datax的工作流需要依托于调度工具的流,本身并不具备工作流特性。

 

总体来说,从目前的情况来看,Datax比kettle更适合我们。

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注