zi663227 发表于 2017-4-15 11:34:13

memcache数据缓存系统

memcache数据缓存系统

一 目的:

在服务器集群环境下,设置一个共享数据的另一机制。
减少对数据库的直接操作,减轻数据库负担。
提高系统的响应能力。
二 系统整体结构:

   


三 系统设计说明

1.Memcache服务器集群:
   Memcache服务器是逻辑上的划分,在分析业务数据的基础上,将数据安排存放在不同的Memcache服务器上,有必要的话可以将不同硬件服务器上的多个Memcache服务器再做成一个数据互相备份的组,避免数据的单点丢失的风险。
Memcache服务器安装在不同的硬件服务器上,比如可以在一台硬件服务器上安装3个Memcache服务器,在另外一台硬件服务器上安装4个Memcache服务器。

2.缓存数据的配置信息:
   在数据库中建一张表来说明Memcache服务器集群中缓存数据的存放逻辑,提供给memCache管理程序使用,以便实现从缓存到数据库,数据库到缓存双向的数据存取。
表结构设计如下:
cache_data_propertits_1(读数据库写缓存)
列列名类型说明
mc_serverMC服务器CHAR(10)决定数据存放在哪个MC服务器,和MC的客户端配置文件要求一致。
Object_ key缓存对象的主键Char(30)存放在MC对象的主键。
Object_key _suffix缓存对象的主键后缀Char(20)存放在MC对象的主键后缀(给使用者作为参考,该值一般对应系统中对象的具体id,比如customer_no,emp_no等)。
Proc_name过程名Char(80)Memcache管理程序定期执行生成缓存数据。
Proc_control_type过程控制类型Char(10)“定时”,“间隔”。
Proc_control_parm过程控制参数Char(50)若“定时”:1,17等,若“间隔”:
6,12等。
Proc_flag是否立即执行Char(2)是/否
Status状态Char(4)有效/无效
Note说明Char(100)说明用途
Proc_status是否在运行状态Char(2)如果这个任务正在运行就标志出来。
Last_proc_time最近执行完成时间Datetime这个任务最近执行成功时间。




cache_data_propertits_2(读缓存写数据库)
列列名类型说明
mc_serverMC服务器CHAR(10)决定数据存放在哪个MC服务器,和MC的客户端配置文件要求一致。
Object_list存放缓存对象的主键的列表Char(30)例如 ep_log_list(存放日志数据,这样可以将日志文件先写到缓存,到空闲的时候再复制到数据库)
Object_ key缓存对象的主键Char(30)例如:ep_log
Object_key _suffix缓存对象的主键后缀Char(20)例如:唯一码。
Proc_name过程名Char(80)例如:proc_ep_log_mc ? ??
Proc_parm过程参数Char(200)例如: start_time, end_time, proc_time,
sql
Proc_control_type过程控制类型Char(10)“定时”,“间隔”。
Proc_control_parm过程控制参数Char(50)若“定时”:1,17等,若“间隔”:
6,12等。
Proc_flag是否立即执行Char(2)是/否
Status状态Char(4)有效/无效
Note说明Char(100)说明用途
Proc_status是否在运行状态Char(2)如果这个任务正在运行就标志出来。
Last_proc_time最近执行完成时间Datetime这个任务最近执行成功时间。


         

3.memCache管理程序
   memCache管理程序最为独立的运行的应用程序,它主要负责读取配置表中的配置信息,根据配置文件的设置,交换数据库和缓存中的数据。
memCache管理程序可以利用目前比较完善的memCache客户端程序,以便加强功能。

4.业务系统
业务系统开发人员参考配置文件,决定如何从缓存取得数据,并采用合适的策略。(比如:是利用本地缓存,还是直接利用memCache缓存)
开发代理访问缓存的程序。
   

5.存放在缓冲中的数据格式
采用 ArrayList <Map m>的形式,目的是为了通用。
但是这个结构额外的内存开销会比较大,而且序列化对象也会增加系统的响应负担。



四 具体实现说明
   根据业务实际需要,目前只实现了后台管理端读数据库写缓存,然后业务程序读缓存,当缓存中没有业务数据数据的时候,写入缓存。

1 缓冲服务器
    在多台服务器上安装memcached服务器,服务器的安装步骤,参与我的另一篇文章《建立集群架构的步骤》。
2 管理端程序:
用到的jar 包:
a.alisoft-xplatform-asf-cache-2.5.jar
b.commons-logging.jar
c.j2ee.jar
d.jconn3.jar
e.log4j-1.2.11.jar
f.proxool-0.9.1.jar
g.proxool-cglib.jar

类McTranslateServer.java

public class McTranslateServer {
public static String LOG_PATH = "";         //日志存放目录
    private static SimpleDateFormat dtformat;
   

public static void main(String[] args) throws Exception{
McTranslateServer mcts = new McTranslateServer();
mcts.go();
}
public void log(String s)throws Exception{
dtformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(dtformat.format(new Date()) + " " + s);
}
    public void loadConfig() throws Exception{
      InputStream is = getClass().getResourceAsStream("mcts.properties");
      Properties pp = new Properties();
      pp.load(is);
      LOG_PATH = pp.getProperty("log_path","");
   }

public void go() throws Exception{
log("config file loading......");
loadConfig();
McTranslateTimerTask t = new McTranslateTimerTask(0);
t.start(1,1200000);
McTranslateTimerTask t2 = new McTranslateTimerTask(1);
t2.start(300000,1200000);

log("DataTranslateServer start......");
}
}
类McTranslateTimerTask.java

public classMcTranslateTimerTask {
private DataAccess da = null;
private TransactionContext dbtxt = null;

    //for log
    private SimpleDateFormat dtFormater;   //日期时间格式化
    private PrintStream logger = null ;    //日志输出
    private String logDate = "";         //当前日志的记录日期,当前日期字符串
                                           //不等于此值则要重建logger和logDate
    private Base base = new Base();
    private Timer timer;
    private SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
   
    private int step = 0;

    public McTranslateTimerTask(int i){
    timer = new Timer();   
dbtxt = new TransactionContext("wahaha",true);
da = new DataAccess();
da._TransactionContxt = dbtxt;
step = i;

   
    }
    public void start(int delay,int period){
    timer.scheduleAtFixedRate(ttask,delay,period);   
    }
    /************************************************************
   *
   *run 主方法,
   *
   ************************************************************/
    private TimerTask ttask = new TimerTask(){
    public void run(){
    //因为是两个db,conn单个控制free
   
            try{
            //取配置表数据
            ArrayList properties = null;
            String sql = "select * from cache_data_propertits_1 where status = '1' and proc_status='1' and convert(int,id) % 2 ="+step;
            ResultSet rs = null;
            try{
            rs = da.doQuery(sql);
            properties= da.getArrByRs4(rs);
            //log("info:...读取配置文件");
            if (properties.size()>0){
                  sql = " update cache_data_propertits_1 set proc_status='0' where status='1' and proc_status='1' and convert(int,id) % 2 ="+step;
                da.doUpdate(sql);
                  dbtxt.commit();
                //log("info:...锁定配置文件>proc_status='0'");
            }
                }catch(Exception e){
                  dbtxt.rollback();
                  log("error:...锁定配置文件出错 >" + e.toString());
            throw new Exception(e.toString());
            }finally{
            dbtxt.freeConnection();
            }
            
            
            Date now = new java.util.Date();
            for(int k=0;k<properties.size();k++){
            try{
                Map mm =(HashMap)properties.get(k);
            String id = String.valueOf(mm.get("id"));
            String mc_server = (String)mm.get("mc_server");
            String object_key = (String)mm.get("object_key");         
            String object_key_suffix = (String)mm.get("object_key_suffix");
            String proc_name = (String)mm.get("proc_name");
            String proc_parm = (String)mm.get("proc_parm");
            String proc_control_type = (String)mm.get("proc_control_type");
            String proc_control_parm = (String)mm.get("proc_control_parm");
            String proc_flag = (String)mm.get("proc_flag");
            String last_proc_time = (String)mm.get("last_proc_time");
            if (last_proc_time.equals(""))
            {last_proc_time="1900-1-1 00:00:00";}
            Date last_proc_time_d = df.parse(last_proc_time);
            boolean run_flag = false;
            if (proc_flag.equals("1")){
            run_flag = true;
            }else if (proc_control_type.equals("定时")){
            String[] proc_control_parms = proc_control_parm.split(",");
            for(int i=0;i<proc_control_parms.length;i++){
            proc_control_parms=df2.format(new java.util.Date())+" "+ proc_control_parms;
                if (last_proc_time_d.before(df.parse(proc_control_parms)) && now.after(df.parse(proc_control_parms))){
                run_flag = true;
                break;
                }
            }
            }else if (proc_control_type.equals("间隔")){
            if ((now.getTime() - last_proc_time_d.getTime())/(60*60*1000) >Integer.parseInt(proc_control_parm)){
            run_flag = true;
             }
            
            }
            if (run_flag){
            //log("info:...开始>id:"+ id+";object_key:"+object_key);
try {
databaseToMc(mc_server,object_key,object_key_suffix,proc_name,proc_parm);
log("info:...复制>id:"+ id+";object_key:"+object_key+";参数:"+proc_parm+"\r\n");
} catch (Throwable e) {
log("error:...出错>id:"+ id+";object_key:"+object_key+";参数:"+proc_parm +";错误信息:"+ e.toString());
throw new Exception(e.toString());
}finally{
            dbtxt.freeConnection();
            }
            }
                //更新配置表中proc_mark = '1' 和 syn_date = getdate()
                try{
                if (run_flag){
                  sql = " update cache_data_propertits_1set proc_status = '1' ,last_proc_time='"+df.format(new java.util.Date())+"',proc_flag='0' where id= "+id;                  
                }else{
                sql = " update cache_data_propertits_1set proc_status = '1' ,proc_flag='0' where id= "+id;                  
                }
                //sql = " update cache_data_propertits_1set proc_status = '1' ,last_proc_time='"+df.format(new java.util.Date())+"' where id= "+id;                     
               da.doUpdate(sql);
                           dbtxt.commit();         
                           //log("info:...恢复标记>id:"+ id+";object_key:"+object_key+";参数:"+proc_parm+"\r\n");
                }catch(Exception e){               
               dbtxt.rollback();               
               log("error:...恢复标记出错>id:"+ id+";object_key:"+object_key +";参数:"+proc_parm+";错误信息:"+ e.toString());
               throw new Exception();
                }finally{
                dbtxt.freeConnection();
                }
            }catch(Exception e){
            throw new Exception(e.toString());
            }//--一个进程结尾
            }
               
            }catch(Exception e){
            try{
                  //出错时恢复标记。
                  try{
                        String sql = " update cache_data_propertits_1 set proc_status='1' "+
                              " where status = '1' and proc_status='0' and convert(int,id) % 2 ="+step;
                        da.doUpdate(sql);
                        dbtxt.commit();
                        //log("info:...恢复标记此次接口执行出错,将未执行的过程标志全重置为1\r\n");
                  }catch(Exception ex){
                        dbtxt.rollback();
                        log("error: ...恢复标记出错>错误信息:"+ e.toString());
                  }finally{
            dbtxt.freeConnection();
            }
                  
            base.sendMail("","dataTrans@wahaha.com.cn","zhoucw@wahaha.com.cn;","Data Trans Error!",e.toString());
            }catch(Exception ee){};//最后统一扔出异常
            }
            finally{
      try {
dbtxt.freeConnection();
    } catch (Exception e) {
}
            }
    }
   
    };
    private void log(String s) {
      Map obj = new HashMap();
      obj.put("logger",logger);
      obj.put("dtf",dtFormater);
      logDate = base.getLogFile(McTranslateServer.LOG_PATH,obj,logDate,"dts");
      logger = (PrintStream)obj.get("logger");
      dtFormater = (SimpleDateFormat)obj.get("dtf");
      logger.println(dtFormater.format(new java.util.Date()) + " ---" + s);
    }




    public void databaseToMc(String mc_server,String object_key,String object_key_suffix,String proc_name,String proc_parm) throws Throwable{
    //从database收集数据
    proc_name = " {call "+proc_name+"}";
    CallableStatement cs = da.callProc(proc_name);
    if (!proc_parm.equals("")){
    String[] parms = proc_parm.split(",");
    for (int i=0;i<parms.length;i++){
    cs.setString(i+1, da.toDB(parms));
    }
    }
   
    ResultSet rs = cs.executeQuery();
   
    ResultSetMetaData rsmd = rs.getMetaData();
    int numberofcolumns = rsmd.getColumnCount();
    int i;
    String[] columntype = new String;
    columntype="";
      for(i=1; i<= numberofcolumns; i++){
      columntype=rsmd.getColumnTypeName(i).toLowerCase();
      }
      String[] columnname = new String;
      columnname="";      
      for(i=1; i<= numberofcolumns; i++){
      columnname=rsmd.getColumnLabel(i);
      
      }

    //将数据放入Mc
    IMemcachedCache cache0 = McClient.getMemcachedCache(mc_server);
   
    String objectKey ="";
    String temp_objectKey ="";
    ArrayList arr= new ArrayList();
    Thread.sleep(1000);
   while(rs.next()){   
          Map mm = new HashMap();
      for(i =1;i<= numberofcolumns; i++)
   
         {
            
               if ((columntype.equals("binary"))||(columntype.equals("varbinary")))
               mm.put(columnname,rs.getBinaryStream(i) == null ? "&nbsp;":rs.getBinaryStream(i));
               else if (columntype.equals("blob"))
               mm.put(columnname,rs.getBlob(i));
               else if ((columntype.equals("char"))||(columntype.equals("varchar"))||(columntype.equals("longvarchar"))||(columntype.equals("text")))
               {      
                   String s = rs.getString(i);
                   String r = "";
                   if( s != null && !( s.trim()).equals("") )
                        r = new String((s.trim()).getBytes("ISO8859_1"),"GBK" ) ;
                   mm.put(columnname,r);
               }
               else if ((columntype.equals("date")))
               {
               mm.put(columnname,rs.getDate(i) == null ? "":rs.getDate(i));                  
               }else if (columntype.equals("datetime")||columntype.equals("smalldatetime")){
               mm.put(columnname,rs.getString(i) == null ? "":rs.getString(i));
               }
               else if ((columntype.equals("decimal"))||(columntype.equals("numeric"))||(columntype.equals("real")))
               {
                   BigDecimal bd = rs.getBigDecimal(i);
                   if(bd==null){
                  bd=new BigDecimal(0.0);
                  mm.put(columnname,bd);
                   }
                   else{
                   bd = bd.setScale(2,BigDecimal.ROUND_HALF_UP);
                   if(bd.floatValue()==bd.intValue()){
                   bd = bd.setScale(0,BigDecimal.ROUND_HALF_UP);
                   }
                   mm.put(columnname,bd);
                   }
               }
               elseif((columntype.equals("float"))||(columntype.equals("money")))
               {                  
                  Float b = new Float(rs.getFloat(i));
                  mm.put(columnname,b.equals(new Float(0)) ? "":b);
               }
               elseif((columntype.equals("integer"))||(columntype.equals("smallint"))||(columntype.equals("int"))||(columntype.equals("tinyint")))
               {
                     
                     Integer c = new Integer(rs.getInt(i));
                     mm.put(columnname,c.equals(new Integer(0)) ? "":c);
               }
               elseif (columntype.equals("null"))
               mm.put(columnname,"");
               else   mm.put("error","无法识别的类型");            
               
         }
   
    objectKey =object_key + (String)mm.get(object_key_suffix);
    if (temp_objectKey.equals("")){
    temp_objectKey = objectKey;
    }
    if (!temp_objectKey.equals(objectKey)){
   
    cache0.put(temp_objectKey, arr,24*60*60);   
    for(int k = 0;k<arr.size();k++){
    Map mm2=(HashMap)arr.get(k) ;
    mm2 = null;
   
    }
    System.gc();
    arr.clear();
   
    }
    temp_objectKey = objectKey;
    arr.add(mm);
   
      
    }
   if (arr!=null){
   cache0.put(temp_objectKey, arr,24*60*60);
   for(int k = 0;k<arr.size();k++){
    Map mm2=(HashMap)arr.get(k) ;
    mm2 = null;
    }   
    arr=null;
    System.gc();
   }
   
   columntype =null;
   columnname =null;
   System.gc();
   
    }

}
类 McClient.java
public class McClient {
private ICacheManager<IMemcachedCache> manager;
private Map<String,IMemcachedCache>map_cache ;
privatestatic McClient mcInstance;
public static synchronized IMemcachedCache getMemcachedCache(String client_name) {
if ( mcInstance == null) {
mcInstance = new McClient();
}
if (mcInstance.map_cache.get(client_name) == null){
mcInstance.map_cache.put(client_name,mcInstance.manager.getCache(client_name));
}
return mcInstance.map_cache.get(client_name);
}
private McClient() {
manager = CacheUtil.getCacheManager(IMemcachedCache.class,
MemcachedCacheManager.class.getName());
manager.setConfigFile("memcached.xml");
manager.start();
map_cache = new HashMap<String,IMemcachedCache>();

}
public static void main(String[] args) {
IMemcachedCache cache0 = McClient.getMemcachedCache("mclient0");
//cache0.put("key3", "皇后2");
ArrayList arr = (ArrayList)cache0.get("CachePropertitsSaleMaterials");
for (int i=0;i<arr.size();i++){

Map mm = (Map)arr.get(i);
System.out.println(mm);
System.out.println(mm.get("mc_server"));
}

}
}

2 客户端程序:
a.代理方式的:
类DataAccess.java (代理的实现)
    /*
    * mc_key:mc中的主键
    * fuc_name:dao中的函数
    * fuc_arg:dao中函数参
    * mc代理函数
    */
    public ArrayList proxyMc(String object_key,String object_key_suffix,Object c ,String fuc_name,Object[] fuc_arg) throws Exception, Exception{
    //从mc中取主键所在的服务器地址
    ArrayList result = null;
    String mc_key = object_key + object_key_suffix;
      IMemcachedCache cache0 = McClient.getMemcachedCache("mclient0");
      ArrayList arr_properits = (ArrayList)cache0.get("CachePropertits"+object_key);
      if (arr_properits !=null){
         Map m= (Map)arr_properits.get(arr_properits.size()-1);
         String mc_server = (String)m.get("mc_server");
         cache0 = McClient.getMemcachedCache(mc_server);
         result = (ArrayList)cache0.get(mc_key);
      }
      //如果mc中没有数据,从数据库中取
      if (result == null){
      Class[] parameterTypes =null;
      if (fuc_arg!=null)
          parameterTypes= new Class;
      for(int i = 0;i<fuc_arg.length;i++){
      parameterTypes = String.class;
      }
      Method meth = c.getClass().getMethod(fuc_name, parameterTypes);
      result = (ArrayList)meth.invoke(c, fuc_arg);   
      cache0.put(mc_key, result);
      }
    return result;
    }
   
   类Customer.java (业务程序的调用方法)
      public ArrayList getGiftPolicies(TransactionContext dbCtxt)
      throws Exception{
      CustomerDAO scdao = new CustomerDAO(dbCtxt);
      String[] arg =new String;
arg = use_dept;
arg = userNo;
return scdao.proxyMc("SaleGiftPolicies",userNo, scdao, "getGiftPolicies", arg);
}











b.AOP方式的:
需要jar包:


Aop类:
package cn.com.wahaha.common;

import org.codehaus.aspectwerkz.joinpoint.JoinPoint;

public class AOPlog {
private int indent = -1;
public Object trace(JoinPoint jp,String username,String password) throws Throwable {
      indent++;
      System.out.println("***** apo开始");
      System.out.println(username+":"+password);;
      //System.out.println("--> " + jp.toString());
      Object result = jp.proceed(); // will call the next advice or target method, field access, constructor etc
      System.out.println("<--");
      indent--;
      return result;
    }

}
package cn.com.wahaha.common;

import java.util.ArrayList;
import java.util.Map;

import org.codehaus.aspectwerkz.joinpoint.JoinPoint;

import com.alisoft.xplatform.asf.cache.IMemcachedCache;

import cn.com.wahaha.session.McClient;

public class AOPMC {
private int indent = -1;
public Object cache01(JoinPoint jp,String object_key,String object_key_suffix) throws Throwable {
      indent++;
      System.out.println("***** apo mc开始");
    //从mc中取主键所在的服务器地址
    ArrayList result = null;
    String mc_key = object_key + object_key_suffix;
      IMemcachedCache cache0 = McClient.getMemcachedCache("mclient0");
      ArrayList arr_properits = (ArrayList)cache0.get("CachePropertits"+object_key);
      if (arr_properits !=null){
         Map m= (Map)arr_properits.get(arr_properits.size()-1);
         String mc_server = (String)m.get("mc_server");
         cache0 = McClient.getMemcachedCache(mc_server);
         result = (ArrayList)cache0.get(mc_key);
      }      
      if (result!= null){
      return result;
      }else{
      result = (ArrayList)jp.proceed();
      cache0.put(mc_key, result);
      }                           
      indent--;
      return result;
    }

}
Aop配置文件:
<!DOCTYPE aspectwerkz PUBLIC
    "-//AspectWerkz//DTD//EN"
    "http://aspectwerkz.codehaus.org/dtd/aspectwerkz.dtd">
<aspectwerkz>
    <system id="webapp">
      <package name="cn.com.wahaha.common">
      <aspect class="AOPlog">
    <pointcut name="valid(String username,String password)" expression="execution(* cn.com.wahaha.ep.dao.EPUserDAO.validateUser(..)) AND args(username,password)"/>   
            <advice name="trace(JoinPoint jp,String username,String password)" type="around" bind-to="valid(username,password)"/>      
      </aspect>
      <aspect class="AOPMC">
    <pointcut name="cache(String object_key,String object_key_suffix)" expression="execution(* cn.com.wahaha.ep.dao.CustomerDAO.*Mc(..)) AND args(object_key,object_key_suffix)"/>   
            <advice name="cache01(JoinPoint jp,String object_key,String object_key_suffix)" type="around" bind-to="cache( object_key,object_key_suffix)"/>      
      </aspect>
      </package>
    </system>
</aspectwerkz>

Aop离线编译方法:
aspectwerkz -offline WEB-INF/aop.xml WEB-INF/classes -cp WEB-INF/lib/*.jar
Aop在线编译方法:
更改 setclasspath.bat
定位: set _RUNJAVA="%JAVA_HOME%\bin\java"
替换为:@REM ** Begin of AspectWerkz configuration **
@REM ** Adapt ASPECTWERKZ_HOME **
set ASPECTWERKZ_HOME=D:\aw\aspectwerkz-2.0
set _RUNJAVA=%ASPECTWERKZ_HOME%\bin\aspectwerkz.bat
@REM ** we can add some more AspectWerkz option this way:
@REM jit.off=true due to an open issue in beta1 version - this is just an optimization option
set JAVA_OPTS=%JAVA_OPTS% -Daspectwerkz.jit.off=true
@REM ** End of AspectWerkz configuration **
页: [1]
查看完整版本: memcache数据缓存系统