劳伦的奇妙冒险


客亦知夫水与月乎?

哀吾生之须臾,
羡长江之无穷。

Java 8 并发编程 (2/3):同步与锁

原文地址:Java 8 Concurrency Tutorial: Synchronization and Locks

作者: winterbe



欢迎来到由我编写的关于 Java 8 并发编程系列的第二部分。这个系列的文章将通过简单易懂的示例代码展示如何使用并发编程 API。在接下来的 15 分钟里,你将会学习到如何使用 synchronized 关键字、不同类型的锁(Lock)以及信号量(semaphore)来同步访问共享的可变对象。

这篇文章中大部分概念与旧版 Java 是相通的,但是我的示例代码会重度使用 Java 8 的 lambda 表达式以及其他新特性。所以如果你对 lambda 表达式还不熟悉,我建议你先行阅读我的 Java 8 指南.

为了简化篇幅,文章中的示例代码使用了由我定义的两个辅助方法: sleep(seconds)stop(executor), 具体代码可在 github 查看。

Synchronized 关键字

上一篇简介中,我们学习了通过 executor service 来并行地执行代码。当我们在编写多线程程序时,一定要特别小心多线程共享的变量的并发访问。下面我们以多线程同步使整数累加的情况为例。

定义 count 变量,并通过 increment() 方法来使其加一:

int count = 0;

void increment() {
    count = count + 1;
}

多个线程同时调用该方法将会出现问题:

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10000)
    .forEach(i -> executor.submit(this::increment));

stop(executor);

System.out.println(count);  // 9965

上述代码并没有得到预期的结果 10000,而是每次运行的结果都不同。这是因为我们通过多线程访问共享变量时没有进行同步,导致了竞态条件(race condition)的出现。

为变量加一的操作实际上分为三步:1.读取当前的数值,2.该数值加一,3.将该值写入到变量中。当两个线程同时执行这些步骤,有可能两个线程都在执行第一步并读取了相同的数值,导致部分的写入操作的丢失,使得最终结果低于预期值。由于在并发环境下没有使用同步访问,上述示例中对 count 变量的增加操作中有 35 次结果丢失了。

幸运的是,Java 从早期起就支持通过 synchronized 关键字进行线程同步操作。我们可以通过 synchronized 关键字来解决上面的竞态条件问题:

synchronized void incrementSync() {
    count = count + 1;
}

在并发情况下调用 incrementSync() 终于等到我们期望的结果 10000,不再出现竞态条件,而且每次执行的结果都是稳定的:

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10000)
    .forEach(i -> executor.submit(this::incrementSync));

stop(executor);

System.out.println(count);  // 10000

关键字 synchronized 也能用于代码块中:

void incrementSync() {
    synchronized (this) {
        count = count + 1;
    }
}

Java 内部对同步管理使用了所谓的监视器(monitor, 即管程),也被称为监视器锁(monitor lock)或内在锁(intrinsic lock)。每个对象都有与其绑定的监视器,例如,一个对象有多个同步方法,则这些同步方法共享了与该对象相关的监视器。

所有隐式的监视器都有可重入的特性,这意味着锁与当前线程绑定。一个线程可以安全地重复申请同一个锁而不会出现死锁。(例如,一个对象的同步方法调用了它另一个同步方法不会出现死锁,因为它们相关的监视器与当前线程绑定,且具有可重入特性)

Lock 锁

除了 synchronized 关键字用到的隐式锁,并发 API 也提供了不同种类的显式锁。这些锁都实现了 Lock 接口,并提供了更细颗粒度的控制,因而比隐式的监视器更具有表达力。

接下来的部分将会展示标准 JDK 中各种锁的实现。

ReentrantLock 可重入锁

ReentrantLock 为互斥锁,与通过 synchronized 关键字来访问的隐式监视器有相同的基本行为,但也有后者不具备的扩展能力。ReentrantLock 的类名也提示了它与隐式的监视器一样有可重入的特性。

ReentrantLock 来改写之前的例子:

ReentrantLock lock = new ReentrantLock();
int count = 0;

