设为首页 收藏本站
查看: 826|回复: 0

[经验分享] 基于zookeeper的MySQL主主负载均衡的简单实现

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-9-6 09:15:17 | 显示全部楼层 |阅读模式
  1.先上原理图
DSC0000.png
  2.说明
  两个mysql采用主主同步的方式进行部署。
  在安装mysql的服务器上安装客户端(目前是这么做,以后想在zookeeper扩展集成),客户端实时监控mysql应用的可用性,可用时想zookeepercreateNode,当网络不可用或者mysql应用不可用时,建立的znode消失。
  在客户端,通过改造proxool数据库连接池的方式,在建立连接之前,从zookeeper中去取真实的数据库URL,如果有多个URL,即有多个服务时,采用随机算法去拿连接(以后准备扩展权重)。当连接不可用时,数据库连接池将重建连接,这时候又回去zookeeper拿连接,因为agent建立的临时znode消失了,就不能拿到已经失效的url了。
  这个方案只是初步的实验和实现了,还有很多后续的问题,主要为了解决lvs+keepalived只能在同一个区域内的问题。
  3.部分实现
  1).agent
  



/**
* 数据库可用性检测
* @author tomsnail
* @date 2015年4月3日 上午10:11:51
*/
public class TestMySQL {
public static boolean test(String url){
Connection conn = null;
Statement stmt = null;
ResultSet rs  = null;
String sql = ConfigHelp.getLocalConifg("jdbc_inventory.house-keeping-test-sql", "select 0");
try {
Class.forName(ConfigHelp.getLocalConifg("jdbc_inventory.driver-class", "com.mysql.jdbc.Driver"));// 动态加载mysql驱动
conn = DriverManager.getConnection(url);
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
while (rs.next()) {
}
return true;
} catch (SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(rs!=null){
rs.close();
}
if(stmt!=null){
stmt.close();
}
if(conn!=null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return false;
}
}


/**
* zookeeper客户端
* @author tomsnail
* @date 2015年4月3日 上午10:11:51
*/
public class TestServer {
private static final Logger logger = LoggerFactory
.getLogger(TestServer.class);
private static ZooKeeper zk;
private String path;
//同步锁
private Lock _lock = new ReentrantLock();
// 用于等待 SyncConnected 事件触发后继续执行当前线程
private CountDownLatch latch = new CountDownLatch(1);

public TestServer() {
zk = connectServer();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//logger.info("check zk...");
                    _lock.lock();
if (zk != null) {
if (zk.getState().isAlive()
&& zk.getState().isConnected()) {
//logger.info("zk is ok");
                            _lock.unlock();
continue;
}
}
close();
logger.info("reConnectServer ...");
zk = connectServer();
logger.info("reConnectServer ok");
_lock.unlock();
}
}
private void close() {
if(zk!=null){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
zk = null;
}
}
}).start();
}

// 连接 ZooKeeper 服务器
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(ConfigHelp.ZK_CONNECTION_STRING,
ConfigHelp.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown(); // 唤醒当前正在执行的线程
                            }
}
});
latch.await(); // 使当前线程处于等待状态
} catch (Exception e) {
logger.error("", e);
}
if (zk != null) {
try {
Stat stat = zk.exists(ConfigHelp.ZK_ROOT_PATH, false);
if (stat == null) {
String path = zk.create(ConfigHelp.ZK_ROOT_PATH,
"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); // 创建一个临时性且有序的 ZNode
logger.info("create zookeeper node ({})", path);
}
stat = zk.exists(ConfigHelp.ZK_RMI_PATH, false);
if (stat == null) {
String path = zk.create(ConfigHelp.ZK_RMI_PATH,
"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); // 创建一个临时性且有序的 ZNode
logger.info("create zookeeper node ({})", path);
}
} catch (Exception e) {
e.printStackTrace();
}
}
return zk;
}
// 创建 ZNode
public void createNode(String url) {
_lock.lock();
try {
byte[] data = url.getBytes();
path = zk.create(ConfigHelp.ZK_RMI_PATH + "/", data,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL); // 创建一个临时性且有序的 ZNode
logger.info("create zookeeper node ({} => {})", path, url);
} catch (Exception e) {
logger.error("", e);
e.printStackTrace();
}
_lock.unlock();
}
public void deleteNode(String url){
_lock.lock();
try {
Stat stat = zk.exists(path, false);
if(stat!=null){
zk.delete(url, stat.getVersion());
}
} catch (Exception e) {
e.printStackTrace();
}
_lock.unlock();
}
}


