设为首页 收藏本站
查看: 615|回复: 0

[经验分享] python-gearman之多个GearmanServer问题

[复制链接]

尚未签到

发表于 2017-5-1 14:55:17 | 显示全部楼层 |阅读模式
============================================================================
原创作品,允许转载。转载时请务必以超链接形式标明原始出处、以及本声明。

请注明转自:http://yunjianfei.iteye.com/blog/

============================================================================
 

问题描述
  1.开启多个GearmanServer,端口号分别为4730、4731
  2.使用python-gearman开发client和worker,开启多个worker连接4730和4731两个Server
  3.client发送大量任务到GearmanServer
  4.多个worker中的一个或者多个在接收一个或者若干个任务后,再也不接收任务了

代码实现
  注意:
  在编写代码前,应该先安装python-gearman,官网链接如下:
  https://pypi.python.org/pypi/gearman/
   
  1.文件名:test.bash

# test.bash
for i in {0..5}; do
python gearman_client.py
sleep 2
done
  2.文件名:gearman_client.py

# gearman_client.py
import multiprocessing
import gearman
import traceback
def start_gearman_client(process_id):
gm_client = gearman.GearmanClient(['127.0.0.1:4730','127.0.0.1:4731'])
try:
requests = []
for gm_job_id in range(500):
request = gm_client.submit_job(
task='do_task',
data='%d_%03d' % (process_id,gm_job_id),
unique='%d_%03d' % (process_id,gm_job_id),
background=False,wait_until_complete=False)
requests.append(request)
gm_client.wait_until_jobs_completed(requests)
except:
print traceback.format_exc()
return 0
def main():
child_processes = []
for process_id in range(2):
p = multiprocessing.Process(target=start_gearman_client, args=(process_id,))
child_processes.append((process_id,p))
p.start()
for (pid,child) in child_processes:
print 'Confirming that child number %d had died' % pid
child.join()
if __name__ == '__main__':
main()
  3.文件名:gearman_worker.py

# gearman_worker.py
import gearman
import multiprocessing
import time
import traceback
from functools import partial
def do_work(gearman_worker,gearman_job,worker_id):
try:
print 'Worker %02d processing %s from port %d: %s' % (worker_id,gearman_job.data,gearman_job.connection.gearman_port,gearman_job.unique)
time.sleep(0.001)
except:
print traceback.format_exc()
return 'Done by worker %d through port %d' % (worker_id,gearman_job.connection.gearman_port)
def start_gearman_worker(worker_id):
gm_worker = gearman.GearmanWorker(['127.0.0.1:4730','127.0.0.1:4731'])
gm_worker.register_task('do_task', partial(do_work,worker_id=worker_id))
print 'Worker %d start working' % worker_id
gm_worker.work()
if __name__ == '__main__':
workers = []
for pid in range(8):
worker = multiprocessing.Process(target=start_gearman_worker,args=(pid,))
workers.append(worker)
worker.start()
for worker in workers:
worker.join()

问题复现
  1.开启多个GearmanServer,端口号分别为4730、4731
  2.运行命令 python gearman_worker.py
  3.运行脚本 /bin/bash test.bash
  4.查看worker打印输出的log,可以发现log末尾只有部分worker在接收任务并执行,有几个worker是只执行了一次任务就再也不执行了。(注意:这种状况如果没发生,多执行几次test.bash,然后观察)
  这里我贴出其中一次测试结果:

测试结果

Worker 02 processing 1_467 from port 4731: 1_467
Worker 04 processing 0_484 from port 4731: 0_484
Worker 05 processing 1_468 from port 4731: 1_468
Worker 02 processing 1_469 from port 4731: 1_469
Worker 04 processing 1_470 from port 4731: 1_470
Worker 05 processing 0_486 from port 4731: 0_486
Worker 02 processing 0_487 from port 4731: 0_487
Worker 04 processing 1_474 from port 4731: 1_474
Worker 05 processing 1_473 from port 4731: 1_473
Worker 02 processing 0_490 from port 4731: 0_490
Worker 05 processing 1_476 from port 4731: 1_476
Worker 04 processing 0_491 from port 4731: 0_491
Worker 02 processing 1_478 from port 4731: 1_478
Worker 05 processing 0_492 from port 4731: 0_492
Worker 04 processing 1_479 from port 4731: 1_479
Worker 02 processing 1_484 from port 4731: 1_484
Worker 05 processing 1_485 from port 4731: 1_485
Worker 04 processing 0_497 from port 4731: 0_497
Worker 02 processing 0_498 from port 4731: 0_498
Worker 05 processing 1_489 from port 4731: 1_489
Worker 02 processing 1_492 from port 4731: 1_492
Worker 04 processing 1_493 from port 4731: 1_493
Worker 05 processing 1_495 from port 4731: 1_495
Worker 02 processing 1_498 from port 4731: 1_498
Worker 04 processing 1_499 from port 4731: 1_499
   可以看到,后面只有worker2、4、5在执行任务,其他的worker都不工作了。

问题分析与解决
  在开始问题分析之前,先了解一下GearmanServer与client、worker之间大概的工作流程。
  
DSC0000.png
1.client提交一个Job到GearmanServer

  2.GearmanServer找出所有Sleeping的worker
  3.GearmanServer向这些Sleeping的worker发送'noop'命令,唤醒worker
  4.worker向GearmanServer发送Grab_Job的命令获取Job
   
  根据上面的流程,我们可以知道,worker要获取Job,首先必须受到GearmanServer发送的“NOOP”命令。那么,worker不执行任务了,不外乎以下几种情况:
  1. worker同GearmanServer断开了连接
  2. GearmanServer之中因为某些原因,没有向所有worker发送“NOOP”
  这两点也是可以通过代码来验证的,我当时在python-gearman中加了一些打印输出,可以确定worker同GearmanServer的连接并没有断开。那么,原因只能是GearmanServer没有向所有的worker发送NOOP。
  这样的话,有以下两种方案来解决:
  1.查阅GearmanServer(C实现),修改源码,对所有worker发送NOOP
  2.修改python-gearman的代码,使得所有worker能够主动从GearmanServer获取Job
  对于第一种方案,显然成本很高,而且风险极大,可能会引发更大的BUG,显然不可取。所以我们选择第二种方案,让worker主动去GearmanServer拉取Job,而不是死等“NOOP”。
  第二种方案怎么实现呢?这里我就不详细介绍了,一切尽在代码中,以下是我在github发布的源码,是基于python-gearman的官方版本修改出来的,链接如下:
  https://github.com/yunjianfei/python-gearman
  主要特性有:
  1.加入了TCP keepalive特性
  2.采用epoll,效率更高
  3.worker加入主动抓取任务的功能,保证能从多个server获取任务
  希望大家踊跃使用,如果发现bug,请联系我,谢谢

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-371733-1-1.html 上篇帖子: python中string的操作函数 下篇帖子: (Python编程)一个简单的C扩展模块
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表