设为首页 收藏本站
查看: 1530|回复: 0

[经验分享] python socketpool:通用连接池

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-11-30 13:47:28 | 显示全部楼层 |阅读模式
简介
  在软件开发中经常要管理各种“连接”资源,通常我们会使用对应的连接池来管理,比如mysql数据库连接可以用sqlalchemy中的池来管理,thrift连接可以通过thriftpool管理,redis-py中的StrictRedis实现本身就是基于连接池的,等等。 而今天介绍的socketpool是一个通用的python连接池库,通过它可以实现任意类型连接的管理,虽然不是很完美,但在一些找不到合适连接池实现、而又不想自己造轮子的时候使用起来会节省很多精力。

内部实现要点


  • 这个类库的代码其实并不是特别的漂亮,但结构设计的不错,关键留下了对拓展开放的钩子,能让使用者根据自己的需要定制自己的连接池
  • 内部主要的组件有ConnectionPool,Connector和backend_mod三个

    • ConnectionPool实现了一个连接池的通用逻辑,用一个优先级队列管理所有连接,另外支持connection的生命周期定制,有一个reap机制(可选),基本思想是每个conn有一个最大生命周期,比如600秒,过了这个时间,就必须回收掉,reap线程(也有可能是greenlet或eventlet)定期检查过期的conn并进行回收
    • Connector是一个接口,它可以看做是一个制造conn的工厂,ConnectionPool在需要新建conn的时候,会通过这个工厂来生成conn。所以我们只要实现Connector的接口方法就可以定制一个自己的连接工厂
    • backend_mod是为了支持不同的线程模型(比如python原生线程,gevent或者eventlet)抽象出来的后端模块,它统一封装了Socket, PriorityQueue, Semaphore等和并发模型相关的组件,在创造ConnectionPool对象时可以通过参数控制选用哪种backend


部分代码阅读
  ConnectionPool的初始化函数



     def __init__(self, factory,
retry_max=3, retry_delay=.1,
timeout=-1, max_lifetime=600.,
max_size=10, options=None,
reap_connections=True, reap_delay=1,
backend="thread"):
if isinstance(backend, str):
self.backend_mod = load_backend(backend)
self.backend = backend
else:
self.backend_mod = backend
self.backend = str(getattr(backend, '__name__', backend))
self.max_size = max_size
self.pool = getattr(self.backend_mod, 'PriorityQueue')()
self._free_conns = 0
self.factory = factory
self.retry_max = retry_max
self.retry_delay = retry_delay
self.timeout = timeout
self.max_lifetime = max_lifetime
if options is None:
self.options = {"backend_mod": self.backend_mod,
"pool": self}
else:
self.options = options
self.options["backend_mod"] = self.backend_mod
self.options["pool"] = self
# bounded semaphore to make self._alive 'safe'
self._sem = self.backend_mod.Semaphore(1)
self._reaper = None
if reap_connections:
self.reap_delay = reap_delay
self.start_reaper()

  这里几个参数的意义:


  • factory是类对象,需要实现Connector接口,用来生成conn,options是调用factory时传入的参数
  • retry_max是获取conn时如果出错最多重试几次
  • max_lifetime是规定每个conn最大生命时间,见上面说的reap机制
  • max_size是这个pool的大小上限
  • backend是线程模型
  • reap_connections控制是否启用reap机制
  被启动的reap就是一个单独的线程,定时调用下面的方法把过期的conn回收掉:



     def murder_connections(self):
current_pool_size = self.pool.qsize()
if current_pool_size > 0:
for priority, candidate in self.pool:
current_pool_size -= 1
if not self.too_old(candidate):
self.pool.put((priority, candidate))
else:
self._reap_connection(candidate)
if current_pool_size <= 0:
break
  
  _reap_connection最终会回调conn对象的invalidate方法(Connector的接口)进行销毁。每次使用完conn后会调用release_connection, 它的逻辑是



     def release_connection(self, conn):
