Press "Enter" to skip to content

大数据物流项目:实时增量ETL存储Kudu(八)

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第12天,点击查看活动详情

 

Logistics_Day08:实时增量ETL存储Kudu

 

01-[复习]-上次课程内容回顾

 

主要讲解2个方面内容:搭建物流项目环境(Maven Project)和结构化流程序(测试)

 

1、搭建物流项目环境
- Windows系统开发环境初始化
设置HADOOP_HOME:指向在windows下编译HADOOP,bin目录winutils.exe和hadoop.dll
设置hadoop.dll 文件:放置C:\Windows/System32
- 创建MavenProject工程
1个父工程,4个子模块(common、etl、generate、offline)
创建工程、创建模块、加入POM依赖
导入genereate数据生成器模块
进行初始化操作
创建基础包、导入工具类
属性文件:config.properties,连接服务信息参数
2、测试结构化流程序
编写结构化流应用程式,实时从Kafka消费数据(2个Topic,对应2个业务系统数据),将其打印控制台
- 启动数据库和采集框架,对表的数据进行更新和删除,流式是否消费到数据
- 运行数据模拟生成器,实时产生业务数据,插入到数据库表中,流式是否消费到数据

 

02-[了解]-第8天:课程内容提纲

 

主要物流项目业务数据实时ETL转换操作,流程如下图中:process方法
功能

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YzpbYYdC-1652014600908)(image-20210526072519874.)]

 

在流式应用程序中,通常都是从Kafka消费数据,基本上形成固定代码结构
spark.readStream.format("kafka").option("bootstrop.servers").option("topic", "").load()
最重要核心代码逻辑:
对消费到Kafka业务数据(JSON字符串)进行ETL转换
1. JSON 字符串 ->  JavaBean对象中
2. JavaBean 对象 -> 提取字段,封装到具体表对应POJO对象,方便存储业务数据

 

​由于物流项目中,需要编写多个流式计算程序,实时消费Kafka数据,进行ETL转换,存储到不同引擎,封装流式计算程序公共接口,定义程序执行业务流程步骤。

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eIQ7RaET-1652014600910)(image-20210526084023859.)]

 

03-[掌握]-实时ETL开发之封装流计算公共接口

 

为什幺封装接口:物流项目来说,需要编写3个流式应用程序,消费业务数据,ETL转换后存储到不同引擎(Kudu、Es和CK),步骤基本类似:

1)、第一步、创建SparkSession实例对象,基本相同
封装到工具类中,专门创建SparkSession实例对象
2)、第二步、从Kafka消费数据,基本相同
加载数据:load
方法
3)、第三步、对消费JSON数据进行ETL转换,有所变化,不同ETL业务不同,具体实现
处理数据:process
方法
4)、第四步、将转换后数据保存至外部存储,不一样,具体实现
保存数据:save
方法

==在etl模块【logistics-etl
】的realtime
包下创建BasicStreamApp
特质Trait,定义方法==

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pEW3MnrA-1652014600911)(1616040734866.)]

 

load
process
save

 

package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.Configuration
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 所有ETL流式处理的基类,实时增量ETL至:Kudu、Elasticsearch和ClickHouse都要实现此基类,定义三个方法
 * - 1. 加载数据:load
 * - 2. 处理数据:process
 * - 3. 保存数据:save
 */trait BasicStreamApp {
/**
 * 读取数据的方法
 *
 * @param spark SparkSession
 * @param topic 指定消费的主题
 * @param selectExpr 默认值:CAST(value AS STRING)
 */def load(spark: SparkSession, topic: String, selectExpr: String = "CAST(value AS STRING)"): DataFrame = {
spark.readStream
.format(Configuration.SPARK_KAFKA_FORMAT)
.option("kafka.bootstrap.servers", Configuration.KAFKA_ADDRESS)
.option("subscribe", topic)
.option("maxOffsetsPerTrigger", "100000")
.load()
.selectExpr(selectExpr)
}
/**
 * 数据的处理
 *
 * @param streamDF 流式数据集StreamingDataFrame
 * @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
 * @return 流式数据集StreamingDataFrame
 */def process(streamDF: DataFrame, category: String): DataFrame
/**
 * 数据的保存
 *
 * @param streamDF 保存数据集DataFrame
 * @param tableName 保存表的名称
 * @param isAutoCreateTable 是否自动创建表,默认创建表
 */def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit
}

 

​当公共接口完成以后,某个实时ETL应用,创建对象object时,继承公共接口,实现其中:process
save
方法即可。

 

04-[掌握]-实时ETL开发之SparkUtils工具类

 

任务:==编写工具类SparkUtils
,创建SparkSession实例对象,并且可以对应用进行设置。==

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Cx4tJuDH-1652014600911)(1616049306846.)]

 

创建对象SparkUtils
,按照上述结构,实现具体方法,代码如下所示:

 

package cn.itcast.logistics.common
import org.apache.commons.lang3.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
 * Spark 操作的工具类
 */object SparkUtils {
// 定义变量,类型匿名函数类型,返回为SparkConf对象
lazy val sparkConf = () => {
new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.set("spark.sql.session.timeZone", "Asia/Shanghai")
.set("spark.sql.files.maxPartitionBytes", "134217728")
.set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.sql.shuffle.partitions", "3")
.set("spark.sql.autoBroadcastJoinThreshold", "67108864")
}
// 依据应用运行操作系统,设置运行模式:local本地、yarn集群
lazy val autoSettingEnv = (sparkConf: SparkConf) => {
if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
//本地环境LOCAL_HADOOP_HOME
System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
//设置运行环境和checkpoint路径
sparkConf
.set("spark.master", "local[3]")
.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR)
} else {
//生产环境
sparkConf
.set("spark.master", "yarn")
.set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR)
}
// 返回设置以后SparkConf对象
sparkConf
}
/**
 * 创建sparkSession对象
 * @param sparkConf SparkConf实例,设置应用惨啊户数
 * @param clazz 每个应用Class实例对象
 */def createSparkSession(sparkConf: SparkConf, clazz: Class[_]): SparkSession = {
SparkSession.builder()
    .appName(clazz.getSimpleName.stripSuffix("$"))
.config(sparkConf)
.getOrCreate()
}
}

 

编写main方法,创建SparkSession对象,查看4040界面页面(线程休眠即可)

 

// 测试
def main(args: Array[String]): Unit = {
val spark: SparkSession = createSparkSession(
autoSettingEnv(sparkConf()), this.getClass
)
println(spark)
Thread.sleep(10000000)
spark.stop()
}

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-irIwFlIe-1652014600912)(1616049966978.)]

 

05-[理解]-实时ETL开发之KuduStreamApp程序

 

任务:编写流式程序,实时消费Kafka数据,进行ETL转换,最终存储到Kudu表中,
继承公共接口:BasicStreamApp
,实现其中方法。

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ijwk5iEE-1652014600912)(gitee.com/the_efforts…
.)]

 

具体开发步骤如下所示:

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kaLMr39B-1652014600912)(1616050178069.)]

 

​实时Kudu ETL应用程序入口,数据处理逻辑步骤:

 

step1. 创建SparkSession实例对象,传递SparkConf
step2. 从Kafka数据源实时消费数据
step3. 对获取Json数据进行ETL转换
step4. 保存转换后数据到外部存储
step5. 应用启动以后,等待终止结束

 

package cn.itcast.logistics.etl.realtime
 
import cn.itcast.logistics.common.SparkUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * Kudu数据管道应用:实现Kudu数据库的实时ETL操作
 */object KuduStreamApp extends BasicStreamApp {
/**
 * 数据的处理
 *
 * @param streamDF 流式数据集StreamingDataFrame
 * @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
 * @return 流式数据集StreamingDataFrame
 */override def process(streamDF: DataFrame, category: String): DataFrame = ???
/**
 * 数据的保存
 *
 * @param streamDF          保存数据集DataFrame
 * @param tableName         保存表的名称
 * @param isAutoCreateTable 是否自动创建表,默认创建表
 */override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = ???
/*
实时Kudu ETL应用程序入口,数据处理逻辑步骤:
step1. 创建SparkSession实例对象,传递SparkConf
step2. 从Kafka数据源实时消费数据
step3. 对获取Json数据进行ETL转换
step4. 保存转换后数据到外部存储
step5. 应用启动以后,等待终止结束
*/def main(args: Array[String]): Unit = {
// step1. 创建SparkSession实例对象,传递SparkConf
val spark: SparkSession = SparkUtils.createSparkSession(
SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
)
import spark.implicits._
// step2. 从Kafka数据源实时消费数据
// 物流系统Topic数据
val logisticsDF: DataFrame = load(spark, "logistics")
val crmDF: DataFrame = load(spark, "crm")
// step3. 对获取Json数据进行ETL转换
val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
val etlCrmDF: DataFrame = process(logisticsDF, "crm")
// step4. 保存转换后数据到外部存储
save(etlLogisticsDF, "logistics-console")
save(etlCrmDF, "crm-console")
// step5. 应用启动以后,等待终止结束
spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
spark.streams.awaitAnyTermination()
}
}

 

上述代码,已经实现MAIN方法,接下来只要实现其中:process【ETL转换】和save【保存数据】方法即可。

 

先不考虑ETL业务逻辑和具体save保存,数据直接进行ETL转换,将数据打印到控制台即可。

实现方法:process
,没有任何逻辑

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jFM2zN69-1652014600913)(1616051030658.)]

实现方法:save
,将数据打印控制台

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pmeOkIof-1652014600913)(1616051118704.)]

 

package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.SparkUtils
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * Kudu数据管道应用:实现Kudu数据库的实时ETL操作
 */object KuduStreamApp extends BasicStreamApp {
/**
 * 数据的处理,此时不进行任何业务逻辑转换
 *
 * @param streamDF 流式数据集StreamingDataFrame
 * @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
 * @return 流式数据集StreamingDataFrame
 */override def process(streamDF: DataFrame, category: String): DataFrame = {
val etlStreamDF: DataFrame = category match {
// TODO: 物流系统业务数据
case "logistics" =>
streamDF
// TODO: CRM系统业务数据
case "crm" =>
streamDF
// TODO: 其他业务系统数据
case _ => streamDF
}
// 返回ETL转换后的数据
etlStreamDF
}
/**
 * 数据的保存,此时仅仅将数据打印控制台
 *
 * @param streamDF          保存数据集DataFrame
 * @param tableName         保存表的名称
 * @param isAutoCreateTable 是否自动创建表,默认创建表
 */override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = {
streamDF.writeStream
.queryName(s"query-${tableName}")
.outputMode(OutputMode.Append())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.start()
}
/*
实时Kudu ETL应用程序入口,数据处理逻辑步骤:
step1. 创建SparkSession实例对象,传递SparkConf
step2. 从Kafka数据源实时消费数据
step3. 对获取Json数据进行ETL转换
step4. 保存转换后数据到外部存储
step5. 应用启动以后,等待终止结束
*/def main(args: Array[String]): Unit = {
// step1. 创建SparkSession实例对象,传递SparkConf
val spark: SparkSession = SparkUtils.createSparkSession(
SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
)
import spark.implicits._
// step2. 从Kafka数据源实时消费数据
// 物流系统Topic数据
val logisticsDF: DataFrame = load(spark, "logistics")
val crmDF: DataFrame = load(spark, "crm")
// step3. 对获取Json数据进行ETL转换
val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
val etlCrmDF: DataFrame = process(logisticsDF, "crm")
// step4. 保存转换后数据到外部存储
save(etlLogisticsDF, "logistics-console")
save(etlCrmDF, "crm-console")
// step5. 应用启动以后,等待终止结束
spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
spark.streams.awaitAnyTermination()
}
}

 

