劳伦的奇妙冒险


客亦知夫水与月乎?

哀吾生之须臾,
羡长江之无穷。

Java 8 并发编程 (1/3):Thread 与 Executors

原文地址:Java 8 Concurrency Tutorial: Threads and Executors

作者: winterbe



欢迎来到由我编写的关于 Java 8 并发编程系列的第一部分。这个系列的文章将通过简单易懂的示例代码展示如何使用并发编程 API。在接下来的 15 分钟里,你将会学习到如何使用 thread, task 以及 executor service 以并行的方式运行你的代码。

Concurrency API 在 Java 5 引入,并在后续版本中逐渐得到增强。这篇文章中大部分概念与旧版 Java 是相通的,但是我的示例代码会重度使用 Java 8 的 lambda 表达式以及其他新特性。所以如果你对 lambda 表达式还不熟悉,我建议你先行阅读我的 Java 8 指南.

Thread 与 Runnable

现代操作系统都是通过进程线程来支持并发操作。进程就是相互之间独立运行的程序。例如当你启动一个 Java 程序,操作系统就会启动一个进程,并与其他程序同时运行。在每个进程中,我们可以使用多个线程去并发地运行代码,以此来高效使用全部 CPU 核心。

从 JDK 1.0 起,Java 就已经支持 线程 Thread. 在启动新的线程之前,你必须指定这个线程要执行什么代码,这部分代码通常被称为任务 task. 具体来说就是实现函数式接口 Runnable 并提供无参方法 run 的实现:

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");

由于 Runnable 是函数式接口,因此我们可以使用 Java 8 lambda 表达式在控制台打印当前线程的名称。示例代码中,在开始一个新的线程之前,我们先在主线程直接调用 runnable 的 run 方法。

程序运行的结果可能是这样:

Hello main
Hello Thread-0
Done!

也可能是这样:

Hello main
Done!
Hello Thread-0

由于这段代码以并发的方式运行,因此我们无法预测 runnable 的调用是在打印 ‘done’ 之前还是之后。由于并发编程执行顺序的不确定性,使得这在大型应用中成为了一项复杂的任务。

在后续的内容中我们通过设置线程的休眠时间来模拟需要长时间运行的任务。

Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();

当你运行上述代码时,你能感受到在第一次和第二次 print 操作之间有一秒的延迟。在需要使用时间单位的场合,TimeUnit 是一个非常有用的枚举类。当然你也可以选择使用 Thread.sleep(1000) 来达到相同的效果。

直接使用 Thread 类非常枯燥且容易出错,因此早在 2004 年发布的 Java 5 中已经推出了 Concurrency API. 该 API 位于 java.util.concurrent 包并包含了许多用于并发编程的类。伴随着各个新版 Java 的发布,并发 API 也一直被增强。当前的 Java 8 也推出了新的类和方法。

现在就来深入地了解一下并发 API 中最重要的部分之一:executor service.

Executors

在并发 API 中,我们通过一个更高层级的 ExecutorService 来代替对线程的直接操作。Executor 可以异步执行任务并且以池的形式来管理线程,因此我们无需手动创建线程。线程池会在后台复用线程并执行相应任务。因此我们可以在整个程序的生命周期内,在单个线程池中执行任意多的并发任务。

第一个线程示例改为 executor 的形式:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
});

// => Hello pool-1-thread-1

Executors 类提供了便捷的工厂方法来创建不同类型的 executor service. 在这个示例中,我们使用的线程池中线程的数量为 1。

程序运行的结果与之前的示例差不多,但是当你执行这段代码的时候,你会注意到一个非常重要的区别:Java 进程并没有停止运行!这是因为 executor 必须被显式地停止运行,否则他们将会持续的监听和等待新的任务。

ExecutorService 类提供了两个方法用于停止运行:shutdown() 会等待当前正在执行的任务完成,而 shutdownNow() 则会中断所有正在执行的任务并立即关闭 executor.

我比较喜欢用以下方式关闭 executor:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}

executor 在关闭前会先等待一段时间,好让当前执行的任务完成。达到指定的最大时间 5 秒后仍有任务未完成,则立即中断所有任务并关闭 executor.

Callable 与 Future

除了 Runnable 之外,executor 还支持另一类任务,称为 Callable. 同为函数式接口,CallableRunnable 的区别在于前者具有返回值,而后者没有返回值。

这个 lambda 表达式定义了一个 callable 对象,在休眠一秒后返回一个 integer:

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};

Callable 与 Runnable 一样可以提交到 executor 执行。但是 executor 的 submit() 方法不会等待任务完成才返回,那么 Callable 的返回值该如何处理?答案是 executor 会返回一个特殊的对象:Future 对象。通过该对象我们可以获取在一定时间后才能得到的实际结果。

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

上述例子中,我们将 callable 提交到 executor 后立即通过 isDone() 检查 future 对象是否已经完成执行。很肯定的说在本例中,此时任务还没有完成,因为 callable 会在休眠一秒后才返回 integer.

调用get() 方法会阻塞当前线程直到 callable 任务完成返回实际的结果 123. 此时的 future 才算完成任务,并且我们可以在控制台看到如下结果:

future done? false
future done? true
result: 123

Future 与相应的 executor service 是紧密关联的,因此当 executor 被强行关闭时,还未完成的 future 对象将会抛出异常:

executor.shutdownNow();
future.get();

你可能已经注意到了,本例中创建 executor 的方式与之前不同。我们使用 newFixedThreadPool(1) 的方式来创建 executor, 背后是一个大小为 1 的线程池。这实际上等价于 newSingleThreadExecutor(), 但我们可以在之后通过改变参数来指定线程池的大小。

