Canal ClientExample-阿里云开发者社区


本站和网页 https://developer.aliyun.com/article/14570 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

Canal ClientExample-阿里云开发者社区
开发者社区>
愤怒的苹果>
正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开
Canal ClientExample
2016-03-30
6675
简介:
+关注继续查看
Canal介绍
      基于mysql数据库binlog的增量订阅&消费
ClientExample
依赖配置:(目前暂未正式发布到mvn仓库,所以需要各位下载canal源码后手工执行下mvn clean install -Dmaven.test.skip)
1.<dependency>
2. <groupId>com.alibaba.otter</groupId>
3. <artifactId>canal.client</artifactId>
4. <version>1.0.1-SNAPSHOT</version>
5.</dependency>
1. 创建mvn标准工程:
1.mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample
2.  修改pom.xml,添加依赖
3.  ClientSample代码
1.package com.alibaba.otter.canal.sample;
2.
3.import java.net.InetSocketAddress;
4.import java.util.List;
5.
6.import com.alibaba.otter.canal.common.utils.AddressUtils;
7.import com.alibaba.otter.canal.protocol.Message;
8.import com.alibaba.otter.canal.protocol.CanalEntry.Column;
9.import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
10.import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
11.import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
12.import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
13.import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
14.
15.public class SimpleCanalClientExample {
16.
17. public static void main(String args[]) {
18. // 创建链接
19. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
20. 11111), "example", "", "");
21. int batchSize = 1000;
22. int emptyCount = 0;
23. try {
24. connector.connect();
25. connector.subscribe(".*\\..*");
26. connector.rollback();
27. int totalEmtryCount = 120;
28. while (emptyCount < totalEmtryCount) {
29. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
30. long batchId = message.getId();
31. int size = message.getEntries().size();
32. if (batchId == -1 || size == 0) {
33. emptyCount++;
34. System.out.println("empty count : " + emptyCount);
35. try {
36. Thread.sleep(1000);
37. } catch (InterruptedException e) {
38. }
39. } else {
40. emptyCount = 0;
41. // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
42. printEntry(message.getEntries());
43. }
44.
45. connector.ack(batchId); // 提交确认
46. // connector.rollback(batchId); // 处理失败, 回滚数据
47. }
48.
49. System.out.println("empty too many times, exit");
50. } finally {
51. connector.disconnect();
52. }
53. }
54.
55. private static void printEntry(List<Entry> entrys) {
56. for (Entry entry : entrys) {
57. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
58. continue;
59. }
60.
61. RowChange rowChage = null;
62. try {
63. rowChage = RowChange.parseFrom(entry.getStoreValue());
64. } catch (Exception e) {
65. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
66. e);
67. }
68.
69. EventType eventType = rowChage.getEventType();
70. System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
71. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
72. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
73. eventType));
74.
75. for (RowData rowData : rowChage.getRowDatasList()) {
76. if (eventType == EventType.DELETE) {
77. printColumn(rowData.getBeforeColumnsList());
78. } else if (eventType == EventType.INSERT) {
79. printColumn(rowData.getAfterColumnsList());
80. } else {
81. System.out.println("-------> before");
82. printColumn(rowData.getBeforeColumnsList());
83. System.out.println("-------> after");
84. printColumn(rowData.getAfterColumnsList());
85. }
86. }
87. }
88. }
89.
90. private static void printColumn(List<Column> columns) {
91. for (Column column : columns) {
92. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
93. }
94. }
95.}
4. 运行Client
首先启动Canal Server,可参加QuickStart : http://agapple.iteye.com/blogs/1796070
启动Canal Client后,可以从控制台从看到类似消息:
1.empty count : 1
2.empty count : 2
3.empty count : 3
4.empty count : 4
此时代表当前数据库无变更数据
5.  触发数据库变更
1.mysql> use test;
2.Database changed
3.mysql> CREATE TABLE `xdual` (
4. -> `ID` int(11) NOT NULL AUTO_INCREMENT,
5. -> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
6. -> PRIMARY KEY (`ID`)
7. -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
8.Query OK, 0 rows affected (0.06 sec)
9.
10.mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)
可以从控制台中看到:
1.empty count : 1
2.empty count : 2
3.empty count : 3
4.empty count : 4
5.================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
6.ID : 4 update=true
7.X : 2013-02-05 23:29:46 update=true
最后:
  整个代码在附件中可以下载,如有问题可及时联系。 
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
canal
关系型数据库
数据库
MySQL
jfinal操作获取
jfinal参数
jfinal加载
afinal框架
jfinal框架加载
开发者社区 >
数据库
文章
作者高分内容
更多
Ubuntu 安装完后的配置
4484
maven2完全使用手册
2808
maven 介绍
2696
Class文件
1637
jboss classloader加载机制
3280
相关文章
Ruan
Canal使用
使用canal监听MySQL中binlog,搭配RabbitMQ,做到记录数据库变化
fly9
Canal 学习笔记
Canal 学习笔记
baphsqca3imha
Canal 数据同步(canal 安装) | 学习笔记
快速学习 Canal 数据同步(canal 安装)
IT咸鱼干
Canal服务搭建
Canal服务搭建
ta7lhdiw3omdo
基于Canal和Kafka实现MySQL的Binlog近实时同步
近段时间,业务系统架构基本完备,数据层面的建设比较薄弱,因为笔者目前工作重心在于搭建一个小型的数据平台。优先级比较高的一个任务就是需要近实时同步业务系统的数据(包括保存、更新或者软删除)到一个另一个数据源,持久化之前需要清洗数据并且构建一个相对合理的便于后续业务数据统计、标签系统构建等扩展功能的数据模型。基于当前团队的资源和能力,优先调研了Alibaba开源中间件Canal的使用。这篇文章简单介绍一下如何快速地搭建一套Canal相关的组件。
晴天哥
canal 启动介绍(2)
概述    
    本篇主要是为了讲清楚canal是如何启动的,从文章内容结构来说主要分为流程图、时序图、核心源码三大块。理解一个东西宏观上一定要理清楚层次,然后细节再去追求融会贯通。