void increment() {
    lock.lock();
    try {
        count++;
    } finally {
        lock.unlock();
    }
}

代码中通过 lock() 来获取锁,通过 unlock() 来释放锁。必须注意使用 try/finally 代码块来预防业务代码抛出异常。这个 increment() 方法与之前使用 synchronized 声明的版本均为线程安全。当一个线程已经获得该锁,那么随后其他线程对 lock() 的调用都会进入阻塞,直到该锁被释放。同一时间内,锁只能被单个线程所持有。

锁提供了各种方法对代码进行细颗粒度的控制:

ExecutorService executor = Executors.newFixedThreadPool(2);
ReentrantLock lock = new ReentrantLock();

executor.submit(() -> {
    lock.lock();
    try {
        sleep(1);
    } finally {
        lock.unlock();
    }
});

executor.submit(() -> {
    System.out.println("Locked: " + lock.isLocked());
    System.out.println("Held by me: " + lock.isHeldByCurrentThread());
    boolean locked = lock.tryLock();
    System.out.println("Lock acquired: " + locked);
});

stop(executor);

在第一个任务持有锁 1 秒的过程中,第二个任务能够获取到该锁当前的状态:

Locked: true
Held by me: false
Lock acquired: false

方法 tryLock() 作为 lock() 的替代,会在不阻塞当前线程的情况尝试获取锁,因此必须先通过返回的布尔值来判断是否获取到锁,之后再访问共享变量。

ReadWriteLock 读写锁

读写锁实现了 ReadWriteLock 接口,通过一对关联的读取锁和写入锁分别对读和写操作进行控制。读写锁背后的理念是,只要没有其他线程对变量执行写入操作,那么多个线程并发地读取该变量的行为是安全的。因此只要写入锁没有被持有,那么读取锁就可以被同时被不同的线程持有。在读取操作远多于写入操作的场景下,这种方式能够提升代码的性能和吞吐量。

ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();

executor.submit(() -> {
    lock.writeLock().lock();
    try {
        sleep(1);
        map.put("foo", "bar");
    } finally {
        lock.writeLock().unlock();
    }
});

上面的例子首先在任务线程中获取一个写入锁,休眠 1 秒后再将数据写入数据到 map 对象。在这个写入任务完成之前,还有另外两个任务被提交,尝试从 map 中读取词条并休眠一秒:

Runnable readTask = () -> {
    lock.readLock().lock();
    try {
        System.out.println(map.get("foo"));
        sleep(1);
    } finally {
        lock.readLock().unlock();
    }
};

executor.submit(readTask);
executor.submit(readTask);

stop(executor);

当你执行这段代码的时候,你会发现两个读取任务都必须等待直到 1 秒后写入任务完成才能开始执行。当写入锁被释放后,两个读取任务以并行的方式执行,将结果同时输出到控制台。这两个任务线程均持有读取锁并有休眠动作,但是在执行过程中任何一个线程都不需要等待另一个线程执行结束,这是因为只要写入锁没有被其他线程持有,读取锁就可以并发地被多个线程持有。

StampedLock 邮戳锁

StampedLock 邮戳锁是 Java 8 新增的锁类型,与读写锁一样同样具备读取锁和写入锁的特性。它与读写锁的区别在于,StampedLock 返回了一个类型为 long 的邮戳,可用于释放锁或检查锁的当前状态是否有效。除此之外,邮戳锁支持乐观锁模式

使用 StampedLock 改写 ReadWriteLock 的例子:

ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        sleep(1);
        map.put("foo", "bar");
    } finally {
        lock.unlockWrite(stamp);
    }
});

Runnable readTask = () -> {
    long stamp = lock.readLock();
    try {
        System.out.println(map.get("foo"));
        sleep(1);
    } finally {
        lock.unlockRead(stamp);
    }
};

executor.submit(readTask);
executor.submit(readTask);

stop(executor);

