基于EHcache實現高并發數據緩存池

jopen 11年前發布 | 91K 次閱讀 Ehcache 緩存組件

在高并發的場景里面經常會使用到localcache內容,但是一直沒有一個很好的內存管理工具。在開發的時候發現了ehcache,這么一個開源的工具。唯一的缺點就是無法對于多塊數據單元進行一個有效的管理,并且在數據過期的時候無法提供有效的更新機制,所以這里寫了一個數據緩存池來滿足這個需求。

下面是設計組織結構:

153022_tuje_917881.jpg

這里主要是在數據實體內部封裝了數據更新器,這樣在數據過期的時候可以調用更新器的方法。

1. Ehcache數據緩沖的具體代碼:(主要是get方法內部進行數據更新,使用對象鎖的方式來進行數據過期的并發控制,缺點是可能在非常高的并發里面會出現數據阻塞的現象,但是因為這里大部分都是內存的運算操作,所以相對來說阻塞的效果還好)

package com.tmall.lafite.core.manager.localcache;

import java.util.List;

import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;
import net.sf.ehcache.concurrent.LockType;
import net.sf.ehcache.concurrent.ReadWriteLockSync;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.store.MemoryStoreEvictionPolicy;

import com.tmall.lafite.core.LafiteResult;
import com.tmall.lafite.core.ResultCode;

/**
 * cache數據實體
 * @author wangxiao
 *
 */
public class LafiteCache {

    private CacheManager cacheManager = null;
    private Cache cacheImpl = null;

    ReadWriteLockSync rwLock = new ReadWriteLockSync();

    private int capability = 30;
    private long expireTime = 30;

    public static final int DEFAULT_CAPABILITY = 30;
    public static final int DEFAULT_EXPIRETIME = 30;

    private String cacheName = "Tair Local Cache";

    public LafiteCache(String id, int capability, long expireTimeMS) {
        this.cacheName = id;
        this.capability = capability;
        this.expireTime = expireTimeMS;
    }

    public void setExpireTime(long expireTimeMS) {
        this.expireTime = expireTimeMS;
    }

    public void setCapacity(int cap) {
        cacheImpl.getCacheConfiguration().setMaxEntriesLocalHeap(cap);
    }

    public long getExpireTime() {
        return expireTime;
    }

    @SuppressWarnings("deprecation")
    public void initialize() {
        CacheConfiguration cacheConfiguration = new CacheConfiguration();
        cacheConfiguration.setDiskPersistent(false);

        cacheConfiguration.name(cacheName)
                          .maxEntriesLocalHeap(capability)
                          .diskPersistent(false)
                          .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU);
                         // .timeToLiveSeconds(expireTime);
        //cacheConfiguration.transactionalMode("LOCAL");
        //Configuration config = new Configuration().name(cacheName).cache(cacheConfiguration);

        cacheManager = CacheManager.create();

        cacheImpl =  new Cache(cacheConfiguration);
        cacheManager.addCache(cacheImpl);
        //cache = MemoryStore.create(cacheImpl, new UnboundedPool());

    }

    public int size() {
        return (int) cacheImpl.getSize();
    }

    public void destroy() {
        cacheImpl.dispose();
        //cacheManager.removeCache(cacheName);
        cacheManager.shutdown();
    }

    public void clear() {
        rwLock.lock(LockType.WRITE);
        try {
            cacheImpl.removeAll();
        } finally {
            rwLock.unlock(LockType.WRITE);
        }
    }

    public void del(Object key) {
        cacheImpl.remove(key);
        return;
    }

    public void put(Object key, Object value) { 
        cacheImpl.put(new Element(key, value));
        return ;
    }

    public LafiteResult get(Object key) {
        LafiteResult lafiteResult = new LafiteResult(); 

        Element element = cacheImpl.get(key);
        if (element == null) {
            lafiteResult.setError(ResultCode.Error.Cache.NO_DATA);
            return lafiteResult;
        }
        long now = System.currentTimeMillis();

        long pastTime = now - element.getLastUpdateTime();
        if (pastTime >= expireTime) {
            // double check
            synchronized (element) {
                pastTime = now - element.getLastUpdateTime();
                if (pastTime >= expireTime) {
                    // expired, update entry
                    element.updateUpdateStatistics();
                    lafiteResult.setError(ResultCode.Error.Cache.DATA_OVERDUE);
                }
            }
        }
        // element object value never null;
        lafiteResult.setDefaultModel(element.getObjectValue());
        return lafiteResult;
    }

    @SuppressWarnings("unchecked")
    public List<Object> getKeys() {
        List<Object> keys = cacheImpl.getKeys();
        return keys;
    }
}

2. 數據邏輯單元定義(這里封裝了數據容器單元,把原來的數據池方法傳入,在數據更新的時候使用namespace來獲取內存實體,進而來獲取數據。)

(注:這里目前還沒有想好是否把數據的初始化放在容器當中,這里暫時不放入。只是在容器里面進行初始化方法的調用,真正的數據設置方式由邏輯單元獲取數據池進行自身的put)

package com.tmall.lafite.core.manager.localcache.util;

import org.springframework.beans.factory.annotation.Autowired;

import com.tmall.lafite.core.LafiteResult;
import com.tmall.lafite.core.manager.localcache.LafiteContainer;

/**
 * 緩存邏輯單元
 * @author wangxiao
 *
 */
public abstract class LogicCenter {
    @Autowired
    private LafiteContainer lafiteContainer;

    /**
     * 初始化方法
     * @return
     */
    public abstract Object initialize();

    /**
     * 回調函數
     * @param lafiteCache
     * @param key
     * @return
     */
    public abstract Object callBack(String namespace, Object key, LafiteResult lafiteResult);

    public LafiteContainer getLafiteContainer() {
        return lafiteContainer;
    }
}
3. 數據池(數據池,使用init來循環調用內存實體里的邏輯單元進行數據的初始化)
package com.tmall.lafite.core.manager.localcache;


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.tmall.lafite.core.LafiteResult;
import com.tmall.lafite.core.ResultCode;
import com.tmall.lafite.core.manager.localcache.entity.CacheEntity;
import com.tmall.lafite.core.manager.localcache.util.LogicCenter;

/**
 * cache容器
 * @author wangxiao
 *
 */
public class LafiteContainer {

    protected final Logger logger = LoggerFactory.getLogger(LafiteContainer.class);

    private Map<String, CacheEntity> cacheMap = new ConcurrentHashMap<String, CacheEntity>();

    /**
     * 注冊緩存對象
     * @param namespace
     * @param key
     * @param lafiteCache 緩存對象
     * @param logicCenter 邏輯對象 (包含:初始化方法和callback方法)
     * @return
     */
//  public String register(String namespace, String key, LafiteCache lafiteCache, LogicCenter logicCenter) {
//      if(namespace != null && StringUtils.isEmpty(key) && lafiteCache != null) {
//          if(cacheMap.containsKey(namespace)) {
//              return ResultCode.Error.Cache.NAMESPACE_REPETITION;
//          }
//          
//          CacheEntity cacheEntity = new CacheEntity(lafiteCache, logicCenter);
//          try {
//              cacheEntity.initialize();
//          } catch (Exception e) {
//              logger.error("LafiteContainer.register ", e);
//          }
//          cacheMap.put(namespace, cacheEntity);
//          return null;
//      } 
//      return ResultCode.Error.Cache.COMMON_PARAM_LOST;
//  }

    /**
     * 獲取cache內的數據
     * @param namespace
     * @param key
     * @return
     */
    public LafiteResult get(String namespace, Object key) {
        LafiteResult lafiteResult = new LafiteResult();
        CacheEntity cacheEntity = cacheMap.get(namespace);//獲取緩存實體
        if(cacheEntity == null) {
            lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY);
        } else {
            LafiteCache lafiteCache = cacheEntity.getLafiteCache();
            LafiteResult result = lafiteCache.get(key);
            if(ResultCode.Error.Cache.NO_DATA.equals(result.getError()) 
                    || ResultCode.Error.Cache.DATA_OVERDUE.equals(result.getError())) {
                cacheEntity.getLogicCenter().callBack(namespace, key, result);//數據過期觸發callback事件
            }
            lafiteResult = result;
        }

