java并发容器ConcurrentHashMap源码分析

一、ConcurrentHashMap 与 HashTable

HashTable 是 HashMap 的线程安全版本,使用的是 HashTable 的对象锁,同一时刻只能有一个线程 新增元素,获取元素。锁等待多,并发度低。而 ConcurrentHashMap 采用的是锁分段机制,就是用多把锁,让每把锁管理一部分数据。怎么实现的呢?引入了段(Segment)数据结构。html

咱们不妨来回忆一下HashMap、HashTable 的数据结构java

结合下文对ConcurrentHashMap的分析,能够得知ConcurrentHashMap的数据结构以下,其实就是能够简单的认为,ConcurrentHashMap就是 HashMap[] 数组,就是一个数组,数组元素是一个一个的 HashMap。node

为了更好的理解ConcurrentHashMap,最好首先阅读以下几篇文章:数组

  • HashMap源码分析 http://blog.csdn.net/prestigeding/article/details/52861420
  • 使用Unsafe根据内存地址与偏移量访问数组元素方法:http://blog.csdn.net/prestigeding/article/details/52980801
  • ReentrantLock锁分析:http://blog.csdn.net/prestigeding/article/details/53084883

二、ConcurrentHashMap 详解

/**
 * A {@link java.util.Map} providing additional atomic
 * <tt>putIfAbsent</tt>, <tt>remove</tt>, and <tt>replace</tt> methods.
 *
 * <p>Memory consistency effects: As with other concurrent
 * collections, actions in a thread prior to placing an object into a
 * {@code ConcurrentMap} as a key or value
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * actions subsequent to the access or removal of that object from
 * the {@code ConcurrentMap} in another thread.
 *
 * <p>This interface is a member of the
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <K> the type of keys maintained by this map
 * @param <V> the type of mapped values
 */
public interface ConcurrentMap<K, V> extends Map<K, V> {
    /**
     * If the specified key is not already associated
     * with a value, associate it with the given value.
     * This is equivalent to
     * <pre>
     *   if (!map.containsKey(key))
     *       return map.put(key, value);
     *   else
     *       return map.get(key);</pre>
     * except that the action is performed atomically.
     *
     * @param key key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     * @return the previous value associated with the specified key, or
     *         <tt>null</tt> if there was no mapping for the key.
     *         (A <tt>null</tt> return can also indicate that the map
     *         previously associated <tt>null</tt> with the key,
     *         if the implementation supports null values.)
     * @throws UnsupportedOperationException if the <tt>put</tt> operation
     *         is not supported by this map
     * @throws ClassCastException if the class of the specified key or value
     *         prevents it from being stored in this map
     * @throws NullPointerException if the specified key or value is null,
     *         and this map does not permit null keys or values
     * @throws IllegalArgumentException if some property of the specified key
     *         or value prevents it from being stored in this map
     *
     */
    V putIfAbsent(K key, V value);

    /**
     * Removes the entry for a key only if currently mapped to a given value.
     * This is equivalent to
     * <pre>
     *   if (map.containsKey(key) && map.get(key).equals(value)) {
     *       map.remove(key);
     *       return true;
     *   } else return false;</pre>
     * except that the action is performed atomically.
     *
     * @param key key with which the specified value is associated
     * @param value value expected to be associated with the specified key
     * @return <tt>true</tt> if the value was removed
     * @throws UnsupportedOperationException if the <tt>remove</tt> operation
     *         is not supported by this map
     * @throws ClassCastException if the key or value is of an inappropriate
     *         type for this map
     *         (<a href="../Collection.html#optional-restrictions">optional</a>)
     * @throws NullPointerException if the specified key or value is null,
     *         and this map does not permit null keys or values
     *         (<a href="../Collection.html#optional-restrictions">optional</a>)
     */
    boolean remove(Object key, Object value);