通过 readLock()writeLock() 分别获取读取锁和写入锁都会返回一个邮戳,可用在 finally 中释放锁资源。必须注意的是,邮戳锁并不具备可重入的特性。每次获取锁都会返回一个新的邮戳,如果没有锁可用,线程就会进入阻塞,哪怕该线程已经持有写入锁。因此你必须特别注意,以免发生死锁。

(译者注:锁的可重入特性指的是对于单个线程而言,锁能否多次与其进行绑定。邮戳锁是复合型的锁,内部的两种锁的获取均要等待对方释放,因此当同一个线程获取一种类型的锁后,在内部再获取另一种类型的锁,即可发生死锁。因此说邮戳锁不具备可重入特性)

上面的代码与之前的 ReadWriteLock 读写锁的示例一样,两个读取变量的任务都需要等待写入锁的释放后才能进行。一旦写入锁被释放,两个读取任务将同时执行并将内容打印到控制台,分别持有读取锁的两个任务之间不会进入阻塞。

下面展示如何使用乐观锁:

ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.tryOptimisticRead();
    try {
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        sleep(1);
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        sleep(2);
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
    } finally {
        lock.unlock(stamp);
    }
});

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        System.out.println("Write Lock acquired");
        sleep(2);
    } finally {
        lock.unlock(stamp);
        System.out.println("Write done");
    }
});

stop(executor);

乐观读取锁(optimistic read lock)通过 tryOptimisticRead() 方法获取。该方法同样返回一个邮戳,而且不管锁是否可用,当前线程都不会进入阻塞。如果写入锁已经被持有,那么返回的邮戳为 0。通过 lock.validate(stamp) 可验证已获取的非 0 邮戳是否仍有效(即另一种类型的锁还未被持有)。

以下是控制台输出的内容:

Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false

乐观锁在刚获取的时候是有效的。与普通读取锁不同,乐观锁在其他线程获取写入锁时不会进行阻止。在例子中,将第一个线程设为休眠 1 秒后,第二个线程立即就能获取到写入锁,而无需等待乐观读锁的释放。当第二个线程获取到写锁,乐观读锁的邮戳就不再有效了,哪怕写锁已经释放。

由此可见,使用乐观锁需要在每次读取共享变量后进行检查,以确保读取操作是有效的。

StampedLocktryConvertToWriteLock() 方法可以将读锁转换为写锁,当需要写入数据时可以避免先释放锁再加锁的操作:

ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.readLock();
    try {
        if (count == 0) {
            stamp = lock.tryConvertToWriteLock(stamp);
            if (stamp == 0L) {
                System.out.println("Could not convert to write lock");
                stamp = lock.writeLock();
            }
            count = 23;
        }
        System.out.println(count);
    } finally {
        lock.unlock(stamp);
    }
});

stop(executor);

该任务在获取读取锁后将变量 count 的值输出到控制台。但是如果 count 为零,我们会将其修改为 23. 为了不改变其他线程的访问,我们通过 tryConvertToWriteLock() 方法将这个读取锁转换为写入锁。该方法不会阻塞线程,但是返回的结果有可能为 0, 意味着当前没有写入锁可用。

译者注:这段代码作为示例不太合适,容易使人产生困惑。

tryConvertToWriteLock(stamp) 的 Java 文档给出了该方法行为的说明:1.当 stamp 属于写入锁,则直接返回该 stamp; 2.如果 stamp 属于读取锁,在写入锁空闲时释放该读取锁,并返回写入锁的邮戳;3.如果 stamp 是乐观读取,则只有在此刻写入锁空闲时返回可写入的邮戳;其他情况均返回 0.

换言之当,当其他线程也持有读取锁且仍未释放时,当前线程无法将持有的读取锁转换为写入锁。

示例中仅有一个任务获取了读取锁,因此无法观察到 tryConvertToWriteLock 返回 0 的情况。

为了使其返回 0,只需要再向 executor 提交一次该任务即可。但是直接使用 lock.writeLock() 将会导致死锁(反复执行即可复现):

