设为首页 收藏本站
查看: 1084|回复: 0

[经验分享] commons-pool与commons-pool2连接池(Hadoop连接池)

[复制链接]

尚未签到

发表于 2017-12-17 21:58:42 | 显示全部楼层 |阅读模式
  commons-pool和commons-pool2是用来建立对象池的框架,提供了一些将对象池化必须要实现的接口和一些默认动作。对象池化之后可以通过pool的概念去管理其生命周期,例如对象的创建,使用,销毁等。例如我们通常使用的连接池,连接池可以有效管理连接的数量和状态,保证连接资源的情况而且避免并发场景下连接的频繁建立和释放。
  我们这里来讲述如何使用commons-pool2来池化对象。我们以池化hadoop连接为例。
  1、先解决依赖
  

     <dependency>  <groupId>org.apache.commons</groupId>
  <artifactId>commons-pool2</artifactId>
  <version>2.3</version>
  </dependency>
  

  2、如何使用连接池
  我们是在spingboot框架中池化hadoop集群连接,先看一下池化之后的效果。
  下面是我们池化之后的hadoop集群客户端。可以看到我们可以通过连接池的方式管理hadoo集群的链接。
  1)配置连接池
  最大连接数maxTotal
  最大空闲连接数maxIdle
  最小空闲连接数minIdle
  获取连接的最大等待时间maxWait
  可以看到传入这些配置的时候我们使用了一个config对象JHadoopPoolConfig,后面我们将说明这个config对象如何实现。
  2)管理连接池
  我们以三个函数说明了如何去连接池中申请连接,使用连接和释放链接资源。
  申请资源pool.getResource()
  释放资源pool.returnBrokenResource()和pool.returnResource()
  这里要注意的是,一定要在catch和finally中成功释放资源,不然会导致could not get a Resource from the Pool的异常
  

package com.xiaoju.dqa.jazz.hadoop.client;  

  

import org.apache.hadoop.fs.FileStatus;  

import org.slf4j.Logger;  

import org.slf4j.LoggerFactory;  

  

import java.text.SimpleDateFormat;  

import java.util.Date;  

  

  
public
class JHadoopClient {  protected final Logger logger
= LoggerFactory.getLogger(this.getClass());  

  private JHadoopPool jHadoopPool;
  private String coreResource;
  private String hdfsResource;
  private int maxTotal;
  private int maxIdle;
  private int minIdle;
  private int maxWaitMillis;
  

  public String getCoreResource() {
return coreResource;  }
  public void setCoreResource(String coreResource) {
  this.coreResource
= coreResource;  }
  public String getHdfsResource() {
return hdfsResource;  }
  public void setHdfsResource(String hdfsResource) {
  this.hdfsResource
= hdfsResource;  }
  public int getMaxTotal() {
return maxTotal;  }
  public void setMaxTotal(int maxTotal) {
  this.maxTotal
= maxTotal;  }
  public int getMaxIdle() {
return maxIdle;  }
  public void setMaxIdle(int maxIdle) {
  this.maxIdle
= maxIdle;  }
  public int getMaxWaitMillis() {
return maxWaitMillis;  }
  public void setMaxWaitMillis(int maxWaitMillis) {
  this.maxWaitMillis
= maxWaitMillis;  }
  public int getMinIdle() {
return minIdle;  }
  public void setMinIdle(int minIdle) {
  this.minIdle
= minIdle;  }
  

  public void init() {
try {  JHadoopPoolConfig conf
= new JHadoopPoolConfig();  conf.setMaxTotal(maxTotal);
  conf.setMaxIdle(maxIdle);
  conf.setMinIdle(minIdle);
  conf.setMaxWaitMillis(maxWaitMillis);
  jHadoopPool
= new JHadoopPool(conf, coreResource, hdfsResource);  logger.info(
"[HDFS]初始化JHadoopClient成功");  } catch (Exception ex) {
  logger.error(
"[HDFS]初始化JHadoopClient失败", ex);  }
  }
  

  public void stop() {
try {  jHadoopPool.destroy();
  } catch(Exception e) {
  }
  }
  

  public boolean exists(String path) throws Exception {
  JHadoop jHadoop
= null;  boolean broken
= false;try {  jHadoop
= jHadoopPool.getResource();return jHadoop.exists(path);  } catch (Exception e) {
  broken
= true;  jHadoopPool.returnBrokenResource(jHadoop);
  logger.error(
"[HDFS]判断文件是否存在失败", e);  throw e;
  }
finally {if (null != jHadoop && !broken) {  jHadoopPool.returnResource(jHadoop);
  }
  }
  }
  

  public String getModificationTime(String path) throws Exception {
  JHadoop jHadoop
= null;  boolean broken
= false;try {  jHadoop
= jHadoopPool.getResource();  FileStatus fileStatus
= jHadoop.getFileStatus(path);  long modifyTimestamp
= fileStatus.getModificationTime();  SimpleDateFormat simpleDateFormat
= new SimpleDateFormat("yyyyMMddHHmmss");  Date date
= new Date(modifyTimestamp);return simpleDateFormat.format(date);  } catch (Exception e) {
  broken
= true;  jHadoopPool.returnBrokenResource(jHadoop);
  logger.error(
"[HDFS]获取最近修改时间失败", e);  throw e;
  }
finally {if (null != jHadoop && !broken) {  jHadoopPool.returnResource(jHadoop);
  }
  }
  }
  

  public long getPathSize(String path) throws Exception {
  JHadoop jHadoop
= null;  boolean broken
= false;try {  jHadoop
= jHadoopPool.getResource();return jHadoop.getContentSummary(path).getLength();  } catch (Exception e) {
  broken
= true;  jHadoopPool.returnBrokenResource(jHadoop);
  logger.error(
"[HDFS]获取路径大小失败", e);  throw e;
  }
finally {if (null != jHadoop && !broken) {  jHadoopPool.returnResource(jHadoop);
  }
  }
  }
  

  
}
  

  3)注册成bean
  通过配置文件传入链接池相应的配置。
  

package com.xiaoju.dqa.jazz.hadoop.configuration;  

  

import com.xiaoju.dqa.jazz.hadoop.client.JHadoopClient;  

import org.springframework.beans.factory.annotation.Value;  

import org.springframework.context.annotation.Bean;  

import org.springframework.context.annotation.Configuration;  

  
@Configuration
  
public
class HadoopConfig {  

  @Value(
"${hadoop.core.resource}")  private String coreResource;
  @Value(
"${hadoop.hdfs.resource}")  private String hdfsResource;
  @Value(
"${hadoop.pool.maxTotal}")  private int maxTotal;
  @Value(
"${hadoop.pool.maxIdle}")  private int maxIdle;
  @Value(
"${hadoop.pool.minIdle}")  private int minIdle;
  @Value(
"${hadoop.pool.maxWaitMillis}")  private int maxWaitMillis;
  

  @Bean(initMethod
= "init", destroyMethod = "stop")  public JHadoopClient jHadoopClient() {
  JHadoopClient jHadoopClient
= new JHadoopClient();  jHadoopClient.setMaxTotal(maxTotal);
  jHadoopClient.setMaxIdle(maxIdle);
  jHadoopClient.setMinIdle(minIdle);
  jHadoopClient.setMaxWaitMillis(maxWaitMillis);
  jHadoopClient.setCoreResource(coreResource);
  jHadoopClient.setHdfsResource(hdfsResource);
return jHadoopClient;  }
  
}
  

  4)config对象如何实现
  我们这里要说明一下下面这些参数的含义:
  1)setTestWhileConfig - 在空闲时检查有效性, 默认false
  2)setMinEvictableIdleTimeMillis - 逐出连接的最小空闲时间
  3)setTimeBetweenEvictionRunsMillis - 逐出扫描的时间间隔(毫秒) 如果为负数则不运行逐出线程,默认-1

4)setNumTestsPerEvictionRun - 每次逐出检查时 逐出的最大数目  

package com.xiaoju.dqa.jazz.hadoop.client;  

  

  

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;  

  

  
public
class JHadoopPoolConfig extends GenericObjectPoolConfig {  public JHadoopPoolConfig() {
  this.setTestWhileIdle(true);
  this.setMinEvictableIdleTimeMillis(
60000L);  this.setTimeBetweenEvictionRunsMillis(
30000L);  this.setNumTestsPerEvictionRun(
-1);  }
  
}
  

  3、连接池JHadoopPool
  这个类继承了Pool<JHadoop>,用来初始化连接池对象。而JHadoop是Pool要管理的连接对象。
  可以看到JHadoopPool在初始化的时候传入了一个JHadoopFactory的实例。这个实例将会以工厂模式来创建实际的JHadoop
  JHadoopPool代码
  

package com.xiaoju.dqa.jazz.hadoop.client;  

  

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;  

  
public
class JHadoopPool extends Pool<JHadoop> {  public JHadoopPool(GenericObjectPoolConfig poolConfig, String coreResource, String hdfsResource) {
  super(poolConfig, new JHadoopFactory(coreResource, hdfsResource));
  }
  