编程完成以后,运行流式计算程序:KuduStreamApp,启动MySQL数据库和Canal及Oracle数据库和OGG

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SlCVJaiw-1652014600914)(1616051529743.)]

 

06-[理解]-实时ETL开发之Kafka数据JSON格式

 

​实时从Kafka消费数据,无论是OGG采集还是Canal采集数据,都是以JSON字符串格式发送KafkaTopic,此时需要查看OGG采集数据格式字段和Canal采集数据格式字段。

1)、OGG 采集数据格式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Of6k9pCU-1652014600914)(1616051796175.)]

 

具体分析:插入数据Insert、更新数据update和删除数据delete

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hSS9jVRN-1652014600914)(1616052050412.)]

2)、Canal采集数据格式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6qUKxp1f-1652014600915)(1616052064308.)]

 

具体查看Canal采集数据,分析字段

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mqaOU4ec-1652014600915)(1616052381480.)]

 

07-[掌握]-实时ETL开发之定义数据Bean对象

 

​无论是OGG采集数据还是Canal采集数据,JSON数据各式字段,基本一致,所以定义JavaBean,分别解析封装数据到JavaBean对象

1)、OGG采集JSON数据:7个字段
2)、Canal采集JSON数据:12个字段
1)、定义 Bean 对象基类

​根据数据来源不同可以分为OGG数据和Canal数据,两者之间有相同的属性:table
,因此将该属性作为公共属性进行提取,抽象成基类。

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zmcLPflD-1652014600916)(1616053553019.)]

 

package cn.itcast.logistics.common.beans.parser;
import java.io.Serializable;
/**
 * 根据数据源定义抽象类,数据源:ogg 和 canal, 两者有共同的table属性
 */public abstract class MessageBean implements Serializable {
private static final long serialVersionUID = 373363837132138843L;
private String table;
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
@Override
public String toString() {
return table;
}
}

2)、定义 OGG 数据 Bean 对象

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YcbgHW28-1652014600917)(1616053885882.)]

 

package cn.itcast.logistics.common.beans.parser;
import java.util.Map;
/**
 * 定义消费OGG数据的JavaBean对象
 * {
 *     "table": "ITCAST.tbl_route",            //表名:库名.表名
 *     "op_type": "U",                         //操作类型:U表示修改
 *     "op_ts": "2020-10-08 09:10:54.000774",
 *     "current_ts": "2020-10-08T09:11:01.925000",
 *     "pos": "00000000200006645758",
 *     "before": {                            //操作前的字段集合
 *        "id": 104,
 *        "start_station": "东莞中心",
 *        "start_station_area_id": 441900,
 *        "start_warehouse_id": 1,
 *        "end_station": "蚌埠中转部",
 *        "end_station_area_id": 340300,
 *        "end_warehouse_id": 107,
 *        "mileage_m": 1369046,
 *        "time_consumer_minute": 56172,
 *        "state": 1,
 *        "cdt": "2020-02-02 18:51:39",
 *        "udt": "2020-02-02 18:51:39",
 *        "remark": null
 *        },
 *     "after": {                         //操作后的字段集合
 *        "id": 104,
 *        "start_station": "东莞中心",
 *        "start_station_area_id": 441900,
 *        "start_warehouse_id": 1,
 *        "end_station": "TBD",
 *        "end_station_area_id": 340300,
 *        "end_warehouse_id": 107,
 *        "mileage_m": 1369046,
 *        "time_consumer_minute": 56172,
 *        "state": 1,
 *        "cdt": "2020-02-02 18:51:39",
 *        "udt": "2020-02-02 18:51:39",
 *        "remark": null
 *    }
 * }
 */public class OggMessageBean extends MessageBean {
private static final long serialVersionUID = -4763944161833712521L;
//定义操作类型
private String op_type;
@Override
public void setTable(String table) {
//如果表名不为空
if (table != null && !table.equals("")) {
table = table.replaceAll("[A-Z]+\\.", "");
}
super.setTable(table);
}
public String getOp_type() {
return op_type;
}
public void setOp_type(String op_type) {
this.op_type = op_type;
}
public String getOp_ts() {
return op_ts;
}
public void setOp_ts(String op_ts) {
this.op_ts = op_ts;
}
public String getCurrent_ts() {
return current_ts;
}
public void setCurrent_ts(String current_ts) {
this.current_ts = current_ts;
}
public String getPos() {
return pos;
}
public void setPos(String pos) {
this.pos = pos;
}
public Map<String, Object> getBefore() {
return before;
}
public void setBefore(Map<String, Object> before) {
this.before = before;
}
public Map<String, Object> getAfter() {
return after;
}
public void setAfter(Map<String, Object> after) {
this.after = after;
}
//操作时间
private String op_ts;
@Override
public String toString() {
return "OggMessageBean{" +
"table='" + super.getTable() + '\'' +
", op_type='" + op_type + '\'' +
", op_ts='" + op_ts + '\'' +
", current_ts='" + current_ts + '\'' +
", pos='" + pos + '\'' +
", before=" + before +
", after=" + after +
'}';
}
/**
 * 返回需要处理的列的集合
 * @return
 */public Map<String, Object> getValue() {
//如果执行的是删除操作,则返回before节点的列的集合,如果执行的是插入和更新操作,则返回after节点的列的集合
if (after == null) {
return before;
} else {
return after;
}
}
//同步时间
private String current_ts;
//偏移量
private String pos;
//操作之前的数据
private Map<String, Object> before;
//操作之后的数据
private Map<String, Object> after;
}

 

