commons-pool和commons-pool2是用来建立对象池的框架,提供了一些将对象池化必须要实现的接口和一些默认动作。对象池化之后可以通过pool的概念去管理其生命周期,例如对象的创建,使用,销毁等。例如我们通常使用的连接池,连接池可以有效管理连接的数量和状态,保证连接资源的情况而且避免并发场景下连接的频繁建立和释放。
我们这里来讲述如何使用commons-pool2来池化对象。我们以池化hadoop连接为例。
1、先解决依赖
<dependency> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.3</version>
</dependency>
2、如何使用连接池
我们是在spingboot框架中池化hadoop集群连接,先看一下池化之后的效果。
下面是我们池化之后的hadoop集群客户端。可以看到我们可以通过连接池的方式管理hadoo集群的链接。
1)配置连接池
最大连接数maxTotal
最大空闲连接数maxIdle
最小空闲连接数minIdle
获取连接的最大等待时间maxWait
可以看到传入这些配置的时候我们使用了一个config对象JHadoopPoolConfig,后面我们将说明这个config对象如何实现。
2)管理连接池
我们以三个函数说明了如何去连接池中申请连接,使用连接和释放链接资源。
申请资源pool.getResource()
释放资源pool.returnBrokenResource()和pool.returnResource()
这里要注意的是,一定要在catch和finally中成功释放资源,不然会导致could not get a Resource from the Pool的异常
package com.xiaoju.dqa.jazz.hadoop.client;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
public
class JHadoopClient { protected final Logger logger
= LoggerFactory.getLogger(this.getClass());
private JHadoopPool jHadoopPool;
private String coreResource;
private String hdfsResource;
private int maxTotal;
private int maxIdle;
private int minIdle;
private int maxWaitMillis;
public String getCoreResource() {
return coreResource; }
public void setCoreResource(String coreResource) {
this.coreResource
= coreResource; }
public String getHdfsResource() {
return hdfsResource; }
public void setHdfsResource(String hdfsResource) {
this.hdfsResource
= hdfsResource; }
public int getMaxTotal() {
return maxTotal; }
public void setMaxTotal(int maxTotal) {
this.maxTotal
= maxTotal; }
public int getMaxIdle() {
return maxIdle; }
public void setMaxIdle(int maxIdle) {
this.maxIdle
= maxIdle; }
public int getMaxWaitMillis() {
return maxWaitMillis; }
public void setMaxWaitMillis(int maxWaitMillis) {
this.maxWaitMillis
= maxWaitMillis; }
public int getMinIdle() {
return minIdle; }
public void setMinIdle(int minIdle) {
this.minIdle
= minIdle; }
public void init() {
try { JHadoopPoolConfig conf
= new JHadoopPoolConfig(); conf.setMaxTotal(maxTotal);
conf.setMaxIdle(maxIdle);
conf.setMinIdle(minIdle);
conf.setMaxWaitMillis(maxWaitMillis);
jHadoopPool
= new JHadoopPool(conf, coreResource, hdfsResource); logger.info(
"[HDFS]初始化JHadoopClient成功"); } catch (Exception ex) {
logger.error(
"[HDFS]初始化JHadoopClient失败", ex); }
}
public void stop() {
try { jHadoopPool.destroy();
} catch(Exception e) {
}
}
public boolean exists(String path) throws Exception {
JHadoop jHadoop
= null; boolean broken
= false;try { jHadoop
= jHadoopPool.getResource();return jHadoop.exists(path); } catch (Exception e) {
broken
= true; jHadoopPool.returnBrokenResource(jHadoop);
logger.error(
"[HDFS]判断文件是否存在失败", e); throw e;
}
finally {if (null != jHadoop && !broken) { jHadoopPool.returnResource(jHadoop);
}
}
}
public String getModificationTime(String path) throws Exception {
JHadoop jHadoop
= null; boolean broken
= false;try { jHadoop
= jHadoopPool.getResource(); FileStatus fileStatus
= jHadoop.getFileStatus(path); long modifyTimestamp
= fileStatus.getModificationTime(); SimpleDateFormat simpleDateFormat
= new SimpleDateFormat("yyyyMMddHHmmss"); Date date
= new Date(modifyTimestamp);return simpleDateFormat.format(date); } catch (Exception e) {
broken
= true; jHadoopPool.returnBrokenResource(jHadoop);
logger.error(
"[HDFS]获取最近修改时间失败", e); throw e;
}
finally {if (null != jHadoop && !broken) { jHadoopPool.returnResource(jHadoop);
}
}
}
public long getPathSize(String path) throws Exception {
JHadoop jHadoop
= null; boolean broken
= false;try { jHadoop
= jHadoopPool.getResource();return jHadoop.getContentSummary(path).getLength(); } catch (Exception e) {
broken
= true; jHadoopPool.returnBrokenResource(jHadoop);
logger.error(
"[HDFS]获取路径大小失败", e); throw e;
}
finally {if (null != jHadoop && !broken) { jHadoopPool.returnResource(jHadoop);
}
}
}
}
3)注册成bean
通过配置文件传入链接池相应的配置。
package com.xiaoju.dqa.jazz.hadoop.configuration;
import com.xiaoju.dqa.jazz.hadoop.client.JHadoopClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public
class HadoopConfig {
@Value(
"${hadoop.core.resource}") private String coreResource;
@Value(
"${hadoop.hdfs.resource}") private String hdfsResource;
@Value(
"${hadoop.pool.maxTotal}") private int maxTotal;
@Value(
"${hadoop.pool.maxIdle}") private int maxIdle;
@Value(
"${hadoop.pool.minIdle}") private int minIdle;
@Value(
"${hadoop.pool.maxWaitMillis}") private int maxWaitMillis;
@Bean(initMethod
= "init", destroyMethod = "stop") public JHadoopClient jHadoopClient() {
JHadoopClient jHadoopClient
= new JHadoopClient(); jHadoopClient.setMaxTotal(maxTotal);
jHadoopClient.setMaxIdle(maxIdle);
jHadoopClient.setMinIdle(minIdle);
jHadoopClient.setMaxWaitMillis(maxWaitMillis);
jHadoopClient.setCoreResource(coreResource);
jHadoopClient.setHdfsResource(hdfsResource);
return jHadoopClient; }
}
4)config对象如何实现
我们这里要说明一下下面这些参数的含义:
1)setTestWhileConfig - 在空闲时检查有效性, 默认false
2)setMinEvictableIdleTimeMillis - 逐出连接的最小空闲时间
3)setTimeBetweenEvictionRunsMillis - 逐出扫描的时间间隔(毫秒) 如果为负数则不运行逐出线程,默认-1
4)setNumTestsPerEvictionRun - 每次逐出检查时 逐出的最大数目
package com.xiaoju.dqa.jazz.hadoop.client;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
public
class JHadoopPoolConfig extends GenericObjectPoolConfig { public JHadoopPoolConfig() {
this.setTestWhileIdle(true);
this.setMinEvictableIdleTimeMillis(
60000L); this.setTimeBetweenEvictionRunsMillis(
30000L); this.setNumTestsPerEvictionRun(
-1); }
}
3、连接池JHadoopPool
这个类继承了Pool<JHadoop>,用来初始化连接池对象。而JHadoop是Pool要管理的连接对象。
可以看到JHadoopPool在初始化的时候传入了一个JHadoopFactory的实例。这个实例将会以工厂模式来创建实际的JHadoop
JHadoopPool代码
package com.xiaoju.dqa.jazz.hadoop.client;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
public
class JHadoopPool extends Pool<JHadoop> { public JHadoopPool(GenericObjectPoolConfig poolConfig, String coreResource, String hdfsResource) {
super(poolConfig, new JHadoopFactory(coreResource, hdfsResource));
}
public JHadoopPool(GenericObjectPoolConfig poolConfig) {
super(poolConfig, new JHadoopFactory());
}
}
JHadoop代码
JHadoop实现了hadoop.fs中的方法调用。
我这里只给出了几个函数的简单封装,你可以根据具体的需要进行增加。
package com.xiaoju.dqa.jazz.hadoop.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public
class JHadoop { protected final Logger logger
= LoggerFactory.getLogger(this.getClass()); private FileSystem fs;
private String coreResource;
private String hdfsResource;
public JHadoop(String coreResource, String hdfsResource) {
this.coreResource
= coreResource; this.hdfsResource
= hdfsResource; }
public String getCoreResource() {
return coreResource; }
public void setCoreResource(String coreResource) {
this.coreResource
= coreResource; }
public String getHdfsResource() {
return hdfsResource; }
public void setHdfsResource(String hdfsResource) {
this.hdfsResource
= hdfsResource; }
public void open() {
try { Configuration conf
= new Configuration(); conf.addResource(coreResource);
conf.addResource(hdfsResource);
fs
= FileSystem.get(conf); logger.info(
"[JHadoop]创建实例成功"); } catch (Exception e) {
logger.error(
"[JHadoop]创建实例失败", e); }
}
public void close() {
try {if (null != fs) { fs.close();
logger.info(
"[JHadoop]关闭实例成功"); }
} catch(Exception e) {
logger.error(
"[JHadoop]关闭实例失败", e); }
}
public boolean isConnected() throws IOException {
fs.exists(new Path(
"/forTest"));return true; }
public boolean exists(String path) throws IOException {
Path hdfsPath
= new Path(path);return fs.exists(hdfsPath); }
public FileStatus getFileStatus(String path) throws IOException {
Path hdfsPath
= new Path(path);return fs.getFileStatus(hdfsPath); }
public ContentSummary getContentSummary(String path) throws IOException {
ContentSummary contentSummary
= null; Path hdfsPath
= new Path(path);if (fs.exists(hdfsPath)) { contentSummary
= fs.getContentSummary(hdfsPath); }
return contentSummary; }
}
4、连接工厂类JHadoopFactory
JHadoopFactory这个类管理着连接对象的创建,销毁,验证等动作
package com.xiaoju.dqa.jazz.hadoop.client;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
public
class JHadoopFactory implements PooledObjectFactory<JHadoop> { private final String coreResource;
private final String hdfsResource;
public JHadoopFactory() {
this.coreResource
= "core-site.xml"; this.hdfsResource
= "hdfs-site.xml"; }
public JHadoopFactory(String coreResource, String hdfsResource) {
this.coreResource
= coreResource; this.hdfsResource
= hdfsResource; }
@Override
public PooledObject
<JHadoop> makeObject() throws Exception { JHadoop jHadoop
= new JHadoop(coreResource, hdfsResource); jHadoop.open();
return new DefaultPooledObject<JHadoop>(jHadoop); }
@Override
public void destroyObject(PooledObject
<JHadoop> pooledJHadoop) throws Exception { JHadoop jHadoop
= pooledJHadoop.getObject(); jHadoop.close();
}
@Override
public boolean validateObject(PooledObject
<JHadoop> pooledJHadoop) { JHadoop jHadoop
= pooledJHadoop.getObject();try {return jHadoop.isConnected(); } catch (Exception e) {
return false; }
}
@Override
public void activateObject(PooledObject
<JHadoop> pooledObject) throws Exception {
}
@Override
public void passivateObject(PooledObject
<JHadoop> pooledObject) throws Exception {
}
}
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com