Docker安装canal、mysql进行简单测试与实现redis和mysql缓存一致性(docker安装mysql5.7)

  本篇文章为你整理了Docker安装canal、mysql进行简单测试与实现redis和mysql缓存一致性(docker安装mysql5.7)的详细内容,包含有docker安装mysql8.0 docker安装mysql5.7 docker 安装mysql8 docker-compose安装mysql Docker安装canal、mysql进行简单测试与实现redis和mysql缓存一致性,希望能帮助你了解 Docker安装canal、mysql进行简单测试与实现redis和mysql缓存一致性。

  canal [kənæl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
 

  早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

  Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅 消费的中间件。
 

  目前,Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得
 

  的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。

  当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

  canal github地址
 

  二、MySQL 的 Binlog

  1. Binlog介绍

  MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除
 

  了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进 制日志是事务安全型的。
 

  一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:

  
MySQL Replication 在 Master 端开启 Binlog,Master 把它的二进制日志传递给 Slaves
 

  来达到 Master-Slave 数据一致的目的,这就是我们常用的主从复制。

  
就是数据恢复了,通过使用 MySQL Binlog 工具来使恢复数据,生产上要开启,不然真的要删库跑路了 。

  
2. Binlog 的分类

  MySQL Binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配
 

  置 binlog_format= statementmixedrow。

  statement:语句级,binlog 会记录每次一执行写操作的语句。比如
 

  update user set create_date=now()
 

  优点:节省空间。
 

  缺点:有可能造成数据不一致。

  row:行级, binlog 会记录每次操作后每行记录的变化。
 

  优点:保持数据的绝对一致性
 

  缺点:占用较大空间

  mixed:statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement
 

  模式不一致问题,默认还是 statement,一些会产生不一致的情况还是会选择row。

  综合对比
 

  Canal 想做监控分析,选择 row 格式比较合适。

  三、工作原理

  1. MySQL主备复制原理

  MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
 

  2. canal 工作原理

  canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

  MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  canal 解析 binary log 对象(原始为 byte 流)

  总结:

  我们可以把canal理解为从机,拿到数据然后进行后续操作,可以同步到redis上,再也不需要进行延迟双删来保证mysql和redis的数据一致性了,而且还不会出现各种各样的问题!

  四、canal使用场景

  场景一: 阿里 Otter 中间件的一部分
 

  Otter 是阿里用于进行异地数据库之间的同步框架,Canal 是其中一部分。

  otter github地址
 

  
 

  场景二:保证缓存和数据库一致性(我们今天要测试的)

  场景三:实时数据分析

  抓取业务表的新增变化数据,用于制作实时统计

  五、安装mysql、redis

  1. 安装mysql

  

sudo docker run -p 3306:3306 --name mysql \

 

  -v /mydata/mysql/log:/var/log/mysql \

  -v /mydata/mysql/data:/var/lib/mysql \

  -v /mydata/mysql/conf:/etc/mysql \

  -e MYSQL_ROOT_PASSWORD=root \

  -d mysql:5.7

  

 

  2. Docker配置MySQL

  

vim /mydata/mysql/conf/my.cnf # 创建并进入编辑

 

  

 

  添加如下配置:

  

[client]

 

  default-character-set=utf8

  [mysql]

  default-character-set=utf8

  [mysqld]

  init_connect=SET collation_connection = utf8_unicode_ci

  init_connect=SET NAMES utf8

  character-set-server=utf8

  collation-server=utf8_unicode_ci

  skip-character-set-client-handshake

  skip-name-resolve

  # 开启binlog日志:目录为docker里的目录

  log-bin=/var/lib/mysql/mysql-bin

  # server_id 需保证唯一,不能和 canal 的 slaveId 重复

  server-id=123456

  binlog_format=row

  # test数据库开启,不设置则所有库开启

  binlog-do-db=test

  

 

  3. 重新启动mysql

  

docker restart mysql

 

  

 

  4. 创建用户并赋权限

  查看mysql的 id:

  

docker ps

 

  

 

  进入docker容器:

  

docker exec -it 7d /bin/bash

 

  

 

  连接到mysql:

  

mysql -u root -p

 

  

 

  创建用户并赋予权限:

  

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@% IDENTIFIED BY canal ;

 

  

 

  刷新:

  

flush privileges;

 

  

 

  5. Win10连接mysql创建user表

  

CREATE TABLE `user` (

 

   `id` int(10) NOT NULL AUTO_INCREMENT,

   `name` varchar(25) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,

   `sex` varchar(1) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,

   PRIMARY KEY (`id`) USING BTREE

  ) ENGINE = InnoDB AUTO_INCREMENT = 8 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

  SET FOREIGN_KEY_CHECKS = 1;

  

 

  6. 创建redis

  

docker run -p 6379:6379 --name redis \

 

  -v /mydata/redis/data:/data \

  -v /mydata/redis/conf/redis.conf:/etc/redis/redis.conf \

  -d redis redis-server /etc/redis/redis.conf

  

 

  六、安装canal

  1. 启动容器

  

docker run -it --name canal -p 11111:11111 -d canal/canal-server:v1.1.5

 

  

 

  查看三个容器:

  

docker ps

 

  

 

  2. 配置canal

  进入容器:

  

docker exec -it 56 /bin/bash

 

  

 

  切换目录:

  

cd canal-server/conf/example

 

  

 

  修改两个地方:

  第一个是mysql的地址,第二个是我们创建数据库名字(可以使用默认带的,就是全部的库都进行收集binlog日志)

  

canal.instance.master.address=192.168.84.138:3306

 

  canal.instance.filter.regex=test\..*

  

 

  3. 查看日志

  我们查看一下canal的日志,看是否启动成功!
 

  首先进入容器:

  

docker exec -it 56 /bin/bash

 

  

 

  切换目录:

  

cd canal-server/logs/example/

 

  

 

  查看日志:

  

cat example.log

 

  

 

  无报错,刚刚新建的表这里也可以检测到!

  4. 查看canal.properties

  

cd /canal-server/conf

 

  

 

  

cat canal.properties

 

  

 

  我们可以看到有很多个模式,可以把canal收集到的binlog发送到三大MQ中,或者tcp。

  本次以tcp为准测试,如果大家有需求可以进行发送到MQ,往下滑都有对应的配置!

  七、简单测试

  1. 新建springboot项目,导入依赖

  

 dependency 

 

   groupId org.springframework.boot /groupId

   artifactId spring-boot-starter /artifactId

   /dependency

   dependency

   groupId org.springframework.boot /groupId

   artifactId spring-boot-starter-test /artifactId

   scope test /scope

   exclusions

   exclusion

   groupId org.junit.vintage /groupId

   artifactId junit-vintage-engine /artifactId

   /exclusion

   /exclusions

   /dependency

   dependency

   groupId com.alibaba.otter /groupId

   artifactId canal.client /artifactId

   version 1.1.0 /version

   /dependency

   dependency

   groupId org.springframework.boot /groupId

   artifactId spring-boot-starter-data-redis /artifactId

   /dependency

   dependency

   groupId com.fasterxml.jackson.core /groupId

   artifactId jackson-core /artifactId

   version 2.8.6 /version

   /dependency

   dependency

   groupId com.fasterxml.jackson.module /groupId

   artifactId jackson-module-jaxb-annotations /artifactId

   version 2.8.6 /version

   /dependency

  

 

  2. 编写测试文件

  来自官方例子:

  我把statis关键字删除了,方便和redis进行整合

  例子地址

  

import com.alibaba.otter.canal.client.CanalConnector;

 

  import com.alibaba.otter.canal.protocol.Message;

  import com.alibaba.otter.canal.protocol.CanalEntry.Column;

  import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

  import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

  import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

  import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

  import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

  import com.alibaba.otter.canal.client.*;

  import org.springframework.context.annotation.Bean;

  import org.springframework.context.annotation.Configuration;

  
// private static String REDIS_DATABASE = "mall";

  // private static String REDIS_KEY_ADMIN = "ums:admin";

   @Bean

   public void canalSync() {

   // 创建链接,第一个参数是canal的ip,第二个参数是canal的端口号,

   // 第三个参数是canal虚拟的模块名称,canal是创建的数据库账号密码

   CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.84.138",

   11111), "example", "canal", "canal");

   int batchSize = 1000;

   int emptyCount = 0;

   try {

   connector.connect();

   // 对应上面的配置只对test库进行获取binlog文件

   connector.subscribe("test\\..*");

   connector.rollback();

   int totalEmptyCount = 120;

   while (emptyCount totalEmptyCount) {

   Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

   long batchId = message.getId();

   int size = message.getEntries().size();

   if (batchId == -1 size == 0) {

   emptyCount++;

   System.out.println("empty count : " + emptyCount);

   try {

   Thread.sleep(1000);

   } catch (InterruptedException e) {

   } else {

   emptyCount = 0;

   // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);

   printEntry(message.getEntries());

   connector.ack(batchId); // 提交确认

   // connector.rollback(batchId); // 处理失败, 回滚数据

   System.out.println("empty too many times, exit");

   } finally {

   connector.disconnect();

   private void printEntry(List Entry entrys) {

   for (Entry entry : entrys) {

   if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN entry.getEntryType() == EntryType.TRANSACTIONEND) {

   continue;

   RowChange rowChage = null;

   try {

   rowChage = RowChange.parseFrom(entry.getStoreValue());

   } catch (Exception e) {

   throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),

   EventType eventType = rowChage.getEventType();

   System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",

   entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

   entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

   eventType));

   for (RowData rowData : rowChage.getRowDatasList()) {

   if (eventType == EventType.DELETE) {

   printColumn(rowData.getBeforeColumnsList());

   } else if (eventType == EventType.INSERT) {

   printColumn(rowData.getAfterColumnsList());

   } else {

   System.out.println("------- before");

   printColumn(rowData.getBeforeColumnsList());

   System.out.println("------- after");

   printColumn(rowData.getAfterColumnsList());

   private void printColumn(List Column columns) {

   for (Column column : columns) {

   System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());

  

 

  3. 启动项目

  4. 插入一条数据

  

INSERT INTO user VALUES (1,小红,女);

 

  

 

  总结:
 

  我们测试是可以获取到binlog日志的,下面我们进入实战:实现redis缓存同步

  八、实战redis同步缓存

  1. 编写redis序列化配置类

  

import org.springframework.context.annotation.Bean;

 

  import org.springframework.context.annotation.Configuration;

  import org.springframework.data.redis.connection.RedisConnectionFactory;

  import org.springframework.data.redis.core.RedisTemplate;

  import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

  import org.springframework.data.redis.serializer.StringRedisSerializer;

   * @author wangzhenjun

   * @date 2022/6/30 9:24

  @Configuration

  public class RedisConfig {

   @Bean

   public RedisTemplate String, Object redisTemplate(RedisConnectionFactory connectionFactory) {

   RedisTemplate String, Object redisTemplate = new RedisTemplate ();

   redisTemplate.setConnectionFactory(connectionFactory);

   StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

   redisTemplate.setKeySerializer(stringRedisSerializer);

   redisTemplate.setHashKeySerializer(stringRedisSerializer);

   Jackson2JsonRedisSerializer ? jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer (Object.class);

   redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);

   redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

   redisTemplate.afterPropertiesSet();

   return redisTemplate;

  

 

  2. 添加redis增删改方法

  主要添加了同步到redis的两个方法,这里是2分钟就会停止监听,大家可以按自己的来调整:

  

int totalEmptyCount = 120;

 

  

 

  

import java.net.InetSocketAddress;

 

  import java.util.List;

  import com.alibaba.fastjson.JSONObject;

  import com.alibaba.otter.canal.client.CanalConnector;

  import com.alibaba.otter.canal.protocol.Message;

  import com.alibaba.otter.canal.protocol.CanalEntry.Column;

  import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

  import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

  import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

  import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

  import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

  import com.alibaba.otter.canal.client.*;

  import org.springframework.beans.factory.annotation.Autowired;

  import org.springframework.context.annotation.Bean;

  import org.springframework.context.annotation.Configuration;

  import org.springframework.data.redis.core.RedisTemplate;

  
public void canalSync() {

   // 创建链接,第一个参数是canal的ip,第二个参数是canal的端口号,

   // 第三个参数是canal虚拟的模块名称,canal是创建的数据库账号密码

   CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.84.138",

   11111), "example", "canal", "canal");

   int batchSize = 1000;

   int emptyCount = 0;

   try {

   connector.connect();

   // 对应上面的配置只对test库进行获取binlog文件

   connector.subscribe("test\\..*");

   connector.rollback();

   int totalEmptyCount = 120;

   while (emptyCount totalEmptyCount) {

   Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

   long batchId = message.getId();

   int size = message.getEntries().size();

   if (batchId == -1 size == 0) {

   emptyCount++;

   System.out.println("empty count : " + emptyCount);

   try {

   Thread.sleep(1000);

   } catch (InterruptedException e) {

   } else {

   emptyCount = 0;

   // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);

   printEntry(message.getEntries());

   connector.ack(batchId); // 提交确认

   // connector.rollback(batchId); // 处理失败, 回滚数据

   System.out.println("empty too many times, exit");

   } finally {

   connector.disconnect();

   private void printEntry(List Entry entrys) {

   for (Entry entry : entrys) {

   if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN entry.getEntryType() == EntryType.TRANSACTIONEND) {

   continue;

   RowChange rowChage = null;

   try {

   rowChage = RowChange.parseFrom(entry.getStoreValue());

   } catch (Exception e) {

   throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),

   EventType eventType = rowChage.getEventType();

   System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",

   entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

   entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

   eventType));

   for (RowData rowData : rowChage.getRowDatasList()) {

   if (eventType == EventType.DELETE) {

   printColumn(rowData.getBeforeColumnsList());

   // 同步到redis

   delete(rowData.getBeforeColumnsList());

   } else if (eventType == EventType.INSERT) {

   printColumn(rowData.getAfterColumnsList());

   // 同步到redis

   insertOrUpdate(rowData.getAfterColumnsList());

   } else {

   System.out.println("------- before");

   printColumn(rowData.getBeforeColumnsList());

   System.out.println("------- after");

   printColumn(rowData.getAfterColumnsList());

   // 同步到redis

   insertOrUpdate(rowData.getAfterColumnsList());

   private void printColumn(List Column columns) {

   for (Column column : columns) {

   System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());

   * 更新或者添加触发同步到redis

   * @param columns

   private void insertOrUpdate (List Column columns) {

   if (columns.size() 0) {

   JSONObject json = new JSONObject();

   for (Column column : columns) {

   json.put(column.getName(), column.getValue());

   redisTemplate.opsForHash().put(KEY,columns.get(0).getValue(),json.toJSONString());

   * 删除触发同步到redis

   * @param columns

   private void delete (List Column columns) {

   if (columns.size() 0) {

   redisTemplate.opsForHash().delete(KEY, columns.get(0).getValue());

  

 

  3. 测试添加

  数据库插入一条:

  

insert into user values (1,我是测试添加,男);

 

  

 

  控制台捕捉到信息:

  
 

  我们看到redis已经有数据了,同步成功!

  4. 测试更新

  更细我们刚刚添加的那条数据:

  

update user set name = 修改了 where id = 1;

 

  

 

  控制台捕捉到了更新信息:

  
 

  redis也同步修改了!

  5. 测试删除

  我们先多添加几条哈:
 

  删除id为1的那条数据:

  

delete from user where id = 1;

 

  

 

  控制台捕捉到了删除信息:

  redis也同步删除了!
 

  这样就实现了一个canal的应用场景,当然也可以把binlog的数据发送到MQ来!

  小编整理了一天,看到这里给小编点个关注呗,谢谢大家的支持哦!!

  有缘人才可以看得到的哦!!!

  点击访问!小编自己的网站,里面也是有很多好的文章哦!

  以上就是Docker安装canal、mysql进行简单测试与实现redis和mysql缓存一致性(docker安装mysql5.7)的详细内容,想要了解更多 Docker安装canal、mysql进行简单测试与实现redis和mysql缓存一致性的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: