LevelDB:Cache源碼精讀——緩存
來自: https://yq.aliyun.com/articles/2746
一、原理分析:
這里講的Cache緩存是指內存緩存,既然是內存緩存,因為內存有限,所以緩存肯定有一個容量大小capacity。
1、模擬實例化一個緩存時,LevelDB的Cache對象結構。
1.1、LevelDB可以創建一個容量大小capacity 的Cache,
1.2、Cache子類ShardedLRUCache將容量大小capacity的緩存分成了很多個小緩存LRUCache。
1.3、小緩存LRUCache里實現了一個雙向鏈表lru_和一個二級指針數組table_用來緩存數據。雙向鏈表lru_用來保證當緩存容量飽滿時,清除比較舊的緩存數據;二級指針數組table_用來保證緩存數據的快速查找。
2、模擬緩存一個數據時,LevelDB的Cache工作流程。
2.1、調用Cache的insert方法插入緩存數據data,子類ShardedLRUCache將緩存數據data進行hash操作,得到的hash值定位得到屬于哪個小緩存LRUCache,LRUCache將緩存數據封裝成LRUHandle數據對象,進行存儲。
2.2、先將緩存數據添加到雙向鏈表lru_中,由于lru_.pre存儲比較新的數據,lru_.next存儲比較舊的數據,所以將緩存數據添加在lru_.pre上。
2.3、再存儲到二級指針數組table_里,存儲之前,先查找數據是否存在。查找根據緩存數據的hash值,定位緩存數據屬于哪個一級指針,然后遍歷這一級指針上存放的二級指針鏈表,查找緩存數據。
2.4、最后如果緩存數據的總大小大于緩存LRUCache的容量大小,則循環從雙向鏈表lru_的next取緩存數據,將其從雙向鏈表lru_和二級指針數組table_中移除,直到緩存數據的總大小小于緩存LRUCache的容量大小。
二、代碼實現:
1、創建一個容量大小capacity 的Cache
/*****************************************************************************
     類:Cache
*****************************************************************************/
    Cache* NewLRUCache(size_t capacity)
    {
        return new ShardedLRUCache(capacity);
    } 2、將容量大小capacity的緩存分成了很多個小緩存LRUCache;將緩存數據data進行hash操作,得到的hash值定位得到屬于哪個小緩存LRUCache。
/*****************************************************************************
         類:ShardedLRUCache
*****************************************************************************/
        static const int kNumShardBits = 4;
        static const int kNumShards = 1 << kNumShardBits; // 2^4==16
        class ShardedLRUCache : public Cache
        {
        private:
            LRUCache shard_[kNumShards];
            port::Mutex id_mutex_;
            uint64_t last_id_;
            static inline uint32_t HashSlice(const Slice& s)
            {
                return Hash(s.data(), s.size(), 0);
            }
            // 得到shard_數組的下標
            static uint32_t Shard(uint32_t hash)
            {
                /*
                 hash是4個字節,32位,向右移動28位,則剩下高4位有效位,
                 即最小的是0000等于0,最大的是1111等于15 
                 則得到的數字在[0,15]范圍內。
                 */
                return hash >> (32 - kNumShardBits);
            }
        public:
            explicit ShardedLRUCache(size_t capacity) : last_id_(0)
            {
                /*
                 將容量平均分成kNumShards份,如果有剩余,將剩余的補全。為什么要補全呢?
                 例如設置容量大小為10,則最多就能放下大小為10的數據,現在將容量分成3份,
                 如果不補全,余量被丟棄,每份容量則為3,總容量為9,需要放大小為10的數據則放不下了。
                 如果補全,剩余量1加上2,每份就多得1個容量,也就每份容量為4,總容量為12,能保證數據都放下。
                 */
                /*
                 //補全塊,
                 如果capacity除以kNumShards有余數,那么余數加上(kNumShards - 1),
                 除以kNumShards,就能多得到一塊。
                 如果如果capacity除以kNumShards無余數,那么0加上(kNumShards - 1),
                 除以kNumShards,還是0
                 */
                const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
                for (int s = 0; s < kNumShards; s++)
                {
                    shard_[s].SetCapacity(per_shard);
                }
            }
            virtual ~ShardedLRUCache() { }
            // charge 數據大小
            virtual Handle* Insert(const Slice& key, void* value, size_t charge,
                                   void (*deleter)(const Slice& key, void* value))
            {
                const uint32_t hash = HashSlice(key);
                return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
            }
            virtual Handle* Lookup(const Slice& key)
            {
                const uint32_t hash = HashSlice(key);
                return shard_[Shard(hash)].Lookup(key, hash);
            }
            virtual void Release(Handle* handle)
            {
                LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
                shard_[Shard(h->hash)].Release(handle);
            }
            virtual void Erase(const Slice& key)
            {
                const uint32_t hash = HashSlice(key);
                shard_[Shard(hash)].Erase(key, hash);
            }
            virtual void* Value(Handle* handle)
            {
                return reinterpret_cast<LRUHandle*>(handle)->value;
            }
            virtual uint64_t NewId()
            {
                MutexLock l(&id_mutex_);
                return ++(last_id_);
            }
        };
    } 3、小緩存LRUCache里實現了一個雙向鏈表lru_和一個二級指針數組table_用來緩存數據。雙向鏈表lru_用來保證當緩存容量飽滿時,清除比較舊的緩存數據;二級指針數組table_用來保證緩存數據的快速查找;lru_.pre存儲比較新的數據,lru_.next存儲比較舊的數據;緩存數據的總大小大于緩存LRUCache的容量大小,則循環從雙向鏈表lru_的next取緩存數據,將其從雙向鏈表lru_和二級指針數組table_中移除,直到緩存數據的總大小小于緩存LRUCache的容量大小。
/*****************************************************************************
         類:LRUCache
*****************************************************************************/
        // A single shard of sharded cache.
        class LRUCache
        {
        public:
            LRUCache();
            ~LRUCache();
            // Separate from constructor so caller can easily make an array of LRUCache
            void SetCapacity(size_t capacity) { capacity_ = capacity; }
            // Like Cache methods, but with an extra "hash" parameter.
            Cache::Handle* Insert(const Slice& key, uint32_t hash, void* value,
                                  size_t charge, void (*deleter)(const Slice& key, void* value));
            Cache::Handle* Lookup(const Slice& key, uint32_t hash);
            void Release(Cache::Handle* handle);
            void Erase(const Slice& key, uint32_t hash);
        private:
            void LRU_Remove(LRUHandle* e);
            void LRU_Append(LRUHandle* e);
            void Unref(LRUHandle* e);
            // Initialized before use.
            // 緩存的總容量
            size_t capacity_;
            // mutex_ protects the following state.
            port::Mutex mutex_;
            // 緩存數據的總大小
            size_t usage_;
            // Dummy head of LRU list.
            // lru.prev is newest entry, lru.next is oldest entry.
            // 雙向循環鏈表,有大小限制,保證數據的新舊,當緩存不夠時,保證先清除舊的數據
            LRUHandle lru_;
            /* 
             二級指針數組,鏈表沒有大小限制,動態擴展大小,保證數據快速查找,
             hash定位一級指針,得到存放在一級指針上的二級指針鏈表,遍歷查找數據
             */
            HandleTable table_;
        };
        LRUCache::LRUCache(): usage_(0)
        {
            // Make empty circular linked list
            lru_.next = &lru_;
            lru_.prev = &lru_;
        }
        LRUCache::~LRUCache()
        {
            for (LRUHandle* e = lru_.next; e != &lru_; )
            {
                LRUHandle* next = e->next;
                assert(e->refs == 1);  // Error if caller has an unreleased handle
                Unref(e);
                e = next;
            }
        }
        void LRUCache::Unref(LRUHandle* e)
        {
            assert(e->refs > 0);
            e->refs--;
            if (e->refs <= 0) // 引用計數小于等于0 釋放
            {
                usage_ -= e->charge;
                (*e->deleter)(e->key(), e->value);
                free(e);
            }
        }
        void LRUCache::LRU_Remove(LRUHandle* e)
        {
            e->next->prev = e->prev;
            e->prev->next = e->next;
        }
        void LRUCache::LRU_Append(LRUHandle* e)
        {
            // Make "e" newest entry by inserting just before lru_
            // 新數據插到lru_的前面
            e->next = &lru_;
            e->prev = lru_.prev;
            e->prev->next = e;
            e->next->prev = e;
        }
        Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash)
        {
            MutexLock l(&mutex_);
            LRUHandle* e = table_.Lookup(key, hash);
            if (e != NULL)
            {
                e->refs++;
                /*
                 為什么要先刪除,再加入。
                 由于當緩存不夠時,會清除lru_的next處的數據,保證清除比較舊的數據。
                 */
                LRU_Remove(e);
                LRU_Append(e);
            }
            return reinterpret_cast<Cache::Handle*>(e);
        }
        void LRUCache::Release(Cache::Handle* handle)
        {
            MutexLock l(&mutex_);
            Unref(reinterpret_cast<LRUHandle*>(handle));
        }
        Cache::Handle* LRUCache::Insert(const Slice& key, uint32_t hash, void* value, size_t charge, 
                                        void (*deleter)(const Slice& key, void* value))
        {
            MutexLock l(&mutex_);
            // 減去記錄key的首地址大小(一個字節),加上key實際大小
            LRUHandle* e = reinterpret_cast<LRUHandle*>(malloc(sizeof(LRUHandle)-1 + key.size()));
            e->value = value;
            e->deleter = deleter;
            e->charge = charge;
            e->key_length = key.size();
            e->hash = hash;
            e->refs = 2;  // One from LRUCache, one for the returned handle
            // 記錄key的首地址
            memcpy(e->key_data, key.data(), key.size());
            LRU_Append(e);
            // 緩存數據的大小
            usage_ += charge;
            LRUHandle* old = table_.Insert(e);
            if (old != NULL)
            {
                LRU_Remove(old);
                Unref(old);
            }
            // 緩存不夠,清除比較舊的數據
            while (usage_ > capacity_ && lru_.next != &lru_)
            {
                LRUHandle* old = lru_.next;
                LRU_Remove(old);
                table_.Remove(old->key(), old->hash);
                Unref(old);
            }
            return reinterpret_cast<Cache::Handle*>(e);
        }
        void LRUCache::Erase(const Slice& key, uint32_t hash)
        {
            MutexLock l(&mutex_);
            LRUHandle* e = table_.Remove(key, hash);
            if (e != NULL)
            {
                LRU_Remove(e);
                Unref(e);
            }
        } 4、根據緩存數據的hash值,定位緩存數據屬于哪個一級指針,然后遍歷這一級指針上存放的二級指針鏈表,查找緩存數據。
/*****************************************************************************
         類:HandleTable
*****************************************************************************/
        // We provide our own simple hash table since it removes a whole bunch
        // of porting hacks and is also faster than some of the built-in hash
        // table implementations in some of the compiler/runtime combinations
        // we have tested.  E.g., readrandom speeds up by ~5% over the g++
        // 4.4.3's builtin hashtable.
        class HandleTable
        {
        public:
            HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
            ~HandleTable() { delete[] list_; }
            LRUHandle* Lookup(const Slice& key, uint32_t hash)
            {
                return *FindPointer(key, hash);
            }
            LRUHandle* Insert(LRUHandle* h)
            {
                LRUHandle** ptr = FindPointer(h->key(), h->hash);
                LRUHandle* old = *ptr;
                h->next_hash = (old == NULL ? NULL : old->next_hash);
                *ptr = h;
                // 找到的節點的值為NULL,說明h是新節點
                if (old == NULL)
                {
                    // 元素個數
                    ++elems_;
                    // 元素個數加1大于一級指針個數。如果每個節點h定位一級指針不存在哈希沖突,則每個一級指針存放一個節點
                    if (elems_ > length_)
                    {
                        // Since each cache entry is fairly large, we aim for a small
                        // average linked list length (<= 1).
                        Resize();
                    }
                }
                return old;
            }
            LRUHandle* Remove(const Slice& key, uint32_t hash)
            {
                LRUHandle** ptr = FindPointer(key, hash);
                LRUHandle* result = *ptr;
                if (result != NULL)
                {
                    *ptr = result->next_hash;
                    --elems_;
                }
                return result;
            }
        private:
            // The table consists of an array of buckets where each bucket is
            // a linked list of cache entries that hash into the bucket.
            uint32_t length_;
            uint32_t elems_;
            LRUHandle** list_;
            // Return a pointer to slot that points to a cache entry that
            // matches key/hash.  If there is no such cache entry, return a
            // pointer to the trailing slot in the corresponding linked list.
            LRUHandle** FindPointer(const Slice& key, uint32_t hash)
            {
                /* 
                 hash & (length_ - 1)的運算結果是0到length-1;
                 */
                LRUHandle** ptr = &list_[hash & (length_ - 1)];
                // 二級指針鏈表*ptr不為空,遍歷二級指針鏈表找到hash相同且key也相同的節點
                while (*ptr != NULL && ((*ptr)->hash != hash || key != (*ptr)->key()))
                {
                    ptr = &(*ptr)->next_hash;
                }
                // 返回匹配節點的地址
                return ptr;
            }
            void Resize()
            {
                uint32_t new_length = 4;
                while (new_length < elems_)
                {
                    new_length *= 2;
                }
                // 下面的new方法,只表明給一級指針分配了內存塊
                LRUHandle** new_list = new LRUHandle*[new_length];
                /*
                 避免一級指針分配的內存塊,存有野指針,所以需要使用memset對內存塊進行清零處理。
                 memset:作用是在一段內存塊中存儲某個給定的值,
                        它對較大的結構體或數組進行清零操作的一種最快方法。
                        存儲0,就是置空。
                 new_list和&new_list[i]是一級指針,
                 *new_list和new_list[i]是二級指針,
                 **new_list是二級指針存儲的值。
                 下面memset代碼的意思是:
                 即將一級指針內存塊中存儲0,就是new_list[i] = 0或new_list[i] = NULL;
                 也就是將二級指針*new_list置空。
                 */
                memset(new_list, 0, sizeof(new_list[0]) * new_length);
                uint32_t count = 0;
                for (uint32_t i = 0; i < length_; i++) // 遍歷一級指針
                {
                    /*
                     由于每個h通過表達式hash&(new_length - 1)得到屬于一級指針的位置,
                     所以表達式計算結果相同(注:hash不相同,計算結果也可能相同)的h,會定位到相同的一級指針,
                     并組成一個二級鏈表存放在一級指針上。
                     一級指針上存放的二級指針鏈表,通過h的next_hash鏈接起來
                     */
                    LRUHandle* h = list_[i];
                    while (h != NULL)
                    {
                        /*
                         功能:下面遍歷的邏輯是重新定位h屬于的一級指針。并在新的一級指針上組成新的二級鏈表。
                         */
                        LRUHandle* next = h->next_hash;
                        uint32_t hash = h->hash;
                        // 定位新的一級指針 *ptr就是new_list[hash & (new_length - 1)]
                        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
                        // 如果是第一次運行,則*ptr為NULL,其他則是取到上個循環h的地址
                        h->next_hash = *ptr;
                        // new_list[hash & (new_length - 1)] = h;
                        *ptr = h;
                        // 二級鏈表下一個節點
                        h = next;
                        count++;
                    }
                }
                assert(elems_ == count);
                delete[] list_;
                list_ = new_list;
                length_ = new_length;
            }
        }; 5、緩存數據封裝成LRUHandle數據對象,進行存儲;雙向鏈表也是LRUHandle數據對象,使用了next和prev字段;二級指針數組中的二級指針鏈表也是LRUHandle數據對象,使用了next_hash字段。
        struct LRUHandle
        {
            void* value;
            void (*deleter)(const Slice&, void* value);
            LRUHandle* next_hash;  // 二級指針數組的二級指針鏈表
            LRUHandle* next;       // 雙向鏈表 指向比較舊的數據
            LRUHandle* prev;       // 雙向鏈表 指向比較新的數據
            size_t charge;      // TODO(opt): Only allow uint32_t?
            size_t key_length;
            uint32_t refs;
            uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
            char key_data[1];   // Beginning of key key的首地址
            Slice key() const
            {
                // For cheaper lookups, we allow a temporary Handle object
                // to store a pointer to a key in "value".
                if (next == this)
                {
                    return *(reinterpret_cast<Slice*>(value));
                } else
                {
                    return Slice(key_data, key_length);
                }
            }
        };