设为首页 收藏本站
查看: 869|回复: 0

[经验分享] MemCache客户端实现

[复制链接]

尚未签到

发表于 2017-4-15 13:47:55 | 显示全部楼层 |阅读模式
  不记得什么时候从网上下的,看着还不错,分享一下...
  1~Cache接口:

package com.yx.cache;
/**
* @功能: 缓存接口 ,提供对缓存的增删改查 四类操作
* @作者: smile
* @时间: 2013-5-2
*/
public interface Cache<T> {
/**
* 获取缓存中的数据
*
* @param key
* @return
*/
T get(String key);
/**
* 把数据放入缓存 如果存在与key对应的值,则返回失败
*
* @param key
* @param value
* @return
*/
boolean add(String key, T value);
/**
* 把数据放入缓存 如果存在与key对应的值,则覆盖原有的值
*
* @param key
* @param value
* @return
*/
boolean set(String key, T value);
/**
* 缓存更新 如果不存在与key对应的缓存值,则不更新
*
* @param key
* @param value
* @return
*/
boolean update(String key, T value);
/**
* 删除缓存
*
* @param key
* @return
*/
boolean delete(String key);
}
  2~Cache工厂,创建不同缓存策略的Cache对象:

package com.yx.cache;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.danga.MemCached.MemCachedClient;
import com.danga.MemCached.SockIOPool;
import com.yx.cache.util.ConfigUtil;
/**
* @功能: 缓存工厂
* @作者: smile
* @时间: 2013-5-2
*/
public class CacheFactory {
//MemCache客户端对象
private static MemCachedClient memCachedClient = null;
@SuppressWarnings("rawtypes")
//每个类对应的Cache对象
private static final Map<String, Cache> map = new ConcurrentHashMap<String, Cache>();
static {
// 获取服务器ip地址
String serverStr = ConfigUtil.getConfigValue("servers", "");
List<String> servers = new ArrayList<String>();
for (String s : serverStr.split(",")) {
s = s.trim();
if (!"".equals(s)) {
servers.add(s);
}
}
if (servers.size() < 1) {
throw new RuntimeException("cache 初始化失败!");
}
SockIOPool pool = SockIOPool.getInstance();
pool.setServers(servers.toArray(new String[] {}));
pool.setFailover(Boolean.valueOf(ConfigUtil.getConfigValue("failover", "true")));
pool.setInitConn(Integer.valueOf(ConfigUtil.getConfigValue("initConn", "100")));
pool.setMinConn(Integer.valueOf(ConfigUtil.getConfigValue("minConn", "25")));
pool.setMaxConn(Integer.valueOf(ConfigUtil.getConfigValue("maxConn", "250")));
pool.setMaintSleep(Integer.valueOf(ConfigUtil.getConfigValue("maintSleep", "30")));
pool.setNagle(Boolean.valueOf(ConfigUtil.getConfigValue("nagle", "false")));// 关闭nagle算法
pool.setSocketTO(Integer.valueOf(ConfigUtil.getConfigValue("socketTO", "3000")));
pool.setAliveCheck(Boolean.valueOf(ConfigUtil.getConfigValue("aliveCheck", "true")));
pool.setHashingAlg(Integer.valueOf(ConfigUtil.getConfigValue("hashingAlg", "0")));
pool.setSocketConnectTO(Integer.valueOf(ConfigUtil.getConfigValue("socketConnectTO", "3000")));
//服务器权重
String wStr = ConfigUtil.getConfigValue("weights", "");
List<Integer> weights = new ArrayList<Integer>();
for (String s : wStr.split(",")) {
s = s.trim();
if (!"".equals(s)) {
weights.add(Integer.valueOf(s));
}
}
if (weights.size() == servers.size()) {
pool.setWeights(weights.toArray(new Integer[] {}));
}
pool.initialize();
memCachedClient = new MemCachedClient();
}
/**
* @功能:集中式缓存
*/
public static <T> Cache<T> getCommonCache(Class<T> t) {
Cache<T> cache = map.get(t.getName());
if (cache == null) {
cache = createCommonCache(t);
}
return cache;
}
/**
* @功能:分布式缓存
*/
public static <T> Cache<T> getClusterCache(Class<T> t) {
//key值是以 i- 开头的
Cache<T> cache = map.get("i-" + t.getName());
if (cache == null) {
cache = createClusterCache(t);
}
return cache;
}
/**
* @功能:集中式缓存  保证并发条件下线程安全
*/
private static synchronized <T> Cache<T> createCommonCache(Class<T> t) {
//key值直接使用类名
Cache<T> cache = map.get(t.getName());
if (cache == null) {
cache = new CommonCache<T>(t, memCachedClient);
map.put(t.getName(), cache);
}
return cache;
}
/**
* @功能:分布式缓存  保证并发条件下线程安全
*/
private static synchronized <T> Cache<T> createClusterCache(Class<T> t) {
Cache<T> cache = map.get(t.getName());
if (cache == null) {
cache = new ClusterCache<T>(t, memCachedClient);
map.put(t.getName(), cache);
}
return cache;
}
}
  3~集中式策略缓存:

package com.yx.cache;
import com.danga.MemCached.MemCachedClient;
/**
* @功能: 集中式缓存
* @作者: smile
* @时间: 2013-5-2
*/
public class CommonCache<T> implements Cache<T> {
private static MemCachedClient memCachedClient = null;
// key值拼接头字符
private String base = null;
CommonCache(Class<T> t, MemCachedClient client) {
memCachedClient = client;
base = t.getSimpleName() + "-";
}
public T get(String key) {
return (T) memCachedClient.get(base + key);
}
public boolean set(String key, T value) {
return memCachedClient.set(base + key, value);
}
@Override
public boolean update(String key, T value) {
return memCachedClient.replace(base + key, value);
}
@Override
public boolean delete(String key) {
return memCachedClient.delete(base + key);
}
@Override
public boolean add(String key, T value) {
return memCachedClient.add(base + key, value);
}
}
  4~分布式策略缓存:

package com.yx.cache;
import com.danga.MemCached.MemCachedClient;
import com.schooner.MemCached.SchoonerSockIOPool;
import com.yx.cache.util.HashCodeUtil;
import com.yx.task.ThreadPoolManager;
/**
* @功能: 分布式缓存
* @作者: smile
* @时间: 2013-5-2
*/
public class ClusterCache<T> implements Cache<T> {
private static MemCachedClient memCachedClient = null;
// key值拼接头字符
private String base = null;
// 管理缓存的线程池
private static ThreadPoolManager taskManager = ThreadPoolManager.getInstance("cache");
private SchoonerSockIOPool pool = SchoonerSockIOPool.getInstance();
ClusterCache(Class<T> t, MemCachedClient client) {
memCachedClient = client;
base = "i-" + t.getSimpleName() + "-";
}
@Override
public T get(String key) {
T value = null;
if (key == null) {
return null;
}
key = base + key;
if (pool.getServers().length < 2) {
value = (T) memCachedClient.get(key);
} else {
int hashCode = HashCodeUtil.getHash(key);
value = (T) memCachedClient.get(key, hashCode);
if (value == null) {//第一台不存在 则继续去另一台上找
hashCode = this.getRehashCode(key, hashCode);
value = (T) memCachedClient.get(key, hashCode);
if (value != null) {// 如果在另外一台服务器上取到了缓存,则把缓存复制到第一台上去
UpdateTask task = new UpdateTask(key, value);
taskManager.submit(task);
}
}
}
return value;
}
@Override
public boolean set(String key, T value) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length < 2) {
result = memCachedClient.set(key, value);
} else {
int hashCode = HashCodeUtil.getHash(key);
result = memCachedClient.set(key, value, hashCode);
// if (result) {
hashCode = getRehashCode(key, hashCode);
memCachedClient.set(key, value, hashCode);
// }
}
return result;
}
private int getRehashCode(String key, int oldHashcode) {
String host = pool.getHost(key, oldHashcode);
int rehashTries = 0;
// if (result) {
int hashCode = HashCodeUtil.getHash(rehashTries + key);
while (host.equals(pool.getHost(key, hashCode))) {
rehashTries++;
hashCode = HashCodeUtil.getHash(rehashTries + key);
}
return hashCode;
}
@Override
public boolean update(String key, T value) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length < 2) {
result = memCachedClient.replace(key, value);
} else {
int hashCode = HashCodeUtil.getHash(key);
result = memCachedClient.replace(key, value, hashCode);//更新缓存
// if (result) {
hashCode = getRehashCode(key, hashCode);
memCachedClient.replace(key, value, hashCode);//更新备用机上缓存
// }
}
return result;
}
@Override
public boolean delete(String key) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length < 2) {
result = memCachedClient.delete(key);
} else {
int hashCode = HashCodeUtil.getHash(key);
result = memCachedClient.delete(key, hashCode, null);//删除对应机子上缓存
// if (result) {
hashCode = this.getRehashCode(key, hashCode);
memCachedClient.delete(key, hashCode, null);//删除备用机子上缓存
// }
}
return result;
}
@Override
public boolean add(String key, T value) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length < 2) {
result = memCachedClient.add(key, value);
} else {
int hashCode = HashCodeUtil.getHash(key);
result = memCachedClient.add(key, value, hashCode);
// if (result) {
hashCode = getRehashCode(key, hashCode);//根据原始哈希值再进行哈希运算
memCachedClient.add(key, value, hashCode);//再次保存  备份之用
// }
}
return result;
}

/**
* @功能: 缓存丢失的情况下 恢复缓存
* @作者: smile
* @时间: 2013-5-2
*/
static class UpdateTask implements Runnable {
private String key;
private Object value;
UpdateTask(String key, Object value) {
this.key = key;
this.value = value;
}
@Override
public void run() {
memCachedClient.set(key, value, HashCodeUtil.getHash(key));
}
}
}
  5~线程池工具类:

package com.yx.task;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @功能: 线程池管理  不同key值对应不同线程池
* @作者: smile
* @时间: 2013-5-2
*/
public class ThreadPoolManager {
private static final Map<String, ThreadPoolManager> map = new HashMap<String, ThreadPoolManager>();
final int CORE_SIZE = 5;
//线程池 初始线程为5
private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(CORE_SIZE);
//提交任务执行
public void submit(Runnable task) {
executor.submit(task);
}
public boolean finished() {
return executor.getCompletedTaskCount() == executor.getTaskCount();
}
private ThreadPoolManager() {
}
//每个key值返回一个单例对象
public static synchronized ThreadPoolManager getInstance(String key) {
ThreadPoolManager t = map.get(key);
if (t == null) {
t = new ThreadPoolManager();
map.put(key, t);
}
return t;
}
}
  6~配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<config>
<!-- server列表,(eg:192.168.1.111:18123,192.168.1.111:18124) -->
<servers>192.168.1.119:18123,192.168.1.119:18124</servers>
<!-- 权重,权重数量和servers数量相同,总和等于10.(eg:5,5)如果为空则代表平均分配 -->
<weights></weights>
<failover>true</failover><!-- 故障转移 -->
<initConn>25</initConn><!-- 初始化连接数 -->
<minConn>10</minConn><!-- 最小连接数 -->
<maxConn>50</maxConn><!-- 最大连接数 -->
<maintSleep>3000</maintSleep><!-- 守护线程的启动时间,如果要关闭守护线程设置未0 -->
<nagle>false</nagle><!-- 是否使用nagle算法 -->
<socketTO>3000</socketTO><!-- socket读超时时间 -->
<aliveCheck>true</aliveCheck><!-- 对server的可用性进行检测 -->
<hashingAlg>0</hashingAlg><!-- hash算法,0代表使用jdk的hash算法,如果要支持不同jdk请设置为 2(自带的hash算法),如果有多台cache服务器,而且考虑到动态添加cache服务器时,请设置 3 -->
<socketConnectTO>100</socketConnectTO><!-- socket链接超时时间 -->
</config>
  7~读取配置文件工具类:

package com.yx.cache.util;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
/**
* @功能: 读取配置参数
* @作者: smile
* @时间: 2013-5-2
*/
public class ConfigUtil {
private static final String CONFILE = "cacheConfig.xml";
private static final Map<String, String> map = new HashMap<String, String>();
// 类加载的时候初始化以下静态代码块
// 主要功能就是读取配置文件参数 存放在一个Map中
static {
SAXReader saxReader = new SAXReader();
InputStream ins = ConfigUtil.class.getClassLoader().getResourceAsStream(CONFILE);
try {
if (ins != null) {
Document doc = saxReader.read(ins);
Element root = doc.getRootElement();
Iterator<Element> iter = root.elementIterator();
while (iter.hasNext()) {
Element e = iter.next();
map.put(e.getName(), e.getTextTrim());
}
}
} catch (DocumentException e) {
e.printStackTrace();
throw new RuntimeException("找不到配置文件:" + CONFILE);
} finally {
try {
if (ins != null) {
ins.close();
} else {
throw new RuntimeException("找不到配置文件:" + CONFILE);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
ins = null;
}
}
}
public static String getConfigValue(String key, String defaultValue) {
String tmp = map.get(key);
return isEmpty(tmp) ? defaultValue : tmp;
}

private static boolean isEmpty(String str) {
if (str == null || "".equals(str)) {
return true;
}
return false;
}
public static void main(String[] args) {
System.out.println(map);
}
}
  8~哈希工具类:

package com.yx.cache.util;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.zip.CRC32;
import com.schooner.MemCached.SchoonerSockIOPool;
public class HashCodeUtil {
public static final int NATIVE_HASH = 0; // native String.hashCode();
public static final int OLD_COMPAT_HASH = 1; // original compatibility
public static final int NEW_COMPAT_HASH = 2; // new CRC32 based
public static final int CONSISTENT_HASH = 3; // MD5 Based -- Stops
private static int hashingAlg = SchoonerSockIOPool.getInstance().getHashingAlg();
/**
* Returns a bucket to check for a given key.
*
* @param key
*            String key cache is stored under
* @return int bucket
*/
public static final int getHash(String key) {
switch (hashingAlg) {
case NATIVE_HASH:
return key.hashCode();
case OLD_COMPAT_HASH:
return origCompatHashingAlg(key);
case NEW_COMPAT_HASH:
return newCompatHashingAlg(key);
case CONSISTENT_HASH:
return md5HashingAlg(key);
default:
// use the native hash as a default
hashingAlg = NATIVE_HASH;
return key.hashCode();
}
}
private static int origCompatHashingAlg(String key) {
int hash = 0;
char[] cArr = key.toCharArray();
for (int i = 0; i < cArr.length; ++i) {
hash = (hash * 33) + cArr;
}
return hash;
}
private static int newCompatHashingAlg(String key) {
CRC32 checksum = new CRC32();
checksum.update(key.getBytes());
int crc = (int) checksum.getValue();
return (crc >> 16) & 0x7fff;
}
private static int md5HashingAlg(String key) {
MessageDigest md5 = MD5.get();
md5.reset();
md5.update(key.getBytes());
byte[] bKey = md5.digest();
int res = ((bKey[3] & 0xFF) << 24) | ((bKey[2] & 0xFF) << 16) | ((bKey[1] & 0xFF) << 8) | (bKey[0] & 0xFF);
return res;
}
private static ThreadLocal<MessageDigest> MD5 = new ThreadLocal<MessageDigest>() {
@Override
protected final MessageDigest initialValue() {
try {
return MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(" no md5 algorythm found");
}
}
};
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-365116-1-1.html 上篇帖子: memcache分布式实现、memcache分布式的数据同步、memcache保存session数据的实现 下篇帖子: memcache java 的简单操作(一)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表