设为首页 收藏本站
查看: 678|回复: 0

[经验分享] JAVA通过Gearman实现MySQL到Redis的数据同步(异步复制)

[复制链接]

尚未签到

发表于 2018-11-3 08:08:38 | 显示全部楼层 |阅读模式
  MySQL到Redis数据复制方案
  无论MySQL还是Redis,自身都带有数据同步的机制,像比较常用的 MySQL的Master/Slave模式 ,就是由Slave端分析Master的binlog来实现的,这样的数据复制其实还是一个异步过程,只不过当服务器都在同一内网时,异步的延迟几乎可以忽略。
  那么理论上我们也可以用同样方式,分析MySQL的binlog文件并将数据插入Redis。但是这需要对binlog文件以及MySQL有非常深入的理解,同时由于 binlog存在Statement/Row/Mixedlevel多种形式 ,分析binlog实现同步的工作量是非常大的。
  因此这里选择了一种开发成本更加低廉的方式,借用已经比较成熟的MySQL UDF,将MySQL数据首先放入Gearman中,然后通过一个自己编写的PHP Gearman Worker,将数据同步到Redis。比分析binlog的方式增加了不少流程,但是实现成本更低,更容易操作。
  Gearman的安装与使用
  Gearman 是一个支持分布式的任务分发框架。设计简洁,获得了非常广泛的支持。一个典型的Gearman应用包括以下这些部分:
DSC0000.jpg

  Gearman Job Server:Gearman核心程序,需要编译安装并以守护进程形式运行在后台
  Gearman Client:可以理解为任务的收件员,比如我要在后台执行一个发送邮件的任务,可以在程序中调用一个Gearman Client并传入邮件的信息,然后就可以将执行结果立即展示给用户,而任务本身会慢慢在后台运行。
  Gearman Worker:任务的真正执行者,一般需要自己编写具体逻辑并通过守护进程方式运行,Gearman Worker接收到Gearman Client传递的任务内容后,会按顺序处理。
  以前曾经介绍过类似的 后台任务处理项目Resque 。两者的设计其实非常接近,简单可以类比为:
  Gearman Job Server:对应Resque的Redis部分
  Gearman Client:对应Resque的Queue操作
  Gearman Worker:对应Resque的Worker和Job
  这里之所以选择Gearman而不是Resque是因为Gearman提供了比较好用的MySQL UDF,工作量更小。
  1、安装依赖
  yum install -y boost-devel gperf libevent-devel libuuid-devel
  yum install mysql-devel -y
  2、下载gearman
  wget https://launchpad.net/gearmand/1.2/1.1.12/+download/gearmand-1.1.12.tar.gz
  3、编译安装,指定mysqlclient的链接路径
  tar -zxvf gearmand-1.1.12.tar.gz
  cd gearmand-1.1.12
  ./configure
  make && make install
  4、启动gearmand服务端 (启动之时,在/var/log/下创建gearmand.log日志文件。-l 指定日志文件  -d后台运行 -L 0.0.0.0 绑定到IPV4
  gearmand -L 0.0.0.0 -l /var/log/gearmand.log -d
  5、查看是否启动成功
  ps -ef | grep gearman
  6、查看是否安装成功,查看gearman版本信息
  gearmand -V
  7、MySQL UDF + Trigger同步数据到Gearman (https://github.com/mysqludf)
  安装lib_mysqludf_json(lib_mysqludf_json可以把MySQL表的数据以json数据格式输出)
  wget https://github.com/mysqludf/lib_mysqludf_json/archive/master.zip
  unzip master.zip
  cd lib_mysqludf_json-master/
  rm -rf lib_mysqludf_json.so
  8、编译 mysql_config 这是mysql的配置文件,可以 find /usr -name mysql_config 搜索下在什么位置
  gcc $(/usr/local/mysql/bin/mysql_config  --cflags) -shared -fPIC -o lib_mysqludf_json.so lib_mysqludf_json.c
  9、拷贝lib_mysqludf_json.so到MySQL的plugin目录
  (可以登陆MySQL,输入命令"show variables like '%plugin%'"查看plugin位置)
  cp lib_mysqludf_json.so /usr/local/mysql/lib/plugin/
  演示lib_mysqludf_json功能
  登录mysql
  mysql -uroot -h127.0.0.1 -p
  注册UDF函数
  CREATE FUNCTION json_object RETURNS STRING SONAME "lib_mysqludf_json.so";
  CREATE FUNCTION json_array RETURNS STRING SONAME "lib_mysqludf_json.so";
  CREATE FUNCTION json_members RETURNS STRING SONAME "lib_mysqludf_json.so";
  CREATE FUNCTION json_values RETURNS STRING SONAME "lib_mysqludf_json.so";
  //json_array|json_members|json_values函数注册方式与json_object一样.
  select json_object(id,file_save_type,base_dir) as sys_file_save_config from sys_file_save_config;
  ERROR 1123 (HY000): Can't initialize function 'json_object'; Invalid json member name - name cannot be empty
  以上错误这样解决,给每个成员名称使用别名即可:

  select json_object(id as>  10、安装gearman-mysql-udf (https://launchpad.net/gearman-mysql-udf)
  wget https://launchpad.net/gearman-mysql-udf/trunk/0.6/+download/gearman-mysql-udf-0.6.tar.gz
  tar zxvf gearman-mysql-udf-0.6.tar.gz
  cd gearman-mysql-udf-0.6
  11、安装libgearman-devel
  yum install libgearman-devel -y
  如果没有yum源,添加epel.repo yum源
  [epel]
  name=Extra Packages for Enterprise Linux 6 - $basearch
  #baseurl=http://download.fedoraproject.org/pub/epel/6/$basearch
  mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-6&arch=$basearch
  failovermethod=priority
  enabled=1
  gpgcheck=1
  gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6
  

        [epel-debuginfo]  name=Extra Packages for Enterprise Linux 6 - $basearch - Debug
  #baseurl=http://download.fedoraproject.org/pub/epel/6/$basearch/debug
  mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-debug-6&arch=$basearch
  failovermethod=priority
  enabled=0
  gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6
  gpgcheck=1
  

  [epel-source]
  name=Extra Packages for Enterprise Linux 6 - $basearch - Source
  #baseurl=http://download.fedoraproject.org/pub/epel/6/SRPMS
  mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-source-6&arch=$basearch
  failovermethod=priority
  enabled=0
  gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6
  gpgcheck=1
  

  12、编译安装
  (可以登陆MySQL,输入命令"show variables like '%plugin%'"查看plugin位置, mysql_config的配置文件,以及插件库所在路径,编译之后会在此路径生成.so文件)
  ./configure --with-mysql=/usr/local/mysql/bin/mysql_config --libdir=/usr/local/mysql/lib/plugin/
  make && make install
  演示gearman-mysql-udf功能
  mysql -uroot -p
  CREATE FUNCTION gman_do_background RETURNS STRING SONAME "libgearman_mysql_udf.so";
  CREATE FUNCTION gman_servers_set RETURNS STRING SONAME "libgearman_mysql_udf.so";
  CREATE FUNCTION gman_do RETURNS STRING SONAME "libgearman_mysql_udf.so";
  CREATE FUNCTION gman_do_high RETURNS STRING SONAME "libgearman_mysql_udf.so";
  CREATE FUNCTION gman_do_low RETURNS STRING SONAME "libgearman_mysql_udf.so";
  CREATE FUNCTION gman_do_high_background RETURNS STRING SONAME "libgearman_mysql_udf.so";
  CREATE FUNCTION gman_do_low_background RETURNS STRING SONAME "libgearman_mysql_udf.so";
  CREATE FUNCTION gman_sum RETURNS STRING SONAME "libgearman_mysql_udf.so";
  //函数gman_do|gman_do_high|gman_do_low|gman_do_high_background|gman_do_low_background|gman_sum注册方式类似,请参考gearman-mysql-udf-0.6/README
  //指定gearman job server地址
  SELECT gman_servers_set('127.0.0.1:4730');
  如果出现异常信息:
  ERROR 1126 (HY000): Can't open shared library 'libgearman_mysql_udf.so' (errno: 11 libgearman.so.8: cannot open shared object file: No such file or directory)
  表示系统找不到 libgearman.so 文件,一般so都在/usr/local/lib目录下,修改配置文件/etc/ld.so.conf,将/usr/local/lib目录加入进去即可:
  $ cat /etc/ld.so.conf
  include ld.so.conf.d/.conf
  /usr/local/lib
  $ /sbin/ldconfig -v | grep gearman
  13、MySQL Trigger调用Gearman UDF实现同步
  创建触发器
  DELIMITER $$
  CREATE TRIGGER test_data_to_redis AFTER UPDATE ON test FOR EACH ROW BEGIN
  SET@ret=gman_do_background('syncToRedis', json_object(NEW.id AS id, NEW.phone ASphone));
  END$$;
  

    DELIMITER $$  
CREATE TRIGGER test_data_to_redis2 AFTER INSERT ON test
  FOR EACH ROW BEGIN
  SET @ret=gman_do_background('syncToRedis2', json_object(NEW.id AS `id`, NEW.phone AS`phone`));
  END$$
  
DELIMITER ;
  

  
DELIMITER $$
  
CREATE TRIGGER test_data_to_redis3 BEFORE DELETE ON test
  FOR EACH ROW BEGIN
  SET @ret=gman_do_background('syncToRedis3', json_object(OLD.id AS `id`, OLD.phone AS`phone`));
  END$$
  
DELIMITER ;
  

  说明以及问题:此类采用了gearman官网的java-gearman-service(地址:https://launchpad.net/gearman-java),目前release版本是0.6.6。java-gearman-servic.jar包中,即包括gearman server,还包括client和work客户端API。
  问题:config类为spring注入的配置文件类,在worker.addFunction中,如果通过config类的属性,并且属性是从配置文件来的就会有问题。不知道为啥,写死就是OK的。此类连接远程的gearman job server。
  jar包需要添加到本地jar仓库:
  mvn install:install-file -Dfile=C:\software\java-gearman-service-0.6.6.jar -DgroupId=org.gearman.jgs -DartifactId=java-gearman-service -Dversion=0.6.6 -Dpackaging=jar
  

        import java.util.concurrent.TimeUnit;  

  import org.gearman.Gearman;
  import org.gearman.GearmanFunction;
  import org.gearman.GearmanFunctionCallback;
  import org.gearman.GearmanServer;
  import org.gearman.GearmanWorker;
  

  /**
  * *ECHO_HOST = "192.168.125.131"为安装了Gearman并开启geramand服务的主机地址
  *int ECHO_PORT = 4730默认端口为4730
  *
  * @author Administrator
  *
  */

  public>  

  // function name
  public static final String ECHO_FUNCTION_NAME = "syncToRedis";
  

  // job server地址
  public static final String ECHO_HOST = "192.168.1.245";
  

  // job server监听的端口
  public static final int ECHO_PORT = 4730;
  

  public static void main(String[] args) {
  // 创建一个Gearman实例
  Gearman gearman = Gearman.createGearman();
  /*
  * 创建一个jobserver
  *
  * Parameter 1: job server的IP地址 Parameter 2: job server监听的端口
  *
  * job server收到client的job,并将其分发给注册worker
  *
  */
  GearmanServer server = gearman.createGearmanServer(EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
  // 创建一个Gearman的worker
  GearmanWorker worker = gearman.createGearmanWorker(); // 正题来了,创建work节点。
  worker.setReconnectPeriod(2, TimeUnit.SECONDS); // 设置超时重连时间
  worker.setMaximumConcurrency(5); // 最大并发数
  // 告诉工人如何执行工作(主要实现了GearmanFunction接口)
  worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());
  // worker连接服务器
  worker.addServer(server);
  
}
  

  
@Override
  
public byte[] work(String function, byte[] data, GearmanFunctionCallback callback) throws Exception {
  // work方法实现了GearmanFunction接口中的work方法,本实例中进行了字符串的反写
  if (data != null) {
  String str = new String(data);
  System.out.println(str);
  StringBuffer sb = new StringBuffer(str);
  return sb.reverse().toString().getBytes();
  } else {
  return "未接收到data".getBytes();
  }
  }
  }
  import org.gearman.Gearman;
  import org.gearman.GearmanClient;
  import org.gearman.GearmanJobEvent;
  import org.gearman.GearmanJobReturn;
  import org.gearman.GearmanServer;
  


  
public>  
public static void main(String... args) throws InterruptedException {
  //创建一个Gearman实例
  Gearman gearman = Gearman.createGearman();
  //创建一个Gearman client
  GearmanClient client = gearman.createGearmanClient();
  /*
  * 创建一个jobserver
  *
  * Parameter 1: job server的IP地址
  * Parameter 2: job server监听的端口
  *
  *job server收到client的job,并将其分发给注册worker
  *
  */
  GearmanServer server = gearman.createGearmanServer(
  EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
  // 告诉客户端,提交工作时它可以连接到该服务器
  client.addServer(server);
  /*
  * 向job server提交工作
  *
  * Parameter 1: gearman function名字
  * Parameter 2: 传送给job server和worker的数据
  *
  * GearmanJobReturn返回job发热结果
  */
  GearmanJobReturn jobReturn = client.submitJob(
  EchoWorker.ECHO_FUNCTION_NAME, ("Hello World!").getBytes());
  //遍历作业事件,直到我们打到最后文件
  while (!jobReturn.isEOF()) {
  

  //下一个作业事件
  GearmanJobEvent event = jobReturn.poll();
  

  switch (event.getEventType()) {
  

  case GEARMAN_JOB_SUCCESS:     //job执行成功
  System.out.println(new String(event.getData()));
  break;
  case GEARMAN_SUBMIT_FAIL:     //job提交失败
  

  case GEARMAN_JOB_FAIL:        //job执行失败
  System.err.println(event.getEventType() + ": "
  + new String(event.getData()));
  default:
  }
  }
  //关闭
  gearman.shutdown();
  
}
  

  }
  http://gearman.org/download/
  php方案:https://www.tuicool.com/articles/B7Jjaa




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-630035-1-1.html 上篇帖子: redis单节点安装 下篇帖子: 将redis加入到elk日志系统里
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表