        return lafiteResult;
    }

    /**
     * 獲取指定命名空間的全部數據
     *  這里采用的是逐條遍歷的方式
     * @param namespace
     * @return
     */
    public LafiteResult getAll(String namespace) {
        LafiteResult lafiteResult = new LafiteResult();
        List<Object> objects = new ArrayList<Object>();

        CacheEntity cacheEntity = cacheMap.get(namespace);
        if(cacheEntity == null) {
            lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY);
        } else {
            LafiteCache lafiteCache = cacheEntity.getLafiteCache();
            List<Object> keys = lafiteCache.getKeys();
            if(keys.isEmpty()) {
                lafiteResult.setError(ResultCode.Error.Cache.NO_DATA);
            } else {
                for(Object key : keys) {
                    LafiteResult lr = get(namespace, key);
                    objects.add(lr.getDefaultModel());
                }
            }
        }
        lafiteResult.setDefaultModel(objects);
        return lafiteResult;
    }

    /**
     * 設置數據對象內容
     * @param namespace
     * @param key
     * @param value
     * @return
     */
    public LafiteResult put(String namespace, Object key, Object value) {
        LafiteResult lafiteResult = new LafiteResult();
        CacheEntity cacheEntity = cacheMap.get(namespace);
        if(cacheEntity == null) {
            lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY);
        } else {
            LafiteCache lafiteCache = cacheEntity.getLafiteCache();
            lafiteCache.put(key, value);
        }
        return lafiteResult;
    }

    public void setCacheMap(Map<String, CacheEntity> cacheMap) {
        this.cacheMap = cacheMap;
    }

    public Map<String, CacheEntity> getCacheMap() {
        return cacheMap;
    }

    public void initialize() {

        new Thread(new Runnable() {
            @Override
            public void run() {
                for(String key : cacheMap.keySet()) {
                    try {
                        CacheEntity cacheEntity = cacheMap.get(key);
                        if(cacheEntity != null) {
                            LogicCenter logicCenter = cacheEntity.getLogicCenter();
                            if(logicCenter != null) {
                                logicCenter.initialize();
                            }
                        }
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                }

            }
        }).start();

    }
}
4. spring初始化方式
<!-- 權限角色緩存
 <bean id="permitRoleCache" class="com.tmall.lafite.core.manager.localcache.LafiteCache" init-method="initialize">
 <constructor-arg value="_permit_role_cache_"/>
 <constructor-arg value="100"/>
 <constructor-arg value="2000"/>
 </bean>
 <bean id="permitRoleLogicCenter" class="com.tmall.lafite.core.manager.permit.cache.PermitRoleLogicCenter"/>
 <bean id="permitRoleCacheEntity" class="com.tmall.lafite.core.manager.localcache.entity.CacheEntity">
 <property name="logicCenter" ref="permitRoleLogicCenter" />
 <property name="lafiteCache" ref="permitRoleCache" />
 </bean>
 -->
  
 <!-- 緩存容器 -->
 <bean id="lafiteContainer" class="com.tmall.lafite.core.manager.localcache.LafiteContainer" init-method="initialize">
 <!-- 
 <property name="cacheMap">
      <map>
          <entry key="PermitCommon" value-ref="permitCommonCacheEntity" />
          <entry key="PermitAlgorithm" value-ref="permitAlgorithmCacheEntity"/>
          <entry key="PermitRole" value-ref="permitRoleCacheEntity"/>
      </map>
    </property> 
    -->
 </bean>

5. 使用示例

@Autowired
    private LafiteContainer lafiteContainer;

    private String namespace = LafiteNameSpace.PermitCommon;

    @SuppressWarnings("unchecked")
    @Transactional
    public List<PermitCommonDO> getPermitDOCache() {
        LafiteResult lafiteResult = lafiteContainer.getAll(namespace);
        if(lafiteResult.getDefaultModel() != null) {
            return (List<PermitCommonDO>) lafiteResult.getDefaultModel();
        }
        return null;
    }
 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!