自動加載緩存框架

jopen 11年前發布 | 28K 次閱讀 緩存 緩存組件

自動加載緩存框架


代碼
現在使用的緩存技術很多,比如RedisMemcacheEhCache等,甚至還有使用ConcurrentHashMapHashTable 來實現緩存。但在緩存的使用上,每個人都有自己的實現方式,大部分是直接與業務代碼綁定,隨著業務的變化,要更換緩存方案時,非常麻煩。接下來我們就使用AOP + Annotation 來解決這個問題,同時使用自動加載機制 來實現數據“常駐內存”。

Spring AOP這幾年非常熱門,使用也越來越多,但個人建議AOP只用于處理一些輔助的功能(比如:接下來我們要說的緩存),而不能把業務邏輯使用AOP中實現,尤其是在需要“事務”的環境中。

如下圖所示:
Alt 緩存框架

AOP攔截到請求后:

  1. 根據請求參數生成Key,后面我們會對生成Key的規則,進一步說明;
  2. 如果是AutoLoad的,則請求相關參數,封裝到AutoLoadTO中,并放到AutoLoadHandler中。
  3. 根據Key去緩存服務器中取數據,如果取到數據,則返回數據,如果沒有取到數據,則執行DAO中的方法,獲取數據,同時將數據放到緩存中。如 果是AutoLoad的,則把最后加載時間,更新到AutoLoadTO中,最后返回數據;如是AutoLoad的請求,每次請求時,都會更新 AutoLoadTO中的 最后請求時間。
  4. 為了減少并發,增加等待機制:如果多個用戶同時取一個數據,那么先讓第一個用戶去DAO取數據,其它用戶則等待其返回后,去緩存中獲取,嘗試一定次數后,如果還沒獲取到,再去DAO中取數據。

AutoLoadHandler(自動加載處理器)主要做的事情:當緩存即將過期時,去執行DAO的方法,獲取數據,并將數據放到緩存中。為了防止 自動加載隊列過大,設置了容量限制;同時會將超過一定時間沒有用戶請求的也會從自動加載隊列中移除,把服務器資源釋放出來,給真正需要的請求。

使用自加載的目的:

  1. 避免在請求高峰時,因為緩存失效,而造成數據庫壓力無法承受;
  2. 把一些耗時業務得以實現。
  3. 把一些使用非常頻繁的數據,使用自動加載,因為這樣的數據緩存失效時,最容易造成服務器的壓力過大。

分布式自動加載

如果將應用部署在多臺服務器上,理論上可以認為自動加載隊列是由這幾臺服務器共同完成自動加載任務。比如應用部署在A,B兩臺服務器上,A服務器自 動加載了數據D,(因為兩臺服務器的自動加載隊列是獨立的,所以加載的順序也是一樣的),接著有用戶從B服務器請求數據D,這時會把數據D的最后加載時間 更新給B服務器,這樣B服務器就不會重復加載數據D。

使用方法

1. 實現com.jarvis.cache.CacheGeterSeter

下面舉個使用Redis做緩存服務器的例子:

package com.jarvis.example.cache;
import ... ...
/**
 * 緩存切面,用于攔截數據并調用Redis進行緩存操作
 */
@Aspect
public class CachePointCut implements CacheGeterSeter<Serializable> {

    private static final Logger logger=Logger.getLogger(CachePointCut.class);

    private AutoLoadHandler<Serializable> autoLoadHandler;

    private static List<RedisTemplate<String, Serializable>> redisTemplateList;

    public CachePointCut() {
        autoLoadHandler=new AutoLoadHandler<Serializable>(10, this, 20000);
    }

    @Pointcut(value="execution(public !void com.jarvis.example.dao..*.*(..)) && @annotation(cahce)", argNames="cahce")
    public void daoCachePointcut(Cache cahce) {
        logger.info("----------------------init daoCachePointcut()--------------------");
    }

    @Around(value="daoCachePointcut(cahce)", argNames="pjp, cahce")
    public Object controllerPointCut(ProceedingJoinPoint pjp, Cache cahce) throws Exception {
        return CacheUtil.proceed(pjp, cahce, autoLoadHandler, this);
    }

    public static RedisTemplate<String, Serializable> getRedisTemplate(String key) {
        if(null == redisTemplateList || redisTemplateList.isEmpty()) {
            return null;
        }
        int hash=Math.abs(key.hashCode());
        Integer clientKey=hash % redisTemplateList.size();
        RedisTemplate<String, Serializable> redisTemplate=redisTemplateList.get(clientKey);
        return redisTemplate;
    }

    @Override
    public void setCache(final String cacheKey, final CacheWrapper<Serializable> result, final int expire) {
        try {
            final RedisTemplate<String, Serializable> redisTemplate=getRedisTemplate(cacheKey);
            redisTemplate.execute(new RedisCallback<Object>() {

                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    byte[] key=redisTemplate.getStringSerializer().serialize(cacheKey);
                    JdkSerializationRedisSerializer serializer=(JdkSerializationRedisSerializer)redisTemplate.getValueSerializer();
                    byte[] val=serializer.serialize(result);
                    connection.set(key, val);
                    connection.expire(key, expire);
                    return null;
                }
            });
        } catch(Exception ex) {
            logger.error(ex.getMessage(), ex);
        }
    }

    @Override
    public CacheWrapper<Serializable> get(final String cacheKey) {
        CacheWrapper<Serializable> res=null;
        try {
            final RedisTemplate<String, Serializable> redisTemplate=getRedisTemplate(cacheKey);
            res=redisTemplate.execute(new RedisCallback<CacheWrapper<Serializable>>() {

                @Override
                public CacheWrapper<Serializable> doInRedis(RedisConnection connection) throws DataAccessException {
                    byte[] key=redisTemplate.getStringSerializer().serialize(cacheKey);
                    byte[] value=connection.get(key);
                    if(null != value && value.length > 0) {
                        JdkSerializationRedisSerializer serializer=
                            (JdkSerializationRedisSerializer)redisTemplate.getValueSerializer();
                        @SuppressWarnings("unchecked")
                        CacheWrapper<Serializable> res=(CacheWrapper<Serializable>)serializer.deserialize(value);
                        return res;
                    }
                    return null;
                }
            });
        } catch(Exception ex) {
            logger.error(ex.getMessage(), ex);
        }
        return res;
    }

    /**
     * 刪除緩存
     * @param cs Class
     * @param method
     * @param arguments
     * @param subKeySpEL
     * @param deleteByPrefixKey 是否批量刪除
     */
    public static void delete(@SuppressWarnings("rawtypes") Class cs, String method, Object[] arguments, String subKeySpEL,
        boolean deleteByPrefixKey) {
        try {
            if(deleteByPrefixKey) {
                final String cacheKey=CacheUtil.getCacheKeyPrefix(cs.getName(), method, arguments, subKeySpEL) + "*";
                for(final RedisTemplate<String, Serializable> redisTemplate : redisTemplateList){
                    redisTemplate.execute(new RedisCallback<Object>() {
                        @Override
                        public Object doInRedis(RedisConnection connection) throws DataAccessException {
                            byte[] key=redisTemplate.getStringSerializer().serialize(cacheKey);
                            Set<byte[]> keys=connection.keys(key);
                            if(null != keys && keys.size() > 0) {
                                byte[][] keys2=new byte[keys.size()][];
                                keys.toArray(keys2);
                                connection.del(keys2);
                            }
                            return null;
                        }
                    });
                }

            } else {
                final String cacheKey=CacheUtil.getCahcaheKey(cs.getName(), method, arguments, subKeySpEL);
                final RedisTemplate<String, Serializable> redisTemplate=getRedisTemplate(cacheKey);
                redisTemplate.execute(new RedisCallback<Object>() {

                    @Override
                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
                        byte[] key=redisTemplate.getStringSerializer().serialize(cacheKey);

                        connection.del(key);

                        return null;
                    }
                });
            }
        } catch(Exception ex) {
            logger.error(ex.getMessage(), ex);
        }
    }

    public AutoLoadHandler<Serializable> getAutoLoadHandler() {
        return autoLoadHandler;
    }

    public void destroy() {
        autoLoadHandler.shutdown();
        autoLoadHandler=null;
    }

    public List<RedisTemplate<String, Serializable>> getRedisTemplateList() {
        return redisTemplateList;
    }

    public void setRedisTemplateList(List<RedisTemplate<String, Serializable>> redisTemplateList) {
        CachePointCut.redisTemplateList=redisTemplateList;
    }

}

從上面的代碼可以看出,對緩存的操作,還是由業務系統自己來實現的,我們只是對AOP攔截到的ProceedingJoinPoint,進行做一些處理。

java代碼實現后,接下來要在spring中進行相關的配置:

<aop:aspectj-autoproxy proxy-target-class="true"/>
<bean id="cachePointCut" class="com.jarvis.example.cache.CachePointCut" destroy-method="destroy">
    <property name="redisTemplateList">
        <list>
            <ref bean="redisTemplate1"/>
            <ref bean="redisTemplate2"/>
        </list>
    </property>
</bean>

2. 將需要使用緩存的方法前增加@Cache注解

package com.jarvis.example.dao;
import ... ...
public class UserDAO {
    @Cache(expire=600, autoload=true, requestTimeout=72000)
    public List<UserTO> getUserList(... ...) {
        ... ...
    }
}

緩存Key的生成

生成緩存Key的生成方法:CacheUtil.getCahcaheKey(String className, String method, Object[] arguments, String subKeySpEL)

  • className 類名稱
  • method 方法名稱
  • arguments 參數
  • subKeySpEL SpringEL表達式

生成的Key格式為:{類名稱}.{方法名稱}{.SpringEL表達式運算結果}:{參數值的Hash字符串}

SpringEL表達式的作用

根據業務的需要,將緩存Key進行分組。舉個例子,商品的評論列表:

package com.jarvis.example.dao;
import ... ...
public class GoodsCommentDAO{
    @Cache(expire=600, subKeySpEL="#args[0]", autoload=true, requestTimeout=18000)
    public List<CommentTO> getCommentListByGoodsId(Long goodsId, int pageNo, int pageSize) {
        ... ...
    }
}

如果商品Id為:100,那么生成緩存Key格式為:com.jarvis.example.dao.GoodsCommentDAO.getCommentListByGoodsId.100:xxxx
在Redis中,能精確刪除商品Id為100的評論列表,執行命令即可:
del com.jarvis.example.dao.GoodsCommentDAO.getCommentListByGoodsId.100:*

SpringEL表達式使用起來確實非常方便,如果需要,@Cache中的expire,requestTimeout以及autoload參數都可以用SpringEL表達式來動態設置。但使用起來就變得復雜,所以我們沒有這樣做。

數據實時性

上面商品評論的例子中,如果用戶發表了評論,要立即顯示該如何來處理?

比較簡單的方法就是,在發表評論成功后,立即把緩存中的數據也清除,這樣就可以了。

package com.jarvis.example.dao;
import ... ...
public class GoodsCommentDAO{
    @Cache(expire=600, subKeySpEL="#args[0]", autoload=true, requestTimeout=18000)
    public List<CommentTO> getCommentListByGoodsId(Long goodsId, int pageNo, int pageSize) {
        ... ...
    }
    public void addComment(Long goodsId, String comment) {
        ... ...// 省略添加評論代碼
        deleteCache(goodsId);
    }
    private void deleteCache(Long goodsId) {
        Object arguments[]=new Object[]{goodsId};
        CachePointCut.delete(this.getClass(), "getCommentListByGoodsId", arguments, "#args[0]", true);
    }
}

@Cache

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Cache {

    /**
     * 緩存的過期時間,單位:秒
     */
    int expire();

    /**
     * 是否啟用自動加載緩存
     * @return
     */
    boolean autoload() default false;

    /**
     * 當autoload為true時,緩存數據在 requestTimeout 秒之內沒有使用了,就不進行自動加載數據,如果requestTimeout為0時,會一直自動加載
     * @return
     */
    long requestTimeout() default 36000L;

    /**
     * 使用SpEL,將緩存key,根據業務需要進行二次分組
     * @return
     */
    String subKeySpEL() default "";
}

注意事項

1. 當@Cache中 autoload 設置為 ture 時,對應方法的參數必須都是Serializable的。

AutoLoadHandler中需要緩存通過深度復制后的參數。

2. 參數中只設置必要的屬性值,在DAO中用不到的屬性值盡量不要設置,這樣能避免生成不同的緩存Key,降低緩存的使用率。

例如:

public CollectionTO<AccountTO> getAccountByCriteria(AccountCriteriaTO criteria)  {
        List<AccountTO> list=null;
        PaginationTO paging=criteria.getPaging();
        if(null != paging && paging.getPageNo() > 0 && paging.getPageSize() > 0) {// 如果需要分頁查詢,先查詢總數
            criteria.setPaging(null);// 減少緩存KEY的變化,在查詢記錄總數據時,不用設置分頁相關的屬性值
            Integer recordCnt=accountDAO.getAccountCntByCriteria(criteria);
            if(recordCnt > 0) {
                criteria.setPaging(paging);
                paging.setRecordCnt(recordCnt);
                list=accountDAO.getAccountByCriteria(criteria);
            }
            return new CollectionTO<AccountTO>(list, recordCnt, criteria.getPaging().getPageSize());
        } else {
            list=accountDAO.getAccountByCriteria(criteria);
            return new CollectionTO<AccountTO>(list, null != list ? list.size() : 0, 0);
        }
    }

3. 注意AOP失效的情況;

例如:

TempDAO {

        public Object a() {
            return b().get(0);
        }

        @Cache(expire=600)
        public List<Object> b(){
            return ... ...;
        }
    }

通過 new TempDAO().a() 調用b方法時,AOP失效,也無法進行緩存相關操作。

4. 自動加載緩存時,不能在緩存方法內疊加查詢參數值;

例如:

@Cache(expire=600, autoload=true)
    public List<AccountTO> getDistinctAccountByPlayerGet(AccountCriteriaTO criteria) {
        List<AccountTO> list;
        int count=criteria.getPaging().getThreshold() ;
        // 查預設查詢數量的10倍
        criteria.getPaging().setThreshold(count * 10);
        … …
    }

因為自動加載時,AutoLoadHandler 緩存了查詢參數,執行自動加載時,每次執行時 threshold 都會乘以10,這樣threshold的值就會越來越大。

5. 當方法返回值類型改變了怎么辦?

在代碼重構時,可能會出現改方法返回值類型的情況,而參數不變的情況,那上線部署時,可能會從緩存中取到舊數據類型的數據,可以通過以下方法處理:

  • 上線后,快速清理緩存中的數據;
  • 在CacheGeterSeter的實現類中統一加個version;
  • 在@Cache中加version(未實現)。

6. 對于一些比較耗時的方法盡量使用自動加載。

7. 對于查詢條件變化比較劇烈的,不要使用自動加載機制。

比如,根據用戶輸入的關鍵字進行搜索數據的方法,不建議使用自動加載。

使用規范

  1. 將調接口或數據庫中取數據,封裝在DAO層,不能什么地方都有調接口的方法。
  2. 自動加載緩存時,不能在緩存方法內疊加(或減)查詢條件值,但允許設置值。
  3. DAO層內部,沒使用@Cache的方法,不能調用加了@Cache的方法,避免AOP失效。
  4. 因緩存Key是方法參數轉為字符串獲得的,為了避免生成的Key不同,盡量只設置必要的參數及屬性,也便于反向定位
  5. 對于比較大的系統,要進行模塊化設計,這樣可以將自動加載,均分到各個模塊中。

為什么要使用自動加載機制?

首頁我們想一下系統的瓶頸在哪里?

  1. 在高并發的情況下數據庫性能極差,即使查詢語句的性能很高;如果沒有自動加載機制的話,在當緩存過期時,訪問洪峰到來時,很容易就使數據壓力大增。

  2. 往緩存寫數據與從緩存讀數據相比,效率也差很多,因為寫緩存時需要分配內存等操作。使用自動加載,可以減少同時往緩存寫數據的情況,同時也能提升緩存服務器的吞吐量。

如何減少DAO層并發

  1. 使用緩存;
  2. 使用自動加載機制;“寫”數據往往比讀數據性能要差,使用自動加載也能減少寫并發。
  3. 從DAO層加載數據時,增加等待機制(拿來主義):如果有多個請求同時請求同一個數據,會先讓其中一個請求去取數據,其它的請求則等待它的數據。

可擴展性及維護性

  1. 通過AOP實現緩存與業務邏輯的解耦;如果要實時顯示數據,還是會有點耦合。
  2. 非常方便更換緩存服務器或緩存實現(比如:從Memcache換成Redis);
  3. 非常方便增減緩存服務器(如:增加Redis的節點數);
  4. 非常方便增加或去除緩存,方便測試期間排查問題;
 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!