​当OGG采集数据时,需要获取关心操作的数据,定义方法:getValue
,删除数据时获取before数据,插入或更新获取after数据,代码如下:

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IapFgUjs-1652014600918)(1616054052067.)]

3)、定义 Canal 数据 Bean 对象

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZckDhdXi-1652014600918)(1616054087741.)]

 

package cn.itcast.logistics.common.beans.parser;
import java.util.List;
import java.util.Map;
/**
 * 定义消费canal数据对应的JavaBean对象
 * {
 *     "data": [{
 *        "id": "1",
 *        "name": "北京",
 *        "tel": "222",
 *        "mobile": "1111",
 *        "detail_addr": "北京",
 *        "area_id": "1",
 *        "gis_addr": "1",
 *        "cdt": "2020-10-08 17:20:12",
 *        "udt": "2020-11-05 17:20:16",
 *        "remark": null
 *        }],
 *     "database": "crm",
 *     "es": 1602148867000,
 *     "id": 15,
 *     "isDdl": false,
 *     "mysqlType": {
 *        "id": "bigint(20)",
 *        "name": "varchar(50)",
 *        "tel": "varchar(20)",
 *        "mobile": "varchar(20)",
 *        "detail_addr": "varchar(100)",
 *        "area_id": "bigint(20)",
 *        "gis_addr": "varchar(20)",
 *        "cdt": "datetime",
 *        "udt": "datetime",
 *        "remark": "varchar(100)"
 *    },
 *     "old": [{
 *        "tel": "111"
 *    }],
 *     "sql": "",
 *     "sqlType": {
 *        "id": -5,
 *        "name": 12,
 *        "tel": 12,
 *        "mobile": 12,
 *        "detail_addr": 12,
 *        "area_id": -5,
 *        "gis_addr": 12,
 *        "cdt": 93,
 *        "udt": 93,
 *        "remark": 12
 *    },
 *     "table": "crm_address",
 *     "ts": 1602148867311,
 *     "type": "UPDATE"               //修改数据
 * }
 */public class CanalMessageBean extends MessageBean {
private static final long serialVersionUID = -3147101694588578078L;
//操作的数据集合
private List<Map<String, Object>> data;
public List<Map<String, Object>> getData() {
return data;
}
public void setData(List<Map<String, Object>> data) {
this.data = data;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public Long getEs() {
return es;
}
public void setEs(Long es) {
this.es = es;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public boolean isDdl() {
return isDdl;
}
public void setDdl(boolean ddl) {
isDdl = ddl;
}
public Map<String, Object> getMysqlType() {
return mysqlType;
}
public void setMysqlType(Map<String, Object> mysqlType) {
this.mysqlType = mysqlType;
}
public String getOld() {
return old;
}
public void setOld(String old) {
this.old = old;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public Map<String, Object> getSqlType() {
return sqlType;
}
public void setSqlType(Map<String, Object> sqlType) {
this.sqlType = sqlType;
}
public Long getTs() {
return ts;
}
public void setTs(Long ts) {
this.ts = ts;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
//数据库名称
private String database;
private Long es;
private Long id;
private boolean isDdl;
private Map<String, Object> mysqlType;
private String old;
private String sql;
private Map<String, Object> sqlType;
private Long ts;
private String type;
/**
 * 重写父类的settable方法,将表名修改成统一的前缀
 * @param table
 */@Override
public void setTable(String table) {
if(table!=null && !table.equals("")){
if(table.startsWith("crm_")) {
table = table.replace("crm_", "tbl_");
}
}
super.setTable(table);
}
}

 

CRM系统中每个表的名称都有前缀【crm_
】,解析JSON字符串时,将其替换【tbl_

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FP63yxtA-1652014600918)(1616054221345.)]

 

​至此,定义MessageBean实体类,接下来,需要编写代码解析JSON字符串为MessageBean对象,封装实例对象中,方便获取各个字段的值,进行相应的处理转换操作。

 


注意:定义MessageBean实体类使用Java
语言,没有使用Scala语言,后续使用fastJson
库解析。

 

08-[掌握]-实时ETL开发之数据转换Bean及测试

 

任务:==编写代码,解析JSON字符串为MessageBean对象,属于实时ETL转换第一步。==

 

1)、如何解析JSON字符串为JavaBean对象呢???

 


使用阿里巴巴JSON库:fastJson
,既能解析JSON为Bean对象,又能转换Bean对象为JSON字符串

 

为什幺使用fastJson解析??
fastJson解析Json字符串时,使用起来比较简单,此外库基于Java语言开发,对JavaBean对象支持非常的好,对Scala语言支持不好,所以MessageBean使用Java语言定义的,没有使用Scala语言。

转换JSON为Bean对象:JSON.parseObject(jsonStr, classOf[StuBean])

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DiTGRUf4-1652014600919)(1616054946045.)]

将Bean对象转换为JSON字符串:JSON.toJSONString(stuBean, true)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wAfJcjCm-1652014600920)(1616054978516.)]

 

