设为首页 收藏本站
查看: 937|回复: 0

[经验分享] Zookeeper场景实践:(8) 分布式队列

[复制链接]

尚未签到

发表于 2015-11-22 07:13:58 | 显示全部楼层 |阅读模式

1.基本介绍



按照ZooKeeper典型应用场景一览里的说法,分布式队列有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。

第二种队列可以先建立一个/queue,赋值为n,表达队列的大小。然后每个队列成员加入时,就判断是否达到队列要求的大小,如果是可以进行下一步动作,否则继续等待队列成员的加入。比较典型的情况是,当一个大的任务可能需要很多的子任务完成才能开始进行。



比如汇总账单的时候,就必须先将用户的消费数据,积分数据等都统计完成后才能开始。汇总账单的程序建立一个队列/Queue,赋值为2,然后分别统计消费数据和积分数据的程序当完成任务时就往/Queue下创建一个临时节点。而汇总账单程序监测到/Queue的子节点个数为2时,就可以开始执行任务了。




实际上,我们也可以先建立一个数目为2的子节点。当一个子任务完成的时候,就删除一个子节点,当所有子节点都被删除的时候,主任务就可以开始执行了。这个过程可以形象的理解为拆除屏障。因此这种队列还有一个专门的词语描述,叫做屏障(barrier)。


2.场景分析



讲了那么多的关于屏障的认识,但是并不打算就去实现它,并且Zookeeper的官方文档也有相关的知识。这次的主要目标是常规的FIFO队列。我将实现队列的两个主要操作:push和pop。

1).
int push(zhandle_t *zkhandle,const char *path,char *element)


  • zkhandle为zookeeper_init初始化后的句柄
  • path为队列的路径
  • element为要压入队列的内容


2).
int pop(zhandle_t *zkhandle,const char *path,char *element_buffer,int *buffer_len)


  • zkhandle为zookeeper_init初始化后的句柄
  • path为队列的路径
  • element_buffer为要弹出的缓冲区
  • buffer_len为指向缓冲区的大小的指针


简单来说,假设队列的路径为/Queue,push就是就是创建一个临时有序的/Queue/queue-节点。pop就是取出/Queue/下序列号最小的节点。

我们知道在C++中stl里有一个queue的类,实现了push,pop等操作,然而它是非线程安全的,即多个线程同时push/pop的时候可能会出现错误。而由于ZooKeeper保证了创建节点和删除节点的一致性,因此可以说利用Zookeeper实现的队列是进程安全的。


3. 场景实践



来看push和pop的具体实现。push的实现很简单,就是在{path}下创建一个有序的{path}/queue-子节点.



int push(zhandle_t *zkhandle,const char *path,char *element)
{
char child_path[512] = {0};
char path_buffer[512] = {0};
int bufferlen = sizeof(path_buffer);
sprintf(child_path,"%s/queue-",path);
int ret = zoo_create(zkhandle,child_path,element,strlen(element),  
&ZOO_OPEN_ACL_UNSAFE,ZOO_SEQUENCE,  
path_buffer,bufferlen);  
if(ret != ZOK){
fprintf(stderr,"failed to create the path %s!\n",path);
}else{
printf("create path %s successfully!\n",path);
}
return ret;
}


  

pop的功能则是取出{path}下序号最小的子节点,如果没有子节点,则返回-1.



int pop(zhandle_t *zkhandle,const char *path,char *element,int *len)
{
int i = 0;
struct String_vector children;
int ret = zoo_get_children(zkhandle,path,0,&children);

if(ret != ZOK){
fprintf(stderr,"failed to create the path %s!\n",path);
}else if (children.count == 0){
strcpy(element,"");
*len = 0;
ret = -1;
}else{
char *min = children.data[0];
for(i = 0; i < children.count; ++i){
printf(&quot;%s:%s\n&quot;,min,children.data);
if(strcmp(min,children.data) > 0){
min = children.data;
}
}
if(min != NULL){
char child_path[512]={0};
sprintf(child_path,&quot;%s/%s&quot;,path,min);
ret = zoo_get(zkhandle,child_path,0,element,len,NULL);
if(ret != ZOK){
fprintf(stderr,&quot;failed to get data of the path %s!\n&quot;,child_path);
}else{
ret = zoo_delete(zkhandle,child_path, -1);
if(ret != ZOK){
fprintf(stderr,&quot;failed to delete the path %s!\n&quot;,child_path);
}
}
}
}
for(i = 0; i < children.count; ++i){
free(children.data);
children.data = NULL;
}

return ret;
}


  

