root 发表于 2017-4-30 06:52:46

python-memcached的线程安全问题

  答案是肯定的,前提你在使用Python 2.4+和python-memcached 1.36+

为什么我们需要线程安全的memcached
client,因为我们的实际应用一般是多线程的模型,例如cherrypy、twisted,如果python-memcached不是线程安全的话,
引起的问题不仅仅是并发修改共享变量这么简单,是外部socket链接的数据流的混乱

python-memcached怎么实现线程安全的呢?查看源代码看到


try:
# Only exists in Python 2.4+
from threading import local
except ImportError:
# TODO:add the pure-python local implementation
class local(object):
pass
  class Client(local):
很取巧的让Client类继承threading.local,也就是Client里面的每一个属性都是跟当前线程绑定的。实现虽然不太优雅,但是很实在

但是别以为这样就可以随便在线程里面用python-memcached了,因为这种thread local的做法,你的应用必须要使用thread
pool的模式,而不能不停创建销毁thread,因为每一个新线程的创建,对于就会使用一个全新的Client,也就是一个全新的socket链接,如
果不停打开创建销毁thread的话,就会导致不停的创建销毁socket链接,导致性能大量下降。幸好,无论是cherrypy还是twisted,都
是使用了thread pool的模式。
  但是不幸的是gevent不是thread pool的模式,这导致不停的创建销毁socket链接。

gevent

# ThreadID: 47316030715200

=============================

File: "/data1/duitang/dist/app/test/7199/duitang/common/templatetags/myTags.py", line 525, in statichtml

return StaticHTML_Tag(static_name)

File: "/data1/duitang/dist/app/test/7199/duitang/common/templatetags/myTags.py", line 507, in __init__

sh = StaticHTML.objects.get(name=name)

File: "/data1/duitang/dist/app/test/7199/duitang/statichtml/manager.py", line 18, in get

model = key and cache.get(key)

File: "/data1/duitang/dist/app/test/7199/duitang/perf/memcached.py", line 25, in get

rp = super(MemcachedCache, self).get(key, deault, version)

File: "/duitang/dist/sys/python/lib/python2.7/site-packages/django/core/cache/backends/memcached.py", line 58, in get

val = self._cache.get(key)

File: "build/bdist.linux-x86_64/egg/gevent/local.py", line 180, in __getattribute__

_init_locals(self)

File: "build/bdist.linux-x86_64/egg/gevent/local.py", line 167, in _init_locals

cls.__init__(self, *args, **kw)

File: "/data1/duitang/dist/app/test/7199/duitang/memcache.py", line 168, in __init__

print cf1.stacktraces()

File: "/data1/duitang/dist/app/test/7199/duitang/cf1.py", line 8, in stacktraces

for filename, lineno, name, line in traceback.extract_stack(stack):


========================================================

# ThreadID: 47316030715200

File: "build/bdist.linux-x86_64/egg/gevent/greenlet.py", line 390, in run

  result = self._run(*self.args, **self.kwargs)

File: "/duitang/dist/sys/python/lib/python2.7/site-packages/gunicorn-0.14.6-py2.7.egg/gunicorn/workers/async.py", line 44, in handle

  self.handle_request(req, client, addr)

File: "/duitang/dist/sys/python/lib/python2.7/site-packages/gunicorn-0.14.6-py2.7.egg/gunicorn/workers/ggevent.py", line 88, in handle_request

  super(GeventWorker, self).handle_request(*args)

File: "/duitang/dist/sys/python/lib/python2.7/site-packages/gunicorn-0.14.6-py2.7.egg/gunicorn/workers/async.py", line 78, in handle_request

  respiter = self.wsgi(environ, resp.start_response)

File: "/duitang/dist/sys/python/lib/python2.7/site-packages/django/core/handlers/wsgi.py", line 273, in __call__

  response = self.get_response(request)

File: "/duitang/dist/sys/python/lib/python2.7/site-packages/django/core/handlers/base.py", line 111, in get_response

  response = callback(request, *callback_args, **callback_kwargs)

File: "/duitang/dist/sys/python/lib/python2.7/site-packages/django/utils/decorators.py", line 93, in _wrapped_view

  response = view_func(request, *args, **kwargs)

File: "/data1/duitang/dist/app/test/7199/duitang/people/views_people.py", line 45, in peopleIndex

  people = AuthUser.objects.get(id=UserProfile.get_real_id(user_id))

File: "/data1/duitang/dist/app/test/7199/duitang/common/user.py", line 19, in get

  model = key and cache.get(key)

File: "/data1/duitang/dist/app/test/7199/duitang/perf/memcached.py", line 25, in get

  rp = super(MemcachedCache, self).get(key, deault, version)

File: "/duitang/dist/sys/python/lib/python2.7/site-packages/django/core/cache/backends/memcached.py", line 58, in get

  val = self._cache.get(key)

File: "build/bdist.linux-x86_64/egg/gevent/local.py", line 180, in __getattribute__

  _init_locals(self)

File: "build/bdist.linux-x86_64/egg/gevent/local.py", line 167, in _init_locals

  cls.__init__(self, *args, **kw)

File: "/data1/duitang/dist/app/test/7199/duitang/memcache.py", line 168, in __init__

  print cf1.stacktraces()

File: "/data1/duitang/dist/app/test/7199/duitang/cf1.py", line 8, in stacktraces

  for filename, lineno, name, line in traceback.extract_stack(stack):

   


  可以发现threading local最后委托给gevent/local.py实现,导致重复初始化Client。
  而uwsgi只有初始化时才调用,后续刷新页面不再触发Client.init()
  google了一下,老外的解决方案: https://groups.google.com/forum/?fromgroups=#!topic/gevent/ULKUPvbaQ7I
  python memcache客户端比较


对于memcached,redis,hessian,mongo等各种客户端都要考虑一个问题,如何高效安全地和server进行通信?有几种策略:


1. 每次调用都建立和关闭连接

优点:实现简单,不存在线程安全问题。django的BaseMemcachedCache实现就是每次执行完都会close掉连接。

缺点:出现大量TIME_WAIT,花费在建立网络连接上的开销比较大,在有些情况下会导致性能瓶颈。


2.单一socket长线程

优点:不会出现大量TIME_WAIT,不会频繁开关socket,减轻服务器的压力。

缺点:远程调用一般是请求/问答式,为了保证线程安全必须加同步锁,如果远程服务器响应慢的话会导致大量线程竞争同一个socket资源,socket成为瓶颈。


3.单一socket长线程+连接池

为了解决方案1和方案2的问题,我们采用方案3:保持长连接不变,但采用连接池来保存长连接,这样系统内有一批socket供程序调用,避免大量线程竞争同一个socket。


那么python memcached client采用那种形式呢?


