使用Flink CDC 2.2.1进行ETL
本文将展示如何基于 Flink CDC 2.2.1快速构建 针对MySQL 和 Oracle 的流式 ETL。演示基于Java语言,使用Maven。
1. Maven依赖
<properties> <java.version>1.8</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <flink.sql.connector.cdc.version>2.2.1</flink.sql.connector.cdc.version> <flink.version>1.13.3</flink.version> <scala.version>2.12</scala.version> <oracle.jdbc.version>12.2.0.1</oracle.jdbc.version> <mysql.jdbc.version>5.1.49</mysql.jdbc.version> </properties> <dependencies> <!-- jdbc --> <dependency> <groupId>com.oracle.database.jdbc</groupId> <artifactId>ojdbc8</artifactId> <scope>runtime</scope> <version>${oracle.jdbc.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> <version>${mysql.jdbc.version}</version> </dependency> <!-- end jdbc --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- flink connector cdc --> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-oracle-cdc</artifactId> <version>${flink.sql.connector.cdc.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>${flink.sql.connector.cdc.version}</version> </dependency> <!-- end flink connector cdc --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> </dependencies>
2. MySQL CDC
2.1 MySQL CDC 2.0变化
监听mysql的binlog变化,在flink cdc1.0版本基础上,MySQL CDC 连接器提供了无锁算法,并发读取,断点续传等高级特sq性;
2.2 MySQL CDC使用
2.2.1 MySQL环境准备
目前Flink CDC支持的MySQL版本:5.7.x,MySQL 8.0.x;
2.2.1.1 开启binlog
开启binlog前,需要安装MySQL,此处略;
2.2.1.1.1 修改MySQL配置文件
针对/etc/my.cnf文件(这里以CentOS7为基础环境),增加如下内容:
server-id=1 log_bin=mysql-bin binlog_format=ROW binlog_row_image=full expire_logs_days=10 binlog_do_db=mydb
说明:
server_id:MySQL5.7及以上版本开启binlog必须要配置这个选项。对于MySQL集群,不同节点的server_id必须不同。
log_bin:指定binlog文件名和储存位置。如果不指定路径,默认位置为/var/lib/mysql/。
binlog_format:binlog格式。有3个值可以选择:
ROW:记录哪条数据被修改和修改之后的数据,会产生大量日志。
STATEMENT:记录修改数据的SQL,日志量较小。
MIXED:混合使用上述两个模式。CDC要求必须配置为ROW。
binlog_row_image:可以设置三个合法值:
full,表无论有没有主键约束或者唯一约束,binlog都会记录所有前后镜像;
minimal,如果表有主键或唯一索引,前镜像只保留主键列,后镜像只保留修改列;如果表没有主键或唯一索引,前镜像全保留,后镜像只保留修改列;
noblob,
如果表有主键或唯一索引,修改列为text/blob列,前镜像忽略text/blob列,后镜像包含被修改的text/blob列;
如果表有主键或唯一索引,修改列不是text/blob列,前后镜像忽略text/blob列。如果表没有主键或唯一索引,修改列为text/blob列 ,前后镜像全保留;
如果表没有主键或唯一索引,修改列不是text/blob列,前镜像全保留,后镜像忽略text/blob列。
expire_logs_days:bin_log过期时间,超过该时间的log会自动删除。
binlog_do_db:binlog记录哪些数据库。如果需要配置多个库,如例子中配置多项。切勿使用逗号分隔。
2.2.1.1.2 修改查看binlog是否开启成功
执行以下SQL即可查看:
show variables like ‘log_bin’;
2.2.1.2 创建数据库和表,并插入数据
-- MySQL CREATE DATABASE mydb; USE mydb; CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) ); ALTER TABLE products AUTO_INCREMENT = 101; CREATE TABLE products_sink ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, NAME VARCHAR(255) NOT NULL, description VARCHAR(512) ); INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"hammer","12oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"spare tire","24 inch spare tire");
2.2.2 代码实现-捕获MySQL数据表变化
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.TableResult; /** * Flink CDC 2.2.1, 捕获MySQL数据表变化 * * @author 闻武 * @since 2020-05-31 */public class CdcMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.disableOperatorChaining(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String strSql = " CREATE TABLE products_mys_cdc ( " + " id INT, " + " name STRING, " + " description STRING, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'mysql-cdc', " + " 'hostname' = '192.168.123.58', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = '123456', " + " 'database-name' = 'mydb', " + " 'table-name' = 'products', " + " 'debezium.log.mining.continuous.mine'='true', "+ " 'debezium.log.mining.strategy'='online_catalog', " + " 'debezium.database.tablename.case.insensitive'='false', "+ " 'scan.startup.mode' = 'initial')"; tableEnv.executeSql(strSql); TableResult tableResult = tableEnv.executeSql("select * from products_mys_cdc"); tableResult.print(); env.execute(); } }
程序启动运行结果如下:
针对products表增删改数据,products_mys_cdc表都会体现出来:
2022-06-01 10:30:11.379 INFO [Threads.java: 287] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client +----+-------------+--------------------------------+--------------------------------+ | op | id | name | description | +----+-------------+--------------------------------+--------------------------------+ | +I | 105 | spare tire | 24 inch spare tire | | +I | 104 | rocks | box of assorted rocks | | +I | 101 | scooter | Small 2-wheel scooter | | +I | 103 | hammer | 12oz carpenter's hammer | | +I | 102 | car battery | 12V car battery | 2022-06-01 10:30:11.480 INFO [MySqlStreamingChangeEventSource.java: 916] - Keepalive thread is running | -D | 102 | car battery | 12V car battery | | -D | 104 | rocks | box of assorted rocks | | -U | 105 | spare tire | 24 inch spare tire | | +U | 105 | leon | 24 inch spare tire | | +I | 106 | bruce lee | gongfu |
2.2.3 代码实现-捕获MySQL数据表变化,并写入MySQL
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * Flink CDC 2.2.1, 捕获MySQL数据表变化,并写入MySQL * * @author 闻武 * @since 2020-05-31 */public class CdcMySQL2MySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String strSourceSql = "CREATE TABLE IF NOT EXISTS products_mys_cdc ( " + " id Int primary key, " + " name String, " + " description String " + " ) with ( " + " 'connector' = 'mysql-cdc', " + " 'scan.startup.mode' = 'latest-offset', " + " 'hostname' = '192.168.123.58', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = '123456', " + " 'database-name' = 'mydb', " + " 'table-name' = 'products', " + " 'debezium.log.mining.continuous.mine'='true', "+ " 'debezium.log.mining.strategy'='online_catalog', " + " 'debezium.database.tablename.case.insensitive'='false', "+ " 'scan.startup.mode' = 'initial')"; String strSinkSql = " CREATE TABLE IF NOT EXISTS products_mys_sink (" + " id INT, " + " name STRING, " + " description STRING, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://192.168.123.58:3306/mydb'," + " 'table-name' = 'products_sink'," + " 'username' = 'root'," + " 'password' = '123456' " + " )"; tableEnv.executeSql(strSourceSql); tableEnv.executeSql(strSinkSql); tableEnv.executeSql("insert into products_mys_sink select * from products_mys_cdc "); } }
程序启动运行结果如下:
可以通过Flink SQL CLI 监控products_sink表的变化,这里略过flink环境的搭建
2.2.3.1 启动 Flink 集群和 Flink SQL CLI
使用下面的命令跳转至 Flink 目录下
cd flink
使用下面的命令启动 Flink 集群
./bin/start-cluster.sh
使用下面的命令启动 Flink SQL CLI
./bin/sql-client.sh
2.2.3.2 在 Flink SQL CLI 中使用 Flink DDL 创建表
首先,开启 checkpoint,每隔3秒做一次 checkpoint
-- Flink SQL Flink SQL> SET execution.checkpointing.interval = 3s;
然后, 对于数据库中的表products_sink
, 使用 Flink SQL CLI 创建对应的表
-- Flink SQL Flink SQL> CREATE TABLE products_sink ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'products_sink' );
2.2.3.3 在 Flink SQL CLI 中使用 Flink DML 查询表
跟踪products_sink数据,执行
-- Flink SQL Flink SQL> select * from products_sink;
Flink CDC Cli的表跟踪显示:
MySQL客户端执行以下修改语句:
-- MySQL SQL UPDATE products SET description='First Rank' WHERE id=103;
MySQL客户端返回修改成功:
Flink CDC Cli的表跟踪显示:
其它针对products表的新增、删除操作,也都能尽快反应相关表中去,这里不再展示;
3. Oracle CDC
3.1 捕获Oracle数据变更原理
支持捕获并记录Oracle数据库服务器中发生的行级变更,其原理是使用 Oracle 提供的 LogMiner工具或者原生的 XStream API从Oracle 中获取变更数据。
LogMiner 是 Oracle 数据库提供的一个分析工具,该工具可以解析Oracle Redo
日志文件,从而将数据库的数据变更日志解析成变更事件输出。通过LogMiner 方式时,Oracle 服务器对解析日志文件的进程做了严格的资源限制,所以对规模特别大的表,数据解析会比较慢,优点是LogMiner免费。
XStream API 是 Oracle 数据库为 Oracle GoldenGate (OGG) 提供的内部接口, 客户端可以通过XStream API 高效地获取变更事件,其变更数据不是从 Redo 日志文件中获取,而是从 Oralce 服务器中的一块内存中直接读取,省去了数据落盘到日志文件和解析日志文件的开销,效率更高
,但是必须购买 Oracle GoldenGate (OGG) 的 License。
3.2 Oracle CDC使用
3.2.1 Oracle环境准备
目前Flink CDC支持的Oracle版本:11,12,19;
3.2.1.1 开启LogMiner
开启LogMiner前,需要安装Oracle,此处略;
3.2.1.1.1 启用归档日志
3.2.1.1.1.1 用dba进入数据库
sqlplus / AS SYSDBA
3.2.1.1.1.2 开启归档日志
修改归档日志大小,目录 alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/oradata/dg01/recovery_area' scope=spfile; alter system set db_recovery_file_dest_size=41820M scope=spfile; # 重启数据库实例,打开归档日志 shutdown immediate; startup mount; alter database archivelog; alter database open; # 查看归档 archive log list;
3.2.1.1.1.3 开启补全日志
# 开启单个表 ALTER TABLE schema.table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; # 开启全库 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; # 全体字段补充日志 ## 打开all补全日志(建议执行) alter database add supplemental log data (all) columns; ## 查看是否打开 select supplemental_log_data_all as all from v$database ; ## 删除all补全日志 alter database drop supplemental log data (all) columns;
3.2.1.1.2 创建Oracle用户并授权
3.2.1.1.2.1 创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/oradata/dg01/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
3.2.1.1.2.2 创建用户并授权
CREATE USER flink IDENTIFIED BY flink DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flink; GRANT SET CONTAINER TO flink; // GRANT SELECT ON V_$DATABASE to flink; GRANT FLASHBACK ANY TABLE TO flink; GRANT SELECT ANY TABLE TO flink; GRANT SELECT_CATALOG_ROLE TO flink; GRANT EXECUTE_CATALOG_ROLE TO flink; GRANT SELECT ANY TRANSACTION TO flink; GRANT LOGMINING TO flink; GRANT CREATE TABLE TO flink; GRANT LOCK ANY TABLE TO flink; GRANT ALTER ANY TABLE TO flink; GRANT CREATE SEQUENCE TO flink; GRANT EXECUTE ON DBMS_LOGMNR TO flink; GRANT EXECUTE ON DBMS_LOGMNR_D TO flink; GRANT SELECT ON V_$LOG TO flink; GRANT SELECT ON V_$LOG_HISTORY TO flink; GRANT SELECT ON V_$LOGMNR_LOGS TO flink; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flink; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flink; GRANT SELECT ON V_$LOGFILE TO flink; GRANT SELECT ON V_$ARCHIVED_LOG TO flink; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flink;
3.2.1.2 创建数据库和表,并插入数据
-- Oracle CREATE TABLE products ( id number(10) constraint pk_id primary key, name varchar2(255), description varchar2(512) ); -- 修改PRODUCTS表让其支持增量日志,这句先在Oracle里创建user表再执行 ALTER TABLE FAMILY.PRODUCTS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; CREATE TABLE products_sink ( id number(10) constraint pk_id primary key, name varchar2(255), description varchar2(512) ); INSERT INTO products VALUES (101,"scooter","Small 2-wheel scooter"), (102,"car battery","12V car battery"), (103,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (104,"hammer","12oz carpenter's hammer"), (105,"hammer","14oz carpenter's hammer"), (106,"hammer","16oz carpenter's hammer"), (107,"rocks","box of assorted rocks"), (108,"jacket","water resistent black wind breaker"), (109,"spare tire","24 inch spare tire");
3.2.2 代码实现-捕获Oracle数据表变化
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * Flink CDC 2.2.1, 捕获Oracle数据表变化 * * @author 闻武 * @since 2020-05-31 */public class CdcMyOracle { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.disableOperatorChaining(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String strSql = " CREATE TABLE products_ora_cdc ( " + " ID INT, " + " NAME STRING, " + " DESCRIPTION STRING, " + " PRIMARY KEY (ID) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'oracle-cdc', " + " 'hostname' = '192.168.123.58', " + " 'port' = '1521', " + " 'username' = 'flinkuser', " + " 'password' = 'flinkpw', " + " 'database-name' = 'XE', " + " 'schema-name' = 'flinkuser', " + " 'table-name' = 'products', " + " 'debezium.log.mining.continuous.mine'='true', "+ " 'debezium.log.mining.strategy'='online_catalog', " + " 'debezium.database.tablename.case.insensitive'='false', "+ " 'scan.startup.mode' = 'initial')"; tableEnv.executeSql(strSql); TableResult tableResult = tableEnv.executeSql("select * from products_ora_cdc"); tableResult.print(); env.execute(); } }
程序启动运行结果如下:
3.2.3 代码实现-捕获Oracle数据表变化,并写入MySQL
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * Flink CDC 2.2.1, 捕获Oracle数据表变化,并写入MySQL * * @author 闻武 * @since 2020-05-31 */public class CdcOracle2MySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String strSourceSql = "CREATE TABLE IF NOT EXISTS products_ora_cdc ( " + " ID INT, " + " NAME STRING, " + " DESCRIPTION STRING, " + " PRIMARY KEY (ID) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'oracle-cdc', " + " 'hostname' = '192.168.123.58', " + " 'port' = '1521', " + " 'username' = 'flinkuser', " + " 'password' = 'flinkpw', " + " 'database-name' = 'XE', " + " 'schema-name' = 'flinkuser', " + " 'table-name' = 'products', " + " 'debezium.log.mining.continuous.mine'='true', "+ " 'debezium.log.mining.strategy'='online_catalog', " + " 'debezium.database.tablename.case.insensitive'='false', "+ " 'scan.startup.mode' = 'initial')"; String strSinkSql = " CREATE TABLE IF NOT EXISTS products_mys_sink (" + " id INT, " + " name STRING, " + " description STRING, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://192.168.123.58:3306/mydb'," + " 'table-name' = 'products_sink'," + " 'username' = 'root'," + " 'password' = '123456' " + " )"; tableEnv.executeSql(strSourceSql); tableEnv.executeSql(strSinkSql); tableEnv.executeSql("insert into products_mys_sink(id, name, description) select ID, NAME, DESCRIPTION from products_ora_cdc "); } }
剩下的和“2.2.3 代码实现-捕获MySQL数据表变化,并写入MySQL”类似,这里不再赘述;
相关代码下载:
Be First to Comment