1086
晴天哥
canal 高可用介绍(4)
概述
    这篇文章的目的是为了讲清楚canal的HA机制,至于什么是HA机制直接套用canal官网原话,因为我自认为没法描述的更好。而我直接从代码的角度去分析如何实现HA的,其实也就是zookeeper的分布式锁的使用方法。
1574
java技术爱好者
详细讲解!Canal+Kafka实现MySQL与Redis数据同步!
canal实战
+关注
愤怒的苹果
TDDL/DRDS【分布式数据库】 , canal/otter【分布式数据库同步】 yugong【去Oracle数据迁移同步工具】
文章
问答
作者高分内容
更多
Ubuntu 安装完后的配置
4484
maven2完全使用手册
2808
maven 介绍
2696
Class文件
1637
jboss classloader加载机制
3280
文章排行榜
最热
最新
重磅嘉宾畅聊大数据&AI开源话题,零距离感受激荡开源江湖
193586
拥抱开源,云原生时代下的开源牧码人的初心与坚守
98021
Redis客户端Lettuce深度分析介绍(上)
48565
干货!6个方面,32条总结教你提升职场经验
38876
从JDK8飞升到JDK17,再到未来的JDK21
26104
RDS SQL Server 自带证书开启TDE的解决方案
26073
Java8 Lambda实现源码解析
25472
Python3,区区5行代码,让黑白老照片变成华丽的彩色照,被吸粉了。
15605
Python3,早知道3行代码就能提取音频,我把这10块钱买包子吃不香吗?
13896
10
测试开发之:Jenkins持续集成(下),构建与运行(二)
10348
11
Python3,为了给女神暗送秋波,我默默的写了一个图片字符画生成器,真香。
16326
12
测试开发之:Jenkins持续集成(上),安装与配置
9802
13
Python3,5行代码,让你拥有无限量壁纸美图,终于告别手动下载了。
12010
14
深聊性能测试,从入门到放弃之:Locust性能自动化(一)初识Locust
16087
15
Python3,10h行代码,制作艺术签名,从此走上人生巅峰。
13911
16
Python3,5行代码,制作Gif动图,太简单了。
13055
17
Python3,多种方法,同时执行多条SQL语句,并把查询结果分别写入不同Sheet页,妥妥的学到了。
12972
18
Python3,仅仅2段代码,就实现项目代码自动上传及部署,再也不需要Jenkins了。
13143
19
Python3,自动识别图片文字,这个库,我爱了。
12949
20
Python3,1行代码就输出日志文件,从此跟logging模块说拜拜~ ~【赶紧收藏】(一)
8919
性能最高提升36%!基于阿里云倚天实例的Redis性能测试验证
54
性能世界第二的半定规划SDP是什么?怎么用?此文全面解答
107
万字长文详解Java lambda表达式
67
小样本学习总结
73
一文详解|如何写出优雅的代码
51
Linux服务器中了病毒后的清理方法
137
Flink 在米哈游的应用实践
226
RDS怎么查看存储空间的详细使用情况?
106
Python3,区区9行代码批量提取PDF文件的指定内容,我被震惊了....
1012
10
2022阿里云研发效能峰会
57
11
使用阿里云负载均衡时获取客户端真实IP的方法
193
12
http 和www HTTP与HTTPS的区别
73
13
关于K8s集群环境工作组隔离配置多集群切换的一些笔记
83
14
足球黑科技之AI与足球智能分析
151
15
2022 阿里灵杰AI开发者峰会内容抢先看!
120
16
Flink Unaligned Checkpoint 在 Shopee 的优化和实践
60
17
优化 20% 资源成本,新东方的 Serverless 实践之路
59
18
性能最大提升60%,阿里云发布基于第四代英特尔至强的第八代ECS实例
183
19
博途软件的基本操作,快捷操作有哪些?什么是博途视图和项目视图?
78
20
Java中的如何检测字符串是否相等
126
推荐文章
参与DSW Gallery模型案例全集评测,赢神秘好礼
参与应用引擎 SAE评测,赢取CHERRY机械键盘等好礼
“第益课”大学生技术公益
乘风者计划邀您入驻社区,精彩权益即刻享
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载