if self._reaper is not None:
self._reaper.ensure_started()
with self._sem:
if self.pool.qsize() < self.max_size:
connected = conn.is_connected()
if connected and not self.too_old(conn):
self.pool.put((conn.get_lifetime(), conn))
else:
self._reap_connection(conn)
else:
self._reap_connection(conn)
  如果连接还没过期或断开,就会被重新放入优先级队列中,用户可以通过实现Connector接口的get_lifetime来控制这里放回的conn的优先级,priority最小的conn下次会被优先取出
  Connector定义了哪些接口呢?



class Connector(object):
def matches(self, **match_options):
raise NotImplementedError()
def is_connected(self):
raise NotImplementedError()
def handle_exception(self, exception):
raise NotImplementedError()
def get_lifetime(self):
raise NotImplementedError()
def invalidate(self):
raise NotImplementedError()
  matches方法主要用在pool取出一个conn时,除了优先选择priority最小的conn,还需要这个conn和get(**options)传入的参数match,这个match就是回调conn的matches方法。其他几个接口前面都涉及到了。

TcpConnector实现
  来看一下socketpool自带的TcpConnector的实现,实现tcp socket的工厂



class TcpConnector(Connector):
def __init__(self, host, port, backend_mod, pool=None):
self._s = backend_mod.Socket(socket.AF_INET, socket.SOCK_STREAM)
self._s.connect((host, port))
self.host = host
self.port = port
self.backend_mod = backend_mod
self._connected = True
# use a 'jiggle' value to make sure there is some
# randomization to expiry, to avoid many conns expiring very
# closely together.
self._life = time.time() - random.randint(0, 10)
self._pool = pool
def __del__(self):
self.release()
def matches(self, **match_options):
target_host = match_options.get('host')
target_port = match_options.get('port')
return target_host == self.host and target_port == self.port
def is_connected(self):
if self._connected:
return util.is_connected(self._s)
return False
def handle_exception(self, exception):
print('got an exception')
print(str(exception))
def get_lifetime(self):
return self._life
def invalidate(self):
self._s.close()
self._connected = False
self._life = -1
def release(self):
if self._pool is not None:
if self._connected:
self._pool.release_connection(self)
else:
self._pool = None
def send(self, data):
return self._s.send(data)
def recv(self, size=1024):
return self._s.recv(size)
  
  不需要太多额外解释。
  

拓展实现HiveConnector
  根据自身项目需要,我用pyhs2实现了一个hive连接池



class HiveConnector(Connector):
def __init__(self, host, port, backend_mod, pool=None, authMechanism='NOSASL',
**options):
self.host = host
self.port = port
self.backend_mod = backend_mod
self._pool = pool
self._connected = False
self._conn = pyhs2.connect(host=host,
port=port,
authMechanism=authMechanism
)
self._connected = True
# use a 'jiggle' value to make sure there is some
# randomization to expiry, to avoid many conns expiring very
# closely together.
self._life = time.time() - random.randint(0, 10)
def __del__(self):
self.release()
def matches(self, **match_options):
target_host = match_options.get('host')
target_port = match_options.get('port')
return target_host == self.host and target_port == self.port
def is_connected(self):
return self._connected
def handle_exception(self, exception):
logger.exception("error: %s" % str(exception))
def get_lifetime(self):
return self._life
def invalidate(self):
try:
self._conn.close()
except:
pass
finally:
self._connected = False
self._life = -1
def release(self):
if self._pool is not None:
if self._connected:
self._pool.release_connection(self)
else:
self._pool = None
def cursor(self):
return self._conn.cursor()
def execute(self, hql):
with self.curosr() as cur:
return cur.execute(hql)
hive_pool = ConnectionPool(factory=HiveConnector, **HIVE_CONNECTOR_CONFIG)
  使用这个hive_pool去执行hql语句非常容易:



     with hive_pool.connection() as conn:
with conn.cursor() as cur:
print cur.getDatabases()
总结
  简绍了socketpool的内部实现,以及如何使用它构造自己的连接池。
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-145420-1-1.html 上篇帖子: python之路第四篇(基础篇) 下篇帖子: Python 参数设置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表