|
zookeeper的实用场景有统一配置,统一命名服务,集群管理,分布式锁,分布式队列。
今天,我的实验场景是集群管理的ha功能.
实验架构如下:
Server[A/B]代码:
此处,我们需要第3方的模块
zkclient.py(https://github.com/phunt/zk-smoketest/blob/master/zkclient.py)
需要做些稍微的修改,因为我这里的ACL采用的是digest,而不是world
导入些模块:
import base64
import hashlib
定义些变量
auth="badboy:test"
user="badboy"
user_auth = "%s:%s" % (user, base64.b64encode(hashlib.new("sha1", auth).digest()))
#ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"}
ZOO_DIGEST_ACL_SAFE = {"perms":0x1f ,"scheme":"digest", "id" : user_auth}
再将代码处的ZOO_OPEN_ACL_UNSAFE修改为ZOO_DIGEST_ACL_SAFE
最后就是认证了,我们需要在ZKClient类中的__init__最后一行添加zookeeper.add_auth(self.handle,"digest",auth , None)
这样,我们连接的句柄就是经过认证的了.
注意Server[A/B]代码不同之处是service函数处,所输出的ip内容是不一样的。
#!/usr/bin/env python
import logging
from os.path import basename, join ,dirname
from zkclient import ZKClient, zookeeper
from SimpleXMLRPCServer import SimpleXMLRPCServer
DEBUG=True
BASE_DIR=dirname(__file__)
if DEBUG:
logname=""
file_mode=""
else:
logname=join(BASE_DIR,"app.log")
file_mode="a"
logging.basicConfig(
level = logging.DEBUG,
format = "[%(asctime)s] %(levelname)-8s %(message)s",
datefmt = "%Y-%m-%d %H:%M:%S",
filename = logname,
filemode = file_mode,
)
log = logging
class TCZookeeper(object):
ZK_HOST = "192.168.x.5:2181"
ROOT = "/tc"
WORKERS_PATH = join(ROOT, "nodes")
MASTERS_NUM = 1
TIMEOUT = 1000
def __init__(self, verbose = True):
self.VERBOSE = verbose
self.masters = []
self.path = None
self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
self.say("login ok!")
self.__init_zk()
self.register()
self.start_service()
def __init_zk(self):
nodes = (self.ROOT, self.WORKERS_PATH)
for node in nodes:
if not self.zk.exists(node):
try:
self.zk.create(node, "")
except:
pass
def register(self):
self.path = self.zk.create(self.WORKERS_PATH + "/node", "192.168.x.4", flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
self.paths=self.path
self.path = basename(self.path)
self.say("register ok! I'm %s" % self.path)
def service(self):
'''
do somethings
'''
return "My IP is:192.168.x.4"
def start_service(self):
server = SimpleXMLRPCServer(("192.168.x.4", 8000))
self.say("Listening on port 8000...")
server.register_function(self.service, "service")
server.serve_forever()
def say(self, msg):
if self.VERBOSE:
log.info(msg)
if __name__ == "__main__":
tczk=TCZookeeper() 启动Server[A|B]输出如下:
A机192.168.x.4:
[root@web02 scripts]# python server.py
[2014-12-24 14:40:37] INFO login ok!
[2014-12-24 14:40:37] INFO register ok! I'm node0000000034
[2014-12-24 14:40:37] INFO Listening on port 8000...
B机192.168.x.5(zookeeper也在这台机器噢):
[root@web02 scripts]# python server.py
[2014-12-24 14:40:37] INFO login ok!
[2014-12-24 14:40:37] INFO register ok! I'm node0000000035
[2014-12-24 14:40:37] INFO Listening on port 8000... 我们再来看看Agent代码:
#!/usr/bin/env python
import xmlrpclib
import logging
from os.path import basename, join ,dirname
from zkclient import ZKClient, watchmethod
from SimpleXMLRPCServer import SimpleXMLRPCServer
DEBUG=True
BASE_DIR=dirname(__file__)
if DEBUG:
logname=""
file_mode=""
else:
logname=join(BASE_DIR,"app.log")
file_mode="a"
logging.basicConfig(
level = logging.DEBUG,
format = "[%(asctime)s] %(levelname)-8s %(message)s",
datefmt = "%Y-%m-%d %H:%M:%S",
filename = logname,
filemode = file_mode,
)
log = logging
class TCZookeeper(object):
ZK_HOST = "192.168.x.5:2181"
ROOT = "/tc"
NODES_PATH = join(ROOT, "nodes")
MASTERS_NUM = 1
TIMEOUT = 1000
def __init__(self, verbose = True):
self.VERBOSE = verbose
self.masters = []
self.path = None
self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
self.say("login ok!")
self.get_master()
def get_master(self):
"""
get children, and check who is the smallest child
"""
@watchmethod
def watcher(event):
self.say("child changed, try to get master again.")
self.get_master()
children = self.zk.get_children(self.NODES_PATH, watcher)
children.sort()
self.say("%s's children: %s" % (self.NODES_PATH, children))
self.masters = children[:self.MASTERS_NUM]
self.path=self.NODES_PATH+"/"+self.masters[0]
self.serverip=self.zk.get(self.path)
self.say("MasterIP:%s" %self.serverip[0])
def rpc(self):
try:
proxy = xmlrpclib.ServerProxy("http://%s:8000/" %self.serverip[0])
return "content: %s" % str(proxy.service())
except:
pass
def start_service(self):
server = SimpleXMLRPCServer(("192.168.x.3", 8000))
self.say("Listening on port 8000...")
server.register_function(self.rpc, "rpc")
server.serve_forever()
def say(self, msg):
if self.VERBOSE:
log.info(msg)
if __name__ == "__main__":
tczk = TCZookeeper()
tczk.start_service()
启动agent输出如下:
[root@web01 scripts]# python agent.py
[2014-12-24 14:43:29] INFO login ok!
[2014-12-24 14:43:29] INFO /tc/nodes's children: ['node0000000034', 'node0000000035']
[2014-12-24 14:43:29] INFO MasterIP:192.168.x.4
[2014-12-24 14:43:29] INFO Listening on port 8000... 最后,我们看下client
#!/usr/bin/env python
import xmlrpclib
import time
#proxy = xmlrpclib.ServerProxy("http://192.168.x.3:8000/")
while True:
try:
proxy = xmlrpclib.ServerProxy("http://192.168.x.3:8000/")
print proxy.rpc()
except:
pass
time.sleep(1) 输出结果:
root@saltstack:/scripts# python client.py
content: My IP is:192.168.x.4
content: My IP is:192.168.x.4
content: My IP is:192.168.x.4
此时,我们将其192.168.x.4服务关掉.
很快打印的内容就变成
content: My IP is:192.168.x.5
content: My IP is:192.168.x.5
再看agent输出有,
[2014-12-24 14:47:34] INFO child changed, try to get master again.
[2014-12-24 14:47:34] INFO /tc/nodes's children: ['node0000000035']
[2014-12-24 14:47:34] INFO MasterIP:192.168.x.5
看到没,原先MasterIP:192.168.x.4,现在192.168.x.5
呵呵,HA的功能试验完成了.
参考文档:
http://blog.csdn.net/lengzijian/article/details/9233327
如果想了解更多,请关注我们的公众号
公众号ID:opdevos
扫码关注
|
|