/**
* 数据库检测测试主类
* @author tomsnail
* @date 2015年4月3日 上午10:11:51
*/
public class TestMain {
private static TestServer testServer = new TestServer();
public static void main(String[] args) {
String url = ConfigHelp.getLocalConifg("jdbc_inventory.driver-url", "select 0");
boolean isOK = false;
while(true){
if(TestMySQL.test(url)){
if(isOK){
}else{
testServer.createNode(url);//建立znode
                }
isOK = true;
}else{
isOK = false;
testServer.deleteNode(url);//删除znode
            }
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
  
  2).proxool



/**
* zookeeper信息定义类
* @author tomsnail
* @date 2015年4月2日 下午6:49:13
*/
public class ZkInfoDefinition {
public static final String PREFIX_ZK = "zookeeper";
public static final String ZK_URL = "zkUrl";
public static final String ZK_SESSION_TIMEOUT = "sessionTimeout";
public static final String ZK_PATH = "zkPath";
public static final String ZK_ENABLE = "zkEnable";
public static String zkUrl="192.168.102.1:31315";
public static int sessionTimeout = 5000;
public static boolean isEnable = false;
public static String zkPath = "/root/db";
public String getZkUrl() {
return zkUrl;
}
public void setZkUrl(String zkUrl) {
this.zkUrl = zkUrl;
}
public int getSessionTimeout() {
return sessionTimeout;
}
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
public String getZkPath() {
return zkPath;
}
public void setZkPath(String zkPath) {
this.zkPath = zkPath;
}
public ZkInfoDefinition(String zkUrl, int sessionTimeout, String zkPath) {
super();
this.zkUrl = zkUrl;
this.sessionTimeout = sessionTimeout;
this.zkPath = zkPath;
}
public ZkInfoDefinition(){
}
}

DSC0001.gif DSC0002.gif


/**
* zookeeper客户端
* @author tomsnail
* @date 2015年4月3日 上午10:15:11
*/
public class ZkClient {
private static final Logger logger = LoggerFactory.getLogger(ZkClient.class);
// 用于等待 SyncConnected 事件触发后继续执行当前线程
private CountDownLatch latch = new CountDownLatch(1);
// 定义一个 volatile 成员变量,用于保存最新的 RMI 地址(考虑到该变量或许会被其它线程所修改,一旦修改后,该变量的值会影响到所有线程)
private volatile List<String> dataList = new ArrayList<String>();
private Lock _lock = new ReentrantLock();
private static  ZooKeeper zk;
private LBUrl lbUrl;

public ZkClient(){
this(new BasicLBUrl());
}
// 构造器
public ZkClient(LBUrl lbUrl) {
this.lbUrl = lbUrl;
zk = connectServer(); // 连接 ZooKeeper 服务器并获取 ZooKeeper 对象
            watchNode();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
_lock.lock();
if (zk != null) {
if (zk.getState().isAlive()
&& zk.getState().isConnected()) {
_lock.unlock();
continue;
}
}
if(zk!=null){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
zk = null;
}
zk = connectServer();
_lock.unlock();
}
}
}).start();
}
// 查找 URL 服务
public String getUrl() {
if (dataList!=null&&dataList.size()>0) {
return this.lbUrl.getUrl(dataList);
}
return null;
}
public List<String> getUrls(){
return dataList;
}
// 连接 ZooKeeper 服务器
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(ZkInfoDefinition.zkUrl, ZkInfoDefinition.sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown(); // 唤醒当前正在执行的线程
                        }
}
});
latch.await(); // 使当前线程处于等待状态
} catch (Exception e) {
logger.error("", e);
}
return zk;
}
// 观察 /registry 节点下所有子节点是否有变化
private void watchNode() {
_lock.lock();
if(zk!=null&&zk.getState().isAlive()&&zk.getState().isConnected()){
}else{
if(zk!=null){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
zk = null;
}
zk = connectServer();
}
try {
List<String> nodeList = zk.getChildren(ZkInfoDefinition.zkPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
watchNode(); // 若子节点有变化,则重新调用该方法(为了获取最新子节点中的数据)
                        }
}
});
List<String> dataList = new ArrayList<String>(); // 用于存放 /registry 所有子节点中的数据
for (String node : nodeList) {
byte[] data = zk.getData(ZkInfoDefinition.zkPath + "/" + node, false, null); // 获取 /registry 的子节点中的数据
dataList.add(new String(data));
}
logger.debug("node data: {}", dataList);
this.dataList = dataList;
} catch (Exception e) {
logger.error("", e);
}
_lock.unlock();
}
public static void main(String[] args) {
ZkClient client = new ZkClient();
System.out.println(client.getUrl());
}
}
View Code




/**
* 从zookeeper获得URL连接操作类
* @author tomsnail
* @date 2015年4月2日 下午6:56:06
*/
public class ZkUrlOperation {
private static final ZkUrlOperation instance = new ZkUrlOperation();
private static ZkInfoDefinition zkInfoDefinition;
private static ZkClient zkClient;
private static final byte[] _lock = new byte[0];
private  ZkUrlOperation(){
}
public static ZkUrlOperation getInstance(){
return instance;
}
public  void addZkInfoDefinition(ZkInfoDefinition zkInfoDefinition){
ZkUrlOperation.zkInfoDefinition = zkInfoDefinition;
}
public  void addZkInfoDefinition(String key,String value){
if(ZkUrlOperation.zkInfoDefinition==null){
ZkUrlOperation.zkInfoDefinition = new ZkInfoDefinition();
}
if(key.contains(ZkInfoDefinition.ZK_PATH)){
ZkUrlOperation.zkInfoDefinition.setZkPath(value);
}
if(key.contains(ZkInfoDefinition.ZK_SESSION_TIMEOUT)){
ZkUrlOperation.zkInfoDefinition.setSessionTimeout(Integer.valueOf(value));;
}
if(key.contains(ZkInfoDefinition.ZK_URL)){
ZkUrlOperation.zkInfoDefinition.setZkUrl(value);;
}
if(key.contains(ZkInfoDefinition.ZK_ENABLE)){
ZkUrlOperation.zkInfoDefinition.isEnable = Boolean.valueOf(value);
}
}

public String getUrl(){
synchronized (_lock) {
if(zkInfoDefinition.isEnable){
if(zkClient==null){
zkClient = new ZkClient();
}
String url = zkClient.getUrl();
return url;
}else{
return "";
}
}

}
public boolean isAvailUrl(String url){
synchronized (_lock) {
if(zkInfoDefinition.isEnable){
if(zkClient==null){
zkClient = new ZkClient();
}
List<String> urls = zkClient.getUrls();
for(int i=0;i<urls.size();i++){
if(url.equals(urls.get(i))){
return true;
}
}
return false;
}
return false;
}
}

}
View Code  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-109969-1-1.html 上篇帖子: 我读《从Paxos到zookeeper分布式一致性原理与实践》 下篇帖子: Paxos算法之旅(四)zookeeper代码解析--转载
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表