ConcurrentHashmap 解析

jopen 9年前發布 | 21K 次閱讀 Java開發 ConcurrentHashMap

ConcurrentHashmap(JDK1.7) 

總體描述:

concurrentHashmap是為了高并發而實現,內部采用分離鎖的設計,有效地避開了熱點訪問。而對于每個分段,ConcurrentHashmap采用final和內存可見修飾符volatile關鍵字(內存立即可見:Java 的內存模型可以保證:某個寫線程對 value 域的寫入馬上可以被后續的某個讀線程“看”到。注:并不能保證對volatile變量狀態有依賴的其他操作的原子性

借用某博客對concurrentHashmap對結構圖:

ConcurrentHashmap 解析

ConcurrentHashmap 解析

不難看出,concurrenthashmap采用了二次hash的方式,第一次hash將key映射到對應的segment,而第二次hash則是映射到segment的不同桶中。

為什么要用二次hash,主要原因是為了構造分離鎖,使得對于map的修改不會鎖住整個容器,提高并發能力。當然,沒有一種東西是絕對完美的,二次hash帶來的問題是整個hash的過程比hashmap單次hash要長,所以,如果不是并發情形,不要使用concurrentHashmap。

代碼實現:

 該數據結構中,最核心的部分是兩個內部類,HashEntry和Segment

    concurrentHashmap維護一個segment數組,將元素分成若干段(第一次hash)

/**
* The segments, each of which is a specialized hash table.
*/
final Segment<K,V>[] segments;

segments的每一個segment維護一個鏈表數組

代碼:

再來看看構造方法

public ConcurrentHashMap(int initialCapacity,
    float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
    throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
    concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
    initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
    ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
    cap <<= 1;
    // create segments and segments[0]
    Segment<K,V> s0 =
    new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
    (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

代碼28行,一旦指定了concurrencyLevel(segments數組大小)便不能改變,這樣,一旦threshold超標,rehash真不會影響segments數組,這樣,在大并發的情況下,只會影響某一個segment的rehash而其他segment不會受到影響

(put方法都要上鎖)

HashEntry

    與hashmap類似,concurrentHashmap也采用了鏈表作為每個hash桶中的元素,不過concurrentHashmap又有些不同

static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
     
    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
    this.hash = hash;
    this.key = key;
    this.value = value;
    this.next = next;
    }
     
    /**
    * Sets next field with volatile write semantics. (See above
    * about use of putOrderedObject.)
    */
    final void setNext(HashEntry<K,V> n) {
    UNSAFE.putOrderedObject(this, nextOffset, n);
    }
     
    // Unsafe mechanics
    static final sun.misc.Unsafe UNSAFE;
    static final long nextOffset;
    static {
    try {
    UNSAFE = sun.misc.Unsafe.getUnsafe();
    Class k = HashEntry.class;
    nextOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("next"));
    } catch (Exception e) {
    throw new Error(e);
    }
    }
}

HashEntry的key,hash采用final,可以避免并發修改問題,HashEntry鏈的尾部是不能修改的,而next和value采用volatile,可以避免使用同步造成的并發性能災難,新版(jdk1.7)的concurrentHashmap大量使用java Unsafe類提供的原子操作,直接調用底層操作系統,提高性能(這塊我也不是特別清楚

get方法(1.6 vs 1.7)

1.6

V get(Object key, int hash) { 
    if (count != 0) { // read-volatile 
    HashEntry<K,V> e = getFirst(hash); 
    while (e != null) { 
    if (e.hash == hash && key.equals(e.key)) { 
    V v = e.value; 
    if (v != null) 
    return v; 
    return readValueUnderLock(e); // recheck 
    } 
    e = e.next; 
    } 
    } 
    return null; 
}

1.6的jdk采用了樂觀鎖的方式處理了get方法,在get的時候put方法正在new對象,而此時value并未賦值,這時判斷為空則加鎖訪問

1.7

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
    (tab = s.table) != null) {
    for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
    (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
    e != null; e = e.next) {
    K k;
    if ((k = e.key) == key || (e.hash == h && key.equals(k)))
    return e.value;
    }
    }
    return null;
}

1.7并沒有判斷value=null的情況,不知為何

跟同事溝通過,無論是1.6還是1.7的實現,實際上都是一種樂觀的方式,而樂觀的方式帶來的是性能上的提升,但同時也帶來數據的弱一致性,如果你的業務是強一致性的業務,可能就要考慮另外的解決辦法(用Collections包裝或者像jdk6中一樣二次加鎖獲取)

http://ifeve.com/concurrenthashmap-weakly-consistent/

這篇文章可以很好地解釋弱一致性問題

put方法

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

對于put,concurrentHashmap采用自旋鎖的方式,不同于1.6的直接獲取鎖

注:個人理解,這里采用自旋鎖可能作者是覺得在分段鎖的狀態下,并發的可能本來就比較小,并且鎖占用時間又并不是特別長,因此自旋鎖可以減小線程喚醒和切換的開銷

關于hash

private int hash(Object k) {
        int h = hashSeed;
        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }
        h ^= k.hashCode();
        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

concurrentHashMap采用本身hashcode的同時,采用Wang/Jenkins算法對每位都做了處理,使得發生hash沖突的可能性大大減小(否則效率會很差)


而對于concurrentHashMap,segments的大小在初始時確定,此后不變,而元素所在segments桶序列由hash的高位決定

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

segmentShift為(32-segments大小的二進制長度)


總結

    concurrentHashmap主要是為并發設計,與Collections的包裝不同,他不是采用全同步的方式,而是采用非鎖get方式,通過數據的弱一致性帶來性能上的大幅提升,同時采用分段鎖的策略,提高并發能力

參考:

http://www.jb51.net/article/49699.htm

http://my.oschina.net/chihz/blog/58035

http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/

http://www.ibm.com/developerworks/cn/java/j-jtp06197.html    

來自:http://my.oschina.net/zhenglingfei/blog/400515

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!