原文地址:Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap
作者: winterbe
欢迎来到由我编写的 Java 8 并发编程系列的第三部分。本篇的内容为并发 API 中的两个重要部分:原子变量和 Concurrent Map 相关类。得益于 Java 8 的 lambda 表达式和函数式编程,这两者在使用的便捷性上有较大提升。我会通过简单易懂的示例来解释这些新特性,享受这段学习之旅吧!
- 第一部分:Thread 与 Executors
- 第二部分:同步与锁
- 第三部分:原子变量与 ConcurrentMap
为了简化篇幅,文章中的示例代码使用了由我定义的两个辅助方法: sleep(seconds)
和 stop(executor)
, 具体代码可在 github 查看。
AtomicInteger
用于执行原子性操作的类位于 java.concurrent.atomic
包下。原子性的操作指的是在不使用 synchronized
关键字和锁的情况下,代码仍能在多个线程并行下正确地运行。
在 Java 内部,原子类都重度使用了 compare-and-swap (CAS) 技术,这是由现代 CPU 提供支持的原子性指令,通常比使用锁进行同步的方式要快得多。因此诸如单个变量被并发地修改的简单场景,我推荐使用原子类而不是锁。
我们以原子类中的 AtomicInteger
为例:
1 | AtomicInteger atomicInt = new AtomicInteger(0); |
使用 AtomicInteger
代替 Integer
后,我们可以在不使用同步访问的方式就能线程安全地对整数进行累加。incrementAndGet()
是原子操作,因此可以被多个线程同时调用。
AtomicInteger
支持多种原子操作。updateAndGet()
接收一个 lambda 表达式,用于对该整数进行任意的算术运算:
(译者注:updateAndGet()
接收一个应用于当前值的一元操作数,示例中的 lambda 表达式为:对当前值 n, 计算 n + 2 后作为结果返回并更新当前值,等价于 x = x + 2
)
1 | AtomicInteger atomicInt = new AtomicInteger(0); |
accumulateAndGet()
接收的 lambda 表达式类型为 IntBinaryOperator
. 以下示例将 0 到 1000 的结果以并发的方式进行累加:
1 | AtomicInteger atomicInt = new AtomicInteger(0); |
译者注:该例子等价于
1 | int prev = 0; |
其他较常用的原子类为 AtomicBoolean
, AtomicLong, AtomicReference
LongAdder
LongAdder
作为 AtomicLong
的另一个选择,可用于对一个数字进行累加。
1 | ExecutorService executor = Executors.newFixedThreadPool(2); |
LongAdder
和其他与数值相关的原子类一样提供了 add()
和 increment()
方法,而且都是线程安全的。与将计算汇总到单个结果的行为不同,LongAdder
通过内部维护的一系列变量来减少高并发下的线程之间的竞争。实际的结果可通过调用 sum()
或 sumThenReset()
获取。
在变量会被多个线程频繁地修改的场景下,该类比相应的其他原子类更适合使用。这种场景通常是收集统计信息,如在 web server 上统计请求数量。LongAdder
的缺点是更高的内存占用,原因是需要维护内部的一系列变量。
LongAccumulator
LongAccumulator
是 LongAdder
的更泛用版本。与后者只用来执行加法操作不同,LongAccumulator
的构造需要一个类型为 LongBinaryOperator
的 lambda 表达式:
1 | LongBinaryOperator op = (x, y) -> 2 * x + y; |
我们以函数 2 * x + y
和初始值 1 作为参数创建了一个 LongAccumulator
对象。每次调用 accumulate(i)
时,accumulator 的当前值和遍历索引 i 会作为参数传递给 lambda 表达式进行计算。
LongAccumulator
与 LongAdder
一样,通过内部变量减少线程之间的竞争。
ConcurrentMap
ConcurrentMap
接口扩展了 Map
接口并定义了最有用的并发集合框架的类型之一。Java 8 通过向该接口增加新方法来引入函数式编程。接下来的几个示例基于以下 map 对象进行演示:
1 | ConcurrentMap<String, String> map = new ConcurrentHashMap<>(); |
forEach()
方法接收一个类型为 BiConsumer
的 lambda 表达式,以 map 对象的 key 和 value 作为参数。该方法可代替普通的 for-each 循环,在当前线程对 ConcurrentMap 对象的词条进行迭代。
1 | map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value)); |
putIfAbsent()
方法仅当该 key 不存在时才会将 value 放入 map 中。ConcurrentHashMap
对该方法的实现与 put()
一样都是线程安全的,因此不同的线程调用该方法并不需要进行同步:
1 | String value = map.putIfAbsent("c3", "p1"); |
getOrDefault()
方法的作用与 put()
相似,并在 key 不存在时返回默认值:
1 | String value = map.getOrDefault("hi", "there"); |
replaceAll()
接收一个类型为 BiFunction
的 lambda 表达式。函数 BiFunction
接收两个参数并返回一个结果。在这个例子中,该函数以 map 单个词条的 key 和 value 为参数,计算并返回一个新的 value。
1 | map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value); |
compute()
方法接收一个具体的 key 以及用于计算新 value 值的函数作为参数,因此我们可以只对 map 中的一个词条进行转换,而不是将函数应用于所有词条。
1 | map.compute("foo", (key, value) -> value + value); |
compute()
方法有两个变体:computeIfAbsent()
和 computeIfPresent()
. 作为这两个方法的参数之一的函数,只会在对应的 key 不存在 (computeIfAbsent) 或 key 存在 (computeIfPresent) 时被调用。
最后,merge()
方法可用于将 map 中特定词条的 value 与新的 value 进行整合。merge()
方法接收三个参数:一个特定的 key, 一个待整合的新 value, 以及用于将当前 value 与新 value 进行整合的函数:
1 | map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal); |
ConcurrentHashMap
ConcurrentHashMap
作为 ConcurrentMap
接口最重要的实现类,不仅包括以上的方法,还通过其他新方法来增强其对于并发操作的处理能力。
Java 8 的并行流 (parallel stream) 使用的是一个特殊的 ForkJoinPool
对象,可通过 ForkJoinPool.commonPool()
对其进行访问。该线程池的并行数默认情况下由宿主的 CPU 核心数决定,例如我的机器上有 4 个核心,因此对应的并行数为 3:
1 | System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3 |
该值可通过一下 JVM 参数指定:
1 | -Djava.util.concurrent.ForkJoinPool.common.parallelism=5 |
我们使用之前的 map 对象来进行后续的演示,但对象的声明改为 ConcurrentHashMap
实现类而不是之前的 ConcurrentMap
接口,以此来使用属于该类的方法:
1 | ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); |
Java 8 引入了三种并发操作:forEach
, search
和 reduce
. 每种方法都有四种形式,可以接收一个以 key, value, entry 和 key-value 对为参数的函数。
这些方法都有一个共同的首参数 parallelismThreshold
, 该参数定义了方法以并发方式执行的阈值。举例来说,当阈值设定为 500 而 map 的词条数量只有 499 而没有超出阈值,此时方法就会以单线程顺序的方式执行。在后续示例中我们将阈值设为 1 确保方法以并行的方式执行并观察其运行效果。
ForEach
forEach()
方法能够对 map 的词条进行迭代并调用以当前 key-value 对为参数的 BiConsumer
函数。为了观察 forEach()
方法的执行过程,我们将相关的线程名打印到控制台。注意此处 ForkJoinPool
线程池的大小为 3.
1 | map.forEach(1, (key, value) -> |
Search
search()
方法接收一个以当前 key-value 对为参数的 BiFunction
函数,并返回匹配的搜索结果或在不匹配时返回 null. 一旦返回了一个非空对象,则后续的搜索将会被中止。必须注意的是 ConcurrentHashMap
为无序集合,因此搜索的结果不能依赖于 map 词条的顺序。如果 map 中有多个词条满足搜索结果,那么 search
返回的结果是不固定的。
1 | String result = map.search(1, (key, value) -> { |
另一个在 map 中搜索单个结果的方法:
1 | String result = map.searchValues(1, value -> { |
Reduce
reduce()
方法在之前讲解 Java 8 Stream 的文章中已经介绍过。该方法接收两个类型为 BiFunction
的函数,第一个函数将当前 key-value 对转换为相应类型的对象,第二个函数则将各个词条转换后的对象最终合并成单个对象,并忽略掉其中为 null 的值。
1 | String result = map.reduce(1, |
希望你喜欢这部分的内容。所有示例代码均已托管到 github, 欢迎你将仓库的代码 fork 到本地自行体验。
如果你支持我的工作,欢迎分享这篇文章给你的朋友。你也可在 Twitter 上关注我发表的关于 Java 和编程相关的内容。
- 第一部分:Thread 与 Executors
- 第二部分:同步与锁
- 第三部分:原子变量与 ConcurrentMap