设为首页 收藏本站
查看: 600|回复: 0

[经验分享] 使用canal进行mysql数据同步到Redis

[复制链接]

尚未签到

发表于 2018-10-1 06:29:13 | 显示全部楼层 |阅读模式
import java.net.InetSocketAddress;  
import java.util.List;
  

  
import com.alibaba.fastjson.JSONObject;
  
import com.alibaba.otter.canal.client.CanalConnector;
  
import com.alibaba.otter.canal.common.utils.AddressUtils;
  
import com.alibaba.otter.canal.protocol.Message;
  
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
  
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
  
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
  
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
  
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
  
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
  
import com.alibaba.otter.canal.client.*;
  

  
public class CanalClient{
  
   public static void main(String args[]) {
  
       CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
  
               11111), "example", "", "");
  
       int batchSize = 1000;
  
       try {
  
           connector.connect();
  
           connector.subscribe(".*\\..*");
  
           connector.rollback();
  
           while (true) {
  
               Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  
               long batchId = message.getId();
  
               int size = message.getEntries().size();
  
               if (batchId == -1 || size == 0) {
  
                   try {
  
                       Thread.sleep(1000);
  
                   } catch (InterruptedException e) {
  
                       e.printStackTrace();
  
                   }
  
               } else {
  
                   printEntry(message.getEntries());
  
               }
  
               connector.ack(batchId); // 提交确认
  
               // connector.rollback(batchId); // 处理失败, 回滚数据
  
           }
  
       } finally {
  
           connector.disconnect();
  
       }
  
   }
  

  
   private static void printEntry( List entrys) {
  
       for (Entry entry : entrys) {
  
           if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  
               continue;
  
           }
  
           RowChange rowChage = null;
  
           try {
  
               rowChage = RowChange.parseFrom(entry.getStoreValue());
  
           } catch (Exception e) {
  
               throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
  
                       e);
  
           }
  
           EventType eventType = rowChage.getEventType();
  
           System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
  
                   entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  
                   entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  
                   eventType));
  

  
           for (RowData rowData : rowChage.getRowDatasList()) {
  
               if (eventType == EventType.DELETE) {
  
                   redisDelete(rowData.getBeforeColumnsList());
  
               } else if (eventType == EventType.INSERT) {
  
                   redisInsert(rowData.getAfterColumnsList());
  
               } else {
  
                   System.out.println("-------> before");
  
                   printColumn(rowData.getBeforeColumnsList());
  
                   System.out.println("-------> after");
  
                   redisUpdate(rowData.getAfterColumnsList());
  
               }
  
           }
  
       }
  
   }
  

  
   private static void printColumn( List columns) {
  
       for (Column column : columns) {
  
           System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
  
       }
  
   }
  

  
  private static void redisInsert( List columns){
  
      JSONObject json=new JSONObject();
  
      for (Column column : columns) {
  
          json.put(column.getName(), column.getValue());
  
       }
  
      if(columns.size()>0){
  
          RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
  
      }
  
   }
  

  
  private static  void redisUpdate( List columns){
  
      JSONObject json=new JSONObject();
  
      for (Column column : columns) {
  
          json.put(column.getName(), column.getValue());
  
       }
  
      if(columns.size()>0){
  
          RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
  
      }
  
  }
  

  
   private static  void redisDelete( List columns){
  
       JSONObject json=new JSONObject();
  
          for (Column column : columns) {
  
              json.put(column.getName(), column.getValue());
  
           }
  
          if(columns.size()>0){
  
              RedisUtil.delKey("user:"+ columns.get(0).getValue());
  
          }
  
   }
  
}
  

  
RedisUtil 工具类
  

  
import redis.clients.jedis.Jedis;
  
import redis.clients.jedis.JedisPool;
  
import redis.clients.jedis.JedisPoolConfig;
  

  
public class RedisUtil {
  

  
    // Redis服务器IP
  
    private static String ADDR = "0.0.0.0";
  

  
    // Redis的端口号
  
    private static int PORT = 6379;
  

  
    // 访问密码
  
    //private static String AUTH = "admin";
  

  
    // 可用连接实例的最大数目,默认值为8;
  
    // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
  
    private static int MAX_ACTIVE = 1024;
  

  
    // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
  
    private static int MAX_IDLE = 200;
  

  
    // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
  
    private static int MAX_WAIT = 10000;
  

  
    // 过期时间
  