package cn.itcast.logistics.test;
import java.util.Objects;
public class StuBean {
private Integer id ;
private String name ;
public StuBean() {
}
public StuBean(Integer id, String name) {
this.id = id;
this.name = name;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StuBean stuBean = (StuBean) o;
return Objects.equals(id, stuBean.id) &&
Objects.equals(name, stuBean.name);
}
@Override
public int hashCode() {
return Objects.hash(id, name);
}
@Override
public String toString() {
return "StuBean{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
// ===============================================================
package cn.itcast.logistics.test
import com.alibaba.fastjson.JSON
object FastJsonTest {
def main(args: Array[String]): Unit = {
// 定义json字符串
val jsonStr: String =
"""
  |{
  |  "id": 10001,
  |  "name": "zhangsan"
  |}
  |""".stripMargin
// JSON -> JavaBean
val stuBean: StuBean = JSON.parseObject(jsonStr, classOf[StuBean])
println(stuBean)
// JavaBean转换为JSON字符串
val stuJson: String = JSON.toJSONString(stuBean, true)
println(stuJson)
}
}

 

2)、流式程序使用Scala语言编写,为什幺MessageBean使用Java语言??

 


由于使用FastJson
解析Json字符串,所以使用Java语言编写。由于Scala语言中CaseClass对FastJson支持不是很友好,有的时候,解析会出问题,无法完成解析操作。

 

FastJson库使用Java编写的,符合Java语言规范,但是不符合Scala与规范约束。

 

任务:编写代码,将从Kafka消费JSON字符串数据,解析为MessageBean对象即可

1)、首先对物流系统数据,使用OGG采集数数据进行解析处理,核心代码如下:

case "logistics" =>
val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF
// 由于从Kafka消费数据,只获取value消息,将其转换DataSet
.as[String]
// 过滤数据
.filter(msg => null != msg && msg.trim.length > 0)
// 解析每条数据
.map{
msg => JSON.parseObject(msg, classOf[OggMessageBean])
}(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定编码器
// 返回数据
oggBeanStreamDS.toDF()

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-R4yvOxD0-1652014600920)(1616055524533.)]

 

​当将DataFrame转换Dataset进行操作时,尤其调用转换函数(比如map、filter、flatMap)等等,需要指定编码器Encoder
(Dataset 强类型数据结构,指定类型编码)。

 

默认情况下,当Dataset数据类型为:元组类型、CaseClass或基本数据类型,都会提供默认编码器Encoder,除此之外数据类型,比如自定义Java语言Bean对象,必须指定编码器。

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PdlhZQfN-1652014600921)(1616055716033.)]

 

运行测试程序,在Oracle数据库中操作数据,查看控制台打印结果

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VvKeLUp6-1652014600921)(1616055935505.)]

2)、将Canal采集JSON数据,转换MessageBean对象,核心代码

// TODO: CRM系统业务数据
case "crm" =>
implicit val canalEncoder: Encoder[CanalMessageBean] = Encoders.bean(classOf[CanalMessageBean])
val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF
// 过滤数据
.filter(row => !row.isNullAt(0))
// 解析数据,对分区数据操作
.mapPartitions { iter =>
iter.map { row =>
val jsonValue: String = row.getAs[String]("value")
// 解析JSON字符串
JSON.parseObject(jsonValue, classOf[CanalMessageBean])
}
}
// 返回转换后的数据
canalBeanStreamDS.toDF()

 

其中采用隐式参数方式,传递定义编码器Encoder

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EC1VljWg-1652014600921)(1616056559325.)]

 

再次运行流式计算程序:KuduStreamApp,在MySQL数据库中修改表的数据,查看控制台打印结果

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rg1sVo7i-1652014600923)(1616056711227.)]

 

09-[理解]-实时ETL开发之转换POJO【思路】

 

任务:分析从Kafka消费数据(JSON转换MessageBean对象),哪些字段是关系值。

OGG采集Oracle数据库数据:7个字段
Canal采集MySQL数据库数据:12个字段

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tc5InAk0-1652014600923)(image-20210526072519874.)]

1)、OGG 采集数据,核心数据字段

OGG采集Oracle数据库表的数据,发送到Kafka中JSON字符串,转换为MessageBean以后:

第一个字段:table
,对哪个表进行操作
第二个字段:op_type
,数据操作类型,三个值【插入I、更新U、删除D

第三个字段:数据字段,可能是after(插入和更新),可能是before(删除),提供方法getValue

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VOZkVqp6-1652014600923)(1616118817076-1621984868452.)]

2)、Canal采集数据,核心数据字段

Canal采集MySQL数据库数据时,业务中关心字段:与OGG采集数据关心字段基本一致

第一个字段:table
,表的名称,对哪个表进行操作
第二个字段:type
,数据操作类型,三个值【INSERT、UPDATE、DELETE

第三个字段:data
,真正操作数据

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4F8lXt9R-1652014600924)(1616119230880-1621984868452.)]

 

​经过前面分析可知,无论OGG采集海Canal采集数据,将JSON字符串封装为MessageBean以后,需要提取其中核心字段数据,进行转换操作。

 

将提取字段【type】类型和【data】数据,封装到具体表table的实体类【POJO】中,后面方便进行操作。

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7WYVty8J-1652014600924)(1616119695869-1621984868453.)]

 

10-[掌握]-实时ETL开发之OggBean转换POJO编程

 

任务:以OGG采集数据为例,针对其中一个张表【tbl_areas
】进行转换操作。

 

1)、依据table字段判断数据:tbl_areas
2)、获取数据字段值:getValue方法,将其转换为POJO对象
3)、过滤掉转换为null数据

1)、定义表名称的隐射

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ff2HZHtE-1652014600924)(1616120220576-1621984868453.)]

 