最后,再来看看模拟队列操作的程序。和其他程序类&#20284;,它的选项有


  • -p:指定队列的路径
  • -m:指定操作是push还是pop
  • -v:只在push时有用,用与指定要push的元素的&#20540;
  • -s:指定Zookeeper的服务器的ip:port.


如:

向队列/Queue中压人一个元素,元素的&#20540;为&quot;Hello&quot;:

>myqueue -s 172.17.0.36:2181 -p /Queue -m push -v Hello

将队列/Queue弹出一个元素

>myqueue -s 172.17.0.36:2181 -p /Queue -m pop





最后附上完整的源代码:



#include<stdio.h>  
#include<string.h>  
#include<unistd.h>
#include&quot;zookeeper.h&quot;  
#include&quot;zookeeper_log.h&quot;  
char g_host[512]= &quot;172.17.0.36:2181&quot;;  
char g_path[512]= &quot;/Queue&quot;;
char g_value[512]=&quot;msg&quot;;
enum MODE{PUSH_MODE,POP_MODE} g_mode;
void print_usage();
void get_option(int argc,const char* argv[]);
/**********unitl*********************/  
void print_usage()
{
printf(&quot;Usage : [myqueue] [-h] [-m mode] [-p path ] [-v value][-s ip:port] \n&quot;);
printf(&quot;        -h Show help\n&quot;);
printf(&quot;        -p Queue path\n&quot;);
printf(&quot;        -m mode:push or pop\n&quot;);
printf(&quot;        -v the value you want to push\n&quot;);
printf(&quot;        -s zookeeper server ip:port\n&quot;);
printf(&quot;For example:\n&quot;);
printf(&quot;    push the message \&quot;Hello\&quot; into the queue Queue:\n&quot;);
printf(&quot;        >myqueue -s172.17.0.36:2181 -p /Queue -m push -v Hello\n&quot;);
printf(&quot;    pop one message from the queue Queue:\n&quot;);
printf(&quot;        >myqueue -s172.17.0.36:2181 -p /Queue -m pop\n&quot;);
}
void get_option(int argc,const char* argv[])
{
extern char    *optarg;
int            optch;
int            dem = 1;
const char    optstring[] = &quot;hv:m:p:s:&quot;;

g_mode = PUSH_MODE;
while((optch = getopt(argc , (char * const *)argv , optstring)) != -1 )
{
switch( optch )
{
case 'h':
print_usage();
exit(-1);
case '?':
print_usage();
printf(&quot;unknown parameter: %c\n&quot;, optopt);
exit(-1);
case ':':
print_usage();
printf(&quot;need parameter: %c\n&quot;, optopt);
exit(-1);
case 'm':
if(strcasecmp(optarg,&quot;push&quot;)==0){
g_mode = PUSH_MODE;
}else{
g_mode = POP_MODE;
}
break;
case 's':
strncpy(g_host,optarg,sizeof(g_host));
break;
case 'p':
strncpy(g_path,optarg,sizeof(g_path));
break;
case 'v':
strncpy(g_value,optarg,sizeof(g_value));
break;
default:
break;
}
}
}
int push(zhandle_t *zkhandle,const char *path,char *element)
{
char child_path[512] = {0};
char path_buffer[512] = {0};
int bufferlen = sizeof(path_buffer);
sprintf(child_path,&quot;%s/queue-&quot;,path);
int ret = zoo_create(zkhandle,child_path,element,strlen(element),  
&ZOO_OPEN_ACL_UNSAFE,ZOO_SEQUENCE,  
path_buffer,bufferlen);  
if(ret != ZOK){
fprintf(stderr,&quot;failed to create the path %s!\n&quot;,path);
}else{
printf(&quot;create path %s successfully!\n&quot;,path);
}
return ret;
}
int pop(zhandle_t *zkhandle,const char *path,char *element,int *len)
{
int i = 0;
struct String_vector children;
int ret = zoo_get_children(zkhandle,path,0,&children);

if(ret != ZOK){
fprintf(stderr,&quot;failed to create the path %s!\n&quot;,path);
}else if (children.count == 0){
strcpy(element,&quot;&quot;);
*len = 0;
ret = -1;
}else{
char *min = children.data[0];
for(i = 0; i < children.count; ++i){
printf(&quot;%s:%s\n&quot;,min,children.data);
if(strcmp(min,children.data) > 0){
min = children.data;
}
}
if(min != NULL){
char child_path[512]={0};
sprintf(child_path,&quot;%s/%s&quot;,path,min);
ret = zoo_get(zkhandle,child_path,0,element,len,NULL);
if(ret != ZOK){
fprintf(stderr,&quot;failed to get data of the path %s!\n&quot;,child_path);
}else{
ret = zoo_delete(zkhandle,child_path, -1);
if(ret != ZOK){
fprintf(stderr,&quot;failed to delete the path %s!\n&quot;,child_path);
}
}
}
}
for(i = 0; i < children.count; ++i){
free(children.data);
children.data = NULL;
}

return ret;
}
int front(zhandle_t *zkhandle,char *path,char *element,int *len)
{
int i = 0;
struct String_vector children;
int ret = zoo_get_children(zkhandle,path,0,&children);
if(ret != ZOK){
fprintf(stderr,&quot;failed to create the path %s!\n&quot;,path);
}else if(children.count == 0){
strcpy(element,&quot;&quot;);
*len = 0;
ret = -1;
}else{
char *min = NULL;
for(i = 0; i < children.count; ++i){
if(strcmp(min,children.data) > 0){
min = children.data;
}
}
if(min != NULL){
char child_path[512]={0};
sprintf(child_path,&quot;%s/%s&quot;,path,min);
ret = zoo_get(zkhandle,child_path,0,element,len,NULL);
if(ret != ZOK){
fprintf(stderr,&quot;failed to get data of the path %s!\n&quot;,child_path);
}
}
}
for(i = 0; i < children.count; ++i){
free(children.data);
children.data = NULL;
}
return ret;
}

