设为首页 收藏本站
查看: 3431|回复: 0

[经验分享] Apache curator-client详解

[复制链接]

尚未签到

发表于 2017-1-1 08:35:43 | 显示全部楼层 |阅读模式
    Apache curator框架中curator-client组件可以作为zookeeper client来使用,它提供了zk实例创建/重连机制等,简单便捷.不过直接使用curator-client并不能减少太多的开发量,因为它相对比较底层,稍后我们继续了解curator-framework组件提供的更多的便捷特性.
 
一.核心API
    1. CuratorZookeeperClient: zookeeper客户端,根据指定的配置信息创建zookeeper实例.
    2. RetryPolicy接口: 重连策略,当zookeeper失去链接时使用的"重连策略": 
        <> RetryOneTime: 只重连一次.
        <> RetryNTime: 指定重连的次数N.
        <> RetryUtilElapsed: 指定最大重连超时时间和重连时间间隔,间歇性重连直到超时或者链接成功.
        <> ExponentialBackoffRetry: 基于"backoff"方式重连,和RetryUtilElapsed的区别是重连的时间间隔是动态的.
 

时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))).
 
        <> BoundedExponentialBackoffRetry: 同ExponentialBackoffRetry,增加了最大重试次数的控制.
    3. RetryLoop: 操作重试,如果在执行一个操作时,遇到了zk链接异常,怎么办?RetryLoop可以不断重试,直到网络正常且操作执行成功为止.SessionFailRetryLoop类是一个特列,可以兼容当session失效时,如何进行操作重试.
    4. EnsembleProvider: 配置提供者,创建zk客户端时,需要指定connectionString(例如:127.0.0.1:2181),在zookeeper API中只能显式的指定,curator在这个方面提供了更加灵活性的方式,你可以通过任何方式获取或者构建connectionString.
        <> FixedEnsembleProvider: 使用固定的字符串作为connectionString.
        <> ExhibitorEnsembleProvider: 动态的获取connectionString,可以指定一个URL用来提供connectionString的输出服务.此后此Provider将会间歇性的获取最新的connectionString字符串,并保存.事实上,ExhibitorEnsembleProvider只是一个样例,展示了一种动态获取connectionString的方式,如果在真正的开发中,你可能需要参考它,来定制自己的Provider.
 
二. 通用客户端代码示例

public class ZooKeeperClient extends Thread{
protected final CuratorZookeeperClient zkClient;
protected String parent;
public static final Charset charset = Charset.forName("utf-8");
private ZNodeWatcher zNodeWatcher = new ZNodeWatcher();//自定义watcher
public ZooKeeperClient(String connectString, int sessionTimeout, String parent) throws Exception {
this.parent = parent;
zkClient = new CuratorZookeeperClient(connectString, sessionTimeout, sessionTimeout, zNodeWatcher, new ExponentialBackoffRetry(1000, Integer.MAX_VALUE));
zkClient.start();//must,but anytime before zookeeper operation
zkClient.blockUntilConnectedOrTimedOut(); //first connection should be successful
}

public boolean exist(String path,boolean watched) throws Exception{
return zkClient.getZooKeeper().exists(path,watched) == null ? false : true;
}
/**
* 此path必须存在,如果不存在则立即创建
* @param path
* @return
*/
public boolean ensurePath(final String path) throws Exception{
PathUtils.validatePath(path);
return RetryLoop.callWithRetry(zkClient, new Callable<Boolean>(){
@Override
public Boolean call() throws Exception {
EnsurePath ensure = new EnsurePath(path);
ensure.ensure(zkClient);
return true;
}
});
}
/**
*
* @param path
* @param data
* @return   如果path已经存在或者创建成功,则返回true,否则返回false。
* @throws Exception
*/
public boolean create(final String path, final String data) throws Exception {
PathUtils.validatePath(path);//if bad format,here will throw some Exception;
return RetryLoop.callWithRetry(zkClient, new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
int _current = 0;
while (_current < 3) {
_current++;
try {
//zkClient.blockUntilConnectedOrTimedOut();
//确保父节点存在
EnsurePath ensure = new EnsurePath(path).excludingLast();
//parent path should be existed.
//EnsurePath: retry + block
ensure.ensure(zkClient); //ugly API
zkClient.getZooKeeper().create(path, data.getBytes(charset), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return true;
} catch (KeeperException.NodeExistsException e) {
return true;
}
//retry only for KeeperException,not for other runtimeException。
//other exception will be thrown,and stop retry!!
//if no Exception thrown,retry will be stopped and return successfully.
}
return false;
}
}) ;
}

