JDK 7 ConcurrentHashMap

举报
yd_249383650 发表于 2023/07/30 16:57:30 2023/07/30
【摘要】 ​ 目录概述构造器分析 put 流程get 流程size 计算流程概述JDK1.7中的ConcurrentHashMap间接地实现了Map,并将每一个元素称为分段锁segment,每个segment都是一个HashEntry<K,V>数组,称为table,table的每个元素都是一个HashEntry的单向队列。「HashTable是给整个容器加锁,ConcurrentHashMap是给每个...

 

目录

概述

构造器分析

 put 流程

get 流程

size 计算流程



概述

JDK1.7中的ConcurrentHashMap间接地实现了Map,并将每一个元素称为分段锁segment,每个segment都是一个HashEntry<K,V>数组,称为table,table的每个元素都是一个HashEntry的单向队列。

「HashTable是给整个容器加锁,ConcurrentHashMap是给每个segment加锁,」当一个线程修改segment 0时,其他线程也可以修改其它segment,即 只要不同的线程同一时刻访问的是不同的segment,就不会发生写冲突,比HashMap性能更好。

它维护了一个 segment 数组,每个 segment 对应一把锁

  • 优点:如果多个线程访问不同的 segment,实际是没有冲突的,这与 jdk8 中是类似的
  • 缺点:Segments 数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化

构造器分析

    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // ssize 必须是 2^n, 即 2, 4, 8, 16 ... 表示了 segments 数组的大小

        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        // segmentShift 默认是 32 - 4 = 28

        this.segmentShift = 32 - sshift;
        // segmentMask 默认是 15 即 0000 0000 0000 1111

        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // 创建 segments and segments[0]

        Segment<K,V> s0 =

                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                        (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]

        this.segments = ss;
    }

构造完成,如下图所示

编辑


可以看到 ConcurrentHashMap 没有实现懒惰初始化,空间占用不友好

其中 this.segmentShift 和 this.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment

例如,根据某一 hash 值求 segment 位置,先将高位向低位移动 this.segmentShift 位

编辑

结果再与 this.segmentMask 做位于运算,最终得到 1010 即下标为 10 的 segment

编辑

 put 流程

    public V put(K key, V value) {
        Segment<K, V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        // 计算出 segment 下标

        int j = (hash >>> segmentShift) & segmentMask;

        // 获得 segment 对象, 判断是否为 null, 是则创建该 segment

        if ((s = (Segment<K, V>) UNSAFE.getObject
                (segments, (j << SSHIFT) + SBASE)) == null) {
            // 这时不能确定是否真的为 null, 因为其它线程也发现该 segment 为 null,

            // 因此在 ensureSegment 里用 cas 方式保证该 segment 安全性

            s = ensureSegment(j);
        }
        // 进入 segment 的put 流程

        return s.put(key, hash, value, false);
    }

segment 继承了可重入锁(ReentrantLock),它的 put 方法为

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        // 尝试加锁
        HashEntry<K, V> node = tryLock() ? null :
                // 如果不成功, 进入 scanAndLockForPut 流程
                // 如果是多核 cpu 最多 tryLock 64 次, 进入 lock 流程
                // 在尝试期间, 还可以顺便看该节点在链表中有没有, 如果没有顺便创建出来
                scanAndLockForPut(key, hash, value);
        // 执行到这里 segment 已经被成功加锁, 可以安全执行
        V oldValue;
        try {
            HashEntry<K, V>[] tab = table;
            int index = (tab.length - 1) & hash;
            HashEntry<K, V> first = entryAt(tab, index);
            for (HashEntry<K, V> e = first; ; ) {
                if (e != null) {
                    // 更新
                    K k;
                    if ((k = e.key) == key ||

                            (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                } else {
                    // 新增
                    // 1) 之前等待锁时, node 已经被创建, next 指向链表头
                    if (node != null)
                        node.setNext(first);
                    else
                        // 2) 创建新 node
                        node = new HashEntry<K, V>(hash, key, value, first);
                    int c = count + 1;
                    // 3) 扩容
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        // 将 node 作为链表头
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }

rehash 流程

发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全

    private void rehash(HashEntry<K,V> node) {
        HashEntry<K,V>[] oldTable = table;
        int oldCapacity = oldTable.length;
        int newCapacity = oldCapacity << 1;
        threshold = (int)(newCapacity * loadFactor);
        HashEntry<K,V>[] newTable =

                (HashEntry<K,V>[]) new HashEntry[newCapacity];
        int sizeMask = newCapacity - 1;
        for (int i = 0; i < oldCapacity ; i++) {
            HashEntry<K,V> e = oldTable[i];
            if (e != null) {
                HashEntry<K,V> next = e.next;
                int idx = e.hash & sizeMask;
                if (next == null) // Single node on list

                    newTable[idx] = e;
                else { // Reuse consecutive sequence at same slot

                    HashEntry<K,V> lastRun = e;
                    int lastIdx = idx;
                    // 过一遍链表, 尽可能把 rehash 后 idx 不变的节点重用 

                    for (HashEntry<K,V> last = next;
                         last != null;
                         last = last.next) {
                        int k = last.hash & sizeMask;
                        if (k != lastIdx) {
                            lastIdx = k;
                            lastRun = last;
                        }
                    }
                    newTable[lastIdx] = lastRun;
                    // 剩余节点需要新建

                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        V v = p.value;
                        int h = p.hash;
                        int k = h & sizeMask;
                        HashEntry<K,V> n = newTable[k];
                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                    }
                }
            }
        }
        // 扩容完成, 才加入新的节点

        int nodeIndex = node.hash & sizeMask; // add the new node

        node.setNext(newTable[nodeIndex]);
        newTable[nodeIndex] = node;

        // 替换为新的 HashEntry table

        table = newTable;
    }

get 流程

get 时并未加锁,用了 UNSAFE 方法保证了可见性,扩容过程中,get 先发生就从旧表取内容,get 后发生就从新 表取内容

 public V get(Object key) {
        Segment<K, V> s; // manually integrate access methods to reduce overhead

        HashEntry<K, V>[] tab;
        int h = hash(key);
        // u 为 segment 对象在数组中的偏移量

        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        // s 即为 segment

        if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null &&

                (tab = s.table) != null) {
            for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile

                    (tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

size 计算流程

  • 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回
  • 如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回 
    public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits

        long sum; // sum of modCounts

        long last = 0L; // previous sum

        int retries = -1; // first iteration isn't retry

        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    // 超过重试次数, 需要创建所有 segment 并加锁

                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation

                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。