flink流处理和批处理,flink数据采集

  flink流处理和批处理,flink数据采集

  00-1010I、CDC II、常用CDC III、Flink CDC IV的对比、Flink CDC V支持的数据库、Ali依赖实现的Flink CDC的使用示例介绍基于sql总结基于表格

  

目录

CDC(变更数据捕获),广义来说,任何能够捕获数据变更的技术都可以称为CDC。但是我们通常所说的CDC技术主要是针对数据库(包括mysql、Oracle、MongoDB等)的变化。),它是一种用于捕获数据库中数据变化的技术。

 

  00-1010常见产品主要有Flink CDC、Datax、Canal、SQOOP、Kettle、Oracle金门、Debezium等。

  DataX、Sqoop、kettle的CDC实现技术主要基于查询,通过离线调度查询作业实现批量请求。这种操作不能保证数据的一致性,实时性差。Flink CDC、Canal、Debezium和Oracle Goldengate都是基于日志的CDC技术。该技术采用流处理的方式,实时处理日志数据,保证数据的一致性,并为其他服务提供实时数据。

  00-1010年2020年,Flink cdc首次在Flink forward大会上正式公布,由两位大佬Jark吴庆生任提出。

  Flink CDC连接器可以捕获一个或多个表中发生的所有更改。这种模式通常有一个前记录和一个后记录。Flink CDC连接器可以在Flink中以无约束模式(流)直接使用,不需要使用kafka这样的中间件来传输数据。

  

一、CDC

 

  PS:

  Flink CDC 2.2刚刚增加了四个数据源:OceanBase、PolarDB-X、SqlServer和TiDB,都支持完全和增量集成同步。

  到目前为止,FlinkCDC已经支持12个数据源。

  

二、常见CDC的比较

 

  

三、Flink CDC

!- flink表支持-依赖关系groupIdorg.apache.flink/groupId ArtifactidFlink-Table-API-Java/ArtifactidVersion $ { flink . version }/版本/依赖关系groupIdorg.apache.flink/groupId ArtifactidFlink-Table-API-Java-bridge _ $ { Scala . binary . version }/artifactId version $ { flink . version }/版本/依赖关系groupIdorg.apache.flink/groupId ArtifactidFlink-Table-planner-blink _ $ { Scala . binary . version }/artifactId version $ { flink . version }/依赖关系!-阿里实现的Flink MySQL CDC依赖groupIdcom.alibaba.ververica/groupId ArtifactidFlink-Connector-MySQL-CDC/artifactid Version 1 . 4 . 0/Version/Dependency依赖GroupID MySQL/GroupID Artifact

 

  Id>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.80</version> </dependency> <!-- jackson报错解决 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-parameter-names</artifactId> <version>${jackson.version}</version> </dependency>

 

  

基于table

package spendreport.cdc;import com.alibaba.fastjson.JSONObject;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;import io.debezium.data.Envelope;import java.util.List;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;;/** * @author zhengwen **/public class TestMySqlFlinkCDC { public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传, 需要从 Checkpoint 或者 Savepoint 启动程序 //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK env.enableCheckpointing(5000L); //2.2 指定 CK 的一致性语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //2.3 设置任务关闭的时候保留最后一次 CK 数据 env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //2.4 指定从 CK 自动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("127.0.0.1") .serverTimeZone("GMT+8") //时区报错增加这个设置 .port(3306) .username("root") .password("123456") .databaseList("wz") .tableList("wz.user_info") //注意表一定要写库名.表名这种,多个,隔开 .startupOptions(StartupOptions.initial()) //自定义转json格式化 .deserializer(new MyJsonDebeziumDeserializationSchema()) //自带string格式序列化 //.deserializer(new StringDebeziumDeserializationSchema()) .build(); DataStreamSource<String> streamSource = env.addSource(sourceFunction); //TODO 可以keyBy,比如根据table或type,然后开窗处理 //3.打印数据 streamSource.print(); //streamSource.addSink(); 输出 //4.执行任务 env.execute("flinkTableCDC"); } private static class MyJsonDebeziumDeserializationSchema implements com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema<String> { @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { Struct value = (Struct) sourceRecord.value(); Struct source = value.getStruct("source"); //获取数据库名称 String db = source.getString("db"); String table = source.getString("table"); //获取数据类型 String type = Envelope.operationFor(sourceRecord).toString().toLowerCase(); if (type.equals("create")) { type = "insert"; } JSONObject jsonObject = new JSONObject(); jsonObject.put("database", db); jsonObject.put("table", table); jsonObject.put("type", type); //获取数据data Struct after = value.getStruct("after"); JSONObject dataJson = new JSONObject(); List<Field> fields = after.schema().fields(); for (Field field : fields) { String field_name = field.name(); Object fieldValue = after.get(field); dataJson.put(field_name, fieldValue); } jsonObject.put("data", dataJson); collector.collect(JSONObject.toJSONString(jsonObject)); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }}

运行效果

 

  

 

  PS:

  操作数据库的增删改就会立马触发这里是自定义的序列化转json格式字符串,自带的字符串序列化也是可以的(可以自己试试打印的内容)

 

  

基于sql

package spendreport.cdc;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @author zhengwen **/public class TestMySqlFlinkCDC2 { public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //2.创建 Flink-MySQL-CDC 的 Source String connectorName = "mysql-cdc"; String dbHostName = "127.0.0.1"; String dbPort = "3306"; String dbUsername = "root"; String dbPassword = "123456"; String dbDatabaseName = "wz"; String dbTableName = "user_info"; String tableSql = "CREATE TABLE t_user_info (" + "id int,mobile varchar(20)," + "user_name varchar(30)," + "real_name varchar(60)," + "id_card varchar(20)," + "org_name varchar(100)," + "user_stars int," + "create_by int," // + "create_time datetime," + "update_by int," // + "update_time datetime," + "is_deleted int) " + " WITH (" + " connector = " + connectorName + "," + " hostname = " + dbHostName + "," + " port = " + dbPort + "," + " username = " + dbUsername + "," + " password = " + dbPassword + "," + " database-name = " + dbDatabaseName + "," + " table-name = " + dbTableName + "" + ")"; tableEnv.executeSql(tableSql); tableEnv.executeSql("select * from t_user_info").print(); env.execute(); }}

运行效果:

 

  

 

  

 

  

总结

既然是基于日志,那么数据库的配置文件肯定要开启日志功能,这里mysql需要开启内容

 

  

 

  

server-id=1log_bin=mysql-binbinlog_format=ROW #目前还只能支持行expire_logs_days=30binlog_do_db=wz #这里binlog的库如果有多个就再写一行,千万不要写成用,隔开

 

  

实时性确实高,比那些自动任务定时取体验号百倍流示的确实丝滑最后肯定证明这种方式同步数据可行,而且实时性特高,但是就是不知道我们的目标数据库是否可以开启这些日志配置。UP!

 

  到此这篇关于Flink流处理引擎零基础速通之数据的抽取篇的文章就介绍到这了,更多相关Flink数据的抽取内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: