JDK源码阅读(7):ConcurrentHashMap类阅读笔记

ConcurrentHashMap

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {
    ...
}

1. 一些重要参数

1.1 MAXIMUM_CAPACITY参数

/**
 * The largest possible table capacity.  This value must be
 * exactly 1<<30 to stay within Java array allocation and indexing
 * bounds for power of two table sizes, and is further required
 * because the top two bits of 32bit hash fields are used for
 * control purposes.
 */
private static final int MAXIMUM_CAPACITY = 1 << 30;

MAXIMUM_CAPACITY参数表示map的最大容量,默认为1 << 30。

1.2 DEFAULT_CAPACITY参数

/**
 * The default initial table capacity.  Must be a power of 2
 * (i.e., at least 1) and at most MAXIMUM_CAPACITY.
 */
private static final int DEFAULT_CAPACITY = 16;

DEFAULT_CAPACITY参数表示map的默认容量,为16。

1.3 MAX_ARRAY_SIZE参数

/**
 * The largest possible (non-power of two) array size.
 * Needed by toArray and related methods.
 */
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

MAX_ARRAY_SIZE参数表示map的数组最大长度,在toArray()及其相关方法中可能用到。大小为Integer.MAX_VALUE - 8

1.4 DEFAULT_CONCURRENCY_LEVEL参数

/**
 * The default concurrency level for this table. Unused but
 * defined for compatibility with previous versions of this class.
 */
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

DEFAULT_CONCURRENCY_LEVEL参数表示默认并发级别。在笔者当前使用版本JDK13中已经被弃用,但为了和先前版本兼容,保留这个参数。

1.5 LOAD_FACTOR参数

/**
 * The load factor for this table. Overrides of this value in
 * constructors affect only the initial table capacity.  The
 * actual floating point value isn't normally used -- it is
 * simpler to use expressions such as {@code n - (n >>> 2)} for
 * the associated resizing threshold.
 */
private static final float LOAD_FACTOR = 0.75f;

LOAD_FACTOR参数表示加载因子,默认和HashMap一样,是0.75。

1.6 TREEIFY_THRESHOLD参数

/**
 * The bin count threshold for using a tree rather than list for a
 * bin.  Bins are converted to trees when adding an element to a
 * bin with at least this many nodes. The value must be greater
 * than 2, and should be at least 8 to mesh with assumptions in
 * tree removal about conversion back to plain bins upon
 * shrinkage.
 */
static final int TREEIFY_THRESHOLD = 8;

TREEIFY_THRESHOLD参数表示数组中链表转换为红黑树的阈值,他被用于与一个链表的长度进行比较。

1.7 UNTREEIFY_THRESHOLD参数

/**
 * The bin count threshold for untreeifying a (split) bin during a
 * resize operation. Should be less than TREEIFY_THRESHOLD, and at
 * most 6 to mesh with shrinkage detection under removal.
 */
static final int UNTREEIFY_THRESHOLD = 6;

UNTREEIFY_THRESHOLD参数表示数组中红黑树转变成链表的阈值,他被用于与一个红黑树的大小进行比较。

1.8 MIN_TREEIFY_CAPACITY参数

/**
 * The smallest table capacity for which bins may be treeified.
 * (Otherwise the table is resized if too many nodes in a bin.)
 * The value should be at least 4 * TREEIFY_THRESHOLD to avoid
 * conflicts between resizing and treeification thresholds.
 */
static final int MIN_TREEIFY_CAPACITY = 64;

MIN_TREEIFY_CAPACITY参数表示将链表树化的哈希表最小容量。只有当整个ConcurrentHashMap的容量大于这个值时,具体的链表才可能发生树化。如果没有大于这个值,将进行扩容而非树化。(扩容也会减少单个链表中的元素数量)。

1.9 MIN_TRANSFER_STRIDE参数

/**
 * Minimum number of rebinnings per transfer step. Ranges are
 * subdivided to allow multiple resizer threads.  This value
 * serves as a lower bound to avoid resizers encountering
 * excessive memory contention.  The value should be at least
 * DEFAULT_CAPACITY.STR
 */
private static final int MIN_TRANSFER_STRIDE = 16;

在扩容操作中,transfer这个步骤允许多线程并发进行,MIN_TRANSFER_STRIDE参数表示进行一次transfer操作中一个工作线程的最小任务量。即最少要处理的连续哈希桶的数目,默认为16,即最少要对连续的16个哈希桶进行transfer操作,详见下文transfer()方法的解析。

1.10 RESIZE_STAMP_BITS参数(未理解)

/**
 * The number of bits used for generation stamp in sizeCtl.
 * Must be at least 6 for 32bit arrays.
 */
private static final int RESIZE_STAMP_BITS = 16;

RESIZE_STAMP_BITS参数用于生成在每次扩容中都唯一的生成戳。

1.11 MAX_RESIZERS参数(未理解)

/**
 * The maximum number of threads that can help resize.
 * Must fit in 32 - RESIZE_STAMP_BITS bits.
 */
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

这个参数定义了resize时工作线程的最大数量,但这个计算方法我不明白。MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

1.12 RESIZE_STAMP_SHIFT参数(未理解)

/**
 * The bit shift for recording size stamp in sizeCtl.
 */
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

这个参数定义了sizeCtl中记录大小标记的位移位,但这个计算方法我不明白。MAX_RESIZERS = 32 - RESIZE_STAMP_BITS;

1.13 特殊节点的hash状态参数

/*
 * Encodings for Node hash fields. See above for explanation.
 */
static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations

正常情况下hash的值都应该是正数,如果是负数说明当前是一个不正常的、特殊的节点。

  • hash值为-1时,代表当前节点是一个Forwarding Node
    • ForwardingNode是一种临时节点,在扩容进行中才会出现,并且它不存储实际的数据。
    • 如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode.
    • 读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容。
  • hash值为-2时,代表当前节点是一个TreeBin
    • TreeBinConcurrentHashMap中用于代理操作TreeNode的特殊节点,持有存储实际数据的红黑树的根节点。
    • 因为红黑树进行写入操作,整个树的结构可能会有很大的变化,这个对读线程有很大的影响,所以TreeBin还要维护一个简单读写锁,这是相对HashMap,这个类新引入这种特殊节点的重要原因。
  • hash值为-3时,代表当前节点是一个保留节点,即占位符。
    • 一般情况下不会出现。

1.14 HASH_BITS参数

static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

HASH_BITSHashTable中也见到过,通过和它的位运算,可以将负数的哈希值转变为正数。

1.15 NCPU参数

/** Number of CPUS, to place bounds on some sizings */
static final int NCPU = Runtime.getRuntime().availableProcessors();

NCPU参数可以获取当前JVM能使用的处理器内核数。

2. 一些重要属性

值得关注的是,ConcurrentHashMap中的关键属性基本都是volatile变量。

2.1 table属性

/**
 * The array of bins. Lazily initialized upon first insertion.
 * Size is always a power of two. Accessed directly by iterators.
 */
transient volatile Node<K,V>[] table;

table属性用于存节点,是桶的集合。

2.2 nextTable属性

/**
 * The next table to use; non-null only while resizing.
 */
private transient volatile Node<K,V>[] nextTable;

nextTable属性表示下一个要使用的数组,辅助resize操作,也仅在resize时非空。

2.3 baseCount属性

/**
 * Base counter value, used mainly when there is no contention,
 * but also as a fallback during table initialization
 * races. Updated via CAS.
 */
private transient volatile long baseCount;

baseCount属性是在没有争用现象时的基本计数器值,也在初始化表的竞争中使用。

2.4 sizeCtl属性

/**
 * Table initialization and resizing control.  When negative, the
 * table is being initialized or resized: -1 for initialization,
 * else -(1 + the number of active resizing threads).  Otherwise,
 * when table is null, holds the initial table size to use upon
 * creation, or 0 for default. After initialization, holds the
 * next element count value upon which to resize the table.
 */
private transient volatile int sizeCtl;

sizeCtl属性在表初始化和resize操作控制中发挥作用。

  • sizeCtl为负数,说明表正在进行初始化或resize操作。
    • 表初始化时为 -1。
    • resize时为-(1+扩容线程数)
  • sizeCtl为正数。
    • 表为null时为初始表大小或 0。
    • 表不为null时为需进行resize的下一个计数值。

2.5 transferIndex属性

/**
 * The next table index (plus one) to split while resizing.
 */
private transient volatile int transferIndex;

resize中要拆分的下一个表索引。

2.6 cellsBusy属性

/**
 * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
 */
private transient volatile int cellsBusy;

resize过程和/或创建CounterCells过程中使用的自旋锁。

2.7 counterCells数组

/**
 * Table of counter cells. When non-null, size is a power of 2.
 */
private transient volatile CounterCell[] counterCells;

显然,这是CounterCell的数组,即计数单元的数组。

3. 内部类

3.1 Node内部类

Node内部类是ConcurrentHashMap类中普通节点的抽象。

/**
 * Key-value entry.  This class is never exported out as a
 * user-mutable Map.Entry (i.e., one supporting setValue; see
 * MapEntry below), but can be used for read-only traversals used
 * in bulk tasks.  Subclasses of Node with a negative hash field
 * are special, and contain null keys and values (but are never
 * exported).  Otherwise, keys and vals are never null.
 */
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val) {
        this.hash = hash;
        this.key = key;
        this.val = val;
    }

    Node(int hash, K key, V val, Node<K,V> next) {
        this(hash, key, val);
        this.next = next;
    }

    public final K getKey()     { return key; }
    public final V getValue()   { return val; }
    public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
    public final String toString() {
        return Helpers.mapEntryToString(key, val);
    }
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }

    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }

    /**
     * Virtualized support for map.get(); overridden in subclasses.
     */
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

意义

Node内部类作为ConcurrentHashMap节点的实现。

hashCode()的实现

注意hashCode()的实现方式:Objects.hashCode(key) ^ Objects.hashCode(value);

find()

这里Node内部类的find()方法其实不会在一般的业务方法如get()中被调用,因为在那些地方会直接遍历。这个方法会在ForwardingNode类的find()方法中被调用。

4. 工具方法

4.1 spread方法

/**
 * Spreads (XORs) higher bits of hash to lower and also forces top
 * bit to 0. Because the table uses power-of-two masking, sets of
 * hashes that vary only in bits above the current mask will
 * always collide. (Among known examples are sets of Float keys
 * holding consecutive whole numbers in small tables.)  So we
 * apply a transform that spreads the impact of higher bits
 * downward. There is a tradeoff between speed, utility, and
 * quality of bit-spreading. Because many common sets of hashes
 * are already reasonably distributed (so don't benefit from
 * spreading), and because we use trees to handle large sets of
 * collisions in bins, we just XOR some shifted bits in the
 * cheapest possible way to reduce systematic lossage, as well as
 * to incorporate impact of the highest bits that would otherwise
 * never be used in index calculations because of table bounds.
 */
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

通过取高位再进行掩码计算(保证哈希值为正),来减少哈希冲突。

这个方法就是所谓的扰动方法

4.2 tableSizeFor方法

/**
 * Returns a power of two table size for the given desired capacity.
 * See Hackers Delight, sec 3.2
 */
private static final int tableSizeFor(int c) {
    int n = -1 >>> Integer.numberOfLeadingZeros(c - 1);
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

tableSizeFor方法用于计算参数c对应的resize阈值。往往出现为下面的语句。

4.3 comparableClassFor方法

/**
 * Returns x's Class if it is of the form "class C implements
 * Comparable<C>", else null.
 */
static Class<?> comparableClassFor(Object x) {
    if (x instanceof Comparable) {
        Class<?> c; Type[] ts, as; ParameterizedType p;
        // 如果是String 直接返回
        if ((c = x.getClass()) == String.class) 
            return c;
        if ((ts = c.getGenericInterfaces()) != null) {
            for (Type t : ts) {
                if ((t instanceof ParameterizedType) &&
                    ((p = (ParameterizedType)t).getRawType() ==
                     Comparable.class) &&
                    (as = p.getActualTypeArguments()) != null &&
                    as.length == 1 && as[0] == c) // type arg is c
                    return c;
            }
        }
    }
    return null;
}

如果参数x是一个Comparable接口的实现类,那么返回它的类型。

4.4 compareComparables方法

/**
 * Returns k.compareTo(x) if x matches kc (k's screened comparable
 * class), else 0.
 */
@SuppressWarnings({"rawtypes","unchecked"}) // for cast to Comparable
static int compareComparables(Class<?> kc, Object k, Object x) {
    return (x == null || x.getClass() != kc ? 0 :
            ((Comparable)k).compareTo(x));
}

如果对象x匹配k的可比类kc,那么返回k.compareTo(x),否则返回0。

4.5 列表元素访问方法

4.5.1 tabAt方法
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}

tabAt()方法可以获得在 i 位置上的 Node 节点。

4.5.2 casTabAt方法
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

casTabAt()方法可以以CAS的方式更新 i 位置上的 Node 节点

4.5.3 setTabAt方法
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}

setTabAt方法可以设置在 i 位置上的 Node 节点。

注:像Unsafe.getReferenceAcquire()方法和Unsafe.putReferenceRelease()方法这样的方法实际上是Unsafevolatile方法的发行版本。如后者是putReferenceVolatile()的发行版本。

4.6 initTable方法

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            // 如果sizeCtl属性小于0,说明正在初始化或resize,自旋
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {// 如果SIZECTL仍是sc,则置为-1.表示进入初始化
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // 获取初始大小(sc为正时即为初始大小)
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    // 创建一个node数组
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 为table属性赋值
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                // 最后记得更新sizeCtl
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

initTable()方法可以初始化一个空表。

4.7 hashCode方法

public int hashCode() {
    int h = 0;
    Node<K,V>[] t;
    if ((t = table) != null) {
        Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
        for (Node<K,V> p; (p = it.advance()) != null; )
            h += p.key.hashCode() ^ p.val.hashCode();
    }
    return h;
}

hashCode()方法就是遍历每个键值对,令他们的键和值的哈希码相异或,再全部叠加起来。

4.8 addCount方法

addCount()方法会在ConcurrentHashMap元素数量变化时被调用,两个参数中第一个为数量变化值,第二个为控制是否需要扩容检查的参数。

private final void addCount(long x, int check) {
    // 创建计数单元CounterCell
    CounterCell[] cs; long b, s;
    /**
    	1.如果counterCells为null:
    	那么说明之前没有发生过并发冲突,随后会执行U.compareAndSetLong(...,b+x),直接更新了计数值baseCount。而这个本地方法执行成功会返回true,取反后为false。那么整个这个if判断两个条件都为false,不执行if块内内容。
    	2.如果couterCells不为null:
    	那么说明之前发生过并发冲突,需要下面的if块处理。这里if的第一个条件为ture,第二个条件的更新方法就不会执行。
    */
    if ((cs = counterCells) != null 
        || !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        // 进入if块,说明曾发生过并发冲突,那么要把值加到CounterCell中
        CounterCell c; long v; int m;
        boolean uncontended = true;
		
        if (cs == null // cs在并发中又变成了null
            || (m = cs.length - 1) < 0 // cs长度小于1
            || (c = cs[ThreadLocalRandom.getProbe() & m]) == null // 对应的CouterCell为null
            || !(uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {// 尝试更新找到的计数单元c的值
            // 如果更新失败。一般就是上面的最后一个条件中的方法返回了false,取反后为true
          	// 说明CounterCells数组中出现了并发冲突,可能涉及该数组的扩容,调用fullAddCount方法
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)// 如果无需检查,直接返回
            return;
        // 计数,存到s中,下面用于做检查
        s = sumCount();
    }
    // 检查是否需要扩容
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) // 元素数量大于扩容阈值:需要扩容
               && (tab = table) != null // 表不为空
               && (n = tab.length) < MAXIMUM_CAPACITY) {// 表长度未达上限
            int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
            // 如果正在执行resize
            if (sc < 0) {
                // 一些放弃帮助扩容的条件
                if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
                    (nt = nextTable) == null || transferIndex <= 0)
                    break;
                // 将sc+1,表示一个新线程加入帮助扩容
                if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 当前没在执行resize,尝试成为第一个进入扩容的线程。将sc设置为rs+2
            else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
                transfer(tab, null);
            // 重新计算元素数量
            s = sumCount();
        }
    }
}

详细的逻辑看代码注释,这里单独讲几个点。

  • 第一个if的判断条件的使用很精彩,根据CounterCells数组是否为null来检查是该直接把值加到baseCount上还是加到对应的CounterCell中。
  • 注意在CounterCells数组中找到槽位置的方式:c = cs[ThreadLocalRandom.getProbe() & m]) == null
  • check参数小于等于1时,不用做检查就退出。大于1时,要在addCount的主逻辑结束之后在检查需不需要做扩容。在put方法调用addCount时,传入的check参数其实是put过程中遍历到的节点数量,这样逻辑就连通了:假如原先就只有一个节点或为空,就不需要考虑是否需要再检查扩容;否则就要在addCoumt中检查。

4.9 helpTransfer方法

helpTransfer方法可以在节点正在resize时协助数据迁移并返回新数组。在putremove等业务方法中都有调用这个方法。

/**
 * Helps transfer if a resize is in progress.
 */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // 进入方法主逻辑需要同时满足三个条件
    if (tab != null// 表不空
            && (f instanceof ForwardingNode)// f 是一个Forwarding Node
            && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) // nextTable不空
    {
        // 计算出本次resize时的标记“戳”
        int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT;
        
        while (nextTab == nextTable // nextTab不变
               && table == tab // table不变
               && (sc = sizeCtl) < 0) // sizeCtl保持小于0(正在resize)
        {
            if (sc == rs + MAX_RESIZERS // 工作线程数量已满
                || sc == rs + 1 // 在addCount方法中,假如有第一个扩容线程,sc=rs+2。假如变成rs+1,说明扩容结束。
                || transferIndex <= 0) // 如果transferIndex小于等于0,实际说明扩容工作已完成,已进入下标调整。
                break;
            // 令sc++,进入扩容
            if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        // 返回新表
        return nextTab;
    }
    // 返回原表
    return table;
}

4.10 transfer方法

transfer将方法的功能是将每个 bin 中的节点移动和/或复制到新表。 在addCount()helpTransfer()中都有调用,是扩容工作的核心实现类。

下面例子中若出现具体数字,以传入tab的length为16为准。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  	// 定义n为表长。
    int n = tab.length, stride;
    /**
      stride表示一次transfer中一个工作线程的任务量,即要处理的连续哈希桶的数目。
      初始化stride:假如可用CPU核数大于1,初始化为(n >>> 3) / NCPU,否则初始化为n。
      假如初始化后的stride小于MIN_TRANSFER_STRIDE,把他置为这个最小值。
    */
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    
    if (nextTab == null) { // nextTab未初始化,就要先初始化这个数组
        try {
            @SuppressWarnings("unchecked")// 创建nextTab数组,长度为原数组长度*2
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {
            // 创建新数组失败,sizeCtl就设为int的最大值
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // 这个数组赋值给nextTable
        nextTable = nextTab;
        // 更新转移下标
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // 创建ForwardingNode fwd,传入nextTab作为参数
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
    boolean advance = true;
    // 标记是否已经扩容完成
    boolean finishing = false; // to ensure sweep before committing nextTab
    /**
      又是一个for循环自旋,处理每个槽位中的链表元素
    */
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        /** 
          这个while循环通过CAS不断尝试为当前线程分配任务,直到分配成功或任务队列已经被全部分配完毕。
          如果线程已经被分配过bucket区域,那么会通过--i指向下一个待处理桶然后退出循环。
    	*/
        while (advance) {
            int nextIndex, nextBound;
            // --i表示进入下一个待处理的bucket。自减后大于等于bound,表明当前线程已经分配过bucket,advance=false
            if (--i >= bound || finishing)
                advance = false;
            // 所有bucket已经被分配完毕,为nextIndex赋值。
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // CAS修改TRANSFERINDEX,为线程分配任务。
            // 处理的节点区间为(nextBound,nextINdex)
            else if (U.compareAndSetInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        
        // 处理过程
        // CASE1:旧数组已遍历完成,当前线程已经处理完所有负责的bucket
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 扩容已完成
            if (finishing) {
                // 删除这个成员变量nextTable
                nextTable = null;
                // 更新数组
                table = nextTab;
                // 更新扩容阈值
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 使用 CAS 操作对 sizeCtl 的低16位进行减 1,代表做完了属于自己的任务
            if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // 如果上面的if没有执行,即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
                // 说明当前已经没有扩容线程了,扩容结束了
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // CASE2:节点i处为空,那么放入刚刚初始化的ForwardingNode
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // CASE3:该位置现在的哈希值就是MOVED,是一个ForwardingNode,说明已经被其他线程处理过,那么要求继续推进
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        // CASE4:执行转移
        else {
            // 锁住头节点
            synchronized (f) {
                // 再次检查
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 槽中头节点是一个链表头节点
                    if (fh >= 0) {
                        // 先计算好当前的fh * n
                        int runBit = fh & n;
                        // 存储遍历最终位置的lastRun
                        Node<K,V> lastRun = f;
                        // 遍历链表
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            // 如果遍历过程中出现hash&n出现了变化,需要更新runBit和lastRun
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        
                        //如果lastRun引用的是低位链表,令ln为lastRun
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        // 如果lastRUn引用的是高位链表,令hn为lastRun
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        
                        // 遍历链表,把hash&n为0的放在低位链表中,不为0的放在高位链表中
                        // 循环跳出条件:当前循环结点!=lastRun
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 低位链表的位置不变
                        setTabAt(nextTab, i, ln);
                        // 高位链表的位置是:原位置 + n
                        setTabAt(nextTab, i + n, hn);
                        // 标记当前桶已迁移
                        setTabAt(tab, i, fwd);
                        // advance为true,返回上面进行--i操作
                        advance = true;
                    }
                    // 槽中头节点是一个树节点
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 槽中头节点是一个保留占位节点
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
        }
    }
}

transfer()方法是ConcurrentHashMap执行扩容操作的核心方法,他的扩容转移操作其实类似于HashMap,都是将原链表分裂为两个链表。

但也有很多实现细节上的不同,详见源码注释。

4.11 resizeStamp方法

/**
 * Returns the stamp bits for resizing a table of size n.
 * Must be negative when shifted left by RESIZE_STAMP_SHIFT.
 */
static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

resizeStamp(int n)方法可以在扩容一个大小为n的table时,计算stamp bits

5. 业务方法

5.1 构造方法

// 默认构造方法
public ConcurrentHashMap() {
}

// 仅提供初始容量的构造方法
public ConcurrentHashMap(int initialCapacity) {
    this(initialCapacity, LOAD_FACTOR, 1);
}

// 提供map的构造方法
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}

// 提供默认容量和加载因子的构造方法
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

// 提供默认容量、加载因子和并发更新线程数的构造方法。
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 如果初始容量比并发更新线程数还要小,那为它赋新值
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    // cap赋值为最大容量或扩容阈值
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

5.2 size方法

// 计数单元数组
private transient volatile CounterCell[] counterCells;

public int size() {
    // 调用sumCount()
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

final long sumCount() {
    // 获得计数单元数组
    CounterCell[] cs = counterCells;
    long sum = baseCount;
    if (cs != null) {
        // 所有计数单元中的值加起来
        for (CounterCell c : cs)
            if (c != null)
                sum += c.value;
    }
    return sum;
}

// 非常简单的一个计数单元,仅有一个volatile型的计数器value
@jdk.internal.vm.annotation.Contended // 这个注解可以保证当前类的对象独享缓存行
static final class CounterCell {
    // 只提供了构造器,没有提供get/set方法,也就是value的值在初始化时确定,后续不再更改
    volatile long value;
    CounterCell(long x) { value = x; }
}

size()方法的实现是首先获取baseCount,这是无争用时获取的计数器值。再将计数单元数组中的计数值累加在上面。他有以下几个保证线程安全的举措:

  • counterCells数组和CounterCell类中的value变量设为volatile
  • 没有为CounterCell类中的value变量设置get/set方法。

那么CounterCells数组是怎么被创建和初始化的呢,baseCount是怎么增加的呢。后续结合到那些改变了size的业务方法,如put()等方法的源码,我们再来做解释。

5.3 isEmpty方法

public boolean isEmpty() {
    return sumCount() <= 0L; // ignore transient negative values
}

sumCount()方法见5.2

5.4 get方法

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算哈希值
    int h = spread(key.hashCode());
    if ((tab = table) != null // 表不为空
        	&& (n = tab.length) > 0 // 表长度不为0
			&& (e = tabAt(tab, (n - 1) & h)) != null) {// 指定位置不为null
        // 首位置即为要找的key
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        
        else if (eh < 0)// 当前链表头哈希值小于0,说明是一个特殊节点
            // 调用特殊节点e的find方法
            return (p = e.find(h, key)) != null ? p.val : null;
        // 一个正常节点,正常链表,正常遍历
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

注意,首先我们计算出所要查找的key在哈希表中的散列位置。然后根据找到的节点的哈希值的不同,来做不同的处理。

  • 如果哈希值时所要找到的值,直接返回。
  • 如果哈希值小于0,代表当前节点是一个特殊节点。可参考1.13 特殊节点的hash状态参数。这样的话就会去调用特殊节点的find()方法,如ForwardingNode类和TreeNode类的find()方法。
  • 如果哈希值大于等于0,遍历当前链表查找。

5.5 containsKey方法

public boolean containsKey(Object key) {
    return get(key) != null;
}

5.6 containsValue方法

public boolean containsValue(Object value) {
    if (value == null)
        throw new NullPointerException();
    Node<K,V>[] t;
    if ((t = table) != null) {
        Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
        for (Node<K,V> p; (p = it.advance()) != null; ) {
            V v;
            if ((v = p.val) == value || (v != null && value.equals(v)))
                return true;
        }
    }
    return false;
}

Traverser类封装了containsValue方法的遍历逻辑,代码较为复杂,这里暂且按下不表。

5.7 put方法

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 判空
    if (key == null || value == null) throw new NullPointerException();
    // 计算哈希值
    int hash = spread(key.hashCode());
    // 当前桶的计数器
    int binCount = 0;
    // 自旋插入节点,直到成功
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        // CASE1:表为空则先调用初始化方法
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // CASE2:如果散列位置节点为空,向空位置插入时是无锁的
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 尝试把要put的键值对直接put到这里
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break;// 退出
        }
        // CASE3:如果散列位置节点哈希值为-1,即为一个Forwarding Node,调用helperTransfer()
        else if ((fh = f.hash) == MOVED)
            // 协助转移数据并获取新数组
            tab = helpTransfer(tab, f);
        // CASE4:如果onlyIfAbsent为true,且头节点即为所需节点,直接返回它
        else if (onlyIfAbsent
                 && fh == hash
                 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                 && (fv = f.val) != null)
            return fv;
        // CASE5:找到了指定位置,并且不为空(出现了哈希冲突)。
        else {
            V oldVal = null;
            synchronized (f) {// 锁住当前节点(链表头)
                if (tabAt(tab, i) == f) {// 再判断一下f是不是头节点,防止它被别的线程已经修改了
                    // if-不是一个特殊节点
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {// 注意遍历过程中令计数器自增
                            K ek;
                            // 在遍历的过程中找到了想要插入的值,看情况返回
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 如果到了尾部,就插入由当前key-value构建的新节点
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                            }
                        }
                    }
                    // elseIf-是一个树节点
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    // else-如果是一个保留节点
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            // 插入完成后,检查一下需不需要把当前链表树化
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 计数器加一
    addCount(1L, binCount);
    // 返回null
    return null;
}

具体逻辑见注释,解释得很详细。

putVal方法会用一个for循环保持自旋,不断尝试插入要求插入的键值对。循环内有以下几个CASE,体现为if-else块中的五个分支。

  • 表为空。调用初始化方法。
  • 散列位置为空。无锁地直接put。
  • 散列位置是一个ForwardingNode,调用helpTransfer
  • 散列位置头即为当前键,且onlyIfAbsenttrue,直接返回。
  • 散列位置不为空,说明出现了哈希冲突。

注意在遍历过程中对binCount的更新,最终用addCount()令对象加一,并用binCount作为check参数。

5.8 remove方法

public V remove(Object key) {
    return replaceNode(key, null, null);
}

final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    // 自旋
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // CASE1:可以直接退出的情况:数组为空或散列结果位置为null。
        if (tab == null 
            || (n = tab.length) == 0 
            || (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        // CASE2:节点正在移动,协助移动
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // CASE3:出现哈希冲突,要在链表中查找
        else {
            V oldVal = null;
            boolean validated = false;
            // 锁住头节点
            synchronized (f) {// 内部具体逻辑不再赘述,类似于上面的put方法
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        // e 表示当前循环处理结点,pred 表示当前循环节点的上一个节点
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            // 找到
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            if (validated) {
                if (oldVal != null) {
                    // 如果是一次删除,元素个数减一
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

关键还是在于锁住链表头来实现线程安全。逻辑直接看源码即可。

推荐文章

Concrete5、Imageflow和Fancybox JQuery

Concrete5、Imageflow和Fancybox JQuery

推荐文章

如何在服务器上自动阅读和回复电子邮件

如何在服务器上自动阅读和回复电子邮件

推荐文章

Disks无法使用wordpress的“最近评论”小部件

Disks无法使用wordpress的“最近评论”小部件

推荐文章

有没有办法用JCLDebug捕获所有异常(甚至那些已处理的异常)?

有没有办法用JCLDebug捕获所有异常(甚至那些已处理的异常)?

推荐文章

C#下拉列表更改事件

C#下拉列表更改事件

推荐文章

如何处理ASP.NET MVC 3上的连接超时?

如何处理ASP.NET MVC 3上的连接超时?

推荐文章

用openNi/NITE从Kinect生成jpeg图像

用openNi/NITE从Kinect生成jpeg图像

推荐文章

替换SQL XML中空节点的值

替换SQL XML中空节点的值

推荐文章

在高DPI下打印NPAPI插件

在高DPI下打印NPAPI插件

推荐文章

动态源无法加载到以编程方式创建的控件中

动态源无法加载到以编程方式创建的控件中

推荐文章

如何测试HTML表单和ASP.NET MVC控制器操作之间的参数名不匹配?

如何测试HTML表单和ASP.NET MVC控制器操作之间的参数名不匹配?

推荐文章

使用C异步套接字读取面向行的数据

使用C异步套接字读取面向行的数据

推荐文章

如何将Kohana ORM验证和MySQL函数结合起来?

如何将Kohana ORM验证和MySQL函数结合起来?

推荐文章

MATLAB中的信号量与锁

MATLAB中的信号量与锁

推荐文章

windows azure开发存储blob服务未启动

windows azure开发存储blob服务未启动

推荐文章

Java stacktrace分析,媒体中的头错误?

Java stacktrace分析,媒体中的头错误?