范文健康探索娱乐情感热点
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

canalKafka实现mysql与redis数据同步

  前言
  上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更操作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现mysql与redis之间的数据同步。
  架构设计
  canal是一个伪装成slave订阅mysql的binlog,实现数据同步的中间件。上一篇文章 canal入门 中简单介绍了使用方式,即tcp模式;其实canal也是支持直接发送到MQ中,比如:Kafka、RocketMQ、RabbitMQ。本文采用Kafka讲解,实现mysql与redis之间的数据同步。
  通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。
  Kafka&Zookeeper搭建
  首先在 官网 下载Kafka:
  下载后解压文件夹,可以看到以下几个文件:
  Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。
  通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:  # 命令常见一个canaltopic 队列 kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic
  Canal搭建
  canal搭建具体可以参考上文,这里只讲解具体的参数配置:
  找到/conf目录下的canal.properties配置文件:  # tcp, kafka, RocketMQ 这里选择kafka模式 canal.serverMode = kafka # 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况 canal.instance.parser.parallelThreadSize = 16 # 配置MQ的服务地址,这里配置的是kafka对应的地址和端口 canal.mq.servers = 127.0.0.1:9092 # 配置instance,在conf目录下要有example同名的目录,可以配置多个 canal.destinations = example
  然后配置instance,找到/conf/example/instance.properties配置文件:  ## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置) # canal.instance.mysql.slaveId=0   # position info canal.instance.master.address=127.0.0.1:3306 # 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlog canal.instance.master.journal.name=mysql-bin.000006 canal.instance.master.position=4596 # 账号密码 canal.instance.dbUsername=canal canal.instance.dbPassword=Canal@**** canal.instance.connectionCharset = UTF-8 #MQ队列名称 canal.mq.topic=canaltopic #单队列模式的分区下标 canal.mq.partition=0
  经过上述配置后,就可以启动canal了。
  测试
  环境搭建完成后,就可以编写代码进行测试。
  1、引入pom依赖       org.springframework.kafka     spring-kafka       org.springframework.boot     spring-boot-starter-data-redis 
  2、封装Redis工具类
  在application.yml文件增加以下配置:  spring:     redis:     host: 127.0.0.1     port: 6379     database: 0     password: 123456
  封装一个操作Redis的工具类: @Component public class RedisClient {       /**      * 获取redis模版      */     @Resource     private StringRedisTemplate stringRedisTemplate;       /**      * 设置redis的key-value      */     public void setString(String key, String value) {         setString(key, value, null);     }       /**      * 设置redis的key-value,带过期时间      */     public void setString(String key, String value, Long timeOut) {         stringRedisTemplate.opsForValue().set(key, value);         if (timeOut != null) {             stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);         }     }       /**      * 获取redis中key对应的值      */     public String getString(String key) {         return stringRedisTemplate.opsForValue().get(key);     }       /**      * 删除redis中key对应的值      */     public Boolean deleteKey(String key) {         return stringRedisTemplate.delete(key);     } }
  3、创建MQ消费者进行同步  在application.yml配置文件加上kafka的配置信息: spring:   kafka:       # Kafka服务地址     bootstrap-servers: 127.0.0.1:9092     consumer:       # 指定一个默认的组名       group-id: consumer-group1       #序列化反序列化       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer     producer:       key-serializer: org.apache.kafka.common.serialization.StringDeserializer       value-serializer: org.apache.kafka.common.serialization.StringDeserializer       # 批量抓取       batch-size: 65536       # 缓存容量       buffer-memory: 524288
  创建一个CanalBean对象进行接收:  public class CanalBean {     //数据     private List data;     //数据库名称     private String database;     private long es;     //递增,从1开始     private int id;     //是否是DDL语句     private boolean isDdl;     //表结构的字段类型     private MysqlType mysqlType;     //UPDATE语句,旧数据     private String old;     //主键名称     private List pkNames;     //sql语句     private String sql;     private SqlType sqlType;     //表名     private String table;     private long ts;     //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等     private String type;     //getter、setter方法 }   public class MysqlType {     private String id;     private String commodity_name;     private String commodity_price;     private String number;     private String description;     //getter、setter方法 }   public class SqlType {     private int id;     private int commodity_name;     private int commodity_price;     private int number;     private int description; }
  最后就可以创建一个消费者CanalConsumer进行消费:  @Slf4j @Component public class CanalConsumer {       @Resource     private RedisClient redisClient;       @KafkaListener(topics = "canaltopic")     public void receive(ConsumerRecord<?, ?> consumer) {         String value = (String) consumer.value();         log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),                 consumer.partition(), consumer.offset(), value);         //转换为javaBean         CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);         //获取是否是DDL语句         boolean isDdl = canalBean.hasDdl();         //获取类型         String type = canalBean.getType();         //不是DDL语句         if (!isDdl) {             List tbCommodityInfos = canalBean.getData();             //过期时间             long TIME_OUT = 600L;             if ("INSERT".equals(type)) {                 //新增语句                 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {                     String id = tbCommodityInfo.getId();                     log.info("新增数据到redis, id: {}, data: {}", id, JSONObject.toJSONString(tbCommodityInfo));                     //新增到redis中,过期时间是10分钟                     redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);                     log.info("从redis获取数据 result: {}", JSONObject.toJSONString(redisClient.getString(id)));                 }             } else if ("UPDATE".equals(type)) {                 //更新语句                 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {                     String id = tbCommodityInfo.getId();                     log.info("修改数据到redis, id: {}, data: {}", id, JSONObject.toJSONString(tbCommodityInfo));                     //更新到redis中,过期时间是10分钟                     redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);                 }             } else {                 //删除语句                 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {                     String id = tbCommodityInfo.getId();                     log.info("删除数据从redis, id: {}", id);                     //从redis中删除                     redisClient.deleteKey(id);                 }             }         }     }   }
  测试Mysql与Redis同步
  mysql对应的表结构如下:  CREATE TABLE `tb_commodity_info` (   `id` varchar(32) NOT NULL,   `commodity_name` varchar(512) DEFAULT NULL COMMENT "商品名称",   `commodity_price` varchar(36) DEFAULT "0" COMMENT "商品价格",   `number` int(10) DEFAULT "0" COMMENT "商品数量",   `description` varchar(2048) DEFAULT "" COMMENT "商品描述",   PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT="商品信息表";
  启动项目后,新增一条数据: INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ("3e71a81fd80711eaaed600163e046cc3", "叉烧包", "3.99", "3", "又大又香的叉烧包,老人小孩都喜欢");
  可以在控制台看到以下输出: 2022-01-02 18:12:51.317  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : 新增数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"叉烧包","commodity_price":"3.99","description":"又大又香的叉烧包,老人小孩都喜欢","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"} 2022-01-02 18:12:51.320  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : 从redis获取数据 result: "{"commodity_name":"叉烧包","commodity_price":"3.99","description":"又大又香的叉烧包,老人小孩都喜欢","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}"
  如果更新呢?试一下Update语句:  UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`="青菜包",`description`="很便宜的青菜包呀,不买也开看看了喂" WHERE `id`="3e71a81fd80711eaaed600163e046cc3";
  同样可以在控制台看到以下输出: 2022-01-02 18:14:44.613  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : topic名称:canaltopic,key:null,分区位置:0,下标:6,value:{"data":[{"id":"3e71a81fd80711eaaed600163e046cc3","commodity_name":"青菜包","commodity_price":"3.99","number":"3","description":"很便宜的青菜包呀,不买也开看看了喂"}],"database":"study","es":1641118484000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"varchar(36)","number":"int(10)","description":"varchar(2048)"},"old":[{"commodity_name":"叉烧包","description":"又大又香的叉烧包,老人小孩都喜欢"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":12,"number":4,"description":12},"table":"tb_commodity_info","ts":1641118484602,"type":"UPDATE"} 2022-01-02 18:14:44.616  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : 修改数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"青菜包","commodity_price":"3.99","description":"很便宜的青菜包呀,不买也开看看了喂","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}
  经过测试完全么有问题。
  总结
  既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等;尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用Canal+Kafka的方式来实现的。
  如果你觉得这篇文章对你有用,点个赞吧~ 你的点赞是我创作的最大动力~想第一时间看到我更新的文章,可以微信搜索公众号「CodingCode」。

1500左右的手机预算,不玩游戏像素高一点电池大一点,有吗?题主需要续航和拍照的千元机,续航没有问题,千元机最擅长大续航,可以参考售价1699的RedmiNote11Pro,一亿像素5160mAh超大电池,还有67W充电,基本你的需求都囊括欣旺达400亿投建抢动力电池蛋糕前三季扣非4。51亿增逾两倍长江商报消息长江商报记者沈右荣消费电池龙头欣旺达(300207。SZ)频出重磅,发力动力电池,抢占市场蛋糕。12月14日午间,欣旺达发布公告,子公司欣旺达电动汽车电池有限公司(以下假如微信从明年开始收费,每月20元,你还会用吗?账号应该一个月2000收租,包括腾讯所有业务都要收租,游戏每月5000肯定会用,因为没有替代的相当产品,有其他的一些产品目前都是小众,并没有微信使用用户多,功能全。但是收费和不收费从骗补到盗脸小鹏汽车最近到底是怎么了?DoNews汽车12月15日消息(邵志超)今年国内新能源汽车市场最值得一提的关键词就是维权,从造车新势力到传统汽车厂商,从车展到各种门,各类投诉维权事件层出不穷,花样种类也很多,此功耗达11。1W!高通骁龙8,还有人抢?摩托罗拉截胡小米全球首发这个月初高通发布了最新一代旗舰处理器骁龙8Gen1,和往年一样高通旗舰处理器发布之后,国内各大厂家又开始抢起了首发。按照往年都是小米首发的传统再加上高通发布会上雷军受邀录视频,视频新能源汽车迎来专属保险!可保起火燃烧电池电机控制系统,还有4个专属附加险一直沿用传统燃油车保险条款的新能源汽车,迎来专属保险。12月14日,中国保险行业协会在官网正式发布新能源汽车商业保险专属条款(试行)(下称专属条款),供保险业参考使用。这是为充分发被阿里收购后,业绩不升反降的几个品牌,有的已官宣停服作为BAT中的一员,阿里巴巴对我国互联网和经济做出的贡献还是很大的,尤其是旗下的电商平台,是我国网购领域里的领头羊,开创的双十一购物节,也成为了我国全民性的购物狂欢节。2021年的助力生鲜行业线上破局美团闪购启动亿元品牌帮扶计划来源中国经济网长期以来,有品类无品牌最后一公里配送难数字化运营能力不足等等,一直是阻碍生鲜商家线上化实现数字化和全渠道化发展的重要因素。12月13日,美团闪购生鲜业务负责人在上海参2021年底各领域最佳手机,结合自身需求,买对不买贵距离新年还有最后的15天时间,不少朋友都赶在这最后一波年底促销来换新机,不过学长从评论中看到每位想换新机的朋友预算有高有低,所以从各个领域出发,对于有专门需求的用户而言,看准了不同8Gen1高通最强处理器首发价格仅为2999来了来了,高通的今年旗舰处理器,8gen1本来我以为今年的首发还会是出现在小米12上的,结果横空杀出了一个摩托罗拉,还记得当时雷总说过,首发是实力,更是承诺,真想现在看看雷总的表情深度揭秘OPPO自研影像专用NPUAI能效比暴涨40倍,秀四大黑科技芯东西(公众号aichip001)作者云鹏编辑心缘如今,自研芯片已经成为科技巨头们的兵家必争之地,通过芯片自研实现人无我有的产品特性,或许才能在内卷严重的市场中杀出一条路,对于手机
比Airpods更先进,SanagB6SPro闪电银鸥蓝牙降噪耳机体验评测Hi,我是数码小浩,这几天深圳气温骤降,上班路上看到好多人佩戴头戴式耳机,瞬间觉得我的AirPods不香了,不过我也有幸拿到Sanag的头戴式主动降噪耳机B6SPro,这几天上下班新能源汽车车电分离如何彻底解决电动车电池续航里程焦虑?一新能源汽车电池续航里程焦虑的原因一是如今电池技术限制。在插电混动和纯电的电池中,插混电车的电池续航里程一般为30Km以上,至今不超200Km,而纯电电车,一般在300Km以上,至华为mate30刚买的,不考虑5G,能用五年吗,毕竟四千买的?华为mate30是去年上市的最新款旗舰,如果不考虑5G并且使用得当,用五年还是有可能的。下面我们从mate30的技术日常使用和日常保护三个方面来分析使用五年的可能性,希望对大家有所北大元宇宙报告XR硬件出货量或将于2024年达1亿台近日,北京大学汇丰商学院联合安信证券发布元宇宙2022蓄积的力量。第一部分的研究思路为201620192021三大关键时间节点分析相关技术的成熟过程,以及对未来三年做了行业预判。第苏宁易购发布12月手机消费报告iPhone以旧换新持续火热近日,苏宁易购发布12月手机消费报告。报告显示,12月手机销量TOP品牌中,Apple小米荣耀iQOO华为成为消费者首选Top5品牌。相比11月,荣耀逆势上涨,一举跃进12月手机销服务体验or技术创新,iQOO8Pro和小米MIX4谁更值得选?虽然MIX4降价比iQOO8Pro狠,但就现在来说,我依然很坚定的选择iQOO8Pro,这手机除了芯片不是最新的8Gen1外,其余哪一点比2022年发布的新机差?至于MIX4,有这小米手机12会不会成为下一代神机呢?从小米历代产品线上来看,今年的12系列算是体验最完善的一款旗舰机,与小米6不分伯仲,成为5G时代的一代神机还是很有希望的。所以你也可以理解为,小米12的屏幕为6。28尺寸,跟四年前电饭煲和电压力锅有什么区别?电饭煲怎么选?电饭煲哪个牌子好?电饭煲可以说是生活中最为日常的小家电了,一碗香喷喷的米饭能带来说不尽的幸福感,一日三餐正是生活的意义所在,因此选择一款适合自己的电饭煲格外重要,这一篇就来和大家聊聊有关于电饭煲的那Facebook折戟之后,PayPal确定正在开发加密稳定币记者司林威1月7日,据彭博社,美国支付巨头PayPal确认正在开发自己的加密稳定币,以进一步迈入加密货币领域。一位程序开发人员在PayPal的应用中发现了隐藏在代码中的相关证据,代以画笔展现科技成就,为科技工作者立传来源人民网人民日报国家的脊梁(油画)董卓飞天揽月(中国画)毛珠明中国智造,走向世界复兴号CR400(中国画)徐亚慧近年来,我国科技事业蓬勃发展,科技成果不断涌现。其中,许多重要技术因新能源车补贴退坡小鹏汽车宣布全系上涨今日(1月11日),小鹏汽车公布了全系车型最新补贴后的价格。其中小鹏旗舰轿车P7全系上涨43005900元不等,最新补贴后售价为22。4240。99万元全新P5全系上涨480054