package cn.itcast.logistics.common
/**
 * 定义表名:根据表的名字定义属性
 */object TableMapping {
// Logistics 物流系统表名称
val AREAS: String = "tbl_areas"
val CHARGE_STANDARD: String = "tbl_charge_standard"
val CODES: String = "tbl_codes"
val COLLECT_PACKAGE: String = "tbl_collect_package"
val COMPANY: String = "tbl_company"
val COMPANY_DOT_MAP: String = "tbl_company_dot_map"
val COMPANY_TRANSPORT_ROUTE_MA: String = "tbl_company_transport_route_ma"
val COMPANY_WAREHOUSE_MAP: String = "tbl_company_warehouse_map"
val CONSUMER_SENDER_INFO: String = "tbl_consumer_sender_info"
val COURIER: String = "tbl_courier"
val DELIVER_PACKAGE: String = "tbl_deliver_package"
val DELIVER_REGION: String = "tbl_deliver_region"
val DELIVERY_RECORD: String = "tbl_delivery_record"
val DEPARTMENT: String = "tbl_department"
val DOT: String = "tbl_dot"
val DOT_TRANSPORT_TOOL: String = "tbl_dot_transport_tool"
val DRIVER: String = "tbl_driver"
val EMP: String = "tbl_emp"
val EMP_INFO_MAP: String = "tbl_emp_info_map"
val EXPRESS_BILL: String = "tbl_express_bill"
val EXPRESS_PACKAGE: String = "tbl_express_package"
val FIXED_AREA: String = "tbl_fixed_area"
val GOODS_RACK: String = "tbl_goods_rack"
val JOB: String = "tbl_job"
val OUT_WAREHOUSE: String = "tbl_out_warehouse"
val OUT_WAREHOUSE_DETAIL: String = "tbl_out_warehouse_detail"
val PKG: String = "tbl_pkg"
val POSTAL_STANDARD: String = "tbl_postal_standard"
val PUSH_WAREHOUSE: String = "tbl_push_warehouse"
val PUSH_WAREHOUSE_DETAIL: String = "tbl_push_warehouse_detail"
val ROUTE: String = "tbl_route"
val SERVICE_EVALUATION: String = "tbl_service_evaluation"
val STORE_GRID: String = "tbl_store_grid"
val TRANSPORT_RECORD: String = "tbl_transport_record"
val TRANSPORT_TOOL: String = "tbl_transport_tool"
val VEHICLE_MONITOR: String = "tbl_vehicle_monitor"
val WAREHOUSE: String = "tbl_warehouse"
val WAREHOUSE_EMP: String = "tbl_warehouse_emp"
val WAREHOUSE_RACK_MAP: String = "tbl_warehouse_rack_map"
val WAREHOUSE_RECEIPT: String = "tbl_warehouse_receipt"
val WAREHOUSE_RECEIPT_DETAIL: String = "tbl_warehouse_receipt_detail"
val WAREHOUSE_SEND_VEHICLE: String = "tbl_warehouse_send_vehicle"
val WAREHOUSE_TRANSPORT_TOOL: String = "tbl_warehouse_transport_tool"
val WAREHOUSE_VEHICLE_MAP: String = "tbl_warehouse_vehicle_map"
val WAY_BILL: String = "tbl_waybill"
val WAYBILL_LINE: String = "tbl_waybill_line"
val WAYBILL_STATE_RECORD: String = "tbl_waybill_state_record"
val WORK_TIME: String = "tbl_work_time"
// CRM 系统业务数据表名称
val ADDRESS: String = "tbl_address"
val CONSUMER_ADDRESS_MAP: String = "tbl_consumer_address_map"
val CUSTOMER: String = "tbl_customer"
}

2)、编写核心代码,从OggMessageBean中提取字段值,封装到具体POJO对象中

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-clUR4VHN-1652014600926)(1616120552233-1621984868453.)]

 

11-[掌握]-实时ETL开发之转换POJO【数据解析器】

 

任务:编写数据解析器中方法【toAreaBean】实现,从MessageBean中提取字段值,封装到POJO实体类对象。

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i2Q21jEU-1652014600927)(1616121581555-1621984868453.)]

当提取MessageBean中数据字段值,如何将其封装到POJO对象中呢??

首先将数据段值:Map数据类型转换 JSON字符串,再将JSON字符串转换为 POJO对象

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5HvECsad-1652014600927)(1616121249979-1621984868453.)]

 

最终实现代码如下所示:

 

package cn.itcast.logistics.etl.parse
import java.util
import cn.itcast.logistics.common.beans.logistics.AreasBean
import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, MessageBean, OggMessageBean}
import com.alibaba.fastjson.JSON
object DataParser {
/**
 * 判断messageBean是否是OggMessageBean
 */private def getOggMessageBean(bean: MessageBean): OggMessageBean = {
bean match {
case ogg: OggMessageBean => ogg
}
}
/**
 * 判断messageBean是否是CanalMessageBean
 */private def getCanalMessageBean(bean: MessageBean): CanalMessageBean = {
bean match {
case canal: CanalMessageBean => canal
}
}
/**
 * 提取ogg(I、U、D)和canal(insert、update、delete)数据的optype属性,转换成统一的操作字符串
 *
 * @param opType 数据操作类型:insert、update、delete,任意一种
 */private def getOpType(opType: String): String = {
opType match {
case "I" => "insert"
case "U" => "update"
case "D" => "delete"
case "INSERT" => "insert"
case "UPDATE" => "update"
case "DELETE" => "delete"
case _ => "insert"
}
}
// ================== 物流Logistics系统业务数据解析 ==================
/*
从MessageBean提取数据字段值和数据操作类型,将其封装到具体POJO对象中
TODO: 将物流Logistics系统:tbl_areas表的字段信息转换成AreaBean对象
 */def toAreaBean(bean: MessageBean): AreasBean = {
// a. 转换MessageBean对象为OggMessageBean对象
val oggMessageBean: OggMessageBean = getOggMessageBean(bean)
// b. 获取数据操作类型
val opType: String = getOpType(oggMessageBean.getOp_type)
// c. 获取操作数据值
val dataValue: util.Map[String, AnyRef] = oggMessageBean.getValue
// d. 将数据封装到POJO对象
// 第一步、转换map为json字符串
val dataJson: String = JSON.toJSONString(dataValue, true)
println(dataJson)
// 第二步、json字符串为pojo
val areasBean = JSON.parseObject(dataJson, classOf[AreasBean])
// 第三步、判断解析不为null时,设置数据操作类型
if(null != areasBean){
areasBean.setOpType(opType)
}
// e. 返回封装对象
areasBean
}
}

 

12-[理解]-实时ETL开发之转换POJO【隐式转换】

 

​当对Dataset数据结构进行操作时(调用函数,转换函数,比如map、flatMap)等,数据返回类型如果不是元组、CaseClass及基本类型,需要指定编码器Encoder

 

将JSON字符串转换为MessageBean对象时,指定Encoder编码器。

方式一、方法后面紧跟圆括号,指定对应编码器

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-A9yz4kyl-1652014600927)(1616123417063-1621984868453.)]

方式二、隐式参数,指定编码器,程序自动传入

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TtT2n2Ek-1652014600928)(1616123450168-1621984868453.)]

 

​当从MessageBean中提取数据字段值以后,将其封装到对应POJO对象时,需要指定编码器,否则程序报错,具体如下图所示:

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KoWkGwAc-1652014600928)(1616123541413-1621984868453.)]

 


学习SparkSQL框架,如何导入元组类型、CaseClass类型和基本类型
编码器。

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MhaVBfT0-1652014600928)(1616123609397-1621984868453.)]

 

​查看implicits
父类:SQLImplicits
对象,包含很对定义隐式转换函数,返回类型都是各种编码器Encoder

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kXuGyKtt-1652014600929)(1616123726466-1621984868453.)]

 

参考SQLImplicits
对象中编码器定义,自己定义编码器:BeanImplicits
,实现隐式导入和转换操作。

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kraEvGbD-1652014600929)(1616123830045-1621984868453.)]

 

package cn.itcast.logistics.common
import cn.itcast.logistics.common.beans.crm._
import cn.itcast.logistics.common.beans.logistics._
import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import org.apache.spark.sql.{Encoder, Encoders}
/**
 * 扩展自定义POJO的隐式转换实现
 */object BeanImplicits {
// 定义MessageBean隐式参数Encoder值
implicit def newOggMessageBeanEncoder: Encoder[OggMessageBean] = Encoders.bean(classOf[OggMessageBean])
implicit def newCanalMessageBeanEncoder: Encoder[CanalMessageBean] = Encoders.bean(classOf[CanalMessageBean])
// Logistics Bean
implicit def newAreasBeanEncoder: Encoder[AreasBean] = Encoders.bean(classOf[AreasBean])
implicit def newChargeStandardBeanEncoder: Encoder[ChargeStandardBean] = Encoders.bean(classOf[ChargeStandardBean])
implicit def newCodesBeanEncoder: Encoder[CodesBean] = Encoders.bean(classOf[CodesBean])
implicit def newCollectPackageBeanEncoder: Encoder[CollectPackageBean] = Encoders.bean(classOf[CollectPackageBean])
implicit def newCompanyBeanEncoder: Encoder[CompanyBean] = Encoders.bean(classOf[CompanyBean])
implicit def newCompanyDotMapBeanEncoder: Encoder[CompanyDotMapBean] = Encoders.bean(classOf[CompanyDotMapBean])
implicit def newCompanyTransportRouteMaBeanEncoder: Encoder[CompanyTransportRouteMaBean] = Encoders.bean(classOf[CompanyTransportRouteMaBean])
implicit def newCompanyWarehouseMapBeanEncoder: Encoder[CompanyWarehouseMapBean] = Encoders.bean(classOf[CompanyWarehouseMapBean])
implicit def newConsumerSenderInfoBeanEncoder: Encoder[ConsumerSenderInfoBean] = Encoders.bean(classOf[ConsumerSenderInfoBean])
implicit def newCourierBeanEncoder: Encoder[CourierBean] = Encoders.bean(classOf[CourierBean])
implicit def newDeliverPackageBeanEncoder: Encoder[DeliverPackageBean] = Encoders.bean(classOf[DeliverPackageBean])
implicit def newDeliverRegionBeanEncoder: Encoder[DeliverRegionBean] = Encoders.bean(classOf[DeliverRegionBean])
implicit def newDeliveryRecordBeanEncoder: Encoder[DeliveryRecordBean] = Encoders.bean(classOf[DeliveryRecordBean])
implicit def newDepartmentBeanEncoder: Encoder[DepartmentBean] = Encoders.bean(classOf[DepartmentBean])
implicit def newDotBeanEncoder: Encoder[DotBean] = Encoders.bean(classOf[DotBean])
implicit def newDotTransportToolBeanEncoder: Encoder[DotTransportToolBean] = Encoders.bean(classOf[DotTransportToolBean])
implicit def newDriverBeanEncoder: Encoder[DriverBean] = Encoders.bean(classOf[DriverBean])
implicit def newEmpBeanEncoder: Encoder[EmpBean] = Encoders.bean(classOf[EmpBean])
implicit def newEmpInfoMapBeanEncoder: Encoder[EmpInfoMapBean] = Encoders.bean(classOf[EmpInfoMapBean])
implicit def newExpressBillBeanEncoder: Encoder[ExpressBillBean] = Encoders.bean(classOf[ExpressBillBean])
implicit def newExpressPackageBeanEncoder: Encoder[ExpressPackageBean] = Encoders.bean(classOf[ExpressPackageBean])
implicit def newFixedAreaBeanEncoder: Encoder[FixedAreaBean] = Encoders.bean(classOf[FixedAreaBean])
implicit def newGoodsRackBeanEncoder: Encoder[GoodsRackBean] = Encoders.bean(classOf[GoodsRackBean])
implicit def newJobBeanEncoder: Encoder[JobBean] = Encoders.bean(classOf[JobBean])
implicit def newOutWarehouseBeanEncoder: Encoder[OutWarehouseBean] = Encoders.bean(classOf[OutWarehouseBean])
implicit def newOutWarehouseDetailBeanEncoder: Encoder[OutWarehouseDetailBean] = Encoders.bean(classOf[OutWarehouseDetailBean])
implicit def newPkgBeanEncoder: Encoder[PkgBean] = Encoders.bean(classOf[PkgBean])
implicit def newPostalStandardBeanEncoder: Encoder[PostalStandardBean] = Encoders.bean(classOf[PostalStandardBean])
implicit def newPushWarehouseBeanEncoder: Encoder[PushWarehouseBean] = Encoders.bean(classOf[PushWarehouseBean])
implicit def newPushWarehouseDetailBeanEncoder: Encoder[PushWarehouseDetailBean] = Encoders.bean(classOf[PushWarehouseDetailBean])
implicit def newRouteBeanEncoder: Encoder[RouteBean] = Encoders.bean(classOf[RouteBean])
implicit def newServiceEvaluationBeanEncoder: Encoder[ServiceEvaluationBean] = Encoders.bean(classOf[ServiceEvaluationBean])
implicit def newStoreGridBeanEncoder: Encoder[StoreGridBean] = Encoders.bean(classOf[StoreGridBean])
implicit def newTransportToolBeanEncoder: Encoder[TransportToolBean] = Encoders.bean(classOf[TransportToolBean])
implicit def newVehicleMonitorBeanEncoder: Encoder[VehicleMonitorBean] = Encoders.bean(classOf[VehicleMonitorBean])
implicit def newWarehouseBeanEncoder: Encoder[WarehouseBean] = Encoders.bean(classOf[WarehouseBean])
implicit def newWarehouseEmpBeanEncoder: Encoder[WarehouseEmpBean] = Encoders.bean(classOf[WarehouseEmpBean])
implicit def newWarehouseRackMapBeanEncoder: Encoder[WarehouseRackMapBean] = Encoders.bean(classOf[WarehouseRackMapBean])
implicit def newWarehouseReceiptBeanEncoder: Encoder[WarehouseReceiptBean] = Encoders.bean(classOf[WarehouseReceiptBean])
implicit def newWarehouseReceiptDetailBeanEncoder: Encoder[WarehouseReceiptDetailBean] = Encoders.bean(classOf[WarehouseReceiptDetailBean])
implicit def newWarehouseSendVehicleBeanEncoder: Encoder[WarehouseSendVehicleBean] = Encoders.bean(classOf[WarehouseSendVehicleBean])
implicit def newWarehouseTransportToolBeanEncoder: Encoder[WarehouseTransportToolBean] = Encoders.bean(classOf[WarehouseTransportToolBean])
implicit def newWarehouseVehicleMapBeanEncoder: Encoder[WarehouseVehicleMapBean] = Encoders.bean(classOf[WarehouseVehicleMapBean])
implicit def newWaybillBeanEncoder: Encoder[WaybillBean] = Encoders.bean(classOf[WaybillBean])
implicit def newWaybillLineBeanEncoder: Encoder[WaybillLineBean] = Encoders.bean(classOf[WaybillLineBean])
implicit def newWaybillStateRecordBeanEncoder: Encoder[WaybillStateRecordBean] = Encoders.bean(classOf[WaybillStateRecordBean])
implicit def newWorkTimeBeanEncoder: Encoder[WorkTimeBean] = Encoders.bean(classOf[WorkTimeBean])
implicit def newTransportRecordBeanEncoder: Encoder[TransportRecordBean] = Encoders.bean(classOf[TransportRecordBean])
// CRM Bean
implicit def newCustomerBeanEncoder: Encoder[CustomerBean] = Encoders.bean(classOf[CustomerBean])
implicit def newAddressBeanEncoder: Encoder[AddressBean] = Encoders.bean(classOf[AddressBean])
implicit def newConsumerAddressMapBeanEncoder: Encoder[ConsumerAddressMapBean] = Encoders.bean(classOf[ConsumerAddressMapBean])
}

 

在流式程序代码中,导入自定义隐式转换对象即可。

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oNnoiPwH-1652014600929)(1616123959706-1621984868453.)]

 

13-[掌握]-实时ETL开发之OggBean转换POJO测试

 

任务:启动容器,进入容器中启动Oracle数据库和OGG,修改Oracle数据库表的数据测试应用

1)、启动Kafka消息队列

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iEpB4Ge7-1652014600930)(1616124159348-1621984868453.)]

2)、启动Oracle数据库和OGG

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BWH9eKqo-1652014600930)(1616124185566-1621984868453.)]

3)、运行流式应用程序

首先,如果检查点目录存在,最好将其删除;此外,注释掉消费CRM系统数据代码,此时仅仅测试物流系统数据

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-t8nzhTJ0-1652014600930)(1616124259387-1621984868453.)]

 

启动流式应用程式,执行操作,查看控制台输出结果,可以看到将Map集合(存储数据)转换为JSON字符串

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7dGFJr0B-1652014600931)(1616124564721-1621984868453.)]

4)、使用DBeave对数据库表的数据进行更新和删除

更新2条数据,删除1条数据,看到如下界面:

 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4ADiDT4K-1652014600931)(1616124523908-1621984868453.)]

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注