首先python memcached client有好几个,在memcached的官方网站提供了client列表(http://code.google.com/p/memcached/wiki/Clients):

a.libmemcached: 最受欢迎的memcached的C语言版本的客户端的,高性能,线程安全。很多语言都有对这个版本的wrapper。

b.pylibmc:是对libmemcached的wrapper。 http://sendapatch.se/projects/pylibmc/

c.python-memcached: 是100%纯python的版本,也是我们现在正在使用的版本。     http://www.tummy.com/Community/software/python-memcached/

d.Python libmemcached: 豆瓣提供的,对libmemcached的wrapper版本。http://code.google.com/p/python-libmemcached/

e.django cahce: django对memcached的支持,其实django只是提供了一个统一的cache接口,并没有具体实现,相关实现类在django.core.cache.backends.memcached.BaseMemcachedCache ,具体参考:https://docs.djangoproject.com/en/dev/topics/cache/ 不过django的问题很多。


python-memcached

python-memcached不会在每次get/set操作完成之后主动关闭连接,他是一种长连接,但他如果保证线程安全呢?一般我们是这样使用它:

import memcache

mc = memcache.Client(['127.0.0.1:11211'], debug=0)

答案在于他很取巧的让Client类继承threading.local,也就是Client里面的每一个属性都是跟当前线程绑定。这样每个线程都只会看到本地的Client,变相的实现了连接池。

这个池的大小取决于系统有多少个线程。为了验证这个说法,我们写一个程序来测试。



import memcache

import threading

mc = memcache.Client(['127.0.0.1:11211'], debug=1)


class TestThread(threading.Thread): 

    def __init__(self,redis_cache): 

        threading.Thread.__init__(self) 

        self.redis_cache = redis_cache 

 

    def run(self): 

        while True: 

            obj = self.redis_cache.get("uid:1002")

            if not obj:

                self.redis_cache.set("uid:1002","test",1)

            print obj


for i in xrange(8): 

    t = TestThread(mc) 

    t.start()   




执行这段程序,然后通过netstat -an查看,刚好有一个8个ESTABLISHED,每个进程一个socket。

python的locale和java的ThreadLocl是一个意思,把值设置到线程中,每个线程只能看到自己线程保存的值,这里面的实现是这样的,由于Client继承于local,当访问Cient的任何一个方法或者属性都会进入到__getattribute__(),其内部实现是获取到当前线程并绑定 相关代码如下:

def _patch(self):
key = object.__getattribute__(self, '_local__key')
d = current_thread().__dict__.get(key)
if d is None:
d = {}
current_thread().__dict__ = d
object.__setattr__(self, '__dict__', d)
# we have a new instance dict, so call out __init__ if we have
# one
cls = type(self)
if cls.__init__ is not object.__init__:
args, kw = object.__getattribute__(self, '_local__args')
cls.__init__(self, *args, **kw)
else:
object.__setattr__(self, '__dict__', d)
  cls是当前对象的class,object是基类class,我打印了清单:
  new :  <class 'duitang.memcache.Client'> | <type 'object'>

--------------------

cached: <class 'gevent.local.local'> | <slot wrapper '__init__' of 'object' objects> | <slot wrapper '__init__' of 'object' objects>
  

yunpeng@yunpeng-duitang:~/test2$ netstat -an | grep 11211

tcp        0      0 0.0.0.0:11211           0.0.0.0:*               LISTEN    

tcp       31      0 127.0.0.1:46835         127.0.0.1:11211         ESTABLISHED

tcp        0      0 127.0.0.1:46832         127.0.0.1:11211         ESTABLISHED

tcp        0      0 127.0.0.1:46831         127.0.0.1:11211         ESTABLISHED

tcp        0      0 127.0.0.1:46829         127.0.0.1:11211         ESTABLISHED

tcp        0     14 127.0.0.1:46833         127.0.0.1:11211         ESTABLISHED

tcp       31      0 127.0.0.1:46828         127.0.0.1:11211         ESTABLISHED

tcp        0      0 127.0.0.1:46834         127.0.0.1:11211         ESTABLISHED

tcp       31      0 127.0.0.1:46830         127.0.0.1:11211         ESTABLISHED


当python进程退出socket会被自动关闭。

yunpeng@yunpeng-duitang:/duitang/dist/app/trunk/duitang$ netstat -an | grep 11211

tcp        0      0 0.0.0.0:11211           0.0.0.0:*               LISTEN    

tcp        0      0 127.0.0.1:44900         127.0.0.1:11211         TIME_WAIT 



但是很明显,采用threading locale这种方式来保证线程安全存在一些缺陷:


1.要求web server采用thread pool的方式,如果thread每次执行完之后就结束了,这会导致不停的创建销毁socket链接。

2.要求使用python thread locale的语义,但不幸的是python的thread语义很容易被改变,gevent就可以直接把python的一个thread转换成greenlet。



gevent的monkey提供的patch方法


patch_all() 调用所有的monkey patch

   

patch_os() os.fork()替换成gevent.fork



patch_select(aggressive=False) `select.select`替换成`gevent.select.select`


patch_socket(dns=True, aggressive=True)  标准的socket object 替换成 gevent's cooperative sockets.


patch_thread(threading=True, _threading_local=True) thread` module 替换成 gevent's thread


patch_time()  把标准的`time.sleep` 替换成`gevent.sleep`.



gunicorn如何使用gevent?

代码:

/duitang/dist/sys/python/lib/python2.7/site-packages/gunicorn-0.14.6-py2.7.egg/gunicorn/workers/ggevent.py

# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from __future__ import with_statement
import os
import sys
from datetime import datetime
# workaround on osx, disable kqueue
if sys.platform == "darwin":
os.environ['EVENT_NOKQUEUE'] = "1"
try:
import gevent
except ImportError:
raise RuntimeError("You need gevent installed to use this worker.")
from gevent.pool import Pool
from gevent.server import StreamServer
from gevent import pywsgi
import gunicorn
from gunicorn.workers.async import AsyncWorker
VERSION = "gevent/%s gunicorn/%s" % (gevent.__version__, gunicorn.__version__)
BASE_WSGI_ENV = {
'GATEWAY_INTERFACE': 'CGI/1.1',
'SERVER_SOFTWARE': VERSION,
'SCRIPT_NAME': '',
'wsgi.version': (1, 0),
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False
}
class GeventWorker(AsyncWorker):
server_class = None
wsgi_handler = None
@classmethod
def setup(cls):
from gevent import monkey
monkey.noisy = False
monkey.patch_all()
def timeout_ctx(self):
return gevent.Timeout(self.cfg.keepalive, False)
def run(self):
self.socket.setblocking(1)
pool = Pool(self.worker_connections)
if self.server_class is not None:
server = self.server_class(
self.socket, application=self.wsgi, spawn=pool, log=self.log,
handler_class=self.wsgi_handler)
else:
server = StreamServer(self.socket, handle=self.handle, spawn=pool)
server.start()
pid = os.getpid()
try:
while self.alive:
self.notify()
ifpid == os.getpid() and self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
break
gevent.sleep(1.0)
except KeyboardInterrupt:
pass
try:
# Try to stop connections until timeout
self.notify()
server.stop(timeout=self.cfg.graceful_timeout)
except:
pass
def handle_request(self, *args):
try:
super(GeventWorker, self).handle_request(*args)
except gevent.GreenletExit:
pass
if gevent.version_info == 0:
def init_process(self):
#gevent 0.13 and older doesn't reinitialize dns for us after forking
#here's the workaround
import gevent.core
gevent.core.dns_shutdown(fail_requests=1)
gevent.core.dns_init()
super(GeventWorker, self).init_process()

class GeventResponse(object):
status = None
headers = None
response_length = None

def __init__(self, status, headers, clength):
self.status = status
self.headers = headers
self.response_length = clength
class PyWSGIHandler(pywsgi.WSGIHandler):
def log_request(self):
start = datetime.fromtimestamp(self.time_start)
finish = datetime.fromtimestamp(self.time_finish)
response_time = finish - start
resp = GeventResponse(self.status, self.response_headers,
self.response_length)
req_headers =
self.server.log.access(resp, req_headers, self.environ, response_time)
def get_environ(self):
env = super(PyWSGIHandler, self).get_environ()
env['gunicorn.sock'] = self.socket
env['RAW_URI'] = self.path
return env
class PyWSGIServer(pywsgi.WSGIServer):
base_env = BASE_WSGI_ENV
class GeventPyWSGIWorker(GeventWorker):
"The Gevent StreamServer based workers."
server_class = PyWSGIServer
wsgi_handler = PyWSGIHandler
   测试:ab -n100 http://7199.t.duitang.com:7199/cache/
  $ netstat -an| grep 11211 |wc -l

306
  

总结:使用了gevent之后thread local有太多不可控.

gevent代码:/duitang/dist/sys/python/lib/python2.7/site-packages/gevent

gevent+django:http://www.slideshare.net/mahendram/scaling-django-with-gevent
页: [1]
查看完整版本: python-memcached的线程安全问题