public  class ZNodeWatcher implements Watcher{
@Override
public void process(WatchedEvent event) {
Event.EventType eventType = event.getType();
Event.KeeperState keeperState =  event.getState();
String path = event.getPath();
switch(event.getType()) {
case None:
//connection Error:会自动重连
logger.info("[Watcher],Connecting...");
if(keeperState == Event.KeeperState.SyncConnected){
logger.info("[Watcher],Connected...");
//检测临时节点是否失效等。
}
break;
case NodeCreated:
logger.info("[Watcher],NodeCreated:" + path);
break;
case NodeDeleted:
logger.info("[Watcher],NodeDeleted:" + path);
break;
default:
//
}
}
}
}
 
 
三. Provider代码实例
      本实例展示了如何使用curator-client开发简单的API,展示了RetryPolicy,RetryLoop的使用方式;实例中使用Curator自带的ExhibitorEnsembleProvider动态获取zookeeper服务器列表信息.
    1. pom.xml

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.3.0</version>
</dependency>

   2. curator-config.propeties 

host.rest.servers=127.0.0.1,localhost
host.rest.port=8080
host.backup=127.0.0.1:2181
host.rest.path=/servers/zk
host.rest.period=180000
    3. IZkClient.java  

package com.test.demo.curator;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryLoop;
import org.apache.curator.RetryPolicy;
import org.apache.curator.TimeTrace;
import org.apache.curator.drivers.TracerDriver;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
import org.apache.curator.ensemble.exhibitor.ExhibitorRestClient;
import org.apache.curator.ensemble.exhibitor.Exhibitors;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Transaction;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;

