采用OpenReplicator解析MySQL binlog_朱小厮的博客-CSDN博客


本站和网页 https://blog.csdn.net/u013256816/article/details/53072560 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

采用OpenReplicator解析MySQL binlog_朱小厮的博客-CSDN博客
采用OpenReplicator解析MySQL binlog
朱小厮
于 2016-11-07 21:51:28 发布
12229
收藏
分类专栏:
java
服务器搭建
网站架构相关技术
文章标签:
mysql
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u013256816/article/details/53072560
版权
java
同时被 3 个专栏收录
134 篇文章
12 订阅
订阅专栏
服务器搭建
4 篇文章
1 订阅
订阅专栏
网站架构相关技术
31 篇文章
40 订阅
订阅专栏
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
欢迎跳转到本文的原文链接:https://honeypps.com/backend/read-mysql-binlog-by-using-openreplicator/
Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。 Open Replicator项目地址:https://github.com/whitesock/open-replicator
binlog事件分析结构图
在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。
这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:
DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):
"eventId": 1,
"databaseName": "canal_test",
"tableName": "`company`",
"eventType": 2,
"timestamp": 1477033198000,
"timestampReceipt": 1477033248780,
"binlogName": "mysql-bin.000006",
"position": 353,
"nextPostion": 468,
"serverId": 2,
"before": null,
"after": null,
"isDdl": true,
"sql": "DROP TABLE `company` /* generated by server */"
DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):
"eventId": 0,
"databaseName": "canal_test",
"tableName": "person",
"eventType": 24,
"timestamp": 1477030734000,
"timestampReceipt": 1477032161988,
"binlogName": "mysql-bin.000006",
"position": 242,
"nextPostion": 326,
"serverId": 2,
"before": {
"id": "3",
"sex": "f",
"address": "shanghai",
"age": "23",
"name": "zzh3"
},
"after": {
"id": "3",
"sex": "m",
"address": "shanghai",
"age": "23",
"name": "zzh3"
},
"isDdl": false,
"sql": null
相关的类文件如下: CDCEvent.java
package or;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;
public class CDCEvent {
private long eventId = 0;//事件唯一标识
private String databaseName = null;
private String tableName = null;
private int eventType = 0;//事件类型
private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]
private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]
private String binlogName = null;// binlog file name
private long position = 0;
private long nextPostion = 0;
private long serverId = 0;
private Map<String,String> before = null;
private Map<String,String> after = null;
private Boolean isDdl= null;
private String sql = null;
private static AtomicLong uuid = new AtomicLong(0);
public CDCEvent(){}
public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName){
this.init(are);
this.databaseName = databaseName;
this.tableName = tableName;
private void init(final BinlogEventV4 be){
this.eventId = uuid.getAndAdd(1);
BinlogEventV4Header header = be.getHeader();
this.timestamp = header.getTimestamp();
this.eventType = header.getEventType();
this.serverId = header.getServerId();
this.timestampReceipt = header.getTimestampOfReceipt();
this.position = header.getPosition();
this.nextPostion = header.getNextPosition();
this.binlogName = header.getBinlogFileName();
@Override
public String toString(){
StringBuilder builder = new StringBuilder();
builder.append("{ eventId:").append(eventId);
builder.append(",databaseName:").append(databaseName);
builder.append(",tableName:").append(tableName);
builder.append(",eventType:").append(eventType);
builder.append(",timestamp:").append(timestamp);
builder.append(",timestampReceipt:").append(timestampReceipt);
builder.append(",binlogName:").append(binlogName);
builder.append(",position:").append(position);
builder.append(",nextPostion:").append(nextPostion);
builder.append(",serverId:").append(serverId);
builder.append(",isDdl:").append(isDdl);
builder.append(",sql:").append(sql);
builder.append(",before:").append(before);
builder.append(",after:").append(after).append("}");
return builder.toString();
// 省略Getter和Setter方法
open-replicator的解析主要是通过注册Listener的形式实现的,整个过程最重要的步骤在下面: InstanceListener.java
package or;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import or.keeper.TableInfoKeeper;
import or.manager.CDCEventManager;
import or.model.ColumnInfo;
import or.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.binlog.BinlogEventListener;
import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;
import com.google.code.or.binlog.impl.event.QueryEvent;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
import com.google.code.or.binlog.impl.event.WriteRowsEvent;
import com.google.code.or.binlog.impl.event.XidEvent;
import com.google.code.or.common.glossary.Column;
import com.google.code.or.common.glossary.Pair;
import com.google.code.or.common.glossary.Row;
import com.google.code.or.common.util.MySQLConstants;
public class InstanceListener implements BinlogEventListener{
private static final Logger logger = LoggerFactory.getLogger(InstanceListener.class);
@Override
public void onEvents(BinlogEventV4 be) {
if(be == null){
logger.error("binlog event is null");
return;
int eventType = be.getHeader().getEventType();
switch(eventType){
case MySQLConstants.FORMAT_DESCRIPTION_EVENT:
logger.trace("FORMAT_DESCRIPTION_EVENT");
break;
case MySQLConstants.TABLE_MAP_EVENT://每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId
TableMapEvent tme = (TableMapEvent)be;
TableInfoKeeper.saveTableIdMap(tme);
logger.trace("TABLE_MAP_EVENT:tableId:{}",tme.getTableId());
break;
case MySQLConstants.DELETE_ROWS_EVENT:
DeleteRowsEvent dre = (DeleteRowsEvent) be;
long tableId = dre.getTableId();
logger.trace("DELETE_ROW_EVENT:tableId:{}",tableId);
TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
List<Row> rows = dre.getRows();
for(Row row:rows){
List<Column> before = row.getColumns();
Map<String,String> beforeMap = getMap(before,databaseName,tableName);
if(beforeMap !=null && beforeMap.size()>0){
CDCEvent cdcEvent = new CDCEvent(dre,databaseName,tableName);
cdcEvent.setIsDdl(false);
cdcEvent.setSql(null);
cdcEvent.setBefore(beforeMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
break;
case MySQLConstants.UPDATE_ROWS_EVENT:
UpdateRowsEvent upe = (UpdateRowsEvent)be;
long tableId = upe.getTableId();
logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);
TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
List<Pair<Row>> rows = upe.getRows();
for(Pair<Row> p:rows){
List<Column> colsBefore = p.getBefore().getColumns();
List<Column> colsAfter = p.getAfter().getColumns();
Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);
Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);
if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){
CDCEvent cdcEvent = new CDCEvent(upe,databaseName,tableName);
cdcEvent.setIsDdl(false);
cdcEvent.setSql(null);
cdcEvent.setBefore(beforeMap);
cdcEvent.setAfter(afterMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
break;
case MySQLConstants.WRITE_ROWS_EVENT:
WriteRowsEvent wre = (WriteRowsEvent)be;
long tableId = wre.getTableId();
logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);
TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
List<Row> rows = wre.getRows();
for(Row row: rows){
List<Column> after = row.getColumns();
Map<String,String> afterMap = getMap(after,databaseName,tableName);
if(afterMap!=null && afterMap.size()>0){
CDCEvent cdcEvent = new CDCEvent(wre,databaseName,tableName);
cdcEvent.setIsDdl(false);
cdcEvent.setSql(null);
cdcEvent.setAfter(afterMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
break;
case MySQLConstants.QUERY_EVENT:
QueryEvent qe = (QueryEvent)be;
TableInfo tableInfo = createTableInfo(qe);
if(tableInfo == null)
break;
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);
CDCEvent cdcEvent = new CDCEvent(qe,databaseName,tableName);
cdcEvent.setIsDdl(true);
cdcEvent.setSql(qe.getSql().toString());
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
break;
case MySQLConstants.XID_EVENT:{
XidEvent xe = (XidEvent)be;
logger.trace("XID_EVENT: xid:{}",xe.getXid());
break;
default:
logger.trace("DEFAULT:{}",eventType);
break;
/**
* ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,
* 然后跟取回的List<Column>进行映射。
* @param cols
* @param databaseName
* @param tableName
* @return
*/
private Map<String,String> getMap(List<Column> cols, String databaseName, String tableName){
Map<String,String> map = new HashMap<>();
if(cols == null || cols.size()==0){
return null;
String fullName = databaseName+"."+tableName;
List<ColumnInfo> columnInfoList = TableInfoKeeper.getColumns(fullName);
if(columnInfoList == null)
return null;
if(columnInfoList.size() != cols.size()){
TableInfoKeeper.refreshColumnsMap();
if(columnInfoList.size() != cols.size())
logger.warn("columnInfoList.size is not equal to cols.");
return null;
for(int i=0;i<columnInfoList.size(); i++){
if(cols.get(i).getValue()==null)
map.put(columnInfoList.get(i).getName(),"");
else
map.put(columnInfoList.get(i).getName(), cols.get(i).toString());
return map;
/**
* 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,
* 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中
* @param qe
* @return
*/
private TableInfo createTableInfo(QueryEvent qe){
String sql = qe.getSql().toString().toLowerCase();
TableInfo ti = new TableInfo();
String databaseName = qe.getDatabaseName().toString();
String tableName = null;
if(checkFlag(sql,"table")){
tableName = getTableName(sql,"table");
} else if(checkFlag(sql,"truncate")){
tableName = getTableName(sql,"truncate");
} else{
logger.warn("can not find table name from sql:{}",sql);
return null;
ti.setDatabaseName(databaseName);
ti.setTableName(tableName);
ti.setFullName(databaseName+"."+tableName);
return ti;
private boolean checkFlag(String sql, String flag){
String[] ss = sql.split(" ");
for(String s:ss){
if(s.equals(flag)){
return true;
return false;
private String getTableName(String sql, String flag){
String[] ss = sql.split("\\.");
String tName = null;
if (ss.length > 1) {
String[] strs = ss[1].split(" ");
tName = strs[0];
} else {
String[] strs = sql.split(" ");
boolean start = false;
for (String s : strs) {
if (s.indexOf(flag) >= 0) {
start = true;
continue;
if (start && !s.isEmpty()) {
tName = s;
break;
tName.replaceAll("`", "").replaceAll(";", "");
//del "("[create table person(....]
int index = tName.indexOf('(');
if(index>0){
tName = tName.substring(0, index);
return tName;
上面所涉及到的TableInfo .java如下:
package or.model;
public class TableInfo {
private String databaseName;
private String tableName;
private String fullName;
// 省略Getter和Setter
@Override
public boolean equals(Object o){
if(this == o)
return true;
if(o == null || this.getClass()!=o.getClass())
return false;
TableInfo tableInfo = (TableInfo)o;
if(!this.databaseName.equals(tableInfo.getDatabaseName()))
return false;
if(!this.tableName.equals(tableInfo.getTableName()))
return false;
if(!this.fullName.equals(tableInfo.getFullName()))
return false;
return true;
@Override
public int hashCode(){
int result = this.tableName.hashCode();
result = 31*result+this.databaseName.hashCode();
result = 31*result+this.fullName.hashCode();
return result;
接着需要有个地方保存从TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java
package or.keeper;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import or.MysqlConnection;
import or.model.ColumnInfo;
import or.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.binlog.impl.event.TableMapEvent;
public class TableInfoKeeper {
private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);
private static Map<Long,TableInfo> tabledIdMap = new ConcurrentHashMap<>();
private static Map<String,List<ColumnInfo>> columnsMap = new ConcurrentHashMap<>();
static{
columnsMap = MysqlConnection.getColumns();
public static void saveTableIdMap(TableMapEvent tme){
long tableId = tme.getTableId();
tabledIdMap.remove(tableId);
TableInfo table = new TableInfo();
table.setDatabaseName(tme.getDatabaseName().toString());
table.setTableName(tme.getTableName().toString());
table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());
tabledIdMap.put(tableId, table);
public static synchronized void refreshColumnsMap(){
Map<String,List<ColumnInfo>> map = MysqlConnection.getColumns();
if(map.size()>0){
//logger.warn("refresh and clear cols.");
columnsMap = map;
//logger.warn("refresh and switch cols:{}",map);
else
logger.error("refresh columnsMap error.");
public static TableInfo getTableInfo(long tableId){
return tabledIdMap.get(tableId);
public static List<ColumnInfo> getColumns(String fullName){
return columnsMap.get(fullName);
正如上面InstanceListener中提到的,有些信息需要直接从MySQL中读取,比如数据库表的列信息,相关的类MysqlConnection如下:
package or;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import or.model.BinlogInfo;
import or.model.BinlogMasterStatus;
import or.model.ColumnInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MysqlConnection {
private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
private static Connection conn;
private static String host;
private static int port;
private static String user;
private static String password;
public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg){
try {
if(conn == null || conn.isClosed()){
Class.forName("com.mysql.jdbc.Driver");
host = hostArg;
port = portArg;
user = userArg;
password = passwordArg;
conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);
logger.info("connected to mysql:{} : {}",user,password);
} catch (ClassNotFoundException e) {
logger.error(e.getMessage(),e);
} catch (SQLException e) {
logger.error(e.getMessage(),e);
public static Connection getConnection(){
try {
if(conn == null || conn.isClosed()){
setConnection(host,port,user,password);
} catch (SQLException e) {
logger.error(e.getMessage(),e);
return conn;
/**
* 获取Column信息
* @return
*/
public static Map<String,List<ColumnInfo>> getColumns(){
Map<String,List<ColumnInfo>> cols = new HashMap<>();
Connection conn = getConnection();
try {
DatabaseMetaData metaData = conn.getMetaData();
ResultSet r = metaData.getCatalogs();
String tableType[] = {"TABLE"};
while(r.next()){
String databaseName = r.getString("TABLE_CAT");
ResultSet result = metaData.getTables(databaseName, null, null, tableType);
while(result.next()){
String tableName = result.getString("TABLE_NAME");
//System.out.println(result.getInt("TABLE_ID"));
String key = databaseName +"."+tableName;
ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);
cols.put(key, new ArrayList<ColumnInfo>());
while(colSet.next()){
ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));
cols.get(key).add(columnInfo);
} catch (SQLException e) {
logger.error(e.getMessage(),e);
return cols;
/**
* 参考
* mysql> show binary logs
* +------------------+-----------+
*| Log_name | File_size |
*+------------------+-----------+
*| mysql-bin.000001 | 126 |
*| mysql-bin.000002 | 126 |
*| mysql-bin.000003 | 6819 |
*| mysql-bin.000004 | 1868 |
*+------------------+-----------+
*/
public static List<BinlogInfo> getBinlogInfo(){
List<BinlogInfo> binlogList = new ArrayList<>();
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show binary logs");
while(resultSet.next()){
BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));
binlogList.add(binlogInfo);
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally{
try {
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
return binlogList;
/**
* 参考:
* mysql> show master status;
* +------------------+----------+--------------+------------------+
* | File | Position | Binlog_Do_DB | Binlog_Ignore_DB |
* +------------------+----------+--------------+------------------+
* | mysql-bin.000004 | 1868 | | |
* +------------------+----------+--------------+------------------+
* @return
*/
public static BinlogMasterStatus getBinlogMasterStatus(){
BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show master status");
while(resultSet.next()){
binlogMasterStatus.setBinlogName(resultSet.getString("File"));
binlogMasterStatus.setPosition(resultSet.getLong("Position"));
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally{
try {
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
return binlogMasterStatus;
/**
* 获取open-replicator所连接的mysql服务器的serverid信息
* @return
*/
public static int getServerId(){
int serverId=6789;
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show variables like 'server_id'");
while(resultSet.next()){
serverId = resultSet.getInt("Value");
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally{
try {
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
return serverId;
上面代码设计的附加类(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)
package or.model;
public class BinlogInfo {
private String binlogName;
private Long fileSize;
// 省略Getter和Setter
package or.model;
public class BinlogMasterStatus {
private String binlogName;
private long position;
// 省略Getter和Setter
package or.model;
public class ColumnInfo {
private String name;
private String type;
// 省略Getter和Setter
最后还要有个地方存储解析之后的事件信息,这里简要设计下,采用一个ConcurrentLinkedDeque好了(CDCEventManager.java)
package or.manager;
import java.util.concurrent.ConcurrentLinkedDeque;
import or.CDCEvent;
public class CDCEventManager {
public static final ConcurrentLinkedDeque<CDCEvent> queue = new ConcurrentLinkedDeque<>();
所有的准备工作都完成了,下面可以解析binlog日志了:
package or.test;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import or.CDCEvent;
import or.InstanceListener;
import or.MysqlConnection;
import or.OpenReplicatorPlus;
import or.manager.CDCEventManager;
import or.model.BinlogMasterStatus;
import com.google.code.or.OpenReplicator;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
public class OpenReplicatorTest {
private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);
private static final String host = "xx.xx.xx.60";
private static final int port = 3306;
private static final String user = "****";
private static final String password = "****";
public static void main(String[] args){
OpenReplicator or = new OpenReplicator ();
or.setUser(user);
or.setPassword(password);
or.setHost(host);
or.setPort(port);
MysqlConnection.setConnection(host, port, user, password);
//or.setServerId(MysqlConnection.getServerId());
//配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId
BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();
or.setBinlogFileName(bms.getBinlogName());
//or.setBinlogFileName("mysql-bin.000004");
or.setBinlogPosition(4);
or.setBinlogEventListener(new InstanceListener());
try {
or.start();
} catch (Exception e) {
logger.error(e.getMessage(),e);
Thread thread = new Thread(new PrintCDCEvent());
thread.start();
public static class PrintCDCEvent implements Runnable{
@Override
public void run() {
while(true){
if(CDCEventManager.queue.isEmpty() == false)
CDCEvent ce = CDCEventManager.queue.pollFirst();
Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
String prettyStr1 = gson.toJson(ce);
System.out.println(prettyStr1);
else{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
时间运行旧了会遇到这样一个问题:
16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog
java.io.EOFException: null
at com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]
16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from xx.xx.xx.60:3306
初步解决方案(extends OpenReplicator然后添加重试机制):
package or;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.OpenReplicator;
public class OpenReplicatorPlus extends OpenReplicator{
private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);
private volatile boolean autoRestart = true;
@Override
public void stopQuietly(long timeout, TimeUnit unit){
super.stopQuietly(timeout, unit);
if(autoRestart){
try {
TimeUnit.SECONDS.sleep(10);
logger.error("Restart OpenReplicator");
} catch (InterruptedException e) {
e.printStackTrace();
最后只需将OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改为OpenReplicator or = new OpenReplicatorPlus ();即可。
大功告成~~
参考资料
谈谈对Canal(增量数据订阅与消费)的理解MySQL主备复制原理、实现及异常处理https://github.com/whitesock/open-replicator
欢迎跳转到本文的原文链接:https://honeypps.com/backend/read-mysql-binlog-by-using-openreplicator/
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
朱小厮
关注
关注
点赞
收藏
打赏
16
评论
采用OpenReplicator解析MySQL binlog
Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景...
复制链接
扫一扫
专栏目录
OpenLogReplicator:完全用C ++编写的开源Oracle数据库CDC。直接从数据库重做日志文件和JSON或Protobuf格式的流中读取事务
03-22
OpenLogReplicator
完全用C ++编写的开源Oracle数据库CDC。直接从数据库重做日志文件和JSON或Protobuf格式的流中读取事务到:
卡夫卡
平面文件
网络流(计划TCP / IP或ZeroMQ)
请注意,该代码有2个分支:
主-具有稳定代码的分支-每月更新
每晚-当前分支不稳定,每天更新代码
更新Protobuf代码:
光盘原型
导出PATH = / opt / protobuf / bin:$ PATH
协议OraProtoBuf.proto --cpp_out =。
mv OraProtoBuf.pb.cc ../src/OraProtoBuf.pb.cpp
mv OraProtoBuf.pb.h ../src/OraProtoBuf.pb.h
调试编译:
git clone
cd OpenLogReplicator
autoreconf -f
mysqlbinlog工具详解
10-01
详细讲解了mysqlbinlog数据库库回复的详细过程,减少了参数的冗余性造成的命令看不懂得现象
评论 16
您还未登录,请先
登录
后发表或查看评论
【网管日记】MySQL主从复制
最新发布
见见大魔王
12-15
176
MySQL 主从复制是一个异步的复制过程,底层是基于 Mysql 数据库自带的功能。一台或多台 MySQL 数据库(slave,即)从另一台 MySQL 数据库( master,即)进行日志的复制,然后再解析日志并应用到自身,最终实现的数据和的数据保持一致。二进制日志(BINLOG)记录了所有的 DDL(数据定义语言)语句和 DML(数据操纵语言)语句,但是不包括数据查询语句。此日志对于灾难时的数据恢复起着极其重要的作用,MySQL的主从复制,就是通过该 binlog 实现的。
开源框架open-replicator原理分析
凌风郎少
11-07
5163
databus组件open-replicator的实现原理分析,包括IO流的实现分析。
mysqlbinlog 解析binlog文件为可读的sql语句
qq_45458749的博客
11-18
176
解析mysql日志文件,解析binlog内容为sql语句
点击事件及按钮事件。
weixin_45617334的博客
09-14
2249
事件源
事件名
事件注册
事件处理
点击事件
(1)单击事件
在这个案例中,事件源是id为“p1”的元素,事件名是单击事件注册οnclick=“fun()”,
也就是说,当单击id为“p1”的元素时,交给fun函数来处理。
(2)双击事件
(3)鼠标按下/抬起。(on mouse down / on mouse up)
(4)鼠标移入移出(on mouse enter / on mouse leave)
(5)事件综合
<!DOCTYPE html>
<html>
linux 利用canal充当中间件同步mysql数据到redis
zxljsbk的博客
04-24
1924
一、原理简介
1.简介:
canal为阿里巴巴产品,它主要模拟了mysql的Slave向Master发送请求,当mysql有增删改查时则会出发请求将数据发送到canal服务中,canal将数据存放到内存,直到客户端程序(canal服务端和客户端程序都是由java编写,且客户端逻辑由我们借助com.alibaba.otter.canal工具包下的类完成开发)通过发布-订阅...
23 open-replicator 解析binlog失败 available: 4, event type: 19
970655147的专栏
07-10
1183
问题出现
使用 open-replicator 来解析 binLog 的时候出现了这个问题, 这个包 似乎是14年之后 就没有杂更新了
open-replicator 的版本是现在的最新的版本1.0.7, 详情请见 参考的ref
15:55:33.435 [binlog-parser-1] ERROR c.g.c.o.b.impl.AbstractBinlogParser - faile
MySQL binlog 日志解析
hzp666的博客
07-08
3598
一 、binlog 简介binlog 是 MySQL Server 层记录的二进制日志文件,用于记录 MySQL 的数据更新或者潜在更新(比如 DELETE 语句执行删除而实际并没有符合条件的数据),select 或 show 等不会修改数据的操作则不会记录在 binlog 中。通常在 binlog_format = ROW 的环境下,我们可以通过 binlog 获取历史的 SQL 执行记录,前提是必须开启 binlog_rows_query_log_events 参数(默认关闭,建议开启),该参数可以通过
binlog解析工具—my2sql
雪辉博客
03-15
1734
文章目录一、工具介绍1.1 工具对比1.2 用途1.3 限制1.4 性能对比二、工具使用2.1 参数说明2.2 使用案例
一、工具介绍
  my2sql是go版MySQL binlog解析工具,通过解析MySQL binlog ,可以生成原始SQL、回滚SQL、去除主键的INSERT SQL等,也可以生成DML统计信息。类似工具有binlog2sql、MyFlash、my2fback等,本工具基于my2fback、binlog_rollback工具二次开发而来。
1.1 工具对比
binlog2sql:P
mysql操作日志查看工具_MySQL 的 Binlog 日志处理工具对比
weixin_42520277的博客
01-30
373
来源 |blog.csdn.net/weixin_38071106/article/details/88547660CanalMaxwellDatabus阿里云的数据传输服务DTSCanal定位:基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql。原理:canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql mast...
mysqlbinlog解析工具
oceanbase,xml上传漏洞
11-17
88
mysql怎样通过binlog恢复被恶意删除的数据
1.以前我错误的认为mysql的日志可以恢复到任何时间的状态,其实并不是这样,这个恢复是有前提的,就是你至少得有一个从日志记录开始后的数据库备份,通过日志恢复数据库实际上只是一个对以前操作的回放过程而已,不用想得太复杂,既然是回放你就得注意了,如果你执行了两次恢复那么就...
MySQL如何解析binlog_带你解析MySQL binlog
weixin_39703982的博客
01-19
1208
前言:我们都知道,binlog可以说是MySQL中比较重要的日志了,在日常学习及运维过程中,也经常会遇到。不清楚你对binlog了解多少呢?本篇文章将从binlog作用、binlog相关参数、解析binlog内容三个方面带你了解binlog。1.binlog简介binlog即binary log,二进制日志文件。它记录了数据库所有执行的DDL和DML语句(除了数据查询语句select、show等)...
MySQL binlog分析程序:Open Replicator
menergy
12-26
8381
第0章:简介
(1)下面是http://code.google.com中的binlog事件分析结构图:
(2)获取开源包的maven坐标
com.google.code
open-replicator
1.0.5
Introducing Maxwell, a mysql-to-kafka binlog processor
wulantian的专栏
03-08
2150
Introducing Maxwell, a mysql-to-kafka binlog processor
时间 2015-08-21 01:43:28 Planet
MySQL
原文 http://developer.zendesk.com/blog/introducing-maxwell-a-mysql-to-kafka-binlog-processor
主题 MySQL
MySQL数据库Binlog解析工具--binlog2sql
淡定波的博客
07-17
3189
概述
作为DBA,binlog2sql是一项必须掌握的工具。binlog2sql是一个开源的Python开发的MySQL Binlog解析工具,能够将Binlog解析为原始的SQL,也支持将Binlog解析为回滚的SQL,以便做数据恢复。
gihub:https://github.com/danfengcao/binlog2sql
一、闪回原理简析
开始之前,先说说闪回。我们都知道 MySQL binlog 以 event 为单位,记录数据库的变更信息,这些信息能够帮助我们重现这之间的所有变化,也就是所谓
Open Replicator
weixin_34360651的博客
03-02
126
Open Replicator ( http://code.google.com/p/open-replicator/ ) 开源了。Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通...
Debezium报错处理系列之二十五:Client requested master to start replication from impossible position
zhengzaifeidelushang的博客
09-25
150
Debezium报错处理系列之二十五:Client requested master to start replication from impossible position
Flink cdc 2.1.1 bug问题
qq_31866793的博客
05-16
1628
cdc数据越界,cdc 2.1.1 bug
MySQL解析binlog日志文件
Sebastien23的博客
08-27
593
上面Binlog的格式为。可以看到,解析出的DML语句被加密了。
MySQL Binlog 解析工具 Maxwell 详解
帅成一匹马
08-17
1434
maxwell 简介
Maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。
Maxwell主要提供了下列功能:
支持SELECT * FROM table的方式进行全量数据初始化
支持在主库发生f..
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:编程工作室
设计师:CSDN官方博客
返回首页
朱小厮
CSDN认证博客专家
CSDN认证企业博客
码龄9年
暂无认证
306
原创
6748
周排名
109万+
总排名
495万+
访问
等级
3万+
积分
1万+
粉丝
2882
获赞
1998
评论
7232
收藏
私信
关注
热门文章
RabbitMQ之消息确认机制(事务+Confirm)
127917
RabbitMQ之消息持久化
99910
从零开始玩转JMX(一)——简介和Standard MBean
63057
Kafka解析之topic创建(1)
60644
Linux下Git安装及配置
57960
分类专栏
消息中间件
117篇
技术杂谈
16篇
JAVA相关技术
55篇
网站架构相关技术
31篇
设计模式相关技术
25篇
Java集合容器相关技术
13篇
系统架构
35篇
java
134篇
设计模式
26篇
并发
10篇
算法
3篇
计算机网络
2篇
服务器搭建
4篇
linux
6篇
数据库
5篇
scala
1篇
kafka
65篇
rabbitmq
54篇
消息中间件
58篇
spark
4篇
Go
最新评论
kafka数据可靠性深度解读
weixin_64295471:
大数据2113吴洋到此一游。
最全 Prometheus 踩坑集锦
金金金金金大叔:
求问,我的prometheus在使用的时候,每次评估都会重置“Active Since”,查看数据是一直连续的,请问有这方面的思路吗?
JAVA多线程之UncaughtExceptionHandler——处理非正常的线程中止
wo41chuan_luan_ma:
记录下,程序中用一个线程发送心跳,突然心跳异常,报错Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "HeartBeatThread",看了博主的博客懂了,因为try...catch没法捕获异常导致不在定义的日志文件中打印日志,而在output文件中打印日志,接下来想想如何解决这个问题,我这个应该解决OOM就可以了,就算捕获异常,OOM还是要解决
RabbitMQ之消息确认机制(事务+Confirm)
Stephen·You:
写得挺好的,不过看见了一些瑕疵:
1、异步confirm模式的代码那里创建的集合应该命名为unConfirmSet,因为最终该集合留下的是confirm失败的消息id。
2、rabbitmq丢失了消息,在handleNack函数中不需要再执行删除消息id的操作了。
3、消息id和消息内容都需要存起来,当最后发现有消息confirm失败时才能重新发送消息内容,所以建议将消息内容用hash数据类型存到redis中,或者用TreeMap代替TreeSet,同时也能防止消息重复消费的问题
RabbitMQ之死信队列
小哥骑单车:
个人感觉,其实死信队列来自哪种方式不重要,重要的是都没被消费过,都可以当未被消费的消息处理。
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
是什么让Redis“气急败坏”回击:13年来,总有人想替Redis换套新架构
打破原则引入SQL,MongoDB到底想要干啥???
面试官:大量请求 Redis 不存在的数据,从而影响数据库,该如何解决?
2022
08月
1篇
07月
1篇
06月
4篇
05月
5篇
04月
4篇
03月
11篇
02月
11篇
01月
12篇
2021年272篇
2020年383篇
2019年294篇
2018年43篇
2017年64篇
2016年119篇
2015年38篇
2014年1篇
目录
目录
分类专栏
消息中间件
117篇
技术杂谈
16篇
JAVA相关技术
55篇
网站架构相关技术
31篇
设计模式相关技术
25篇
Java集合容器相关技术
13篇
系统架构
35篇
java
134篇
设计模式
26篇
并发
10篇
算法
3篇
计算机网络
2篇
服务器搭建
4篇
linux
6篇
数据库
5篇
scala
1篇
kafka
65篇
rabbitmq
54篇
消息中间件
58篇
spark
4篇
Go
目录
评论 16
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
打赏作者
朱小厮
你的鼓励将是我创作的最大动力
¥2
¥4
¥6
¥10
¥20
输入1-500的整数
余额支付
(余额:-- )
扫码支付
扫码支付:¥2
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值