原文地址:Java 8 Stream Tutorial

作者: winterbe



本文以示例代码为主深入浅出地讲解 Java 8 Stream。我在初次阅读 Stream API 时对这个名字感到困惑,因为这听起来与 Java I/O 中的 InputStreamOutputStream 很相似。但实际上 Java 8 Stream 是一个完全不一样的东西。Stream 就是 Monad,在将函数式编程引入 Java 的过程中扮演了重要角色:

在函数式编程中,monad 是一种代表按一系列步骤进行计算的结构。一个带有 monad 结构的类型定义了它对链式操作,或该类型的嵌套函数的含义。

本文会教你如何使用 Java 8 Stream 以及各种流式操作。你会学到操作的处理顺序以及这种流式操作的顺序是如何影响到运行时的性能。更为强力的流式操作 reducecollectflatMap 也会进行详细讲解。文章的结尾部分会深入一探并行 stream。

如果你还未熟悉 Java 8 的 lambda 表达式、函数式接口和方法引用,在阅读本文之前可以先行阅读我的 Java 8 基本教程

Stream 如何工作

Stream 代表了一系列的元素并且支持不同类型的操作在这些元素上进行计算:

1
2
3
4
5
6
7
8
9
10
11
12
List<String> myList =
Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
.stream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);

// C1
// C2

流式操作有中间型和终点型两种。中间型操作返回一个 stream 对象,因此我们可以在无需使用分号的前提下将多个中间型操作串联起来。终点型操作要么没有返回值,要么返回一个非 stream 结果。在上面的示例中,filtermapsorted 均为中间型操作,而 forEach 则为终点型操作。如果需要查看全部流式操作,可查阅 Javadoc 。以上示例中的一连串流式操作也被称为操作流水线(operation pipeline)。

多数流式操作都需要接收 lambda 表达式作为参数,这是一种准确定义操作行为的函数式接口。大部分这类操作必须同时为 非干涉(non-interfering)和无状态(stateless)。这是什么意思?

一个 非干涉 的函数不会对 stream 的数据源进行修改。在以上的示例中,没有任何 lambda 表达式以添加或移除集合中的元素的方式来修改 myList

一个 无状态 的函数的行为是明确的。在以上的示例中,没有任何 lambda 表达式是依赖于外部环境中可能会在运行时改变的变量或状态。

不同类型的 stream

Stream 可以创建自多种数据源,其中以集合框架为主。各种 List 与 Set 均支持新方法 stream()parallelStream() 以创建串行或并行的 stream 对象。并行流(parallel stream)允许以多个线程进行操作,这部分会在稍后的章节中讲解。让我们先关注串行流(sequential stream):

1
2
3
4
Arrays.asList("a1", "a2", "a3")
.stream()
.findFirst()
.ifPresent(System.out::println); // a1

在一个对象列表中调用方法 stream() 会返回一个常规的对象 stream。但我们无需为了使用 stream 而特意先创建集合对象,正如以下示例:

1
2
3
Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out::println); // a1

使用 Stream.of() 即可从一堆对象引用中创建 stream 对象。

除了常规的对象 stream 之外,Java 8 也提供了提供了特殊的 stream 以操作基础数据类型的 intlongdouble。你可能已经猜到了,它们分别是 IntStreamLongStreamDoubleStream

使用 IntStream.range() 可以替代常规的 for 循环:

1
2
3
4
5
6
IntStream.range(1, 4)
.forEach(System.out::println);

// 1
// 2
// 3

所有基础类型的 stream 的工作方式与常规的对象 stream 大致相同,区别在于:基础类型 stream 使用特定的 lambda 表达式,如 IntFunction 而不是 Function,或者使用 IntPredicate 而不是 Predicate。另一个区别是基础类型支持额外的终点型数据聚合操作 sum()average()

1
2
3
4
Arrays.stream(new int[] {1, 2, 3})
.map(n -> 2 * n + 1)
.average()
.ifPresent(System.out::println); // 5.0

有时候,将常规的对象 stream 转换为基础类型 stream 是很有用的,反之亦然。为此,对象 stream 支持特殊的映射操作 mapToInt()mapToLong() 以及 mapToDouble():

1
2
3
4
5
Stream.of("a1", "a2", "a3")
.map(s -> s.substring(1))
.mapToInt(Integer::parseInt)
.max()
.ifPresent(System.out::println); // 3

基础类型 stream 可以通过 mapToObj() 转换为对象 stream:

1
2
3
4
5
6
7
IntStream.range(1, 4)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);

// a1
// a2
// a3

以下是一个组合示例:double stream 首先映射为 int stream,然后映射成类型为 string 的对象 stream。

1
2
3
4
5
6
7
8
Stream.of(1.0, 2.0, 3.0)
.mapToInt(Double::intValue)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);

// a1
// a2
// a3

流操作处理的顺序

现在我们已经学习了如何创建和使用不同类型的 stream,是时候深入了解 stream 操作在后台是如何进行。

中间型操作的一个重要特性是懒惰性。看看这段没有终点型操作的示例代码:

1
2
3
4
5
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
});

当这段代码被执行时,console 中没有输出内容,这是因为中间型操作的执行只会在有终点型操作的前提下进行。

让我们加入一个终点型操作 forEach 来扩展以上示例:

1
2
3
4
5
6
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
})
.forEach(s -> System.out.println("forEach: " + s));

执行这段代码,console 就会按我们所想地输出内容:

1
2
3
4
5
6
7
8
9
10
filter:  d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c

结果的顺序可能会让人吃惊。一个天真的想法是,流式操作会按照一个接一个的顺序以水平的方式作用于 stream 中的所有元素。但实际上的执行顺序是 stream 中的每个元素在操作链中垂直地移动。第一个字符串 “d2” 通过了 filter 后紧接着执行 forEach,然后才轮到字符串 “a2” 被处理。

这个行为可以减少对每个元素执行的实际操作次数,正如我们在以下代码所看到的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.anyMatch(s -> {
System.out.println("anyMatch: " + s);
return s.startsWith("A");
});

// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2

anyMatch 操作中,一旦输入的元素经过 predicate 判断为 true 时会立刻将结果返回。这个 predicate 对于第二个传入的元素 “A2” 会返回 true。由于流式链的垂直执行特性,在这个示例中 map 操作仅需执行两次。由此可见,map 操作无需对 stream 中的所有元素进行映射,从而使执行的次数尽可能地减少。

译者注:此处的水平执行顺序和垂直执行顺序之间的区别换句话说就是,前者是所有元素均在同一个操作处理结束后再集体进行下一个操作,而后者指的是 stream 中的各个元素逐一按操作链的顺序执行。

处理的顺序为何如此重要

下一个示例由两个中间型操作 mapfilter 以及终点型操作 forEach 组成。让我们再探究一下这些操作是如何被执行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("A");
})
.forEach(s -> System.out.println("forEach: " + s));

// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C

你可能已经猜到了,mapfilter 均根据集合中的元素数量被执行了五次,而 forEach 仅被执行了一次。

通过改变操作的执行顺序,将 filter 移至操作链的开头,我们可以极大地减少实际的操作次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));

// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1
// filter: b3
// filter: c

现在 map 只被执行了一次,因此该操作流水线在应对大量输入元素时的性能将会大幅提高。当使用复杂的方法链时要记住这一点。

让我们通过添加一个额外的操作 sorted 来扩展以上示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));

排序是一种特殊的中间型操作。它是所谓的带状态的操作,原因是为了对集合中的元素进行排序,在排序过程中你必须维持它的状态。

执行这个示例会得到以下 console 输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sort:    a2; d2
sort: b1; a2
sort: b1; d2
sort: b1; a2
sort: b3; b1
sort: b3; d2
sort: c; b3
sort: c; d2
filter: a2
map: a2
forEach: A2
filter: b1
filter: b3
filter: c
filter: d2

首先,排序操作会作用于整个输入集合。换句话说 sorted 是水平式地执行。因此在这个示例中对各个输入元素的多次组合比较导致 sorted 被执行了 8 次。

(译者注:由于需要维持状态,即每次处理的结果均依赖上一次的处理结果,因此必须等整个操作完成后才能进行下一步操作,而不是“垂直”地、逐个元素地执行。)

让我们再次通过调整处理顺序来优化代码的性能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));

// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2

在这个例子中 sorted 甚至没有被执行,这是因为 filter 已经将输入集合的元素减至只有一个元素。因此该操作流水线的性能在面对海量输入元素时有极大的提升。

可重用的 stream

在 Java 8 中 stream 对象不能被重用。一旦你调用了任何终点型操作,stream 就会被关闭:

1
2
3
4
5
6
Stream<String> stream =
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true); // ok
stream.noneMatch(s -> true); // exception

在同一 stream 中调用 anyMatch 之后再调用 noneMatch 会导致如下异常:

1
2
3
4
5
java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)

为了克服这个限制,我们必须为每个想要执行的终点型操作创建一个新的流式操作链。例如,我们可以创建一个已设置所有中间型操作的 stream 对象供应者(stream supplier),通过它再创建新的 stream 对象:

1
2
3
4
5
6
Supplier<Stream<String>> streamSupplier =
() -> Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true); // ok
streamSupplier.get().noneMatch(s -> true); // ok

每次调用 get() 都会构造新的 stream 对象,因此我们能安全地调用各种终点型操作。

高级操作

Stream 支持大量不同的操作。我们已经学习了最重要的操作如 filtermap。至于其他比较简单的操作,我把它们留给你自行探索(参考 Stream Javadoc)。现在我们来深入了解更为复杂的流式操作:collectflatMapreduce

这个章节的大部分示例代码都会基于以下 persons 列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Person {
String name;
int age;

Person(String name, int age) {
this.name = name;
this.age = age;
}

@Override
public String toString() {
return name;
}
}

List<Person> persons =
Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));

Collect

collect 是一个非常有用的终点型操作,它能将 stream 中的元素转换为另一种类型的结果,例如一个 ListSetMap 的对象。collect 接收一个 collector 对象,该对象由四个不同的操作组成:supplier(提供者)、accumulator(累加器)、combiner(组合器)以及一个 finisher(完成者)。这听起来非常复杂,但好消息是 Java 8 通过 Collectors 类提供了不同的内建 collector 对象。因此在大多数情况下你都无需自行实现自己的 collector。

让我们从一个非常普通的情景开始:

1
2
3
4
5
6
7
List<Person> filtered =
persons
.stream()
.filter(p -> p.name.startsWith("P"))
.collect(Collectors.toList());

System.out.println(filtered); // [Peter, Pamela]

如你所见,将 stream 中的元素构建成一个列表是很简单的操作。如果需要的是一个 set 对象而不是 list 对象,仅需改为使用 Collectors.toSet()

下一个示例,我们把所有人按年龄分组:

1
2
3
4
5
6
7
8
9
10
Map<Integer, List<Person>> personsByAge = persons
.stream()
.collect(Collectors.groupingBy(p -> p.age));

personsByAge
.forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

Collectors 的功能非常齐全。你也能基于 steam 的元素创建它们的聚合信息,例如计算所有人的平均年龄:

1
2
3
4
5
Double averageAge = persons
.stream()
.collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge); // 19.0

如果你对其他常用的统计数据感兴趣,汇总(summarizing)collector 会返回一个特殊的内建汇总统计对象。因此我们很简单就可以得到 minmax 和所有人的年龄的算术平均数 average 以及 sumcount

1
2
3
4
5
6
7
IntSummaryStatistics ageSummary =
persons
.stream()
.collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

下一个示例中,我们把所有人的名字信息连接成一个字符串:

1
2
3
4
5
6
7
8
String phrase = persons
.stream()
.filter(p -> p.age >= 18)
.map(p -> p.name)
.collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

连接(join) collector 接收一个分隔符以及一个可选的前缀和后缀。

为了将 stream 中的元素转化为一个 map 对象,我们必须定义 key 和 value 该如何进行匹配。记住被匹配的 key 必须唯一,否则会抛出 IllegalStateException。你可以传递一个用于合并 value 的 function 作为额外的参数从而避免抛出异常:

1
2
3
4
5
6
7
8
9
Map<Integer, String> map = persons
.stream()
.collect(Collectors.toMap(
p -> p.age,
p -> p.name,
(name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

我们已经学习了一些强力的内建 collector,是时候尝试去创建我们自己的特殊 collector 了。我们想要将 stream 中所有人的名字信息转换成一个字符串,该字符串由大写的英文名以及用于分割的管道符号 | 组成。为了达成目标,我们通过 Collector.of() 创建一个新的 collector。同时我们还需要传入 collector 的四个组成部分:supplier(供应者)、accumulator(累加器)、combiner(组合器)以及 finisher(完成者)。

1
2
3
4
5
6
7
8
9
10
11
12
Collector<Person, StringJoiner, String> personNameCollector =
Collector.of(
() -> new StringJoiner(" | "), // supplier
(j, p) -> j.add(p.name.toUpperCase()), // accumulator
(j1, j2) -> j1.merge(j2), // combiner
StringJoiner::toString); // finisher

String names = persons
.stream()
.collect(personNameCollector);

System.out.println(names); // MAX | PETER | PAMELA | DAVID

由于 Java 中的 String 是不可变对象,我们需要一个辅助类如 StringJoiner 以便 collector 构造我们需要的字符串。supplier 用于初始化创建一个带有相应分隔符的 StringJoiner 对象。 accumulator 用于将每个人的大写名字加入到 StringJoiner 中。 combiner 知道何如将 StringJoiner 对象合二为一。最后一步中 finisher 从 StringJoiner 中构建我们所需的字符串。

译者注:

  1. 所谓的 accumulator、combiner 是参数名,指的是该函数/参数的作用,而不是指具体类型如某个叫 AccumulatorCombiner 的函数式接口。两者的类型既可以相同,也可以不同。
  2. combiner 用于并行流中将多个线程的处理结果合并,故而在串行流不会调用该操作,下面 Reduce 一节的结尾和并行流一节会有详细分析。

FlatMap

通过 map 操作,我们已经学习了如何将 stream 中的对象转换成另外一种类型。但如果我们想要把一个对象转换为多个别的对象或者根本不转换?这就需要 flatMap 来救场了。

(译者注: map 在英语中有映射的意思,所以流式操作 map 的作用是将当前 stream 的每个元素逐一映射为另一种类型的对象,如本文开头的示例代码中将 “a1” 等字符串逐一映射为相应的大写的字符串对象,映射前后是一一对应的关系)

FlatMap 将 stream 中的每个元素转换为其他对象的 stream。因此每个对象将会被转换为新的 stream 中的零个、一个或多个元素。最终这些转换后的 stream 的内容随后会被汇入至 flatMap 操作返回的 stream 对象中。

在了解 flatMap 的实际运用之前,我们需要一个合适的类型继承关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Foo {
String name;
List<Bar> bars = new ArrayList<>();

Foo(String name) {
this.name = name;
}
}

class Bar {
String name;

Bar(String name) {
this.name = name;
}
}

接着,我们运用 stream 的相关知识去初始化一些对象:

1
2
3
4
5
6
7
8
9
10
11
12
List<Foo> foos = new ArrayList<>();

// create foos
IntStream
.range(1, 4)
.forEach(i -> foos.add(new Foo("Foo" + i)));

// create bars
foos.forEach(f ->
IntStream
.range(1, 4)
.forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

现在我们拥有了一个带有三个 foo 对象的列表,每个 foo 对象又包含了三个 bar 对象。

FlatMap 接收一个函数(function),该函数必须返回一个包含若干对象的 stream。因此为了解析出每个 foo 对象中的所有 bar 对象,我们传入了相应的函数(function):

1
2
3
4
5
6
7
8
9
10
11
12
13
foos.stream()
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

如你所见,我们成功地将一个带有三个 foo 对象的 stream 转换为一个带有九个 bar 对象的 stream。

最终,上述的示例代码可以简化为一个含多步 stream 操作的流水线:

1
2
3
4
5
6
7
IntStream.range(1, 4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
.forEach(f.bars::add))
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));

FlatMap 也可在 Java 8 的 Optional 中使用。Optional 的 flatMap 操作返回一个其他类型的 optional 对象,因此可用于避免让人恶心的判空操作。

假如有如下的一个具有高度层级关系的结构:

1
2
3
4
5
6
7
8
9
10
11
class Outer {
Nested nested;
}

class Nested {
Inner inner;
}

class Inner {
String foo;
}

为了解析一个 outer 实例的内部字符串 foo,你必须加入多个判空操作以避免可能发生的 NullPointerExceptions

1
2
3
4
Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
System.out.println(outer.nested.inner.foo);
}

同样的行为可通过 optional 的 flatMap 操作完成:

1
2
3
4
5
Optional.of(new Outer())
.flatMap(o -> Optional.ofNullable(o.nested))
.flatMap(n -> Optional.ofNullable(n.inner))
.flatMap(i -> Optional.ofNullable(i.foo))
.ifPresent(System.out::println);

每次对 flatMap 的调用都会返回一个 optional 对象,它对目标对象进行了包装。若目标对象不存在则包装内容为 null。

Reduce

Reduce 操作将 stream 中的所有元素组合成单一结果。Java 8 支持三种不同的 reduce 操作。第一种是将 stream 中的多个元素缩减到只有一个。让我们来看看如何通过该操作得到人群中年龄最大的人:

1
2
3
4
persons
.stream()
.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
.ifPresent(System.out::println); // Pamela

reduce 方法接收一个累加器函数 BinaryOperator 。它实际上就是一个带有两个相同类型操作数的 BiFunction,在本例中这个类型就是 Person。BiFunction 跟 Function 一样,只不过是带了两个参数。示例中的函数比较两个人的年龄并返回年龄较大者,最终得出年龄最大的人。

(译者注:函数 Function<T, R> 为代表了参数 T 和返回值 R,BiFunction<T, U, R> 则为 Function 的双参数版本,BinaryOperator<T> 则是在 BiFunction 的基础上将两个参数 T&U 和返回值 R 的类型全部统一为 T)

第二种 reduce 方法接收一个标识值和一个 BinaryOperator 累加器。该方法可用于构建一个带有 stream 中所有人名字和年龄信息的 Person 对象:

1
2
3
4
5
6
7
8
9
10
11
Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

第三种 reduce 方法接收三个参数:一个标识值、一个类型为 BiFunction 的累加器以及一个类型为 BinaryOperator 的组合器。由于标识值的类型并没有限制为 Person 类型,我们可以使用该 reduce 操作得到所有人年龄的总和。

1
2
3
4
5
Integer ageSum = persons
.stream()
.reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum); // 76

如你所见该操作的结果为 76,但在后台里究竟发生了什么?让我们在示例代码中加入一些 debug 输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Integer ageSum = persons
.stream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

如你所见,accumulator 干了所有工作。它在首次执行时对标识值 0 和第一个人 Max 进行运算。在接下来的三步,sum 持续地对其他人的年龄进行累加,最终得到总年龄为 76

等等?combiner 从未被调用?对同一个 stream 以并行的方式执行的话将会揭示这个秘密:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Integer ageSum = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

以并行的方式执行该 stream 会导致完全不同的执行行为。现在 combiner 确实被调用了。由于 accumulator 是以并行的方式调用,因此需要组合器去将多个累加的结果组合起来。

让我们在下一章节中更进一步了解并行 stream。

并行流

Stream 能够以并行的方式执行,以此提高大量输入元素情况的运行时性能。并行流使用一个通用的 ForkJoinPool,它由 ForkJoinPool.commonPool() 方法提供。该线程池的大小最多为五个线程——依据物理 CPU 的核心数量而定:

1
2
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3

在我的机器上,通用线程池在初始化时默认并行数为 3。这个值可通过以下 JVM 参数来进行调整:

1
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

集合类支持 parallelStream() 方法来创建一个含有多个元素的并行流。相应地,你也可以在一个给定的 stream 中调用中间型操作 parallel() 将串行流转换为一个并行的副本。

为了简述并行流的行为,下面的示例会在标准输出中打印当前线程的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));

通过研究 debug 输出,我们能更好地理解线程实际上是如何执行流式操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
filter:  b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

如你所见,并行流使用了通用 ForkJoinPool 中所有可用的线程去执行流式操作。输出的内容可能在运行时有所不同,这是因为具体哪个线程执行什么任务是没有明确指定的。

让我们通过一个额外的流式操作 sort 来扩展这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));

这个结果起初看起来会有点奇怪:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

看起来 sort 只在主线程中串行地执行。但实际上,并行流中执行的 sort 操作在后台里使用了 Java 8 中的新方法 Arrays.parallelSort()。正如 Javadoc 中所说:这个方法会按串行或并行的方式运行,这取决于数组的长度。

如果指定数组的长度小于最小粒度,那么它会以适当的方法 Arrays.sort 来排序。

回到上一个章节 reduce 的例子。我们已经发现 combiner 仅在并行时被调用,在串行时不会被调用。让我们来看看有哪些线程参与其中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));

persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]\n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});

控制台的输出显示不管是 accumulator 还是 combiner 都是在所有可用的线程上并行。

1
2
3
4
5
6
7
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]

总的来说,并行流在处理大量输入元素时带来不错的性能提升。但需要记住的是一些并行流的操作如 reducecollect 需要额外的运算(结果的组合运算),这在串行操作中是不需要的。

进一步的,我们学习到所有的并行流操作都共享同一个 JVM 范围内的通用 ForkJoinPool。因此你大概会想要避免进行引发堵塞的缓慢的的流式操作,因为这会潜在地拖慢你的应用中重度依赖并行流的功能。

That‘s it

关于 Java 8 Stream 的编程指导到这里就结束了。如果你感兴趣想要学习更多关于 Java 8 Stream 的知识,我推荐你学习 Stream Javadoc 的文档。如果你想要学习底层的运行机制,你可能需要阅读 Martin Fowlers 关于 Collection Pipeline 的文章。

如果你也对 JavaScript 感兴趣,你可以看一看 Stream.js —— 一个 Java 8 Stream API 的 JavaScript 实现,或者我的文章 Java 8 Nashorn Tutorial

希望这篇教程对你有所帮助、你也享受阅读的过程。这篇教程中的所有示例都寄放在 Github 上。你可以 fork 下该仓库 或通过 Twitter 给我反馈。

Happy coding!