public class IZkClient {
private final CuratorZookeeperClient zkClient;
public IZkClient(String configLocation) throws Exception {
Properties properties = new Properties();
properties.load(ClassLoader.getSystemResourceAsStream(configLocation));
EnsembleProvider provider = buildProvider(properties);
String pTimeout = properties.getProperty("zk.timeout");
Integer timeout = 30000;
if (pTimeout != null) {
timeout = Integer.valueOf(pTimeout);
}
zkClient = new CuratorZookeeperClient(provider, timeout, timeout, null, new ExponentialBackoffRetry(1000, Integer.MAX_VALUE));
zkClient.setTracerDriver(new PrintTraceDrive());
zkClient.start();//must,but anytime before zookeeper operation
zkClient.blockUntilConnectedOrTimedOut(); //first connection should be successful
}
/**
* build provider,all of params from config-file
* @param properties
* @return
*/
private EnsembleProvider buildProvider(Properties properties) {
String servers = properties.getProperty("host.rest.servers");   //hosts.servers = 127.0.0.1,127.0.0.2
if (servers == null || servers.isEmpty()) {
throw new IllegalArgumentException("host.servers cant be empty");
}
List<String> hostnames = Arrays.asList(servers.split(","));
String port = properties.getProperty("host.rest.port");
Integer restPort = 80;   //default
if (port != null) {
restPort = Integer.valueOf(port);
}
final String backupAddress = properties.getProperty("host.backup");//127.0.0.1:2181
//if network is error,you should sepcify a backup zk-connectString
Exhibitors exhibitors = new Exhibitors(hostnames, restPort, new Exhibitors.BackupConnectionStringProvider() {
@Override
public String getBackupConnectionString() throws Exception {
return backupAddress;
}
});
//rest,as meaning of getting fresh zk-connectString list.
ExhibitorRestClient restClient = new DefaultExhibitorRestClient();
String restUriPath = properties.getProperty("host.rest.path");
String period = properties.getProperty("host.rest.period");
Integer pollingMs = 180000; //3 min
if (period != null) {
pollingMs = Integer.valueOf(period);
}
return new ExhibitorEnsembleProvider(exhibitors, restClient, restUriPath, pollingMs, new RetryNTimes(10, 1000));
}
public CuratorZookeeperClient getZkClient() {
return zkClient;
}
/**
* how to use RtryLoop ,another style
* if Znode has been existed,will delete it,and create it again.
*
*/
public boolean replace(final String path,final byte[] value){
PathUtils.validatePath(path);
boolean result = false;
try{
result = RetryLoop.callWithRetry(zkClient,new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
int _current = 0;
while(_current < 3){
_current++;
try{
zkClient.blockUntilConnectedOrTimedOut();
Transaction tx = zkClient.getZooKeeper().transaction();
tx.delete(path, -1);
tx.create(path,value,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
tx.commit();
return true;
} catch (KeeperException.NoNodeException e){
//
} catch (KeeperException.NodeExistsException e){
//
}
}
return false;  //To change body of implemented methods use File | Settings | File Templates.
}
}) ;
}catch (Exception e){
e.printStackTrace();
}
return result;
}
//API : on for test
public String createPath(String path, byte[] value) throws Exception {
PathUtils.validatePath(path);//if bad format,here will throw some Exception;
EnsurePath ensure = new EnsurePath(path).excludingLast();
//parent path should be existed.
//EnsurePath: retry + block
ensure.ensure(zkClient); //ugly API
return zkClient.getZooKeeper().create(path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//API: on for test
public boolean createPath(String path, byte[] value,int blockTimes){
if (!zkClient.isConnected() && blockTimes == 0) {
return false;
}
TimeTrace trace = zkClient.startTracer("createPath:" + path);//log message
try{
EnsurePath ensure = new EnsurePath(path).excludingLast();
ensure.ensure(zkClient);//only for persistent node
RetryLoop loop = zkClient.newRetryLoop();
int _current = 0;
while(loop.shouldContinue()){
try{
if(_current >= blockTimes){
loop.markComplete(); //stop here.
continue;
}
//blocking
boolean isConnected = zkClient.blockUntilConnectedOrTimedOut();
if(!isConnected){
_current++;
continue;
}
zkClient.getZooKeeper().create(path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
loop.markComplete();
} catch (KeeperException.NodeExistsException e){
loop.markComplete();//exist ,stop here
} catch (Exception e){
loop.takeException(e);
}
}
} catch (Exception e){
e.printStackTrace();
return false;  //cant create path
} finally{
trace.commit();
}
return true;
}
public byte[] getData(String path) throws Exception{
PathUtils.validatePath(path);
return zkClient.getZooKeeper().getData(path,false,null);
}
public void close(){
zkClient.close();
}
class PrintTraceDrive implements TracerDriver {
@Override
public void addTrace(String name, long time, TimeUnit unit) {
System.out.println("<Trace>" + name + ";time:" + TimeUnit.MILLISECONDS.convert(time, unit) + " ms");
}
@Override
public void addCount(String name, int increment) {
}
}
}

    4. IZkClientMain.java(for testing) 

public class IZkClientMain {

public static void main(String[] args) throws Exception {
String configLocation = "curator-config.properties";
IZkClient iZkClient = new IZkClient(configLocation);
String value = "curator-demo";
String path = "/curator/child/0";
iZkClient.replace(path, value.getBytes("utf-8"));
//simple method;
String nodeName = ZKPaths.getNodeFromPath(path);
System.out.print(nodeName);
//value
byte[] bytes = iZkClient.getData(path);
System.out.println(new String(bytes, "utf-8"));
Thread.sleep(180000 * 2);
iZkClient.close();
}
}

    5. ExhibitorEnsembleProvider需要使用远端的一个REST风格的Url来提供zookeeper服务器列表,如下为Spring方式:

@Controller
@RequestMapping("/servers")
public class ServersController {
@RequestMapping(value = "/zk",headers="Accept=application/x-www-form-urlencoded")
public void zk(HttpServletResponse response) throws  Exception{
FormHttpMessageConverter converter = new FormHttpMessageConverter();
converter.setCharset(Charset.forName("utf-8"));
HttpOutputMessage output = new ServletServerHttpResponse(response);
converter.write(buildServers(), MediaType.APPLICATION_FORM_URLENCODED,output);
//String servers = "count=2&port=2181&server0=127.0.0.1&server1=localhost";
}

private MultiValueMap<String,Object> buildServers(){
MultiValueMap<String,Object> map = new LinkedMultiValueMap<String, Object>();
map.add("count","2");
map.add("port","2181");
map.add("server0","127.0.0.1");
map.add("server1","localhost");
return map;
}
}
  
   备注:Curator-client可以帮助我们进行链接重连操作,重连过程中,我们不需要关注太多,不过你仍然可以通过注册Watcher的手段来活的通知.如果在操作过程中,zk的链接有异常,你可以通过RetryLoop的方式实现阻塞. 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-322205-1-1.html 上篇帖子: Apache & Tomcat集群配置 下篇帖子: [转]Apache POI简单说明
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表