Timeout

任何对 future.get() 的调用都会阻塞当前线程并等待直到相应的 callable 任务完成。在最坏的情况下,callable 会一直运行下去,结果就是导致应用不再响应。为了避免这种情况,我们可以通过传入 timeout 参数:

ExecutorService executor = Executors.newFixedThreadPool(1);

Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

future.get(1, TimeUnit.SECONDS);

运行上述代码会抛出异常:TimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)

你应该能猜到抛出这个异常的原因:我们指定等待 1 秒后超时,但 callable 的执行需要 2 秒才会返回实际结果。

InvokeAll

Executor 的 invokeAll() 支持一次性提交多个 callable 任务,接收一个 callable 的集合并返回相应 future 的集合。

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);

在本例中,我们使用 Java 8 的函数式流来处理 invokeAll 返回的 future 集合:首先通过 map 获取返回值,然后将该值在控制台打印。如果你不熟悉流的操作,可以阅读我的 Java 8 Stream Tutorial.

(译者注:invokeAll 的行为与 submit 不同,后者在提交任务后就会返回,此时的 future 对象 isDone 状态不确定,一般为 false, 但 invokeAll 则会进入阻塞,并等待所有任务都完成或抛出异常才会返回。此外 invokeAll 也有 timeout 设置,行为与 future.get 相似。)

InvokeAny

另一个批量提交 callable 的方法是 invokeAny()。与 invokeAll() 稍有区别的是 invokeAny 会阻塞直到接收到第一个完成的 callable 的结果。

为了测试 invokeAny 的行为,我们使用以下辅助方法去模拟不同耗时的 callable。这个辅助方法返回被设置成休眠相应时间后才返回结果的任务:

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

我们使用这个辅助方法创建耗时 1-3 秒不等的几个 callable 对象,使用 invokeAny 提交到 executor 并得到最快完成的 callable 的结果。当然在这个例子中是 task2 的结果:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
    callable("task1", 2),
    callable("task2", 1),
    callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2

上述例子使用 newWorkStealingPool() 创建了另一种 executor. 该工厂方法是 Java 8 的新特性,返回的 executor 类型为 ForkJoinPool. 与其他类型的 executor 稍有不同,ForkJoinPool 并不是通过指定的线程数来创建线程池,而是通过给定的并行数来创建,该值默认为宿主的核心数。

ForkJoinPools 是 Java 7 的新特性,我会在该系列的后续介绍中详细地进行介绍。现在让我们来关注这篇文章的最后部分:scheduled executors.

Scheduled Executors

我们已经掌握如何向 executor 提交和运行单次任务。为了间隔性地执行一些通用的任务,我们需要使用定时线程池 scheduled thread pool.

ScheduledExecutorService 可以指定任务按固定的时间间隔来执行,或者等待指定时间后只执行一次。

以下示例定义了一个初始化后延迟 3 秒再执行的定时任务:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);

定时执行的任务会返回一个特殊的 future 类型:ScheduledFuture. 该 future 提供了 getDelay() 方法来获取定时执行的剩余时间。当到达指定时间时,该任务会以并发的方式执行。

Scheduled executor 提供了两个方法:scheduleAtFixedRate()scheduleWithFixedDelay() 来以固定的时间间隔来执行任务。第一种方法允许任务以固定的频率来执行,例如以下的示例任务会按每秒一次的频率运行。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

另外这个方法接收一个初始延迟时间,在该时间过去后才会第一次执行该任务。

必须注意的是 scheduleAtFixedRate() 并没有考虑到任务的实际耗时,因此假如你设定了每秒执行一次,然而每次执行耗时 2 秒,那么线程池的资源将会在极短时间内被耗尽。

在这种情况下,你该考虑使用第二种定时任务方法: scheduleWithFixedDelay(). 该方法的工作原理与第一种方法非常相似,区别在于指定的时间间隔为任务的结束时间到下一次开始时间的间隔。例子如下:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

这段代码为定时任务指定了 1 秒延迟,即一次任务结束到下次任务开始之间的间隔时间,初始化后延迟 0 秒,而任务的执行时间为 2 秒。因此我们可以观察到任务以 3 秒的间隔执行。由此可见,如果你无法预测定时任务的耗时,那么 scheduleWithFixedDelay() 将会十分有用。

以上就是并发指南的第一部分的全部内容,我建议你自行练习上述的示例代码。你可以在 github 上找到这篇文章的所有示例代码,关注我的项目并给我一颗星吧。

希望你喜欢这篇文章,如果你有其他问题,可以通过 Twitter 与我沟通。

最近的文章

Java 8 并发编程 (2/3):同步与锁

原文地址:Java 8 Concurrency Tutorial: Synchronization and Locks作者: winterbe Synchronized 关键字 Lock 锁 ReentrantLock 可重入锁 ReadWriteLock 读写锁 StampedLock 邮戳锁 Semaphores 信号量欢迎来到由我编写的关于 Java 8 并发编程系列的第二部分。这个系列的文章将通过简单易懂的示例代码展示如何使用...…

java concurrency继续阅读
更早的文章

Java 8 API 示例 - String, Number, Math 与 Files

原文地址:Java 8 API by Example: Strings, Numbers, Math and Files作者: winterbe 1. 字符串切片 2. 处理数字 3. 数学运算 4. 使用 Files 列出文件 查找文件 读写文件 关于 Java 8 变化的文章中,绝大部分内容都聚焦于 Lambda 表达式、函数式接口与 Stream. 但除此之外,在 JDK 8 中还有很多类被增强,加入了许多有用的特性和方法。这篇...…

java继续阅读