设为首页 收藏本站
查看: 1433|回复: 0

[经验分享] 代码中实际运用memcached——java

[复制链接]

尚未签到

发表于 2015-9-1 10:54:25 | 显示全部楼层 |阅读模式
  
  以下文章取自:http://jameswxx.iteye.com/blog/1168711
  
  memcached的java客户端有好几种,http://code.google.com/p/memcached/wiki/Clients 罗列了以下几种



Html代码   DSC0000.png

  • spymemcached

  •     * http://www.couchbase.org/code/couchbase/java
  •           o An improved Java API maintained by Matt Ingenthron and others at Couchbase.
  •           o Aggressively optimised, ability to run async, supports binary protocol, support Membase and Couchbase features, etc. See site for details.

  • Java memcached client

  •     * http://www.whalin.com/memcached
  •           o A Java API is maintained by Greg Whalin from Meetup.com.

  • More Java memcached clients

  •     * http://code.google.com/p/javamemcachedclient
  •     * http://code.google.com/p/memcache-client-forjava
  •     * http://code.google.com/p/xmemcached

  • Integrations

  •     * http://code.google.com/p/simple-spring-memcached
  •     * http://code.google.com/p/memcached-session-manager
  
  
  
我看的是第二个:Java memcached client源码,代码很简洁,一共只有9个类,最主要的有以下三个
MemcachedClient.java     客户端,负责提供外出程序接口,如get/set方法等等
  SockIOPool.java          一个自平衡的连接池
NativeHandler.java       负责部分数据类型的序列化
  
  它包含以下几个部分
1:key的服务端分布
2:数据序列化和压缩
3:连接池(连接方式和池的动态自动平衡)
4:failover和failback机制
5:和memcached服务器的通讯协议
关于这几个点,我从key的set/get说起,会贯穿上面列举的4个部分。这个文章写下来,本来是作为一个笔记,思维比较跳跃,可能不是很连贯,如有疑问,欢迎站内交流。这个client的代码
  很简洁明了,我也没有加过多注释,只是理了一个脉络。
  
  
  
从客户端自带的测试代码开始



Java代码  

  • package com.meetup.memcached.test;  
  • import com.meetup.memcached.*;  
  • import org.apache.log4j.*;  

  • public class TestMemcached  {   
  •     public static void main(String[] args) {  
  •         BasicConfigurator.configure();
  •         String[] servers = { "127.0.0.1:12000"};  
  •         SockIOPool pool = SockIOPool.getInstance();
  •         pool.setServers( servers );
  •         pool.setFailover( true );//故障转移  
  •      pool.setInitConn( 10 ); //初始化连接为10  
  •         pool.setMinConn( 5 );//最小连接为5  
  •         pool.setMaxConn( 250 );//最大连接为250  
  •         pool.setMaintSleep( 30 );//平衡线程休眠时间为30ms  
  •         pool.setNagle( false );//Nagle标志为false  
  •         pool.setSocketTO( 3000 );//响应超时时间为3000ms  
  •         pool.setAliveCheck( true );//需要可用状态检查  
  •      //初始化连接池,默认名称为"default"  
  •         pool.initialize();
  •         //新建一个memcached客户端,如果没有给名字  
  •      MemcachedClient mcc = new MemcachedClient();  

  •         // turn off most memcached client logging:  
  •         com.meetup.memcached.Logger.getLogger( MemcachedClient.class.getName() ).setLevel( com.meetup.memcached.Logger.LEVEL_WARN );  

  •         for ( int i = 0; i < 10; i++ ) {  
  •             boolean success = mcc.set( "" + i, "Hello!" );  
  •             String result = (String)mcc.get( "" + i );  
  •             System.out.println( String.format( "set( %d ): %s", i, success ) );  
  •             System.out.println( String.format( "get( %d ): %s", i, result ) );  
  •         }

  •         System.out.println( "\n\t -- sleeping --\n" );  
  •         try { Thread.sleep( 10000 ); } catch ( Exception ex ) { }  

  •         for ( int i = 0; i < 10; i++ ) {  
  •             boolean success = mcc.set( "" + i, "Hello!" );  
  •             String result = (String)mcc.get( "" + i );  
  •             System.out.println( String.format( "set( %d ): %s", i, success ) );  
  •             System.out.println( String.format( "get( %d ): %s", i, result ) );  
  •         }
  •     }
  • }
  
  

以上代码大概做了这几件事情:
初始化一个连接池
新建一个memcached客户端
set一个key/value
get一个key,并且打印出value
这是我们实际应用中很常见的场景。
  
连接池的创建和初始化
        连接池SockIOPool是非常重要的部分,它的好坏直接决定了客户端的性能。SockIOPool用一个HashMap持有多个连接池对象,连接池以名称作为标识,默认为"default"。看看
  SockIOPool的getInstance方法就知道了。



Java代码  

  • public static SockIOPool getInstance() {  
  •       return getInstance("default");  
  •   }

  •   public static synchronized SockIOPool getInstance(String poolName) {  
  •       if (pools.containsKey(poolName)) return pools.get(poolName);  

  •       SockIOPool pool = new SockIOPool();  
  •       pools.put(poolName, pool);

  •       return pool;  
  •   }
  
  
  
连接池实例化完成后,还需要初始化,看看pool.initialize()做了什么:



Java代码  



  • public void initialize() {  
  •    //这里以自身作为同步锁,防止被多次初始化  
  •    synchronized (this) {  
  •    // 如果已经被初始化了则终止初始化过程  
  •    if (initialized && (buckets != null || consistentBuckets != null) && (availPool != null)&& (busyPool != null)) {  
  •       log.error("++++ trying to initialize an already initialized pool");  
  •       return;  
  •    }
  •     <span style="color: #ff0000;">  // 可用连接集合</span>  
  •       availPool = new HashMap<String, Map<SockIO, Long>>(servers.length * initConn);  
  •      //工作连接集合  
  •      busyPool = new HashMap<String, Map<SockIO, Long>>(servers.length * initConn);   
  •      // 不可用连接集合           
  •      deadPool = new IdentityHashMap<SockIO, Integer>();  
  •     hostDeadDur = new HashMap<String, Long>();  
  •    hostDead = new HashMap<String, Date>();  
  •    maxCreate = (poolMultiplier > minConn) ? minConn : minConn / poolMultiplier;
  •    if (log.isDebugEnabled()) {  
  •       log.debug("++++ initializing pool with following settings:");  
  •       log.debug("++++ initial size: " + initConn);  
  •       log.debug("++++ min spare   : " + minConn);  
  •       log.debug("++++ max spare   : " + maxConn);  
  •    }
  •    if (servers == null || servers.length <= 0) {  
  •       log.error("++++ trying to initialize with no servers");  
  •       throw new IllegalStateException("++++ trying to initialize with no servers");  
  •    }
  •    // initalize our internal hashing structures  
  •    if (this.hashingAlg == CONSISTENT_HASH) populateConsistentBuckets();  
  •    else populateBuckets();  
  •    // mark pool as initialized  
  •    this.initialized = true;  
  •    // start maint thread  
  •    if (this.maintSleep > 0) this.startMaintThread();  
  •   }
  • }
  

  
  
  连接池的关闭
  很简单,只是重置清空相关参数而已



Java代码  

  • public void shutDown() {  
  •         synchronized (this) {  
  •             if (log.isDebugEnabled()) log.debug("++++ SockIOPool shutting down...");  

  •             if (maintThread != null && maintThread.isRunning()) {  
  •                 // stop the main thread  
  •                 stopMaintThread();

  •                 // wait for the thread to finish  
  •                 while (maintThread.isRunning()) {  
  •                     if (log.isDebugEnabled()) log.debug("++++ waiting for main thread to finish run +++");  
  •                     try {  
  •                         Thread.sleep(500);  
  •                     } catch (Exception ex) {  
  •                     }
  •                 }
  •             }

  •             if (log.isDebugEnabled()) log.debug("++++ closing all internal pools.");  
  •             closePool(availPool);
  •             closePool(busyPool);
  •             availPool = null;  
  •             busyPool = null;  
  •             buckets = null;  
  •             consistentBuckets = null;  
  •             hostDeadDur = null;  
  •             hostDead = null;  
  •             maintThread = null;  
  •             initialized = false;  
  •             if (log.isDebugEnabled()) log.debug("++++ SockIOPool finished shutting down.");  
  •         }
  •     }
  
  
  
  
  连接池的自动平衡
SockIOPool的initialize()方法最后有这么一行代码
  // start maint thread
if (this.maintSleep > 0) this.startMaintThread();

  这是在初始化完成后,启动线程池平衡线程
  



Java代码  

  • protected void startMaintThread() {  
  •       if (maintThread != null) {  
  •           if (maintThread.isRunning()) {  
  •               log.error("main thread already running");  
  •           } else {  
  •               maintThread.start();
  •           }
  •       } else {  
  •           maintThread = new MaintThread(this);  
  •           maintThread.setInterval(this.maintSleep);  
  •           maintThread.start();
  •       }
  •   }
  
  
  MaintThread的run方法



Java代码  

  • public void run() {  
  •    this.running = true;  
  •    while (!this.stopThread) {  
  •        try {  
  •            Thread.sleep(interval);
  •            // if pool is initialized, then  
  •            // run the maintenance method on itself  
  •            if (pool.isInitialized()) pool.selfMaint();  
  •        } catch (Exception e) {  
  •            break;  
  •        }
  •    }
  •    this.running = false;  
  
其实最终的平衡方法是SockIOPool.selfMaint()
  



Java代码  

  • protected void selfMaint() {  
  •         if (log.isDebugEnabled()) log.debug("++++ Starting self maintenance....");  

  •         // go through avail sockets and create sockets  
  •         // as needed to maintain pool settings  
  •         Map<String, Integer> needSockets = new HashMap<String, Integer>();  

  •         synchronized (this) {  
  •             // 先统计每个服务器实例的可用连接是否小于最小可用连接数  
  •         for (Iterator<String> i = availPool.keySet().iterator(); i.hasNext();) {  
  •                 String host = i.next();
  •                 Map<SockIO, Long> sockets = availPool.get(host);

  •                 if (log.isDebugEnabled()) log.debug("++++ Size of avail pool for host (" + host + ") = "  
  •                                                     + sockets.size());

  •                 // if pool is too small (n < minSpare)  
  •                 if (sockets.size() < minConn) {  
  •                     // need to create new sockets  
  •                     int need = minConn - sockets.size();  
  •                     needSockets.put(host, need);
  •                 }
  •             }
  •         }

  •         // 如果小于最小可用连接数,则要新建增加可用连接  
  •      Map<String, Set<SockIO>> newSockets = new HashMap<String, Set<SockIO>>();  

  •         for (String host : needSockets.keySet()) {  
  •             Integer need = needSockets.get(host);

  •             if (log.isDebugEnabled()) log.debug("++++ Need to create " + need + " new sockets for pool for host: "  
  •                                                 + host);

  •             Set<SockIO> newSock = new HashSet<SockIO>(need);  
  •             for (int j = 0; j < need; j++) {  
  •                 SockIO socket = createSocket(host);
  •                 if (socket == null) break;  
  •                 newSock.add(socket);
  •             }
  •             newSockets.put(host, newSock);
  •         }

  •         // synchronize to add and remove to/from avail pool  
  •         // as well as clean up the busy pool (no point in releasing  
  •         // lock here as should be quick to pool adjust and no  
  •         // blocking ops here)  
  •         synchronized (this) {  
  •             //将新建的连接添加到可用连接集合里  
  •        for (String host : newSockets.keySet()) {  
  •                 Set<SockIO> sockets = newSockets.get(host);
  •                 for (SockIO socket : sockets)  
  •                     addSocketToPool(availPool, host, socket);
  •             }

  •             for (Iterator<String> i = availPool.keySet().iterator(); i.hasNext();) {  
  •                 String host = i.next();
  •                 Map<SockIO, Long> sockets = availPool.get(host);
  •                 if (log.isDebugEnabled()) log.debug("++++ Size of avail pool for host (" + host + ") = "  
  •                                                     + sockets.size());

  •                 //如果可用连接超过了最大连接数,则要关闭一些  
  •           if (sockets.size() > maxConn) {  
  •                     // need to close down some sockets  
  •                     int diff = sockets.size() - maxConn;  
  •                     int needToClose = (diff <= poolMultiplier) ? diff : (diff) / poolMultiplier;  

  •                     if (log.isDebugEnabled()) log.debug("++++ need to remove " + needToClose  
  •                                                         + " spare sockets for pool for host: " + host);  

  •                     for (Iterator<SockIO> j = sockets.keySet().iterator(); j.hasNext();) {  
  •                         if (needToClose <= 0) break;  

  •                         // remove stale entries  
  •                         SockIO socket = j.next();
  •                         long expire = sockets.get(socket).longValue();  

  •                         // 这里回收可用连接池的闲置连接,连接设置到可用连接池里时,expire设置为当前时间。如果 (expire + maxIdle) < System.currentTimeMillis()为true,则表  
  • 明,该连接在可用连接池呆得太久了,需要回收
  •                if ((expire + maxIdle) < System.currentTimeMillis()) {  
  •                             if (log.isDebugEnabled()) log.debug("+++ removing stale entry from pool as it is past its idle timeout and pool is over max spare");  

  •                             // remove from the availPool  
  •                             deadPool.put(socket, ZERO);
  •                             j.remove();
  •                             needToClose--;
  •                         }
  •                     }
  •                 }
  •             }

  •             //清理正在工作的连接集合  
  •         for (Iterator<String> i = busyPool.keySet().iterator(); i.hasNext();) {  
  •                 String host = i.next();
  •                 Map<SockIO, Long> sockets = busyPool.get(host);
  •                 if (log.isDebugEnabled()) log.debug("++++ Size of busy pool for host (" + host + ")  = "  
  •                                                     + sockets.size());
  •                 // loop through all connections and check to see if we have any hung connections  
  •                 for (Iterator<SockIO> j = sockets.keySet().iterator(); j.hasNext();) {  
  •                     // remove stale entries  
  •                     SockIO socket = j.next();
  •                     long hungTime = sockets.get(socket).longValue();  
  •                     //如果工作时间超过maxBusyTime,则也要回收掉,超过maxBusyTime,可能是服务器响应时间过长  
  •              if ((hungTime + maxBusyTime) < System.currentTimeMillis()) {  
  •                         log.error("+++ removing potentially hung connection from busy pool ... socket in pool for "  
  •                                   + (System.currentTimeMillis() - hungTime) + "ms");  

  •                         // remove from the busy pool  
  •                         deadPool.put(socket, ZERO);
  •                         j.remove();
  •                     }
  •                 }
  •             }
  •         }

  •         // 最后清理不可用连接集合  
  •      Set<SockIO> toClose;
  •         synchronized (deadPool) {  
  •             toClose = deadPool.keySet();
  •             deadPool = new IdentityHashMap<SockIO, Integer>();  
  •         }

  •         for (SockIO socket : toClose) {  
  •             try {  
  •                 socket.trueClose(false);  
  •             } catch (Exception ex) {  
  •                 log.error("++++ failed to close SockIO obj from deadPool");  
  •                 log.error(ex.getMessage(), ex);
  •             }

  •             socket = null;  
  •         }

  •         if (log.isDebugEnabled()) log.debug("+++ ending self maintenance.");  
  •     }
  

  
  
  key的服务器端分布
  初始化方法其实就是根据每个服务器的权重,建立一个服务器地址集合,如果选择了一致性哈希,则对服务器地址进行一致性哈希分布,一致性哈希算法比较简单,如果不了解的同学,可以
  自行google一下,initialize() 方法里有这段代码:
  //一致性哈希



Java代码  

  • if (this.hashingAlg == CONSISTENT_HASH){  
  •   populateConsistentBuckets();
  • }else populateBuckets();  
  
看看populateConsistentBuckets()方法
  // 用一致性哈希算法将服务器分布在一个2的32次方的环里,服务器的分布位置<=servers.length*40*4



Java代码  

  • private void populateConsistentBuckets() {  
  •     if (log.isDebugEnabled()) log.debug("++++ initializing internal hashing structure for consistent hashing");  

  •     // store buckets in tree map  
  •     this.consistentBuckets = new TreeMap<Long, String>();  
  •     MessageDigest md5 = MD5.get();
  •     if (this.totalWeight <= 0 && this.weights != null) {  
  •         for (int i = 0; i < this.weights.length; i++)  
  •             this.totalWeight += (this.weights == null) ? 1 : this.weights;  
  •     } else if (this.weights == null) {  
  •         this.totalWeight = this.servers.length;  
  •     }

  •     for (int i = 0; i < servers.length; i++) {  
  •        int thisWeight = 1;  
  •        if (this.weights != null && this.weights != null) thisWeight = this.weights;  

  •       //这个值永远小于40 * this.servers.length,因为thisWeight/totalWeight永远小于1  



Java代码  

  • double factor = Math.floor(((double) (40 * this.servers.length * thisWeight)) / (double) this.totalWeight);  

  •   //服务器的分布位置为factor*4,factor<=40*this.servers.length,所以服务器的分布位置& lt;=40*this.servers.length*4。  
  • for (long j = 0; j < factor; j++) {  
  •       //md5值的二进制数组为16位  
  •    byte[] d = md5.digest((servers + "-" + j).getBytes());  
  •        //16位二进制数组每4位为一组,每组第4个值左移24位,第三个值左移16位,第二个值左移8位,第一个值不移位。进行或运算,得到一个小于2的32 次方的long值。  
  •     for (int h = 0; h < 4; h++) {  
  •             Long k = ((long) (d[3 + h * 4] & 0xFF) << 24) | ((long) (d[2 + h * 4] & 0xFF) << 16)  
  •                          | ((long) (d[1 + h * 4] & 0xFF) << 8) | ((long) (d[0 + h * 4] & 0xFF));  
  •                 consistentBuckets.put(k, servers);
  •                 if (log.isDebugEnabled()) log.debug("++++ added " + servers + " to server bucket");  
  •          }
  •    }

  •    // create initial connections  
  •    if (log.isDebugEnabled()) log.debug("+++ creating initial connections (" + initConn + ") for host: "  
  •                                             + servers);

  •    //创建连接  
  • for (int j = 0; j < initConn; j++) {  
  •        SockIO socket = createSocket(servers);
  •        if (socket == null) {  
  •             log.error("++++ failed to create connection to: " + servers + " -- only " + j + " created.");  
  •              break;  
  •        }

  •        //添加到可用连接池  
  •    addSocketToPool(availPool, servers, socket);
  •        if (log.isDebugEnabled()) log.debug("++++ created and added socket: " + socket.toString()  
  •                                                 + " for host " + servers);  
  •    }
  • }
  
  
  如果不是一致性哈希,则只是普通分布,很简单,只是根据权重将服务器地址放入buckets这个List里



Java代码  

  • private void populateBuckets() {  
  •         if (log.isDebugEnabled()) log.debug("++++ initializing internal hashing structure for consistent hashing");  

  •         // store buckets in tree map  
  •         this.buckets = new ArrayList<String>();  

  •         for (int i = 0; i < servers.length; i++) {  
  •             if (this.weights != null && this.weights.length > i) {  
  •                 for (int k = 0; k < this.weights.intValue(); k++) {  
  •                     this.buckets.add(servers);  
  •                     if (log.isDebugEnabled()) log.debug("++++ added " + servers + " to server bucket");  
  •                 }
  •             } else {  
  •                 this.buckets.add(servers);  
  •                 if (log.isDebugEnabled()) log.debug("++++ added " + servers + " to server bucket");  
  •             }

  •             // create initial connections  
  •             if (log.isDebugEnabled()) log.debug("+++ creating initial connections (" + initConn + ") for host: "  
  •                                                 + servers);

  •             for (int j = 0; j < initConn; j++) {  
  •                 SockIO socket = createSocket(servers);
  •                 if (socket == null) {  
  •                     log.error("++++ failed to create connection to: " + servers + " -- only " + j + " created.");  
  •                     break;  
  •                 }

  •                 //新建连接后,加入到可用连接集合里  
  •            addSocketToPool(availPool, servers, socket);
  •                 if (log.isDebugEnabled()) log.debug("++++ created and added socket: " + socket.toString()  
  •                                                     + " for host " + servers);  
  •             }
  •         }
  •     }
  

  
  
  如何创建socket连接
  在上面的private void populateBuckets()方法里,createSocket(servers)是创建到服务器的连接,看看这个方法



Java代码  

  • protected SockIO createSocket(String host) {  
  • SockIO socket = null;  
  • //hostDeadLock是一个可重入锁,它的变量声明为  


  • private final ReentrantLock             hostDeadLock    = new ReentrantLock();  
  • hostDeadLock.lock();
  • try {  
  •   //hostDead.containsKey(host)为true表示曾经连接过该服务器,但没有成功。  
  •   //hostDead是一个HashMap,key为服务器地址,value为当时连接不成功的时间  
  •   //hostDeadDur是一个HashMap,key为服务器地址,value为设置的重试间隔时间  

  •    if (failover && failback && hostDead.containsKey(host) && hostDeadDur.containsKey(host)) {  
  •        Date store = hostDead.get(host);
  •        long expire = hostDeadDur.get(host).longValue();  

  •       if ((store.getTime() + expire) > System.currentTimeMillis()) return null;  
  •    }
  • } finally {  
  •     hostDeadLock.unlock();
  •   }


  • try {  
  •      socket = new SockIO(this, host, this.socketTO, this.socketConnectTO, this.nagle);  
  •      if (!socket.isConnected()) {  
  •                log.error("++++ failed to get SockIO obj for: " + host + " -- new socket is not connected");  
  •                deadPool.put(socket, ZERO);
  •                socket = null;  
  •      }
  • } catch (Exception ex) {  
  •            log.error("++++ failed to get SockIO obj for: " + host);  
  •            log.error(ex.getMessage(), ex);
  •            socket = null;  
  • }

  •   // if we failed to get socket, then mark  
  •   // host dead for a duration which falls off  
  •   hostDeadLock.lock();
  •   try {  
  •          //到了这里,socket仍然为null,说明这个server悲剧了,无法和它创建连接,则要把该server丢到不可用的主机集合里  
  •       if (socket == null) {  
  •                Date now = new Date();  
  •                hostDead.put(host, now);

  •                //如果上次就不可用了,到期了仍然不可用,就要这次的不可用时间设为上次的2倍,否则初始时长为1000ms  
  •                long expire = (hostDeadDur.containsKey(host)) ? (((Long) hostDeadDur.get(host)).longValue() * 2) : 1000;  

  •                if (expire > MAX_RETRY_DELAY) expire = MAX_RETRY_DELAY;  

  •                hostDeadDur.put(host, new Long(expire));  
  •                if (log.isDebugEnabled()) log.debug("++++ ignoring dead host: " + host + " for " + expire + " ms");  

  •                // 既然这个host都不可用了,那与它的所有连接当然要从可用连接集合"availPool"里删除掉  
  •          clearHostFromPool(availPool, host);
  •            } else {  
  •                if (log.isDebugEnabled()) log.debug("++++ created socket (" + socket.toString() + ") for host: " + host);  
  •                //连接创建成功,如果上次不成功,那么这次要把该host从不可用主机集合里删除掉  
  •           if (hostDead.containsKey(host) || hostDeadDur.containsKey(host)) {  
  •                    hostDead.remove(host);
  •                    hostDeadDur.remove(host);
  •                }
  •            }
  •        } finally {  
  •            hostDeadLock.unlock();
  •        }

  •        return socket;  
  •    }
  
  
    SockIO构造函数



Java代码  

  • public SockIO(SockIOPool pool, String host, int timeout, int connectTimeout, boolean noDelay)  
  •                                                                                                throws IOException,  
  •                                                                                                UnknownHostException {
  •       this.pool = pool;  
  •       String[] ip = host.split(":");  
  •       // get socket: default is to use non-blocking connect  
  •       sock = getSocket(ip[0], Integer.parseInt(ip[1]), connectTimeout);  
  •       if (timeout >= 0) this.sock.setSoTimeout(timeout);  
  •       // testing only  
  •       sock.setTcpNoDelay(noDelay);
  •       // wrap streams  
  •       in = new DataInputStream(new BufferedInputStream(sock.getInputStream()));  
  •       out = new BufferedOutputStream(sock.getOutputStream());  
  •       this.host = host;  
  •   }
  

  getSocket方法



Java代码  

  • protected static Socket getSocket(String host, int port, int timeout) throws IOException {  
  •          SocketChannel sock = SocketChannel.open();
  •          sock.socket().connect(new InetSocketAddress(host, port), timeout);  
  •          return sock.socket();  
  • }
  可以看到,socket连接是用nio方式创建的。
  
  
  
  新建MemcachedClient
MemcachedClient mcc = new MemcachedClient();新建了一个memcached客户端,看看构造函数,没作什么,只是设置参数而已。



Java代码  

  • /**
  •   * Creates a new instance of MemCachedClient.
  •   */  
  • public MemcachedClient() {  
  •      init();
  • }


  • private void init() {  
  •      this.sanitizeKeys       = true;  
  •      this.primitiveAsString  = false;  
  •      this.compressEnable     = true;  
  •      this.compressThreshold  = COMPRESS_THRESH;  
  •      this.defaultEncoding    = "UTF-8";  
  •      this.poolName           = ( this.poolName == null ) ? "default" : this.poolName;  

  •      // get a pool instance to work with for the life of this instance  
  •      this.pool               = SockIOPool.getInstance( poolName );  
  • }
  
  
  
  
  
  set方法如何工作
  到此memcached客户端初始化工作完成。再回到测试类TestMemcached,看看for循环里的
  boolean success = mcc.set( ""  + i, "Hello!" );
String result = (String)mcc.get( "" + i );
初始化后,就可以set,get了。看看set是怎么工作的。
  



Java代码  

  • /**
  •      * Stores data on the server; only the key and the value are specified.
  •      *
  •      * @param key key to store data under
  •      * @param value value to store
  •      * @return true, if the data was successfully stored
  •      */  
  •     public boolean set( String key, Object value ) {  
  •         return set( "set", key, value, null, null, primitiveAsString );  
  •     }


  •     //这个set方法比较长   
  •    private boolean set( String cmdname, String key, Object value, Date expiry, Integer hashCode, boolean asString ) {  
  •         if ( cmdname == null || cmdname.trim().equals( "" ) || key == null ) {  
  •             log.error( "key is null or cmd is null/empty for set()" );  
  •             return false;  
  •         }

  •         try {  
  •             key = sanitizeKey( key );
  •         }
  •         catch ( UnsupportedEncodingException e ) {  
  •             // if we have an errorHandler, use its hook  
  •             if ( errorHandler != null )  
  •                 errorHandler.handleErrorOnSet( this, e, key );  
  •             log.error( "failed to sanitize your key!", e );  
  •             return false;  
  •         }

  •         if ( value == null ) {  
  •             log.error( "trying to store a null value to cache" );  
  •             return false;  
  •         }

  •         // get SockIO obj  
  •         SockIOPool.SockIO sock = pool.getSock( key, hashCode );

  •         if ( sock == null ) {  
  •             if ( errorHandler != null )  
  •                 errorHandler.handleErrorOnSet( this, new IOException( "no socket to server available" ), key );  
  •             return false;  
  •         }

  •         if ( expiry == null )  
  •             expiry = new Date(0);  

  •         // store flags  
  •         int flags = 0;  

  •         // byte array to hold data  
  •         byte[] val;  

  •     //这些类型自己序列化,否则由java序列化处理  
  •    if ( NativeHandler.isHandled( value ) ) {           
  •             if ( asString ) {  
  •                 //如果是字符串,则直接getBytes   
  •                 try {  
  •                     if ( log.isInfoEnabled() )  
  •                         log.info( "++++ storing data as a string for key: " + key + " for class: " + value.getClass().getName() );  
  •                     val = value.toString().getBytes( defaultEncoding );
  •                 }
  •                 catch ( UnsupportedEncodingException ue ) {  
  •                     // if we have an errorHandler, use its hook  
  •                     if ( errorHandler != null )  
  •                         errorHandler.handleErrorOnSet( this, ue, key );  
  •                     log.error( "invalid encoding type used: " + defaultEncoding, ue );  
  •                     sock.close();
  •                     sock = null;  
  •                     return false;  
  •                 }
  •             }
  •             else {  
  •                 try {  
  •                     if ( log.isInfoEnabled() )  
  •                         log.info( "Storing with native handler..." );  
  •                     flags |= NativeHandler.getMarkerFlag( value );
  •                     val    = NativeHandler.encode( value );
  •                 }
  •                 catch ( Exception e ) {  
  •                     // if we have an errorHandler, use its hook  
  •                     if ( errorHandler != null )  
  •                         errorHandler.handleErrorOnSet( this, e, key );  
  •                     log.error( "Failed to native handle obj", e );  

  •                     sock.close();
  •                     sock = null;  
  •                     return false;  
  •                 }
  •             }
  •         }
  •         else {  
  •             // 否则用java的序列化  
  •         try {  
  •                 if ( log.isInfoEnabled() )  
  •                     log.info( "++++ serializing for key: " + key + " for class: " + value.getClass().getName() );  
  •                 ByteArrayOutputStream bos = new ByteArrayOutputStream();  
  •                 (new ObjectOutputStream( bos )).writeObject( value );  
  •                 val = bos.toByteArray();
  •                 flags |= F_SERIALIZED;
  •             }
  •             catch ( IOException e ) {  
  •                 // if we have an errorHandler, use its hook  
  •                 if ( errorHandler != null )  
  •                     errorHandler.handleErrorOnSet( this, e, key );  

  •                 // if we fail to serialize, then  
  •                 // we bail  
  •                 log.error( "failed to serialize obj", e );  
  •                 log.error( value.toString() );

  •                 // return socket to pool and bail  
  •                 sock.close();
  •                 sock = null;  
  •                 return false;  
  •             }
  •         }

  •         //压缩内容  
  •      if ( compressEnable && val.length > compressThreshold ) {  
  •             try {  
  •                 if ( log.isInfoEnabled() ) {  
  •                     log.info( "++++ trying to compress data" );  
  •                     log.info( "++++ size prior to compression: " + val.length );  
  •                 }
  •                 ByteArrayOutputStream bos = new ByteArrayOutputStream( val.length );  
  •                 GZIPOutputStream gos = new GZIPOutputStream( bos );  
  •                 gos.write( val, 0, val.length );  
  •                 gos.finish();
  •                 gos.close();

  •                 // store it and set compression flag  
  •                 val = bos.toByteArray();
  •                 flags |= F_COMPRESSED;

  •                 if ( log.isInfoEnabled() )  
  •                     log.info( "++++ compression succeeded, size after: " + val.length );  
  •             }
  •             catch ( IOException e ) {  
  •                 // if we have an errorHandler, use its hook  
  •                 if ( errorHandler != null )  
  •                     errorHandler.handleErrorOnSet( this, e, key );  
  •                 log.error( "IOException while compressing stream: " + e.getMessage() );  
  •                 log.error( "storing data uncompressed" );  
  •             }
  •         }

  •         // now write the data to the cache server  
  •         try {  
  •              //按照memcached协议组装命令  
  •         String cmd = String.format( "%s %s %d %d %d\r\n", cmdname, key, flags, (expiry.getTime() / 1000), val.length );  
  •             sock.write( cmd.getBytes() );
  •             sock.write( val );
  •             sock.write( "\r\n".getBytes() );  
  •             sock.flush();

  •             // get result code  
  •             String line = sock.readLine();
  •             if ( log.isInfoEnabled() )  
  •                 log.info( "++++ memcache cmd (result code): " + cmd + " (" + line + ")" );  

  •             if ( STORED.equals( line ) ) {  
  •                 if ( log.isInfoEnabled() )  
  •                     log.info("++++ data successfully stored for key: " + key );  
  •                 sock.close();
  •                 sock = null;  
  •                 return true;  
  •             }
  •             else if ( NOTSTORED.equals( line ) ) {  
  •                 if ( log.isInfoEnabled() )  
  •                     log.info( "++++ data not stored in cache for key: " + key );  
  •             }
  •             else {  
  •                 log.error( "++++ error storing data in cache for key: " + key + " -- length: " + val.length );  
  •                 log.error( "++++ server response: " + line );  
  •             }
  •         }
  •         catch ( IOException e ) {  

  •             // if we have an errorHandler, use its hook  
  •             if ( errorHandler != null )  
  •                 errorHandler.handleErrorOnSet( this, e, key );  

  •             // exception thrown  
  •             log.error( "++++ exception thrown while writing bytes to server on set" );  
  •             log.error( e.getMessage(), e );

  •             try {  
  •                 sock.trueClose();
  •             }
  •             catch ( IOException ioe ) {  
  •                 log.error( "++++ failed to close socket : " + sock.toString() );  
  •             }

  •             sock = null;  
  •         }

  •         //用完了,就要回收哦,sock.close()不是真正的关闭,只是放入到可用连接集合里。   
  •        if ( sock != null ) {  
  •             sock.close();
  •             sock = null;  
  •         }
  •         return false;  
  •     }
  通过set方法向服务器设置key和value,涉及到以下几个点
数据的压缩和序列化 (如果是get方法,则和set方法基本是相反的)
为key分配服务器 对于一些常用类型,采用自定义的序列化,具体要看NativeHander.java,这个类比较简单,有兴趣可以自己看看
  



Java代码  

  • public static boolean isHandled( Object value ) {  
  •        return (  
  •            value instanceof Byte            ||  
  •            value instanceof Boolean         ||  
  •            value instanceof Integer         ||  
  •            value instanceof Long            ||  
  •            value instanceof Character       ||  
  •            value instanceof String          ||  
  •            value instanceof StringBuffer    ||  
  •            value instanceof Float           ||  
  •            value instanceof Short           ||  
  •            value instanceof Double          ||  
  •            value instanceof Date            ||  
  •            value instanceof StringBuilder   ||  
  •            value instanceof byte[]  
  •            )
  •        ? true  
  •        : false;  
  •    }
  其他类型则用java的默认序列化
  
  
  
  为key选择服务器
SockIOPool.SockIO sock = pool.getSock( key, hashCode );就是为key选择服务器



Java代码  

  • public SockIO getSock(String key, Integer hashCode) {  
  •         if (log.isDebugEnabled()) log.debug("cache socket pick " + key + " " + hashCode);  
  •         if (!this.initialized) {  
  •             log.error("attempting to get SockIO from uninitialized pool!");  
  •             return null;  
  •         }

  •         // if no servers return null  
  •         if ((this.hashingAlg == CONSISTENT_HASH && consistentBuckets.size() == 0)  
  •             || (buckets != null && buckets.size() == 0)) return null;  

  •         // if only one server, return it  
  •         if ((this.hashingAlg == CONSISTENT_HASH && consistentBuckets.size() == 1)  
  •             || (buckets != null && buckets.size() == 1)) {  
  •             SockIO sock = (this.hashingAlg == CONSISTENT_HASH) ? getConnection(consistentBuckets.get(consistentBuckets.firstKey())) : getConnection(buckets.get(0));  
  •             if (sock != null && sock.isConnected()) {  
  •                 if (aliveCheck) {//健康状态检查  

  •                     if (!sock.isAlive()) {  
  •                         sock.close();
  •                         try {  
  •                             sock.trueClose();//有问题,真的关闭socket  

  •                         } catch (IOException ioe) {  
  •                             log.error("failed to close dead socket");  
  •                         }
  •                         sock = null;  
  •                     }
  •                 }
  •             } else {//连接不正常,放入不可用连接集合里  
  •           if (sock != null) {  
  •                     deadPool.put(sock, ZERO);
  •                     sock = null;  
  •                 }
  •             }

  •             return sock;  
  •         }

  •         Set<String> tryServers = new HashSet<String>(Arrays.asList(servers));  
  •         // get initial bucket  
  •         long bucket = getBucket(key, hashCode);  
  •         String server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket);  

  •         while (!tryServers.isEmpty()) {  
  •             // try to get socket from bucket  
  •             SockIO sock = getConnection(server);
  •             if (log.isDebugEnabled()) log.debug("cache choose " + server + " for " + key);  
  •             if (sock != null && sock.isConnected()) {  
  •                 if (aliveCheck) {  
  •                     if (sock.isAlive()) {  
  •                         return sock;  
  •                     } else {  
  •                         sock.close();
  •                         try {  
  •                             sock.trueClose();
  •                         } catch (IOException ioe) {  
  •                             log.error("failed to close dead socket");  
  •                         }
  •                         sock = null;  
  •                     }
  •                 } else {  
  •                     return sock;  
  •                 }
  •             } else {  
  •                 if (sock != null) {  
  •                     deadPool.put(sock, ZERO);
  •                     sock = null;  
  •                 }
  •             }

  •             // if we do not want to failover, then bail here  
  •             if (!failover) return null;  

  •             // log that we tried  
  •             tryServers.remove(server);

  •             if (tryServers.isEmpty()) break;   
  •            //注意哦,下面是failover机制  
  •         int rehashTries = 0;  
  •             while (!tryServers.contains(server)) {  
  •                 String newKey = String.format("%s%s", rehashTries, key);  
  •                 if (log.isDebugEnabled()) log.debug("rehashing with: " + newKey);  

  •                 bucket = getBucket(newKey, null);  
  •                 server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket);  
  •                 rehashTries++;
  •             }
  •         }
  •         return null;  
  •     }
  

  
  下面这个方法是真正的从服务器获取连接
  
  
   



Java代码  

  • public SockIO getConnection(String host) {  
  •         if (!this.initialized) {  
  •             log.error("attempting to get SockIO from uninitialized pool!");  
  •             return null;  
  •         }

  •         if (host == null) return null;  

  •         synchronized (this) {  
  •             // if we have items in the pool  
  •             // then we can return it  
  •             if (availPool != null && !availPool.isEmpty()) {  
  •                 // take first connected socket  
  •                 Map<SockIO, Long> aSockets = availPool.get(host);
  •                 if (aSockets != null && !aSockets.isEmpty()) {  
  •                     for (Iterator<SockIO> i = aSockets.keySet().iterator(); i.hasNext();) {  
  •                         SockIO socket = i.next();
  •                         if (socket.isConnected()) {  
  •                             if (log.isDebugEnabled()) log.debug("++++ moving socket for host (" + host  
  •                                                                 + ") to busy pool ... socket: " + socket);  
  •                             // remove from avail pool  
  •                             i.remove();
  •                             // add to busy pool  
  •                             addSocketToPool(busyPool, host, socket);
  •                             // return socket  
  •                             return socket;  
  •                         } else {  
  •                             // add to deadpool for later reaping  
  •                             deadPool.put(socket, ZERO);
  •                             // remove from avail pool  
  •                             i.remove();
  •                         }
  •                     }
  •                 }
  •             }
  •         }

  •         // create one socket -- let the maint thread take care of creating more  
  •         SockIO socket = createSocket(host);
  •         if (socket != null) {  
  •             synchronized (this) {  
  •                 addSocketToPool(busyPool, host, socket);
  •             }
  •         }
  •         return socket;  
  •     }


  
  
  
  failover和failback
  这两者都是发生在获取可用连接这个环节。
  failover,如果为key选择的服务器不可用,则对key重新哈希选择下一个服务器,详见getSock方法的末尾。
  failback,用一个hashmap存储连接失败的服务器和对应的失效持续时间,每次获取连接时,都探测是否到了重试时间。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-108293-1-1.html 上篇帖子: memcached安装及测试 下篇帖子: mac + apache2 +memcached +yii
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表