int main(int argc, const char *argv[])  
{  
int timeout = 30000;  
char path_buffer[512];  
int bufferlen=sizeof(path_buffer);  
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); //设置日志级别,避免出现一些其他信息  
get_option(argc,argv);
zhandle_t* zkhandle = zookeeper_init(g_host,NULL, timeout, 0, (char *)&quot;lock Test&quot;, 0);  
if (zkhandle ==NULL)  
{  
fprintf(stderr, &quot;Error when connecting to zookeeper servers...\n&quot;);  
exit(EXIT_FAILURE);  
}  
int ret = zoo_exists(zkhandle,g_path,0,NULL);
if(ret != ZOK){
ret = zoo_create(zkhandle,g_path,&quot;1.0&quot;,strlen(&quot;1.0&quot;),  
&ZOO_OPEN_ACL_UNSAFE,0,  
path_buffer,bufferlen);  
if(ret != ZOK){
fprintf(stderr,&quot;failed to create the path %s!\n&quot;,g_path);
}else{
printf(&quot;create path %s successfully!\n&quot;,g_path);
}
}
if(g_mode == PUSH_MODE){
push(zkhandle,g_path,g_value);
printf(&quot;push:%s\n&quot;,g_value);
}else{
int len = sizeof(g_value);
ret = pop(zkhandle,g_path,g_value,&len) ;
if(ret == ZOK){
printf(&quot;pop:%s\n&quot;,g_value);
}else if( ret == -1){
printf(&quot;queue is empty\n&quot;);
}
}

zookeeper_close(zkhandle);
return 0;
}


  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-141953-1-1.html 上篇帖子: 只有一个 ZooKeeper 服务器的例子 下篇帖子: Zookeeper C Client分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表