日期:2014-05-16  浏览次数:20773 次

基于tungsten监测mysql数据修改系统介绍
一.项目背景
项目中有个全文检索系统,此系统中需要知道应用中数据的实时更新情况,如增加,删除,这些对于索引与数据库数据的同步都很重要。针对于此处理的方式一般有两种,一种是应用层修改,直接发送消息给检索系统,此对系统的消耗是比较大的,一般不可取;另一种是应用系统跟检索系统之间有一种同步机制,可以通过计算ID checksum值的方式来保证数据的同步,此方式在公司相关的系统中都有应用,也是一种可行的方法,不过相关逻辑还是比较复杂。考虑到我们新项目中使用到的是mysql 数据库,同时通过Binlog来监测数据的更新情况在业界也有相关的应用,比如通过Binlog来更新缓存(这也是我现在手上正在做的一个项目,正在开发过程中),对新浪更新redis缓存也是基于此,相关很多公司都有应用,以上为基于Mysql Binlog来监测数据库数据更新情况的背景说明。

二.开源项目选择
Tungsten是一个开源的数据库同步工具,详细可参考官网(http://en.wikipedia.org/wiki/Tungsten)
其提供的tungsten API可供下载,此API实现了mysql slave同步流程的相关功能。

三.基本流程
基于Mysql Binlog完成数据监控主要有三个流程:(对于tungsten使用的API包主要是:com.continuent.tungsten.replicator.extractor.mysql)
1.把Binlog从指定的mysql master日志目录同步过来,通过以上包中的RelayClient类即可,可指定mysql主机地址,相关用户名密码,即JDBC连接的方式参数指定,即可完成Binlog日志的同步。

2.把同步过来的Binlog日志解析成我们能看的懂的SQL语句。对于此解析,由于日志格式的不同,解析也有不同,是通过MysqlExtractor类,指定Binlog的本地目录,即可完成解析。(对于mysql binlog的日志说明可以参考:Mysql Binlog三种格式介绍及分析

3.在以上解析后会得到对应的Sql语句,对于如果选用row日志格式,会知道具体的数据变化情况,MysqlExtractor类数据可转换化RowChangeData对象,即可完成修改数据的读取;如果是基于Statement的日志解析,得到的对应的SQL语句,如果需要提取对应数据修改,需要引入SQL解析,我们项目中采用了MIXED的日志格式,SQL解析采用了JSqlParser,此开源项目对SQL的解析有限,它是通过javacc来完成SQL语义的解析,我们项目中对javacc定义文件的做了修改,使其支撑更复杂的SQL解析。

通过以上三个流程,即可完成数据库对应修改数据的提取。对应的mysql产生的binlog会有很多,在实际应用中解析时可以跟据自己监测的特定SQL,这样很大提高解析速度。(PS:对于增加特定SQL解析所用的时间是几百甚至几十纳表即可完成,但是对于一次解析则需要MS级别的解析。因此如何选择供借鉴)以上是整体流程,一般是多线程异步执行(mysql slave同步流程也是如此)。一个线程执行RelayClient完成日志的同步流程,一个线程完成MysqlExtractor的日志解析流程。一般来说,日志同步的速度远远大于日
志解析的速度,因此对于流程的控制是个需要考虑的问题。(我们是修改tungsten API源码,通过java Semaphore信号量来控制日志的同步与解析速度)日志同步如果没有新的新的日志产生会出现等待新日志更新,如果没有新的日志更新,日志解析线程也会等待,此流程跟Mysql的slave同步线程的流程是一样的。实际线上使用时,mysql日志的产生速度,以及mysql的解析速度可测试,实时性是否满足需求,我们的流程在测试阶段还算比较不错,实时性在MS级别上。


四:说明
上面是基于tungsten API完成mysql数据更新监控项目的简单介绍。在此需要注意是不同的日志格式,解析相关比较大,因此在考虑自己应用的基础上,通过部分的IO开销,来使解析变得容易是可取的。而且statement日志格式对应的解析有时是做不了的。(个人推荐mysql 使用row日志格式)

原先我们系统在使用redis时,是基于应用层更新的,完成的分布式方案,现老大在让我做二期的缓存方案,已经接近尾声。应用层数据更新将通过binlog更新的方式,来完成缓存数据的更新。应用层对缓存的更新,只是使用。(我们的特殊业务需求,一些原始数据需要从DB中加载,考虑到不增加DB额外负载的情况,会考虑还是保留在应用层flush)通过此方式,能大大解耦应用层跟缓存模块的耦合,系统变得更模块化。


相关源码,可参考其tungsten API中对应的单元测试,其单元测试还比较完整,相应的例子都有。如有不对或者交流的地方可联系我。(个人微博:纯生活)