原文地址:Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap

作者: winterbe



欢迎来到由我编写的 Java 8 并发编程系列的第三部分。本篇的内容为并发 API 中的两个重要部分:原子变量和 Concurrent Map 相关类。得益于 Java 8 的 lambda 表达式和函数式编程,这两者在使用的便捷性上有较大提升。我会通过简单易懂的示例来解释这些新特性,享受这段学习之旅吧!

为了简化篇幅,文章中的示例代码使用了由我定义的两个辅助方法: sleep(seconds)stop(executor), 具体代码可在 github 查看。

AtomicInteger

用于执行原子性操作的类位于 java.concurrent.atomic 包下。原子性的操作指的是在不使用 synchronized 关键字和锁的情况下,代码仍能在多个线程并行下正确地运行。

在 Java 内部,原子类都重度使用了 compare-and-swap (CAS) 技术,这是由现代 CPU 提供支持的原子性指令,通常比使用锁进行同步的方式要快得多。因此诸如单个变量被并发地修改的简单场景,我推荐使用原子类而不是锁。

我们以原子类中的 AtomicInteger 为例:

1
2
3
4
5
6
7
8
9
10
AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
.forEach(i -> executor.submit(atomicInt::incrementAndGet));

stop(executor);

System.out.println(atomicInt.get()); // => 1000

使用 AtomicInteger 代替 Integer 后,我们可以在不使用同步访问的方式就能线程安全地对整数进行累加。incrementAndGet() 是原子操作,因此可以被多个线程同时调用。

AtomicInteger 支持多种原子操作。updateAndGet() 接收一个 lambda 表达式,用于对该整数进行任意的算术运算:

(译者注:updateAndGet() 接收一个应用于当前值的一元操作数,示例中的 lambda 表达式为:对当前值 n, 计算 n + 2 后作为结果返回并更新当前值,等价于 x = x + 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
.forEach(i -> {
Runnable task = () ->
atomicInt.updateAndGet(n -> n + 2);
executor.submit(task);
});

stop(executor);

System.out.println(atomicInt.get()); // => 2000

accumulateAndGet() 接收的 lambda 表达式类型为 IntBinaryOperator. 以下示例将 0 到 1000 的结果以并发的方式进行累加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
.forEach(i -> {
Runnable task = () ->
atomicInt.accumulateAndGet(i, (n, m) -> n + m);
executor.submit(task);
});

stop(executor);

System.out.println(atomicInt.get()); // => 499500

译者注:该例子等价于

1
2
3
4
5
int prev = 0;
for (int i = 0; i < 1000; i++) {
prev = prev + i;
}
System.out.println(prev); // 499500

其他较常用的原子类为 AtomicBoolean, AtomicLong, AtomicReference

LongAdder

LongAdder 作为 AtomicLong 的另一个选择,可用于对一个数字进行累加。

1
2
3
4
5
6
7
8
ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
.forEach(i -> executor.submit(adder::increment));

stop(executor);

System.out.println(adder.sumThenReset()); // => 1000

LongAdder 和其他与数值相关的原子类一样提供了 add()increment() 方法,而且都是线程安全的。与将计算汇总到单个结果的行为不同,LongAdder 通过内部维护的一系列变量来减少高并发下的线程之间的竞争。实际的结果可通过调用 sum()sumThenReset() 获取。

在变量会被多个线程频繁地修改的场景下,该类比相应的其他原子类更适合使用。这种场景通常是收集统计信息,如在 web server 上统计请求数量。LongAdder 的缺点是更高的内存占用,原因是需要维护内部的一系列变量。

LongAccumulator

LongAccumulatorLongAdder 的更泛用版本。与后者只用来执行加法操作不同,LongAccumulator 的构造需要一个类型为 LongBinaryOperator 的 lambda 表达式:

1
2
3
4
5
6
7
8
9
10
11
LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10)
.forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

stop(executor);

System.out.println(accumulator.getThenReset()); // => 2539

我们以函数 2 * x + y 和初始值 1 作为参数创建了一个 LongAccumulator 对象。每次调用 accumulate(i) 时,accumulator 的当前值和遍历索引 i 会作为参数传递给 lambda 表达式进行计算。

LongAccumulatorLongAdder 一样,通过内部变量减少线程之间的竞争。

ConcurrentMap

ConcurrentMap 接口扩展了 Map 接口并定义了最有用的并发集合框架的类型之一。Java 8 通过向该接口增加新方法来引入函数式编程。接下来的几个示例基于以下 map 对象进行演示:

1
2
3
4
5
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");

forEach() 方法接收一个类型为 BiConsumer 的 lambda 表达式,以 map 对象的 key 和 value 作为参数。该方法可代替普通的 for-each 循环,在当前线程对 ConcurrentMap 对象的词条进行迭代。

1
map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));

putIfAbsent() 方法仅当该 key 不存在时才会将 value 放入 map 中。ConcurrentHashMap 对该方法的实现与 put() 一样都是线程安全的,因此不同的线程调用该方法并不需要进行同步:

1
2
String value = map.putIfAbsent("c3", "p1");
System.out.println(value); // p0

getOrDefault() 方法的作用与 put() 相似,并在 key 不存在时返回默认值:

1
2
String value = map.getOrDefault("hi", "there");
System.out.println(value); // there

replaceAll() 接收一个类型为 BiFunction 的 lambda 表达式。函数 BiFunction 接收两个参数并返回一个结果。在这个例子中,该函数以 map 单个词条的 key 和 value 为参数,计算并返回一个新的 value。

1
2
map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
System.out.println(map.get("r2")); // d3

compute() 方法接收一个具体的 key 以及用于计算新 value 值的函数作为参数,因此我们可以只对 map 中的一个词条进行转换,而不是将函数应用于所有词条。

1
2
map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo")); // barbar

compute() 方法有两个变体:computeIfAbsent()computeIfPresent(). 作为这两个方法的参数之一的函数,只会在对应的 key 不存在 (computeIfAbsent) 或 key 存在 (computeIfPresent) 时被调用。

最后,merge() 方法可用于将 map 中特定词条的 value 与新的 value 进行整合。merge() 方法接收三个参数:一个特定的 key, 一个待整合的新 value, 以及用于将当前 value 与新 value 进行整合的函数:

1
2
map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal);
System.out.println(map.get("foo")); // boo was foo

ConcurrentHashMap

ConcurrentHashMap 作为 ConcurrentMap 接口最重要的实现类,不仅包括以上的方法,还通过其他新方法来增强其对于并发操作的处理能力。

Java 8 的并行流 (parallel stream) 使用的是一个特殊的 ForkJoinPool 对象,可通过 ForkJoinPool.commonPool() 对其进行访问。该线程池的并行数默认情况下由宿主的 CPU 核心数决定,例如我的机器上有 4 个核心,因此对应的并行数为 3:

1
System.out.println(ForkJoinPool.getCommonPoolParallelism());  // 3

该值可通过一下 JVM 参数指定:

1
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

我们使用之前的 map 对象来进行后续的演示,但对象的声明改为 ConcurrentHashMap 实现类而不是之前的 ConcurrentMap 接口,以此来使用属于该类的方法:

1
2
3
4
5
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");

Java 8 引入了三种并发操作:forEach, searchreduce. 每种方法都有四种形式,可以接收一个以 key, value, entry 和 key-value 对为参数的函数。

这些方法都有一个共同的首参数 parallelismThreshold, 该参数定义了方法以并发方式执行的阈值。举例来说,当阈值设定为 500 而 map 的词条数量只有 499 而没有超出阈值,此时方法就会以单线程顺序的方式执行。在后续示例中我们将阈值设为 1 确保方法以并行的方式执行并观察其运行效果。

ForEach

forEach() 方法能够对 map 的词条进行迭代并调用以当前 key-value 对为参数的 BiConsumer 函数。为了观察 forEach() 方法的执行过程,我们将相关的线程名打印到控制台。注意此处 ForkJoinPool 线程池的大小为 3.

1
2
3
4
5
6
7
8
map.forEach(1, (key, value) ->
System.out.printf("key: %s; value: %s; thread: %s\n",
key, value, Thread.currentThread().getName()));

// key: r2; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
// key: c3; value: p0; thread: main

Search

search() 方法接收一个以当前 key-value 对为参数的 BiFunction 函数,并返回匹配的搜索结果或在不匹配时返回 null. 一旦返回了一个非空对象,则后续的搜索将会被中止。必须注意的是 ConcurrentHashMap 为无序集合,因此搜索的结果不能依赖于 map 词条的顺序。如果 map 中有多个词条满足搜索结果,那么 search 返回的结果是不固定的。

1
2
3
4
5
6
7
8
9
10
11
12
13
String result = map.search(1, (key, value) -> {
System.out.println(Thread.currentThread().getName());
if ("foo".equals(key)) {
return value;
}
return null;
});
System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-2
// main
// ForkJoinPool.commonPool-worker-3
// Result: bar

另一个在 map 中搜索单个结果的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
String result = map.searchValues(1, value -> {
System.out.println(Thread.currentThread().getName());
if (value.length() > 3) {
return value;
}
return null;
});

System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-2
// main
// main
// ForkJoinPool.commonPool-worker-1
// Result: solo

Reduce

reduce() 方法在之前讲解 Java 8 Stream 的文章中已经介绍过。该方法接收两个类型为 BiFunction 的函数,第一个函数将当前 key-value 对转换为相应类型的对象,第二个函数则将各个词条转换后的对象最终合并成单个对象,并忽略掉其中为 null 的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
String result = map.reduce(1,
(key, value) -> {
System.out.println("Transform: " + Thread.currentThread().getName());
return key + "=" + value;
},
(s1, s2) -> {
System.out.println("Reduce: " + Thread.currentThread().getName());
return s1 + ", " + s2;
});

System.out.println("Result: " + result);

// Transform: ForkJoinPool.commonPool-worker-2
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-3
// Reduce: ForkJoinPool.commonPool-worker-3
// Transform: main
// Reduce: main
// Reduce: main
// Result: r2=d2, c3=p0, han=solo, foo=bar

希望你喜欢这部分的内容。所有示例代码均已托管到 github, 欢迎你将仓库的代码 fork 到本地自行体验。

如果你支持我的工作,欢迎分享这篇文章给你的朋友。你也可在 Twitter 上关注我发表的关于 Java 和编程相关的内容。