持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第12天,点击查看活动详情
Logistics_Day08:实时增量ETL存储Kudu
01-[复习]-上次课程内容回顾
主要讲解2个方面内容:搭建物流项目环境(Maven Project)和结构化流程序(测试)
1、搭建物流项目环境 - Windows系统开发环境初始化 设置HADOOP_HOME:指向在windows下编译HADOOP,bin目录winutils.exe和hadoop.dll 设置hadoop.dll 文件:放置C:\Windows/System32 - 创建MavenProject工程 1个父工程,4个子模块(common、etl、generate、offline) 创建工程、创建模块、加入POM依赖 导入genereate数据生成器模块 进行初始化操作 创建基础包、导入工具类 属性文件:config.properties,连接服务信息参数 2、测试结构化流程序 编写结构化流应用程式,实时从Kafka消费数据(2个Topic,对应2个业务系统数据),将其打印控制台 - 启动数据库和采集框架,对表的数据进行更新和删除,流式是否消费到数据 - 运行数据模拟生成器,实时产生业务数据,插入到数据库表中,流式是否消费到数据
02-[了解]-第8天:课程内容提纲
主要物流项目业务数据实时ETL转换操作,流程如下图中:process方法
功能
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YzpbYYdC-1652014600908)(image-20210526072519874.)]
在流式应用程序中,通常都是从Kafka消费数据,基本上形成固定代码结构 spark.readStream.format("kafka").option("bootstrop.servers").option("topic", "").load() 最重要核心代码逻辑: 对消费到Kafka业务数据(JSON字符串)进行ETL转换 1. JSON 字符串 -> JavaBean对象中 2. JavaBean 对象 -> 提取字段,封装到具体表对应POJO对象,方便存储业务数据
由于物流项目中,需要编写多个流式计算程序,实时消费Kafka数据,进行ETL转换,存储到不同引擎,封装流式计算程序公共接口,定义程序执行业务流程步骤。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eIQ7RaET-1652014600910)(image-20210526084023859.)]
03-[掌握]-实时ETL开发之封装流计算公共接口
为什幺封装接口:物流项目来说,需要编写3个流式应用程序,消费业务数据,ETL转换后存储到不同引擎(Kudu、Es和CK),步骤基本类似:
1)、第一步、创建SparkSession实例对象,基本相同
封装到工具类中,专门创建SparkSession实例对象
2)、第二步、从Kafka消费数据,基本相同
加载数据:load
方法
3)、第三步、对消费JSON数据进行ETL转换,有所变化,不同ETL业务不同,具体实现
处理数据:process
方法
4)、第四步、将转换后数据保存至外部存储,不一样,具体实现
保存数据:save
方法
==在etl模块【logistics-etl
】的realtime
包下创建BasicStreamApp
特质Trait,定义方法==
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pEW3MnrA-1652014600911)(1616040734866.)]
load process save
package cn.itcast.logistics.etl.realtime import cn.itcast.logistics.common.Configuration import org.apache.spark.sql.{DataFrame, SparkSession} /** * 所有ETL流式处理的基类,实时增量ETL至:Kudu、Elasticsearch和ClickHouse都要实现此基类,定义三个方法 * - 1. 加载数据:load * - 2. 处理数据:process * - 3. 保存数据:save */trait BasicStreamApp { /** * 读取数据的方法 * * @param spark SparkSession * @param topic 指定消费的主题 * @param selectExpr 默认值:CAST(value AS STRING) */def load(spark: SparkSession, topic: String, selectExpr: String = "CAST(value AS STRING)"): DataFrame = { spark.readStream .format(Configuration.SPARK_KAFKA_FORMAT) .option("kafka.bootstrap.servers", Configuration.KAFKA_ADDRESS) .option("subscribe", topic) .option("maxOffsetsPerTrigger", "100000") .load() .selectExpr(selectExpr) } /** * 数据的处理 * * @param streamDF 流式数据集StreamingDataFrame * @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等 * @return 流式数据集StreamingDataFrame */def process(streamDF: DataFrame, category: String): DataFrame /** * 数据的保存 * * @param streamDF 保存数据集DataFrame * @param tableName 保存表的名称 * @param isAutoCreateTable 是否自动创建表,默认创建表 */def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit }
当公共接口完成以后,某个实时ETL应用,创建对象object时,继承公共接口,实现其中:process
和save
方法即可。
04-[掌握]-实时ETL开发之SparkUtils工具类
任务:==编写工具类SparkUtils
,创建SparkSession实例对象,并且可以对应用进行设置。==
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Cx4tJuDH-1652014600911)(1616049306846.)]
创建对象SparkUtils
,按照上述结构,实现具体方法,代码如下所示:
package cn.itcast.logistics.common import org.apache.commons.lang3.SystemUtils import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession /** * Spark 操作的工具类 */object SparkUtils { // 定义变量,类型匿名函数类型,返回为SparkConf对象 lazy val sparkConf = () => { new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .set("spark.sql.session.timeZone", "Asia/Shanghai") .set("spark.sql.files.maxPartitionBytes", "134217728") .set("spark.sql.files.openCostInBytes", "134217728") .set("spark.sql.shuffle.partitions", "3") .set("spark.sql.autoBroadcastJoinThreshold", "67108864") } // 依据应用运行操作系统,设置运行模式:local本地、yarn集群 lazy val autoSettingEnv = (sparkConf: SparkConf) => { if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) { //本地环境LOCAL_HADOOP_HOME System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME) //设置运行环境和checkpoint路径 sparkConf .set("spark.master", "local[3]") .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR) } else { //生产环境 sparkConf .set("spark.master", "yarn") .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR) } // 返回设置以后SparkConf对象 sparkConf } /** * 创建sparkSession对象 * @param sparkConf SparkConf实例,设置应用惨啊户数 * @param clazz 每个应用Class实例对象 */def createSparkSession(sparkConf: SparkConf, clazz: Class[_]): SparkSession = { SparkSession.builder() .appName(clazz.getSimpleName.stripSuffix("$")) .config(sparkConf) .getOrCreate() } }
编写main方法,创建SparkSession对象,查看4040界面页面(线程休眠即可)
// 测试 def main(args: Array[String]): Unit = { val spark: SparkSession = createSparkSession( autoSettingEnv(sparkConf()), this.getClass ) println(spark) Thread.sleep(10000000) spark.stop() }
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-irIwFlIe-1652014600912)(1616049966978.)]
05-[理解]-实时ETL开发之KuduStreamApp程序
任务:编写流式程序,实时消费Kafka数据,进行ETL转换,最终存储到Kudu表中,
继承公共接口:BasicStreamApp
,实现其中方法。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ijwk5iEE-1652014600912)(gitee.com/the_efforts…
.)]
具体开发步骤如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kaLMr39B-1652014600912)(1616050178069.)]
实时Kudu ETL应用程序入口,数据处理逻辑步骤:
step1. 创建SparkSession实例对象,传递SparkConf step2. 从Kafka数据源实时消费数据 step3. 对获取Json数据进行ETL转换 step4. 保存转换后数据到外部存储 step5. 应用启动以后,等待终止结束
package cn.itcast.logistics.etl.realtime import cn.itcast.logistics.common.SparkUtils import org.apache.spark.sql.{DataFrame, SparkSession} /** * Kudu数据管道应用:实现Kudu数据库的实时ETL操作 */object KuduStreamApp extends BasicStreamApp { /** * 数据的处理 * * @param streamDF 流式数据集StreamingDataFrame * @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等 * @return 流式数据集StreamingDataFrame */override def process(streamDF: DataFrame, category: String): DataFrame = ??? /** * 数据的保存 * * @param streamDF 保存数据集DataFrame * @param tableName 保存表的名称 * @param isAutoCreateTable 是否自动创建表,默认创建表 */override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = ??? /* 实时Kudu ETL应用程序入口,数据处理逻辑步骤: step1. 创建SparkSession实例对象,传递SparkConf step2. 从Kafka数据源实时消费数据 step3. 对获取Json数据进行ETL转换 step4. 保存转换后数据到外部存储 step5. 应用启动以后,等待终止结束 */def main(args: Array[String]): Unit = { // step1. 创建SparkSession实例对象,传递SparkConf val spark: SparkSession = SparkUtils.createSparkSession( SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass ) import spark.implicits._ // step2. 从Kafka数据源实时消费数据 // 物流系统Topic数据 val logisticsDF: DataFrame = load(spark, "logistics") val crmDF: DataFrame = load(spark, "crm") // step3. 对获取Json数据进行ETL转换 val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics") val etlCrmDF: DataFrame = process(logisticsDF, "crm") // step4. 保存转换后数据到外部存储 save(etlLogisticsDF, "logistics-console") save(etlCrmDF, "crm-console") // step5. 应用启动以后,等待终止结束 spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......")) spark.streams.awaitAnyTermination() } }
上述代码,已经实现MAIN方法,接下来只要实现其中:process【ETL转换】和save【保存数据】方法即可。
先不考虑ETL业务逻辑和具体save保存,数据直接进行ETL转换,将数据打印到控制台即可。
实现方法:process
,没有任何逻辑
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jFM2zN69-1652014600913)(1616051030658.)]
实现方法:save
,将数据打印控制台
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pmeOkIof-1652014600913)(1616051118704.)]
package cn.itcast.logistics.etl.realtime import cn.itcast.logistics.common.SparkUtils import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SparkSession} /** * Kudu数据管道应用:实现Kudu数据库的实时ETL操作 */object KuduStreamApp extends BasicStreamApp { /** * 数据的处理,此时不进行任何业务逻辑转换 * * @param streamDF 流式数据集StreamingDataFrame * @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等 * @return 流式数据集StreamingDataFrame */override def process(streamDF: DataFrame, category: String): DataFrame = { val etlStreamDF: DataFrame = category match { // TODO: 物流系统业务数据 case "logistics" => streamDF // TODO: CRM系统业务数据 case "crm" => streamDF // TODO: 其他业务系统数据 case _ => streamDF } // 返回ETL转换后的数据 etlStreamDF } /** * 数据的保存,此时仅仅将数据打印控制台 * * @param streamDF 保存数据集DataFrame * @param tableName 保存表的名称 * @param isAutoCreateTable 是否自动创建表,默认创建表 */override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = { streamDF.writeStream .queryName(s"query-${tableName}") .outputMode(OutputMode.Append()) .format("console") .option("numRows", "100") .option("truncate", "false") .start() } /* 实时Kudu ETL应用程序入口,数据处理逻辑步骤: step1. 创建SparkSession实例对象,传递SparkConf step2. 从Kafka数据源实时消费数据 step3. 对获取Json数据进行ETL转换 step4. 保存转换后数据到外部存储 step5. 应用启动以后,等待终止结束 */def main(args: Array[String]): Unit = { // step1. 创建SparkSession实例对象,传递SparkConf val spark: SparkSession = SparkUtils.createSparkSession( SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass ) import spark.implicits._ // step2. 从Kafka数据源实时消费数据 // 物流系统Topic数据 val logisticsDF: DataFrame = load(spark, "logistics") val crmDF: DataFrame = load(spark, "crm") // step3. 对获取Json数据进行ETL转换 val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics") val etlCrmDF: DataFrame = process(logisticsDF, "crm") // step4. 保存转换后数据到外部存储 save(etlLogisticsDF, "logistics-console") save(etlCrmDF, "crm-console") // step5. 应用启动以后,等待终止结束 spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......")) spark.streams.awaitAnyTermination() } }
编程完成以后,运行流式计算程序:KuduStreamApp,启动MySQL数据库和Canal及Oracle数据库和OGG
。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SlCVJaiw-1652014600914)(1616051529743.)]
06-[理解]-实时ETL开发之Kafka数据JSON格式
实时从Kafka消费数据,无论是OGG采集还是Canal采集数据,都是以JSON字符串格式发送KafkaTopic,此时需要查看OGG采集数据格式字段和Canal采集数据格式字段。
1)、OGG 采集数据格式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Of6k9pCU-1652014600914)(1616051796175.)]
具体分析:插入数据Insert、更新数据update和删除数据delete
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hSS9jVRN-1652014600914)(1616052050412.)]
2)、Canal采集数据格式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6qUKxp1f-1652014600915)(1616052064308.)]
具体查看Canal采集数据,分析字段
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mqaOU4ec-1652014600915)(1616052381480.)]
07-[掌握]-实时ETL开发之定义数据Bean对象
无论是OGG采集数据还是Canal采集数据,JSON数据各式字段,基本一致,所以定义JavaBean,分别解析封装数据到JavaBean对象
1)、OGG采集JSON数据:7个字段
2)、Canal采集JSON数据:12个字段
1)、定义 Bean 对象基类
根据数据来源不同可以分为OGG数据和Canal数据,两者之间有相同的属性:table
,因此将该属性作为公共属性进行提取,抽象成基类。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zmcLPflD-1652014600916)(1616053553019.)]
package cn.itcast.logistics.common.beans.parser; import java.io.Serializable; /** * 根据数据源定义抽象类,数据源:ogg 和 canal, 两者有共同的table属性 */public abstract class MessageBean implements Serializable { private static final long serialVersionUID = 373363837132138843L; private String table; public String getTable() { return table; } public void setTable(String table) { this.table = table; } @Override public String toString() { return table; } }
2)、定义 OGG 数据 Bean 对象
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YcbgHW28-1652014600917)(1616053885882.)]
package cn.itcast.logistics.common.beans.parser; import java.util.Map; /** * 定义消费OGG数据的JavaBean对象 * { * "table": "ITCAST.tbl_route", //表名:库名.表名 * "op_type": "U", //操作类型:U表示修改 * "op_ts": "2020-10-08 09:10:54.000774", * "current_ts": "2020-10-08T09:11:01.925000", * "pos": "00000000200006645758", * "before": { //操作前的字段集合 * "id": 104, * "start_station": "东莞中心", * "start_station_area_id": 441900, * "start_warehouse_id": 1, * "end_station": "蚌埠中转部", * "end_station_area_id": 340300, * "end_warehouse_id": 107, * "mileage_m": 1369046, * "time_consumer_minute": 56172, * "state": 1, * "cdt": "2020-02-02 18:51:39", * "udt": "2020-02-02 18:51:39", * "remark": null * }, * "after": { //操作后的字段集合 * "id": 104, * "start_station": "东莞中心", * "start_station_area_id": 441900, * "start_warehouse_id": 1, * "end_station": "TBD", * "end_station_area_id": 340300, * "end_warehouse_id": 107, * "mileage_m": 1369046, * "time_consumer_minute": 56172, * "state": 1, * "cdt": "2020-02-02 18:51:39", * "udt": "2020-02-02 18:51:39", * "remark": null * } * } */public class OggMessageBean extends MessageBean { private static final long serialVersionUID = -4763944161833712521L; //定义操作类型 private String op_type; @Override public void setTable(String table) { //如果表名不为空 if (table != null && !table.equals("")) { table = table.replaceAll("[A-Z]+\\.", ""); } super.setTable(table); } public String getOp_type() { return op_type; } public void setOp_type(String op_type) { this.op_type = op_type; } public String getOp_ts() { return op_ts; } public void setOp_ts(String op_ts) { this.op_ts = op_ts; } public String getCurrent_ts() { return current_ts; } public void setCurrent_ts(String current_ts) { this.current_ts = current_ts; } public String getPos() { return pos; } public void setPos(String pos) { this.pos = pos; } public Map<String, Object> getBefore() { return before; } public void setBefore(Map<String, Object> before) { this.before = before; } public Map<String, Object> getAfter() { return after; } public void setAfter(Map<String, Object> after) { this.after = after; } //操作时间 private String op_ts; @Override public String toString() { return "OggMessageBean{" + "table='" + super.getTable() + '\'' + ", op_type='" + op_type + '\'' + ", op_ts='" + op_ts + '\'' + ", current_ts='" + current_ts + '\'' + ", pos='" + pos + '\'' + ", before=" + before + ", after=" + after + '}'; } /** * 返回需要处理的列的集合 * @return */public Map<String, Object> getValue() { //如果执行的是删除操作,则返回before节点的列的集合,如果执行的是插入和更新操作,则返回after节点的列的集合 if (after == null) { return before; } else { return after; } } //同步时间 private String current_ts; //偏移量 private String pos; //操作之前的数据 private Map<String, Object> before; //操作之后的数据 private Map<String, Object> after; }
当OGG采集数据时,需要获取关心操作的数据,定义方法:getValue
,删除数据时获取before数据,插入或更新获取after数据,代码如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IapFgUjs-1652014600918)(1616054052067.)]
3)、定义 Canal 数据 Bean 对象
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZckDhdXi-1652014600918)(1616054087741.)]
package cn.itcast.logistics.common.beans.parser; import java.util.List; import java.util.Map; /** * 定义消费canal数据对应的JavaBean对象 * { * "data": [{ * "id": "1", * "name": "北京", * "tel": "222", * "mobile": "1111", * "detail_addr": "北京", * "area_id": "1", * "gis_addr": "1", * "cdt": "2020-10-08 17:20:12", * "udt": "2020-11-05 17:20:16", * "remark": null * }], * "database": "crm", * "es": 1602148867000, * "id": 15, * "isDdl": false, * "mysqlType": { * "id": "bigint(20)", * "name": "varchar(50)", * "tel": "varchar(20)", * "mobile": "varchar(20)", * "detail_addr": "varchar(100)", * "area_id": "bigint(20)", * "gis_addr": "varchar(20)", * "cdt": "datetime", * "udt": "datetime", * "remark": "varchar(100)" * }, * "old": [{ * "tel": "111" * }], * "sql": "", * "sqlType": { * "id": -5, * "name": 12, * "tel": 12, * "mobile": 12, * "detail_addr": 12, * "area_id": -5, * "gis_addr": 12, * "cdt": 93, * "udt": 93, * "remark": 12 * }, * "table": "crm_address", * "ts": 1602148867311, * "type": "UPDATE" //修改数据 * } */public class CanalMessageBean extends MessageBean { private static final long serialVersionUID = -3147101694588578078L; //操作的数据集合 private List<Map<String, Object>> data; public List<Map<String, Object>> getData() { return data; } public void setData(List<Map<String, Object>> data) { this.data = data; } public String getDatabase() { return database; } public void setDatabase(String database) { this.database = database; } public Long getEs() { return es; } public void setEs(Long es) { this.es = es; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public boolean isDdl() { return isDdl; } public void setDdl(boolean ddl) { isDdl = ddl; } public Map<String, Object> getMysqlType() { return mysqlType; } public void setMysqlType(Map<String, Object> mysqlType) { this.mysqlType = mysqlType; } public String getOld() { return old; } public void setOld(String old) { this.old = old; } public String getSql() { return sql; } public void setSql(String sql) { this.sql = sql; } public Map<String, Object> getSqlType() { return sqlType; } public void setSqlType(Map<String, Object> sqlType) { this.sqlType = sqlType; } public Long getTs() { return ts; } public void setTs(Long ts) { this.ts = ts; } public String getType() { return type; } public void setType(String type) { this.type = type; } //数据库名称 private String database; private Long es; private Long id; private boolean isDdl; private Map<String, Object> mysqlType; private String old; private String sql; private Map<String, Object> sqlType; private Long ts; private String type; /** * 重写父类的settable方法,将表名修改成统一的前缀 * @param table */@Override public void setTable(String table) { if(table!=null && !table.equals("")){ if(table.startsWith("crm_")) { table = table.replace("crm_", "tbl_"); } } super.setTable(table); } }
CRM系统中每个表的名称都有前缀【crm_
】,解析JSON字符串时,将其替换【tbl_
】
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FP63yxtA-1652014600918)(1616054221345.)]
至此,定义MessageBean实体类,接下来,需要编写代码解析JSON字符串为MessageBean对象,封装实例对象中,方便获取各个字段的值,进行相应的处理转换操作。
注意:定义MessageBean实体类使用Java
语言,没有使用Scala语言,后续使用fastJson
库解析。
08-[掌握]-实时ETL开发之数据转换Bean及测试
任务:==编写代码,解析JSON字符串为MessageBean对象,属于实时ETL转换第一步。==
1)、如何解析JSON字符串为JavaBean对象呢???
使用阿里巴巴JSON库:fastJson
,既能解析JSON为Bean对象,又能转换Bean对象为JSON字符串
为什幺使用fastJson解析?? fastJson解析Json字符串时,使用起来比较简单,此外库基于Java语言开发,对JavaBean对象支持非常的好,对Scala语言支持不好,所以MessageBean使用Java语言定义的,没有使用Scala语言。
转换JSON为Bean对象:JSON.parseObject(jsonStr, classOf[StuBean])
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DiTGRUf4-1652014600919)(1616054946045.)]
将Bean对象转换为JSON字符串:JSON.toJSONString(stuBean, true)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wAfJcjCm-1652014600920)(1616054978516.)]
package cn.itcast.logistics.test; import java.util.Objects; public class StuBean { private Integer id ; private String name ; public StuBean() { } public StuBean(Integer id, String name) { this.id = id; this.name = name; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; StuBean stuBean = (StuBean) o; return Objects.equals(id, stuBean.id) && Objects.equals(name, stuBean.name); } @Override public int hashCode() { return Objects.hash(id, name); } @Override public String toString() { return "StuBean{" + "id=" + id + ", name='" + name + '\'' + '}'; } } // =============================================================== package cn.itcast.logistics.test import com.alibaba.fastjson.JSON object FastJsonTest { def main(args: Array[String]): Unit = { // 定义json字符串 val jsonStr: String = """ |{ | "id": 10001, | "name": "zhangsan" |} |""".stripMargin // JSON -> JavaBean val stuBean: StuBean = JSON.parseObject(jsonStr, classOf[StuBean]) println(stuBean) // JavaBean转换为JSON字符串 val stuJson: String = JSON.toJSONString(stuBean, true) println(stuJson) } }
2)、流式程序使用Scala语言编写,为什幺MessageBean使用Java语言??
由于使用FastJson
解析Json字符串,所以使用Java语言编写。由于Scala语言中CaseClass对FastJson支持不是很友好,有的时候,解析会出问题,无法完成解析操作。
FastJson库使用Java编写的,符合Java语言规范,但是不符合Scala与规范约束。
任务:编写代码,将从Kafka消费JSON字符串数据,解析为MessageBean对象即可
1)、首先对物流系统数据,使用OGG采集数数据进行解析处理,核心代码如下:
case "logistics" => val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF // 由于从Kafka消费数据,只获取value消息,将其转换DataSet .as[String] // 过滤数据 .filter(msg => null != msg && msg.trim.length > 0) // 解析每条数据 .map{ msg => JSON.parseObject(msg, classOf[OggMessageBean]) }(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定编码器 // 返回数据 oggBeanStreamDS.toDF()
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-R4yvOxD0-1652014600920)(1616055524533.)]
当将DataFrame转换Dataset进行操作时,尤其调用转换函数(比如map、filter、flatMap)等等,需要指定编码器Encoder
(Dataset 强类型数据结构,指定类型编码)。
默认情况下,当Dataset数据类型为:元组类型、CaseClass或基本数据类型,都会提供默认编码器Encoder,除此之外数据类型,比如自定义Java语言Bean对象,必须指定编码器。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PdlhZQfN-1652014600921)(1616055716033.)]
运行测试程序,在Oracle数据库中操作数据,查看控制台打印结果
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VvKeLUp6-1652014600921)(1616055935505.)]
2)、将Canal采集JSON数据,转换MessageBean对象,核心代码
// TODO: CRM系统业务数据 case "crm" => implicit val canalEncoder: Encoder[CanalMessageBean] = Encoders.bean(classOf[CanalMessageBean]) val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF // 过滤数据 .filter(row => !row.isNullAt(0)) // 解析数据,对分区数据操作 .mapPartitions { iter => iter.map { row => val jsonValue: String = row.getAs[String]("value") // 解析JSON字符串 JSON.parseObject(jsonValue, classOf[CanalMessageBean]) } } // 返回转换后的数据 canalBeanStreamDS.toDF()
其中采用隐式参数方式,传递定义编码器Encoder
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EC1VljWg-1652014600921)(1616056559325.)]
再次运行流式计算程序:KuduStreamApp,在MySQL数据库中修改表的数据,查看控制台打印结果
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rg1sVo7i-1652014600923)(1616056711227.)]
09-[理解]-实时ETL开发之转换POJO【思路】
任务:分析从Kafka消费数据(JSON转换MessageBean对象),哪些字段是关系值。
OGG采集Oracle数据库数据:7个字段
Canal采集MySQL数据库数据:12个字段
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tc5InAk0-1652014600923)(image-20210526072519874.)]
1)、OGG 采集数据,核心数据字段
OGG采集Oracle数据库表的数据,发送到Kafka中JSON字符串,转换为MessageBean以后:
第一个字段:table
,对哪个表进行操作
第二个字段:op_type
,数据操作类型,三个值【插入I、更新U、删除D
】
第三个字段:数据字段,可能是after(插入和更新),可能是before(删除),提供方法getValue
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VOZkVqp6-1652014600923)(1616118817076-1621984868452.)]
2)、Canal采集数据,核心数据字段
Canal采集MySQL数据库数据时,业务中关心字段:与OGG采集数据关心字段基本一致
第一个字段:table
,表的名称,对哪个表进行操作
第二个字段:type
,数据操作类型,三个值【INSERT、UPDATE、DELETE
】
第三个字段:data
,真正操作数据
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4F8lXt9R-1652014600924)(1616119230880-1621984868452.)]
经过前面分析可知,无论OGG采集海Canal采集数据,将JSON字符串封装为MessageBean以后,需要提取其中核心字段数据,进行转换操作。
将提取字段【type】类型和【data】数据,封装到具体表table的实体类【POJO】中,后面方便进行操作。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7WYVty8J-1652014600924)(1616119695869-1621984868453.)]
10-[掌握]-实时ETL开发之OggBean转换POJO编程
任务:以OGG采集数据为例,针对其中一个张表【tbl_areas
】进行转换操作。
1)、依据table字段判断数据:tbl_areas 2)、获取数据字段值:getValue方法,将其转换为POJO对象 3)、过滤掉转换为null数据
1)、定义表名称的隐射
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ff2HZHtE-1652014600924)(1616120220576-1621984868453.)]
package cn.itcast.logistics.common /** * 定义表名:根据表的名字定义属性 */object TableMapping { // Logistics 物流系统表名称 val AREAS: String = "tbl_areas" val CHARGE_STANDARD: String = "tbl_charge_standard" val CODES: String = "tbl_codes" val COLLECT_PACKAGE: String = "tbl_collect_package" val COMPANY: String = "tbl_company" val COMPANY_DOT_MAP: String = "tbl_company_dot_map" val COMPANY_TRANSPORT_ROUTE_MA: String = "tbl_company_transport_route_ma" val COMPANY_WAREHOUSE_MAP: String = "tbl_company_warehouse_map" val CONSUMER_SENDER_INFO: String = "tbl_consumer_sender_info" val COURIER: String = "tbl_courier" val DELIVER_PACKAGE: String = "tbl_deliver_package" val DELIVER_REGION: String = "tbl_deliver_region" val DELIVERY_RECORD: String = "tbl_delivery_record" val DEPARTMENT: String = "tbl_department" val DOT: String = "tbl_dot" val DOT_TRANSPORT_TOOL: String = "tbl_dot_transport_tool" val DRIVER: String = "tbl_driver" val EMP: String = "tbl_emp" val EMP_INFO_MAP: String = "tbl_emp_info_map" val EXPRESS_BILL: String = "tbl_express_bill" val EXPRESS_PACKAGE: String = "tbl_express_package" val FIXED_AREA: String = "tbl_fixed_area" val GOODS_RACK: String = "tbl_goods_rack" val JOB: String = "tbl_job" val OUT_WAREHOUSE: String = "tbl_out_warehouse" val OUT_WAREHOUSE_DETAIL: String = "tbl_out_warehouse_detail" val PKG: String = "tbl_pkg" val POSTAL_STANDARD: String = "tbl_postal_standard" val PUSH_WAREHOUSE: String = "tbl_push_warehouse" val PUSH_WAREHOUSE_DETAIL: String = "tbl_push_warehouse_detail" val ROUTE: String = "tbl_route" val SERVICE_EVALUATION: String = "tbl_service_evaluation" val STORE_GRID: String = "tbl_store_grid" val TRANSPORT_RECORD: String = "tbl_transport_record" val TRANSPORT_TOOL: String = "tbl_transport_tool" val VEHICLE_MONITOR: String = "tbl_vehicle_monitor" val WAREHOUSE: String = "tbl_warehouse" val WAREHOUSE_EMP: String = "tbl_warehouse_emp" val WAREHOUSE_RACK_MAP: String = "tbl_warehouse_rack_map" val WAREHOUSE_RECEIPT: String = "tbl_warehouse_receipt" val WAREHOUSE_RECEIPT_DETAIL: String = "tbl_warehouse_receipt_detail" val WAREHOUSE_SEND_VEHICLE: String = "tbl_warehouse_send_vehicle" val WAREHOUSE_TRANSPORT_TOOL: String = "tbl_warehouse_transport_tool" val WAREHOUSE_VEHICLE_MAP: String = "tbl_warehouse_vehicle_map" val WAY_BILL: String = "tbl_waybill" val WAYBILL_LINE: String = "tbl_waybill_line" val WAYBILL_STATE_RECORD: String = "tbl_waybill_state_record" val WORK_TIME: String = "tbl_work_time" // CRM 系统业务数据表名称 val ADDRESS: String = "tbl_address" val CONSUMER_ADDRESS_MAP: String = "tbl_consumer_address_map" val CUSTOMER: String = "tbl_customer" }
2)、编写核心代码,从OggMessageBean中提取字段值,封装到具体POJO对象中
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-clUR4VHN-1652014600926)(1616120552233-1621984868453.)]
11-[掌握]-实时ETL开发之转换POJO【数据解析器】
任务:编写数据解析器中方法【toAreaBean】实现,从MessageBean中提取字段值,封装到POJO实体类对象。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i2Q21jEU-1652014600927)(1616121581555-1621984868453.)]
当提取MessageBean中数据字段值,如何将其封装到POJO对象中呢??
首先将数据段值:Map数据类型转换 JSON字符串,再将JSON字符串转换为 POJO对象
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5HvECsad-1652014600927)(1616121249979-1621984868453.)]
最终实现代码如下所示:
package cn.itcast.logistics.etl.parse import java.util import cn.itcast.logistics.common.beans.logistics.AreasBean import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, MessageBean, OggMessageBean} import com.alibaba.fastjson.JSON object DataParser { /** * 判断messageBean是否是OggMessageBean */private def getOggMessageBean(bean: MessageBean): OggMessageBean = { bean match { case ogg: OggMessageBean => ogg } } /** * 判断messageBean是否是CanalMessageBean */private def getCanalMessageBean(bean: MessageBean): CanalMessageBean = { bean match { case canal: CanalMessageBean => canal } } /** * 提取ogg(I、U、D)和canal(insert、update、delete)数据的optype属性,转换成统一的操作字符串 * * @param opType 数据操作类型:insert、update、delete,任意一种 */private def getOpType(opType: String): String = { opType match { case "I" => "insert" case "U" => "update" case "D" => "delete" case "INSERT" => "insert" case "UPDATE" => "update" case "DELETE" => "delete" case _ => "insert" } } // ================== 物流Logistics系统业务数据解析 ================== /* 从MessageBean提取数据字段值和数据操作类型,将其封装到具体POJO对象中 TODO: 将物流Logistics系统:tbl_areas表的字段信息转换成AreaBean对象 */def toAreaBean(bean: MessageBean): AreasBean = { // a. 转换MessageBean对象为OggMessageBean对象 val oggMessageBean: OggMessageBean = getOggMessageBean(bean) // b. 获取数据操作类型 val opType: String = getOpType(oggMessageBean.getOp_type) // c. 获取操作数据值 val dataValue: util.Map[String, AnyRef] = oggMessageBean.getValue // d. 将数据封装到POJO对象 // 第一步、转换map为json字符串 val dataJson: String = JSON.toJSONString(dataValue, true) println(dataJson) // 第二步、json字符串为pojo val areasBean = JSON.parseObject(dataJson, classOf[AreasBean]) // 第三步、判断解析不为null时,设置数据操作类型 if(null != areasBean){ areasBean.setOpType(opType) } // e. 返回封装对象 areasBean } }
12-[理解]-实时ETL开发之转换POJO【隐式转换】
当对Dataset数据结构进行操作时(调用函数,转换函数,比如map、flatMap)等,数据返回类型如果不是元组、CaseClass及基本类型,需要指定编码器Encoder
。
将JSON字符串转换为MessageBean对象时,指定Encoder编码器。
方式一、方法后面紧跟圆括号,指定对应编码器
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-A9yz4kyl-1652014600927)(1616123417063-1621984868453.)]
方式二、隐式参数,指定编码器,程序自动传入
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TtT2n2Ek-1652014600928)(1616123450168-1621984868453.)]
当从MessageBean中提取数据字段值以后,将其封装到对应POJO对象时,需要指定编码器,否则程序报错,具体如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KoWkGwAc-1652014600928)(1616123541413-1621984868453.)]
学习SparkSQL框架,如何导入元组类型、CaseClass类型和基本类型
编码器。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MhaVBfT0-1652014600928)(1616123609397-1621984868453.)]
查看implicits
父类:SQLImplicits
对象,包含很对定义隐式转换函数,返回类型都是各种编码器Encoder
。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kXuGyKtt-1652014600929)(1616123726466-1621984868453.)]
参考SQLImplicits
对象中编码器定义,自己定义编码器:BeanImplicits
,实现隐式导入和转换操作。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kraEvGbD-1652014600929)(1616123830045-1621984868453.)]
package cn.itcast.logistics.common import cn.itcast.logistics.common.beans.crm._ import cn.itcast.logistics.common.beans.logistics._ import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean} import org.apache.spark.sql.{Encoder, Encoders} /** * 扩展自定义POJO的隐式转换实现 */object BeanImplicits { // 定义MessageBean隐式参数Encoder值 implicit def newOggMessageBeanEncoder: Encoder[OggMessageBean] = Encoders.bean(classOf[OggMessageBean]) implicit def newCanalMessageBeanEncoder: Encoder[CanalMessageBean] = Encoders.bean(classOf[CanalMessageBean]) // Logistics Bean implicit def newAreasBeanEncoder: Encoder[AreasBean] = Encoders.bean(classOf[AreasBean]) implicit def newChargeStandardBeanEncoder: Encoder[ChargeStandardBean] = Encoders.bean(classOf[ChargeStandardBean]) implicit def newCodesBeanEncoder: Encoder[CodesBean] = Encoders.bean(classOf[CodesBean]) implicit def newCollectPackageBeanEncoder: Encoder[CollectPackageBean] = Encoders.bean(classOf[CollectPackageBean]) implicit def newCompanyBeanEncoder: Encoder[CompanyBean] = Encoders.bean(classOf[CompanyBean]) implicit def newCompanyDotMapBeanEncoder: Encoder[CompanyDotMapBean] = Encoders.bean(classOf[CompanyDotMapBean]) implicit def newCompanyTransportRouteMaBeanEncoder: Encoder[CompanyTransportRouteMaBean] = Encoders.bean(classOf[CompanyTransportRouteMaBean]) implicit def newCompanyWarehouseMapBeanEncoder: Encoder[CompanyWarehouseMapBean] = Encoders.bean(classOf[CompanyWarehouseMapBean]) implicit def newConsumerSenderInfoBeanEncoder: Encoder[ConsumerSenderInfoBean] = Encoders.bean(classOf[ConsumerSenderInfoBean]) implicit def newCourierBeanEncoder: Encoder[CourierBean] = Encoders.bean(classOf[CourierBean]) implicit def newDeliverPackageBeanEncoder: Encoder[DeliverPackageBean] = Encoders.bean(classOf[DeliverPackageBean]) implicit def newDeliverRegionBeanEncoder: Encoder[DeliverRegionBean] = Encoders.bean(classOf[DeliverRegionBean]) implicit def newDeliveryRecordBeanEncoder: Encoder[DeliveryRecordBean] = Encoders.bean(classOf[DeliveryRecordBean]) implicit def newDepartmentBeanEncoder: Encoder[DepartmentBean] = Encoders.bean(classOf[DepartmentBean]) implicit def newDotBeanEncoder: Encoder[DotBean] = Encoders.bean(classOf[DotBean]) implicit def newDotTransportToolBeanEncoder: Encoder[DotTransportToolBean] = Encoders.bean(classOf[DotTransportToolBean]) implicit def newDriverBeanEncoder: Encoder[DriverBean] = Encoders.bean(classOf[DriverBean]) implicit def newEmpBeanEncoder: Encoder[EmpBean] = Encoders.bean(classOf[EmpBean]) implicit def newEmpInfoMapBeanEncoder: Encoder[EmpInfoMapBean] = Encoders.bean(classOf[EmpInfoMapBean]) implicit def newExpressBillBeanEncoder: Encoder[ExpressBillBean] = Encoders.bean(classOf[ExpressBillBean]) implicit def newExpressPackageBeanEncoder: Encoder[ExpressPackageBean] = Encoders.bean(classOf[ExpressPackageBean]) implicit def newFixedAreaBeanEncoder: Encoder[FixedAreaBean] = Encoders.bean(classOf[FixedAreaBean]) implicit def newGoodsRackBeanEncoder: Encoder[GoodsRackBean] = Encoders.bean(classOf[GoodsRackBean]) implicit def newJobBeanEncoder: Encoder[JobBean] = Encoders.bean(classOf[JobBean]) implicit def newOutWarehouseBeanEncoder: Encoder[OutWarehouseBean] = Encoders.bean(classOf[OutWarehouseBean]) implicit def newOutWarehouseDetailBeanEncoder: Encoder[OutWarehouseDetailBean] = Encoders.bean(classOf[OutWarehouseDetailBean]) implicit def newPkgBeanEncoder: Encoder[PkgBean] = Encoders.bean(classOf[PkgBean]) implicit def newPostalStandardBeanEncoder: Encoder[PostalStandardBean] = Encoders.bean(classOf[PostalStandardBean]) implicit def newPushWarehouseBeanEncoder: Encoder[PushWarehouseBean] = Encoders.bean(classOf[PushWarehouseBean]) implicit def newPushWarehouseDetailBeanEncoder: Encoder[PushWarehouseDetailBean] = Encoders.bean(classOf[PushWarehouseDetailBean]) implicit def newRouteBeanEncoder: Encoder[RouteBean] = Encoders.bean(classOf[RouteBean]) implicit def newServiceEvaluationBeanEncoder: Encoder[ServiceEvaluationBean] = Encoders.bean(classOf[ServiceEvaluationBean]) implicit def newStoreGridBeanEncoder: Encoder[StoreGridBean] = Encoders.bean(classOf[StoreGridBean]) implicit def newTransportToolBeanEncoder: Encoder[TransportToolBean] = Encoders.bean(classOf[TransportToolBean]) implicit def newVehicleMonitorBeanEncoder: Encoder[VehicleMonitorBean] = Encoders.bean(classOf[VehicleMonitorBean]) implicit def newWarehouseBeanEncoder: Encoder[WarehouseBean] = Encoders.bean(classOf[WarehouseBean]) implicit def newWarehouseEmpBeanEncoder: Encoder[WarehouseEmpBean] = Encoders.bean(classOf[WarehouseEmpBean]) implicit def newWarehouseRackMapBeanEncoder: Encoder[WarehouseRackMapBean] = Encoders.bean(classOf[WarehouseRackMapBean]) implicit def newWarehouseReceiptBeanEncoder: Encoder[WarehouseReceiptBean] = Encoders.bean(classOf[WarehouseReceiptBean]) implicit def newWarehouseReceiptDetailBeanEncoder: Encoder[WarehouseReceiptDetailBean] = Encoders.bean(classOf[WarehouseReceiptDetailBean]) implicit def newWarehouseSendVehicleBeanEncoder: Encoder[WarehouseSendVehicleBean] = Encoders.bean(classOf[WarehouseSendVehicleBean]) implicit def newWarehouseTransportToolBeanEncoder: Encoder[WarehouseTransportToolBean] = Encoders.bean(classOf[WarehouseTransportToolBean]) implicit def newWarehouseVehicleMapBeanEncoder: Encoder[WarehouseVehicleMapBean] = Encoders.bean(classOf[WarehouseVehicleMapBean]) implicit def newWaybillBeanEncoder: Encoder[WaybillBean] = Encoders.bean(classOf[WaybillBean]) implicit def newWaybillLineBeanEncoder: Encoder[WaybillLineBean] = Encoders.bean(classOf[WaybillLineBean]) implicit def newWaybillStateRecordBeanEncoder: Encoder[WaybillStateRecordBean] = Encoders.bean(classOf[WaybillStateRecordBean]) implicit def newWorkTimeBeanEncoder: Encoder[WorkTimeBean] = Encoders.bean(classOf[WorkTimeBean]) implicit def newTransportRecordBeanEncoder: Encoder[TransportRecordBean] = Encoders.bean(classOf[TransportRecordBean]) // CRM Bean implicit def newCustomerBeanEncoder: Encoder[CustomerBean] = Encoders.bean(classOf[CustomerBean]) implicit def newAddressBeanEncoder: Encoder[AddressBean] = Encoders.bean(classOf[AddressBean]) implicit def newConsumerAddressMapBeanEncoder: Encoder[ConsumerAddressMapBean] = Encoders.bean(classOf[ConsumerAddressMapBean]) }
在流式程序代码中,导入自定义隐式转换对象即可。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oNnoiPwH-1652014600929)(1616123959706-1621984868453.)]
13-[掌握]-实时ETL开发之OggBean转换POJO测试
任务:启动容器,进入容器中启动Oracle数据库和OGG,修改Oracle数据库表的数据测试应用
1)、启动Kafka消息队列
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iEpB4Ge7-1652014600930)(1616124159348-1621984868453.)]
2)、启动Oracle数据库和OGG
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BWH9eKqo-1652014600930)(1616124185566-1621984868453.)]
3)、运行流式应用程序
首先,如果检查点目录存在,最好将其删除;此外,注释掉消费CRM系统数据代码,此时仅仅测试物流系统数据
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-t8nzhTJ0-1652014600930)(1616124259387-1621984868453.)]
启动流式应用程式,执行操作,查看控制台输出结果,可以看到将Map集合(存储数据)转换为JSON字符串
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7dGFJr0B-1652014600931)(1616124564721-1621984868453.)]
4)、使用DBeave对数据库表的数据进行更新和删除
更新2条数据,删除1条数据,看到如下界面:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4ADiDT4K-1652014600931)(1616124523908-1621984868453.)]
Be First to Comment