Runnable task = () -> {
    long stamp = lock.readLock();
    try {
        if (count == 0) {
            stamp = lock.tryConvertToWriteLock(stamp);
            if (stamp == 0) {
                System.out.println("Could not conver to write lock.");
                stamp = lock.writeLock();
            }
            count = 23;
        }
        System.out.println(count);
    } finally {
        lock.unlock(stamp);
    }
};
executor.submit(task); // 多个任务同时执行
executor.submit(task);

executor.shutdown();
executor.awaitTermination(3, TimeUnit.SECONDS);

死锁产生的原因:两个任务同时获得读取锁 → 同时判断 count == 0 为 true → 同时调用 lock.tryConvertToWriteLock(stamp); → 同时进入阻塞在 lock.writeLock(),因为此时两个线程均未释放读取锁,因此只能干耗着。改进办法很简单:在申请写入锁之前将读取锁释放掉:

Runnable readTask = () -> {
    long stamp = lock.readLock();
    try {
        if (count == 0) {
            long stamp2 = lock.tryConvertToWriteLock(stamp);
            if (stamp2 == 0) {
                System.out.println("Could not conver to write lock.");
                lock.unlock(stamp); // 先将持有的读取锁释放掉
                stamp2 = lock.writeLock();
            }
            count = 23;
            lock.unlock(stamp2);
        }
        System.out.println(count);
    } finally {
        lock.unlock(stamp);
    }
};
executor.submit(readTask);
executor.submit(readTask);

executor.shutdown();
executor.awaitTermination(3, TimeUnit.SECONDS);

在一个线程中,如果锁不支持可重入特性,那么同时申请读取锁和写入锁是高危行为,必须先释放前者,再申请后者。

Semaphores 信号量

作为锁的补充,并发 API 中也支持计数信号量。与锁提供的单一访问变量或资源的方式不同,信号量可以同时维持多个访问权限。当你的应用中有需要控制并发访问量的场景,信号量将会十分有用。

以下是通过信号量访问一个通过 sleep(5) 来模拟的长时间运行任务的例子:

ExecutorService executor = Executors.newFixedThreadPool(10);

Semaphore semaphore = new Semaphore(5);

Runnable longRunningTask = () -> {
    boolean permit = false;
    try {
        permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
        if (permit) {
            System.out.println("Semaphore acquired");
            sleep(5);
        } else {
            System.out.println("Could not acquire semaphore");
        }
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    } finally {
        if (permit) {
            semaphore.release();
        }
    }
}

IntStream.range(0, 10)
    .forEach(i -> executor.submit(longRunningTask));

stop(executor);

executor 可以同时运行 10 个任务,而我们将信号量设为 5,因此并发访问数被限制为 5. 注意使用 try/finally 语句来释放信号量资源,以防有异常被抛出。

以下是输出结果:

Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore

对于通过 sleep(5) 来模拟的长时间运行任务,信号量限制了对该任务的最大访问量为 5. 后续对 tryAcquire() 的调用超出设置的 1 秒,因此在控制台输出无法获取信号量资源的内容。

以上就是我关于并发教程的第二部分。我还会持续发表更多内容,请务必保持关注。与以往一样,你可以在 github 上找到这篇文章的全部示例代码,关注我的 repo 并动手试试吧。

希望你喜欢这篇文章,有任何问题都可以在留言区反馈,或在 Twitter 上关注我发布的更多开发相关的内容。

最近的文章

Java 8 并发编程 (3/3):原子变量与 ConcurrentMap

原文地址:Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap作者: winterbe AtomicInteger LongAdder LongAccumulator ConcurrentMap ConcurrentHashMap ForEach Search Reduce 欢迎来到由我编写的 Java 8 并发编程系列的第三部分。本篇的内容为并发 API 中...…

java concurrency继续阅读
更早的文章

Java 8 并发编程 (1/3):Thread 与 Executors

原文地址:Java 8 Concurrency Tutorial: Threads and Executors作者: winterbe Thread 与 Runnable Executors Callable 与 Future Timeout InvokeAll InvokeAny Scheduled Executors欢迎来到由我编写的关于 Java 8 并发编程系列的第一部分。这个系列的文章将通过简单易懂的示例代码展示如...…

java concurrency继续阅读