    protected static int  expireTime = 60 * 60 *24;
  

  
    // 连接池
  
    protected static JedisPool pool;
  

  
    /**
  
     * 静态代码,只在初次调用一次
  
     */
  
    static {
  
        JedisPoolConfig config = new JedisPoolConfig();
  
        //最大连接数
  
        config.setMaxTotal(MAX_ACTIVE);
  
        //最多空闲实例
  
        config.setMaxIdle(MAX_IDLE);
  
        //超时时间
  
        config.setMaxWaitMillis(MAX_WAIT);
  
        //
  
        config.setTestOnBorrow(false);
  
        pool = new JedisPool(config, ADDR, PORT, 1000);
  
    }
  

  
    /**
  
     * 获取jedis实例
  
     */
  
    protected static synchronized Jedis getJedis() {
  
        Jedis jedis = null;
  
        try {
  
            jedis = pool.getResource();
  
        } catch (Exception e) {
  
            e.printStackTrace();
  
            if (jedis != null) {
  
                pool.returnBrokenResource(jedis);
  
            }
  
        }
  
        return jedis;
  
    }
  

  
    /**
  
     * 释放jedis资源
  
     * @param jedis
  
     * @param isBroken
  
     */
  
    protected static void closeResource(Jedis jedis, boolean isBroken) {
  
        try {
  
            if (isBroken) {
  
                pool.returnBrokenResource(jedis);
  
            } else {
  
                pool.returnResource(jedis);
  
            }
  
        } catch (Exception e) {
  

  
        }
  
    }
  

  
    /**
  
     * 是否存在key
  
     * @param key
  
     */
  
    public static boolean existKey(String key) {
  
        Jedis jedis = null;
  
        boolean isBroken = false;
  
        try {
  
            jedis = getJedis();
  
            jedis.select(0);
  
            return jedis.exists(key);
  
        } catch (Exception e) {
  
            isBroken = true;
  
        } finally {
  
            closeResource(jedis, isBroken);
  
        }
  
        return false;
  
    }
  

  
    /**
  
     * 删除key
  
     * @param key
  
     */
  
    public static void delKey(String key) {
  
        Jedis jedis = null;
  
        boolean isBroken = false;
  
        try {
  
            jedis = getJedis();
  
            jedis.select(0);
  
            jedis.del(key);
  
        } catch (Exception e) {
  
            isBroken = true;
  
        } finally {
  
            closeResource(jedis, isBroken);
  
        }
  
    }
  

  
    /**
  
     * 取得key的值
  
     * @param key
  
     */
  
    public static String stringGet(String key) {
  
        Jedis jedis = null;
  
        boolean isBroken = false;
  
        String lastVal = null;
  
        try {
  
            jedis = getJedis();
  
            jedis.select(0);
  
            lastVal = jedis.get(key);
  
            jedis.expire(key, expireTime);
  
        } catch (Exception e) {
  
            isBroken = true;
  
        } finally {
  
            closeResource(jedis, isBroken);
  
        }
  
        return lastVal;
  
    }
  

  
    /**
  
     * 添加string数据
  
     * @param key
  
     * @param value
  
     */
  
    public static String stringSet(String key, String value) {
  
        Jedis jedis = null;
  
        boolean isBroken = false;
  
        String lastVal = null;
  
        try {
  
            jedis = getJedis();
  
            jedis.select(0);
  
            lastVal = jedis.set(key, value);
  
            jedis.expire(key, expireTime);
  
        } catch (Exception e) {
  
            e.printStackTrace();
  
            isBroken = true;
  
        } finally {
  
            closeResource(jedis, isBroken);
  
        }
  
        return lastVal;
  
    }
  

  
    /**
  
     *  添加hash数据
  
     * @param key
  
     * @param field
  
     * @param value
  
     */
  
    public static void hashSet(String key, String field, String value) {
  
        boolean isBroken = false;
  
        Jedis jedis = null;
  
        try {
  
            jedis = getJedis();
  
            if (jedis != null) {
  
                jedis.select(0);
  
                jedis.hset(key, field, value);
  
                jedis.expire(key, expireTime);
  
            }
  
        } catch (Exception e) {
  
            isBroken = true;
  
        } finally {
  
            closeResource(jedis, isBroken);
  
        }
  
    }
  
}



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-606855-1-1.html 上篇帖子: MYSQL数据库基本操作命令 下篇帖子: MySQL MRR介绍
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表