  public JHadoopPool(GenericObjectPoolConfig poolConfig) {
  super(poolConfig, new JHadoopFactory());
  }
  

  
}
  

  JHadoop代码
  JHadoop实现了hadoop.fs中的方法调用。
  我这里只给出了几个函数的简单封装,你可以根据具体的需要进行增加。
  

package com.xiaoju.dqa.jazz.hadoop.client;  

  

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.fs.ContentSummary;  

import org.apache.hadoop.fs.FileStatus;  

import org.apache.hadoop.fs.FileSystem;  

import org.apache.hadoop.fs.Path;  

import org.slf4j.Logger;  

import org.slf4j.LoggerFactory;  

  

import java.io.IOException;  

  

  
public
class JHadoop {  protected final Logger logger
= LoggerFactory.getLogger(this.getClass());  private FileSystem fs;
  private String coreResource;
  private String hdfsResource;
  

  public JHadoop(String coreResource, String hdfsResource) {
  this.coreResource
= coreResource;  this.hdfsResource
= hdfsResource;  }
  

  public String getCoreResource() {
return coreResource;  }
  public void setCoreResource(String coreResource) {
  this.coreResource
= coreResource;  }
  public String getHdfsResource() {
return hdfsResource;  }
  public void setHdfsResource(String hdfsResource) {
  this.hdfsResource
= hdfsResource;  }
  

  

  public void open() {
try {  Configuration conf
= new Configuration();  conf.addResource(coreResource);
  conf.addResource(hdfsResource);
  fs
= FileSystem.get(conf);  logger.info(
"[JHadoop]创建实例成功");  } catch (Exception e) {
  logger.error(
"[JHadoop]创建实例失败", e);  }
  }
  

  public void close() {
try {if (null != fs) {  fs.close();
  logger.info(
"[JHadoop]关闭实例成功");  }
  } catch(Exception e) {
  logger.error(
"[JHadoop]关闭实例失败", e);  }
  }
  

  public boolean isConnected() throws IOException {
  fs.exists(new Path(
"/forTest"));return true;  }
  

  public boolean exists(String path) throws IOException {
  Path hdfsPath
= new Path(path);return fs.exists(hdfsPath);  }
  

  public FileStatus getFileStatus(String path) throws IOException {
  Path hdfsPath
= new Path(path);return fs.getFileStatus(hdfsPath);  }
  

  public ContentSummary getContentSummary(String path) throws IOException {
  ContentSummary contentSummary
= null;  Path hdfsPath
= new Path(path);if (fs.exists(hdfsPath)) {  contentSummary
= fs.getContentSummary(hdfsPath);  }
return contentSummary;  }
  

  
}
  

  4、连接工厂类JHadoopFactory
  JHadoopFactory这个类管理着连接对象的创建,销毁,验证等动作
  

package com.xiaoju.dqa.jazz.hadoop.client;  

  

import org.apache.commons.pool2.PooledObject;  

import org.apache.commons.pool2.PooledObjectFactory;  

import org.apache.commons.pool2.impl.DefaultPooledObject;  

  

  
public
class JHadoopFactory implements PooledObjectFactory<JHadoop> {  private final String coreResource;
  private final String hdfsResource;
  

  public JHadoopFactory() {
  this.coreResource
= "core-site.xml";  this.hdfsResource
= "hdfs-site.xml";  }
  public JHadoopFactory(String coreResource, String hdfsResource) {
  this.coreResource
= coreResource;  this.hdfsResource
= hdfsResource;  }
  

  @Override
  public PooledObject
<JHadoop> makeObject() throws Exception {  JHadoop jHadoop
= new JHadoop(coreResource, hdfsResource);  jHadoop.open();
return new DefaultPooledObject<JHadoop>(jHadoop);  }
  

  @Override
  public void destroyObject(PooledObject
<JHadoop> pooledJHadoop) throws Exception {  JHadoop jHadoop
= pooledJHadoop.getObject();  jHadoop.close();
  }
  

  @Override
  public boolean validateObject(PooledObject
<JHadoop> pooledJHadoop) {  JHadoop jHadoop
= pooledJHadoop.getObject();try {return jHadoop.isConnected();  } catch (Exception e) {
return false;  }
  }
  

  @Override
  public void activateObject(PooledObject
<JHadoop> pooledObject) throws Exception {  

  }
  

  @Override
  public void passivateObject(PooledObject
<JHadoop> pooledObject) throws Exception {  

  }
  
}
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425178-1-1.html 上篇帖子: Apache Hadoop集群安装(NameNode HA + SPARK + 机架感知) 下篇帖子: 主流开源SQL(on Hadoop)总结
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表