    /**
     * Replaces the entry for a key only if currently mapped to a given value.
     * This is equivalent to
     * <pre>
     *   if (map.containsKey(key) && map.get(key).equals(oldValue)) {
     *       map.put(key, newValue);
     *       return true;
     *   } else return false;</pre>
     * except that the action is performed atomically.
     *
     * @param key key with which the specified value is associated
     * @param oldValue value expected to be associated with the specified key
     * @param newValue value to be associated with the specified key
     * @return <tt>true</tt> if the value was replaced
     * @throws UnsupportedOperationException if the <tt>put</tt> operation
     *         is not supported by this map
     * @throws ClassCastException if the class of a specified key or value
     *         prevents it from being stored in this map
     * @throws NullPointerException if a specified key or value is null,
     *         and this map does not permit null keys or values
     * @throws IllegalArgumentException if some property of a specified key
     *         or value prevents it from being stored in this map
     */
    boolean replace(K key, V oldValue, V newValue);

    /**
     * Replaces the entry for a key only if currently mapped to some value.
     * This is equivalent to
     * <pre>
     *   if (map.containsKey(key)) {
     *       return map.put(key, value);
     *   } else return null;</pre>
     * except that the action is performed atomically.
     *
     * @param key key with which the specified value is associated
     * @param value value to be associated with the specified key
     * @return the previous value associated with the specified key, or
     *         <tt>null</tt> if there was no mapping for the key.
     *         (A <tt>null</tt> return can also indicate that the map
     *         previously associated <tt>null</tt> with the key,
     *         if the implementation supports null values.)
     * @throws UnsupportedOperationException if the <tt>put</tt> operation
     *         is not supported by this map
     * @throws ClassCastException if the class of the specified key or value
     *         prevents it from being stored in this map
     * @throws NullPointerException if the specified key or value is null,
     *         and this map does not permit null keys or values
     * @throws IllegalArgumentException if some property of the specified key
     *         or value prevents it from being stored in this map
     */
    V replace(K key, V value);
}

2.1 ConcurrentHashMap构造方法与数据结构分析

/**
     * Creates a new, empty map with the specified initial
     * capacity, load factor and concurrency level.
     *
     * @param initialCapacity the initial capacity. The implementation
     * performs internal sizing to accommodate this many elements.
     * @param loadFactor  the load factor threshold, used to control resizing.
     * Resizing may be performed when the average number of elements per
     * bin exceeds this threshold.
     * @param concurrencyLevel the estimated number of concurrently
     * updating threads. The implementation performs internal sizing
     * to try to accommodate this many threads.
     * @throws IllegalArgumentException if the initial capacity is
     * negative or the load factor or concurrencyLevel are
     * nonpositive.
     */
    @SuppressWarnings("unchecked")
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {                               //@1
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;                                                                                          //@2 start
        int ssize = 1;   
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;                                                              //@2 end
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY; // 每一个 segment 内部容里
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

代码@1 concurrencyLevel,含义,并发级别,并发度,在键不冲突的状况下,最多容许多少个线程同时访问数据不须要阻塞(理想状况下),咱们应该知道,ConcurrentHashMap 的基本实现原理就是引入Segment 数据结构,将锁的粒度细化到Segment, 也就是说,若是多个线程,同时操做多个 key,若是这些 key,分布在不一样的 Segment, 那这些线程的操做互不影响,固然不须要加锁,提升性能。因此 concurrencyLevel,就是要求告诉 ConcurrentHashMap, 我须要这么过个线程同时访问你而不产生锁冲突。安全

代码@2,ssize,该变量的值等于ConcurrentHashMap 中 segment 的长度,也就是 Segment[]  的长度。该值取决于concurrencyLevel, 其实就是小于concurrencyLevel 的最大的2的幂,好比concurrencyLevel= 16,那 ssize=16,若是 concurrencyLevel = 12, ssize=8,由于ssize的长度为2的幂。微信

变量shift的值,看出来了没,其实就是 ssize 2 ^ shift,其实就是表示ssize须要的二进制位。数据结构

segmentMask、segmentShift ,这两个属性在该表达式中使用:(h >>> segmentShift) & segmentMask),很明显,就是用来算Segment[]数组中的下标来的。意图segmentShift = 32 - sshift,也就是利用hash的高位与表明(ssize-1)来定位下标。// 若是默认,初始容量16,那么ssize=16, sshift=4 定位端 hash 无符号向右移多少28位,(总共32位),那就是使本来32-29位参与运算(高位)并发

变量cap,就是每一个Segment中HashEntity[]的长度,大于【初始容量/segment长度】的最小2的幂。app

分析到这里,ConcurrentHashMap就构建成功了,咱们先重点关注一下 Segment 的数据结构。less

Segment 段的内部数据结构以下:

  • 类的声明:static final class Segment<K,V> extends ReentrantLock implements Serializable
  • 数据结构:
transient volatile HashEntry<K,V>[] table;    // 内部键值对

transient int count;  // 元素数量

transient int modCount;     // 结构发生变化的次数

transient int threshold;    // 扩容时的阔值

final float loadFactor;     // 扩容因子,主要影响threshold,影响何时扩容

对上述结构,是否似曾相识,对了,就是它,HashMap;每一个 Segment 其实就是一个 HashMap; 还有一个很关键点:Segment继承自 ReentrantLock, 也就是 Segment 自己就是一把锁。


2.2  public V put(K key, V value) 源码分析

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)                                      
            throw new NullPointerException();   // @1
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;   //@2
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck in ensureSegment
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment  @3
            s = ensureSegment(j);                                      //@4
        return s.put(key, hash, value, false);                   //@5
    }

代码@1,代表 ConcurrentHashMap不支持value为空的键值对。

代码@2,计算该key对应的Segment的位置(数组下标),并发包中获取数组元素的方式,采用的是UNSAFE直接操做内存的方式,而不是典型的  Segment[] a = new Segment[16],  第j个元素的值为  a[j]。若是须要详细了解UNSAFE操做数组元素的原理,请查看  另外一篇博客(AtomicIntegerArray 源码分析)

好比一个Integer[]中,每一个int是32位,占4个字节,那数组中第3个位置的开始字节是多少呢?=(3-1) << 2,也就是说SHIFT的值为元素中长度的幂。怎么获取每一个元素在数组中长度(字节为单位)= UNSAFE.arrayIndexScale,

而 UNSAFE.arrayBaseOffset,返回的是,第一个数据元素相对于对象起始地址的便宜量,该部分的详解,请参考个人技术博客【http://blog.csdn.net/prestigeding/article/details/52980801

代码@3,就是获取j下标的segment对象。至关于   if(   (s == segments[j])== null  )

代码@4,咱们将目光移到 ensureSegment方法中:

/**
     * Returns the segment for the given index, creating it and
     * recording in segment table (via CAS) if not already present.
     *
     * @param k the index
     * @return the segment
     */
    @SuppressWarnings("unchecked")
    private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

该方法,主要是确保segment槽的k位置的Segment不为空,若是为空,初始化。

代码@5,代码@4初始化k位置的segment后,将键值对加入到segment,接下重点看一下Segment的put方法:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);   // @1
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);      // @2
                for (HashEntry<K,V> e = first;;) {                         // @3
                    if (e != null) {                                                     // @4           
                        K k;
                        if ((k = e.key) == key ||                                          
                            (e.hash == hash && key.equals(k))) {      //@5
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {                                                                   //@6                                                        
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

该方法,实现思路其实和HashMap同样,就是要在Segment的HashEntity[] table的指定位置加入新的Node,若是在位置k的位置不为空,此时,说明该位置发生了hash冲突,这是须要先遍历整个链,看是否有相等的key,若是key相等,则替换该值,若是没有,则将新加入的节点的next指针指向 table[k],而后将node加入到k位置。可是,因为ConcurrentHashMap是支持多个线程同时访问的,对于单个Segment的操做,须要加锁。

代码@1,首先尝试获取锁,若是成功获取锁,则继续添加元素,若是获取锁失败,后面重点分析。

代码@2,获取该key所对应的table[]中的下标。根据该元素是否为空,有两种操做,若是为空,说明没有发生冲突,也就是走代码@6分支,就是将新建立的节点的节点放入table[k]处,固然,此时须要判断是否须要进行rehash操做(ConcurrentHashMap的是否须要rehash,就是判断阔值)。

代码@4,就是循环判断table[k]的链条中,是否有key与待操做key相等,若是相等,直接替换就好。因为@3开始,其实就是整个put方法,会在锁保护中。

上述过程,应该很好理解,因此,如今重点关注两个方法,一是scanAndLockForPut,二是rehash(比较好奇,是否与HashMap相同,应该是同样的吧,呵呵)。

/**
         * Scans for a node containing given key while trying to
         * acquire lock, creating and returning one if not found. Upon
         * return, guarantees that lock is held. UNlike in most
         * methods, calls to method equals are not screened: Since
         * traversal speed doesn't matter, we might as well help warm
         * up the associated code and accesses as well.
         *
         * @return a new node if key not found, else null
         */
        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

在没有成功获取锁的状况下,先不急于阻塞,而是乐观的估计获取锁的线程操做的key与当前操做的key不要紧,那我该干吗就干吗,自旋尝试获取锁(尝试MAX_SCAN_RETRIES,若是未成功获取锁)尝试超过最大尝试次数,为了性能考虑,该线程阻塞,参加代码@2。

@3,每隔一次,检查一下 Segment HashEntity[] table 处k的位置的元素是否发生变化,若是发生变化,则重试次数设置为-1,继续尝试获取锁。该方法若是在阻塞在lock()方法,时,一旦获取锁,则进入到final V put(K key, int hash, V value, boolean onlyIfAbsent) 方法中,进行常规的put方法(与HashMap操做相似。)

接下来重点看一下代码@7,若是当前segment中容量大于阔值,并小于容许的最大长度时,须要进行rehash,下面分析一下rehash源码:

/**
         * Doubles size of table and repacks entries, also adding the
         * given node to new table
         */
        @SuppressWarnings("unchecked")
        private void rehash(HashEntry<K,V> node) {
            /*
             * Reclassify nodes in each list to new table.  Because we
             * are using power-of-two expansion, the elements from
             * each bin must either stay at same index, or move with a
             * power of two offset. We eliminate unnecessary node
             * creation by catching cases where old nodes can be
             * reused because their next fields won't change.
             * Statistically, at the default threshold, only about
             * one-sixth of them need cloning when a table
             * doubles. The nodes they replace will be garbage
             * collectable as soon as they are no longer referenced by
             * any reader thread that may be in the midst of
             * concurrently traversing table. Entry accesses use plain
             * array indexing because they are followed by volatile
             * table write.
             */
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }

在理解了HashMap的rehash方法后,再来看该方法,应该能很好的理解,故不作重复讲解了。

2.2.2 public V putIfAbsent(K key, V value) 

该方法的语义是,若是存在key,则直接返回key关联的value,若是key不存在,则加入该键值对,并返回null;该步骤是原子操做。

public V putIfAbsent(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject
             (segments, (j << SSHIFT) + SBASE)) == null)
            s = ensureSegment(j);
        return s.put(key, hash, value, true);
    }

该方法与put方法的实现基本相同,惟一不一样的是,对已经存在key时,put方法是直接覆盖旧值,而putIfAbsent是,返回旧值。

2.2.3 public void putAll(Map m)

public void putAll(Map<? extends K, ? extends V> m) {
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
            put(e.getKey(), e.getValue());
    }

直接将传人的Map类型的参数,遍历,调用put方法。

看过了put函数,咱们将目标转向到get方法中,瞧一瞧get相关方法的实现:

2.2.4 public V get(Object key)源码分析

/**
     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     * <p>More formally, if this map contains a mapping from a key
     * {@code k} to a value {@code v} such that {@code key.equals(k)},
     * then this method returns {@code v}; otherwise it returns
     * {@code null}.  (There can be at most one such mapping.)
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

从上文中能够看到,get方法并无加锁,只是根据key的hash,而后算出Segment槽的位置,不是直接根据下标去获取Segment,也不是直接根据下标去Segment 的 HashEntity[] tab中去获取元素,而是使用了 UNSAFE.getObjectVolatile方法,直接操做内存,并使用volatile方式获取,最大程度保证可见性。有人或许有疑问,为何get方法不加读锁,阻止其余写入请求呢?其实这样作意义并不大,ConcurrentHashMap的是一个容器,数据存储,提供基本的 put,get操做,对单一一个get请求加锁,没什么意义,由于get方法并不会改变ConcurrentHashMap的内部结构,在当前线程获取到key中的值,而后其余线程删除了该key,这在业务场景上自己就是正常不过的操做。因此get方法并不须要加锁。

2.3 浏览源码,发现不管是replace方法,仍是remove方法等操做内部等都和HashMap类似,由于Segment就是一个带锁的HashMap。因此,接下来,咱们能够这样思考,put,replace,remove这些方法比HashMap效率高,由于提供了并发度,那这些获取全局的属性的方法呢,好比keys,size等这些方法,性能又是如何呢?咱们将目光转向size,keys等遍历方法。

2.3.1 public int size方法

/**
     * Returns the number of key-value mappings in this map.  If the
     * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
     * <tt>Integer.MAX_VALUE</tt>.
     *
     * @return the number of key-value mappings in this map
     */
    public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

该方法的核心实现原理:从上文的解读,我想你们应该已经了解每个Segment就是一个HashMap,HashMap中有两个变量,modCount,表示数据结构发生变化次数,好比put一个未在HashMap中包含的key,好比remove,好比clear方法,每调用一次上述方法,modCount就加1,也就是影响size属性的操做,都会将modeCount加一;另外一个变量size,记录HashMap中键值对的个数。那ConcurrentHashMap的size方法,若是结构没有发生改变,只需将各个Segment的size相加,就能够获得ConcurrentHashMap的size,然并卯,在相加的过程其余线程若是有改变Segment内部的结构的话,致使size不许确,该方法的实现办法,是先乐观的尝试计算相加的过程最多三次,最少两次,若是先后两次的modCount同样,就说明在计算size的过程当中,其余线程并无改变ConcurrentHashMap的结构没有变化,则能够直接将size返回,结束该方法的调用,若是有变化,则须要依次对全部Segment申请加锁操做,只有获取所有锁后,而后对每一个segment的size相加,而后是否锁,并返回size值。

代码@1,若是重试次数达到 (RETRIES_BEFORE_LOCK +1 ,默认为2)次数后,说明须要加锁才能计算。

代码@2,对Segment相加计算size

代码@3,就是实现,判断连续两次计算出的modCount相等,说明该size值正确,不然,继续尝试,获取去请求锁。

2.3.2 public boolean isEmpty() 方法源码解读

/**
     * Returns <tt>true</tt> if this map contains no key-value mappings.
     *
     * @return <tt>true</tt> if this map contains no key-value mappings
     */
    public boolean isEmpty() {
        /*
         * Sum per-segment modCounts to avoid mis-reporting when
         * elements are concurrently added and removed in one segment
         * while checking another, in which case the table was never
         * actually empty at any point. (The sum ensures accuracy up
         * through at least 1<<31 per-segment modifications before
         * recheck.)  Methods size() and containsValue() use similar
         * constructions for stability checks.
         */
        long sum = 0L;
        final Segment<K,V>[] segments = this.segments;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> seg = segmentAt(segments, j);
            if (seg != null) {
                if (seg.count != 0)
                    return false;
                sum += seg.modCount;
            }
        }
        if (sum != 0L) { // recheck unless no modifications
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    if (seg.count != 0)
                        return false;
                    sum -= seg.modCount;
                }
            }
            if (sum != 0L)
                return false;
        }
        return true;
    }

该方法的核心实现原理:就是遍历有全部的segment,一旦发现有存在size不等于0的segment,则返回false;若是发现全部的segment的size为0,则再次遍历,若是两次遍历时 modCount同样,则返回true,不然返回false。

你们再看看以下方法:

2.3.3  public boolean containsKey(Object key) 

/**
     * Tests if the specified object is a key in this table.
     *
     * @param  key   possible key
     * @return <tt>true</tt> if and only if the specified object
     *         is a key in this table, as determined by the
     *         <tt>equals</tt> method; <tt>false</tt> otherwise.
     * @throws NullPointerException if the specified key is null
     */
    @SuppressWarnings("unchecked")
    public boolean containsKey(Object key) {
        Segment<K,V> s; // same as get() except no need for volatile value read
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return true;
            }
        }
        return false;
    }

2.3.4 public boolean containsValue(Object value) 

/**
     * Returns <tt>true</tt> if this map maps one or more keys to the
     * specified value. Note: This method requires a full internal
     * traversal of the hash table, and so is much slower than
     * method <tt>containsKey</tt>.
     *
     * @param value value whose presence in this map is to be tested
     * @return <tt>true</tt> if this map maps one or more keys to the
     *         specified value
     * @throws NullPointerException if the specified value is null
     */
    public boolean containsValue(Object value) {
        // Same idea as size()
        if (value == null)
            throw new NullPointerException();
        final Segment<K,V>[] segments = this.segments;
        boolean found = false;
        long last = 0;
        int retries = -1;
        try {
            outer: for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                long hashSum = 0L;
                int sum = 0;
                for (int j = 0; j < segments.length; ++j) {
                    HashEntry<K,V>[] tab;
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null && (tab = seg.table) != null) {
                        for (int i = 0 ; i < tab.length; i++) {
                            HashEntry<K,V> e;
                            for (e = entryAt(tab, i); e != null; e = e.next) {
                                V v = e.value;
                                if (v != null && value.equals(v)) {
                                    found = true;
                                    break outer;
                                }
                            }
                        }
                        sum += seg.modCount;
                    }
                }
                if (retries > 0 && sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return found;
    }

2.3.5  public Set<Map.Entry<K,V>> entrySet(); 遍历元素方法。

public Set<Map.Entry<K,V>> entrySet() {
        Set<Map.Entry<K,V>> es = entrySet;
        return (es != null) ? es : (entrySet = new EntrySet());
    }
final class EntrySet extends AbstractSet<Map.Entry<K,V>> {
        public Iterator<Map.Entry<K,V>> iterator() {
            return new EntryIterator();
        }
        public boolean contains(Object o) {
            if (!(o instanceof Map.Entry))
                return false;
            Map.Entry<?,?> e = (Map.Entry<?,?>)o;
            V v = ConcurrentHashMap.this.get(e.getKey());
            return v != null && v.equals(e.getValue());
        }
        public boolean remove(Object o) {
            if (!(o instanceof Map.Entry))
                return false;
            Map.Entry<?,?> e = (Map.Entry<?,?>)o;
            return ConcurrentHashMap.this.remove(e.getKey(), e.getValue());
        }
        public int size() {
            return ConcurrentHashMap.this.size();
        }
        public boolean isEmpty() {
            return ConcurrentHashMap.this.isEmpty();
        }
        public void clear() {
            ConcurrentHashMap.this.clear();
        }
    }
final class EntryIterator
        extends HashIterator
        implements Iterator<Entry<K,V>>
    {
        public Map.Entry<K,V> next() {
            HashEntry<K,V> e = super.nextEntry();
            return new WriteThroughEntry(e.key, e.value);
        }
    }
abstract class HashIterator {
        int nextSegmentIndex;
        int nextTableIndex;
        HashEntry<K,V>[] currentTable;
        HashEntry<K, V> nextEntry;
        HashEntry<K, V> lastReturned;

        HashIterator() {
            nextSegmentIndex = segments.length - 1;
            nextTableIndex = -1;
            advance();
        }

        /**
         * Set nextEntry to first node of next non-empty table
         * (in backwards order, to simplify checks).
         */
        final void advance() {       // @1
            for (;;) {
                if (nextTableIndex >= 0) {
                    if ((nextEntry = entryAt(currentTable,
                                             nextTableIndex--)) != null)
                        break;
                }
                else if (nextSegmentIndex >= 0) {
                    Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);
                    if (seg != null && (currentTable = seg.table) != null)
                        nextTableIndex = currentTable.length - 1;
                }
                else
                    break;
            }
        }

        final HashEntry<K,V> nextEntry() {     // @2
            HashEntry<K,V> e = nextEntry;
            if (e == null)
                throw new NoSuchElementException();
            lastReturned = e; // cannot assign until after null check
            if ((nextEntry = e.next) == null)
                advance();
            return e;
        }

        public final boolean hasNext() { return nextEntry != null; }
        public final boolean hasMoreElements() { return nextEntry != null; }

        public final void remove() {
            if (lastReturned == null)
                throw new IllegalStateException();
            ConcurrentHashMap.this.remove(lastReturned.key);
            lastReturned = null;
        }
    }
final class WriteThroughEntry
        extends AbstractMap.SimpleEntry<K,V>
    {
        WriteThroughEntry(K k, V v) {
            super(k,v);
        }

        /**
         * Set our entry's value and write through to the map. The
         * value to return is somewhat arbitrary here. Since a
         * WriteThroughEntry does not necessarily track asynchronous
         * changes, the most recent "previous" value could be
         * different from what we return (or could even have been
         * removed in which case the put will re-establish). We do not
         * and cannot guarantee more.
         */
        public V setValue(V value) {
            if (value == null) throw new NullPointerException();
            V v = super.setValue(value);
            ConcurrentHashMap.this.put(getKey(), value);
            return v;
        }
    }

上述代码主要关注两点:第一是遍历元素的方法;第二是调用该迭代器的size,isEmpty等方法。

代码@1,advance,该方法主要是从segments[]数组中的最后一个元素开始,找出segment中HashEntity[] table数组中最后一个元素开始遍历,找到一个不为空nextEntity,这里返回的nextEntity,就是 table[]数组中的元素,不包括链表中的HashEntity,链表中的HashEntity遍历在代码@2 nextEntry 方法中,实现真的挺优雅的。

第二个特色是,调用迭代器的size、contains,isEmpty等方法,都是对ConcurrentHashMap对应方法的调用。

 

最后,经过上述的分析,我想对ConcurrentHashMap作一个简单的总结:

  1. 结合上述源码分析,咱们能够清楚的认为,一个Segment就是一个与HashMap相同的结构,固然每一个Segment就是一把锁,该类的核心思想,就是经过对key的第一次hash,定位的不是之前的HashEntity,而是一个Segemnt,而后对该key限定在该Segment中执行,,这样能够同时容许多个线程向ConcurrentHashMap同时添加元素(固然,要分散到不一样的Segment类,故提供了并发度。)
  2. ConcurrentHashMap是一个并发容器,所谓的并发容器并非说在使用过程当中必定不须要加锁,并发容器能提       供的保证是多个线程同时访问该容器,同时调用会改变内部结构的方法时,好比put方法时,不会破坏内部结构       以致于不能提供服务或提供错误服务(数据视图)。怎么理解并非必定不要加锁这句话,我举一个例子,好比咱们用ConcurrentHashMap来存储数据,完成以下操做一、第一步,设置一个key 为"status":1,而后通过复杂的逻辑处理,由存入一个金额 key为amount,值为10; 伪代码以下:
    ConcurrentHashMap a = new ConcurrentHashMap();
    
          逻辑代码
    
          void eval(参数 ...) {
    
                a.put("status",1);
    
                //通过计算
    
                a.put("amount": 10);
    
               //付款
    
              if(  a.get("status")==1  ) {
    
               //执行付款操做
    
              } 
    
          }  

    从上面的代码,若是多个线程执行eval方法,确定会有问题。因此,仍是须要加锁,说白了,ConcurrentHashMap只对单个方法负责,好比对 put 方法负责,只是对调用一次put方法,保证该操做,不会受到其余线程的影响。

  3. ConcurrentHash采起以下方法从Segment[] segments,HashEntity[]  table,数组中获取元素,其准确实施性依次加强。

  • segment[下标],table[下标]
  • 使用UNSAFE根据便宜量直接操做内存方式,使用(voliate方式)
  • 重试必定次数后,加锁。

欢迎加笔者微信号(dingwpmz),加群探讨,笔者优质专栏目录:

一、源码分析RocketMQ专栏(40篇+)
二、源码分析Sentinel专栏(12篇+)
三、源码分析Dubbo专栏(28篇+)
四、源码分析Mybatis专栏
五、源码分析Netty专栏(18篇+)
六、源码分析JUC专栏
七、源码分析Elasticjob专栏
八、Elasticsearch专栏
九、源码分析Mycat专栏