Java 并发 - 锁
Java 世界中都有哪些锁?锁的分类?如何减少锁的竞争等问题。
线程安全的三种实现方式
互斥同步
互斥同步 (Blocking Synchronization),属于悲观并发策略:
非阻塞同步
非阻塞同步 (Non-Blocking Synchronization),属于乐观并发策略:
无同步
无同步 - 线程本地存储 (Thread Local Storage):
False Sharing
- (1)CPU具有多级缓存,越接近CPU的缓存越小也越快;
- (2)CPU缓存中的数据是以缓存行为单位处理的;When the CPU caches are reading data from lower level caches or main RAM (e.g. L1 from L2, L2 from L3 and L3 from main RAM), they don’t just read a single byte at a time. That would be inefficient. Instead they read a cache line. A cache line typically consists of 64 bytes. Thus, the caches read 64 bytes at a time from lower level caches or main RAM.
- (3)CPU缓存行能带来免费加载数据的好处,所以处理数组性能非常高;
- (4)CPU缓存行也带来了弊端,多线程处理不相干的变量时会相互影响,也就是伪共享;
- (5)避免伪共享的主要思路就是让不相干的变量不要出现在同一个缓存行中;
- (6)一是每两个变量之间加七个
long
类型; - (7)二是创建自己的
long
类型,而不是用原生的; - (8)三是使用 java8 提供的注解;
缓存行失效
当 CPU 更新某个变量的值,缓存行标记为脏。这个缓存行需要与其他 CPU 的这个缓存行进行同步,其他 CPU 的缓存行也需要更新。在缓存行被刷新之前,CPU 不允许访问缓存行。
伪共享代码示例
public class Counter {
public volatile long count1 = 0;
public volatile long count2 = 0;
}
public class FalseSharingExample {
public static void main(String[] args) {
Counter counter1 = new Counter();
Counter counter2 = counter1;
long iterations = 1_000_000_000;
Thread thread1 = new Thread(() -> {
long startTime = System.currentTimeMillis();
for(long i=0; i<iterations; i++) {
counter1.count1++;
}
long endTime = System.currentTimeMillis();
System.out.println("total time: " + (endTime - startTime));
});
Thread thread2 = new Thread(() -> {
long startTime = System.currentTimeMillis();
for(long i=0; i<iterations; i++) {
counter2.count2++;
}
long endTime = System.currentTimeMillis();
System.out.println("total time: " + (endTime - startTime));
});
thread1.start();
thread2.start();
}
}
修复伪代码共享一
public class FalseSharingExample {
public static void main(String[] args) {
Counter counter1 = new Counter();
Counter counter2 = new Counter();
long iterations = 1_000_000_000;
Thread thread1 = new Thread(() -> {
long startTime = System.currentTimeMillis();
for(long i=0; i<iterations; i++) {
counter1.count1++;
}
long endTime = System.currentTimeMillis();
System.out.println("total time: " + (endTime - startTime));
});
Thread thread2 = new Thread(() -> {
long startTime = System.currentTimeMillis();
for(long i=0; i<iterations; i++) {
counter2.count2++;
}
long endTime = System.currentTimeMillis();
System.out.println("total time: " + (endTime - startTime));
});
thread1.start();
thread2.start();
}
}
修复伪代码共享二
public class Counter1 {
@jdk.internal.vm.annotation.Contended
public volatile long count1 = 0;
public volatile long count2 = 0;
}
@Contended
可以在类里面使用空的字节 pad fields ,所以这个字段肯定不会和其他字段位于同一个缓存行。这个也可以修饰在类上,这样所有字段都不在一个缓存行:
@jdk.internal.vm.annotation.Contended
public class Counter1 {
public volatile long count1 = 0;
public volatile long count2 = 0;
}
默认情况下,@Contended
添加 128 字节的 pad。通过使用 JVM 参数 -XX:ContendedPaddingWidth=64
可以自定义 pad 的字节。
Java 主内存与工作内存交互
从主内存读取变量到工作内存:
将工作内存的变量写入到主内存:
内置锁 Synchronized
Java 提供了一种内置锁 (Intrinsic Lock)机制来支持原子性: 同步代码块 (Synchronized Block)。每个 Java 对象都可以用做一个实现同步的锁,这些锁被称为内置锁 (Instrinsic Lock) 或监视器锁 (Monitor Lock)。Java 的内置锁相当于一种互斥体(或互斥锁),这意味着最多只有一个线程能持有这种锁。但是,加锁的含义不仅仅局限于互斥行为,还包括内存可见性,为了确保所有线程都能看到共享变量的最新值,所有执行读操作或者写操作的线程都必须在同一个锁上同步。
Java synchronized
语句 是基于 monitorenter/monitorexit 机制来实现的。当你写下面这段代码的时候:
static void Sort(int [] array) {
// synchronize this operation so that some other thread can't
// manipulate the array while we are sorting it. This assumes that other
// threads also synchronize their accesses to the array.
synchronized(array) {
// now sort elements in array
}
}
实际上 JVM 可能会生成下面的代码:
.method static Sort([I)V
aload_0
monitorenter ; lock object in local variable 0
; now sort elements in array
aload_0
monitorexit ; finished with object in local variable 0
return
.end method
monitorenter
在对象的引用上获取了一个 exclusive lock (独占锁)
内置锁 synchronized
是可重入的,某个线程试图获取一个已经由它自己持有的锁,那么这个请求就会成功。
死锁
在数据库系统的设计中考虑了监测死锁以及从死锁中恢复。在执行一个事务 (Transaction) 时可能需要获取多个锁,并一直持有这些锁直到事务提交。当数据库服务器监测到一组事务发生了死锁时 (通过在表示等待关系的有向图中搜索循环),将 选择一个牺牲者并放弃这个事务。作为牺牲者的事务会释放它所持有的资源,从而让其它事务继续进行。应用程序可以重新执行被强行中止的事务,而这个事务现在也可以成功完成。
死锁的四大必要条件 (必须全部满足):
- 互斥
- 持有并等待资源
- 不可抢占
- 循环等待
如果所有的线程以固定的顺序来获得锁,那么在程序中就不会出现锁顺序死锁问题:
// 不要这么做
public class LeftRightDeadlock {
private final Object left = new Object();
private final Object right = new Object();
public void leftRight() {
synchronized (left) {
synchronized (right) {
doSomething();
}
}
}
public void rightLeft() {
synchronized (right) {
synchronized (left) {
doSomethingElse();
}
}
}
}
有时候,你并不能清除地知道是否在锁顺序上有足够的控制权来避免死锁的发生:
// 动态的锁顺序
// Warning: deadlock-prone!
public void transferMoney(Account fromAccount,
Account toAccount,
DollarAmount amount)
throws InsufficientFundsException {
synchronized (fromAccount) {
synchronized (toAccount) {
if (fromAccount.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAccount.debit(amount);
toAccount.credit(amount);
}
}
}
}
在这里锁的顺序取决于参数顺序,而这些参数顺序又取决于外部输入,考虑下面代码就有可能发生死锁:
A: transferMoney(myAccount, yourAccount, 10);
B: transferMoney(yourAccount, myAccount, 20);
使用 System.identityHashCode
来定义锁的顺序:
private static final Object tieLock = new Object();
public void transferMoney(final Account fromAcct,
final Account toAcct,
final DollarAmount amount)
throws InsufficientFundsException {
class Helper {
public void transfer() throws InsufficientFundsException {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAcct.debit(amount);
toAcct.credit(amount);
}
}
}
// 如果 Account 中包含一个唯一的、不可变的,并且具备可比性的键值,例如
// 账号,那么制定锁的顺序就更加容易了。
int fromHash = System.identityHashCode(fromAcct);
int toHash = System.identityHashCode(toAcct);
if (fromHash < toHash) {
synchronized (fromAcct) {
synchronized (toAcct) {
new Helper().transfer();
}
}
} else if (fromHash > toHash) {
synchronized (toAcct) {
synchronized (fromAcct) {
new Helper().transfer();
}
}
} else {
// 在极少数情况下,两个对象可能拥有相同的散列值,
// 此时可以通过某种任意的方法来决定锁的顺序,
// 而这有可能重新引入死锁。为了避免这种情况,可以使用
// “加时赛”锁
synchronized (tieLock) {
synchronized (fromAcct) {
synchronized (toAcct) {
new Helper().transfer();
}
}
}
}
}
某些获取多个锁的操作并不像 LeftRightDeadLock
或 transferMoney
中那么明显,这两个锁并不一定必须在同一个方法中被获取:
class Taxi {
@GuardedBy("this") private Point location, destination;
private final Dispatcher dispatcher;
public Taxi(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
public synchronized Point getLocation() {
return location;
}
// 先获取 Taxi 锁
// 再获取 Dispatcher 锁
public synchronized void setLocation(Point location) {
this.location = location;
if (location.equals(destination))
dispatcher.notifyAvailable(this);
}
}
class Dispatcher {
@GuardedBy("this") private final Set<Taxi> taxis;
@GuardedBy("this") private final Set<Taxi> availableTaxis;
public Dispatcher() {
taxis = new HashSet<Taxi>();
availableTaxis = new HashSet<Taxi>();
}
public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
// 先获取 Dispatcher 锁
// 再获取每一个 Taxi 锁
public synchronized Image getImage() {
Image image = new Image();
for (Taxi t : taxis)
image.drawMarker(t.getLocation());
return image;
}
}
通过将上述代码修改为开放调用 (调用某个方法时不需要使用锁),从而消除发生死锁的风险:
@ThreadSafe
class Taxi {
@GuardedBy("this") private Point location, destination;
private final Dispatcher dispatcher;
...
public synchronized Point getLocation() {
return location;
}
public synchronized void setLocation(Point location) {
boolean reachedDestination;
synchronized (this) {
this.location = location;
reachedDestination = location.equals(destination);
}
if (reachedDestination)
dispatcher.notifyAvailable(this);
}
}
@ThreadSafe
class Dispatcher {
@GuardedBy("this") private final Set<Taxi> taxis;
@GuardedBy("this") private final Set<Taxi> availableTaxis;
...
public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
public Image getImage() {
Set<Taxi> copy;
synchronized (this) {
copy = new HashSet<Taxi>(taxis);
}
Image image = new Image();
for (Taxi t : copy)
image.drawMarker(t.getLocation());
return image;
}
}
在程序中应该尽量使用开放调用。与那些在持有锁时调用外部方法的程序相比,更易于对依赖于开放调用的程序进行死锁分析。通过使用定时锁能够有效地应对死锁问题,通过 Thread Dump 能够帮助你识别死锁的发生。
减少锁的竞争
有三种方式可以降低锁的竞争程度:
- 减少锁的持有时间
- 降低锁的请求频率
- 使用带有协调机制的独占锁,这些机制允许更高的并发性
重入锁 ReentrantLock
ReentrantLock
的 tryLock
方法为你提供了轮询锁与定时锁的锁获取模式,与无条件的锁获取模式相比,它具有更完善的错误恢复机制。方法 lockInterruptibly
方法能够在获得锁的同时保持对中断的响应。ReentrantLock
的构造函数中提供了两种公平性选择: 创建一个非公平的锁 (默认) 或者一个公平的锁。在公平的锁上,线程将按照它们发出请求的顺序来获得锁,但在非公平的锁上,则允许“插队”。在大多数情况下,非公平锁的性能要高于公平锁的性能。
public class ReentrantLock implements Lock, java.io.Serializable {
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public void unlock() {
sync.release(1);
}
}
读写锁 ReadWriteLock
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
}
记录锁 Record Locking
Record Locking 更好的叫法应该被称为: byte-range locking,目的是为了防止两个进程同时修改一个文件的某块区域。函数原型如下:
#include <fcntl.h>
// 出错返回 -1
int fcntl(int fd, int cmd, ... /* struct flock *flockptr */ );
其中 flock
结构体定义如下:
struct flock {
short l_type; /* F_RDLCK, F_WRLCK, or F_UNLCK */
short l_whence; /*SEEK_SET, SEEK_CUR, or SEEK_END */
off_t l_start; /*offset in bytes, relative to l_whence */
off_t l_len; /*length, in bytes; 0 means lock to EOF */
pid_t l_pid; /*returned with F_GETLK */
};
F_RDLCK
: 共享读锁F_WRLCK
: 排斥写锁F_UNLCK
: 取消某个区域的锁
锁优化
自旋锁 SpinLock
互斥同步对性能最大的影响就是阻塞的实现,挂起线程和恢复线程的操作都需要转入内核态中完成,这些操作给系统的并发性能带来了很大的压力。同时,虚拟机的开发团队也注意到在许多应用上,共享数据的锁定状态只会持续很短的一段时间,为了这段时间去挂起和恢复线程并不值得。为了能让线程稍微等一会,我们只需让线程执行一个忙循环 (自旋),这项技术就是所谓的自旋锁。
现在我们假设硬件上有一种能够保证原子性的 TestAndSet
指令实现函数:
int TestAndSet(int *x){
register int temp = *x;
*x = 1;
return temp;
}
TestAndSet
是一种常用的用于支持并发的原子操作指令。另外一种经常使用的指令是原子 Exchange
操作:
void Exchange(int *a, int *b) {
int temp = *a;
*a = *b;
*b = temp;
}
这些所有的原子性操作中最重要的是 CompareAndSwap (CAS)
操作,它经常被用于 lock-free and wait-free algorithms 算法中。
boolean CAS(int *a, int old, int new) {
int temp = *a;
if (temp == old) {
*a = new;
return true;
} else
return false;
}
使用 CAS
来实现 temp++
:
int temp = x;
while (!CAS(&x, temp, temp+1)) {
temp = x;
}
使用 CAS
来实现更链表头插法:
while (1) {
Node *q = *head;
p->next = q;
if (CAS(head, q, p))
break;
}
一般而言,SpinLock
是一种抽象的数据类型,其通常提供三种操作:
InitLock
Lock
UnLock
Lock(mutex);
Si;
UnLock(mutex);
实现 SpinLock
的伪代码如下:
typedef int SpinLock;
void InitLock(SpinLock *L) {
*L = 0;
}
void Lock(SpinLock *L) {
while (TestAndSet(L))
;
}
void UnLock(SpinLock *L) {
*L = 0;
}
一种使用 Exchange
操作的可能实现:
typedef int SpinLock;
void InitLock(SpinLock *s) {
*s = 0;
}
void Lock (SpinLock *s) {
int L = 1;
do {
Exchange(&L, s);
} while (L == 1);
}
void UnLock (SpinLock *s) {
*s = 0;
}
另外一种使用 CompareAndSwap
指令的实现:
typedef int SpinLock;
void InitLock(SpinLock *s) {
*s = 0;
}
void Lock (SpinLock *s) {
do {
} until (CompareAndSwap(s, 0, 1));
}
void UnLock (SpinLock *s) {
*s = 0;
}
自旋锁最大的问题就是可能会占用比较高的 memory bus 带宽,另外它也不保证公平性,即无法保证先后进入临界区的两个进程 P 和 Q 按照 FIFO 顺序来服务。
锁消除 Lock Elimination
虚拟机 JIT 在运行时,对一些代码要求同步,但是被检测到不可能存在共享数据竞争的锁进行消除。主要判定依据来自于逃逸分析的数据支持。
锁粗化 Lock Coarsening
轻量级锁 Lightweight Locking
偏向锁 Biased Locking
偏向锁的"偏",是偏心的"偏",它的意思就是这个锁会偏向于第一个获得它的线程,如果在接下来的执行过程中,该锁没有被其他的线程获取,则持有偏向锁的线程将永远不需要再进行同步。当有另外一个线程去尝试获取这个锁时,偏向模式就宣告结束。
JDK 1.6 默认开启 -XX:+UseBiasedLocking
,使用 -XX:-UseBiasedLocking
来关闭。
偏向锁转为轻量级锁的流程图:
锁升级 Lock Escalation
所谓的锁升级(lock escalation),是数据库的一种作用机制,该机制普遍见于各大数据库产品。 为了节约内存的开销,其会将为数众多并占用大量资源的细粒度的锁转化为数量较少的且占用相对较少资源的粗粒度的锁,多数情况下主要指将为数众多的行锁升级为一个表锁。当然,DB2 支持很多粒度的锁,如**表空间(table space),表(table),行(row)以及索引(index)**等。MySQL 的 InnoDB 存储引擎支持事务,默认是行锁。得益于这些特性,数据库支持高并发。
锁升级与两种事情有关:
- 事务的隔离级别
- 索引
常用的索引有三类:主键、唯一索引、普通索引。主键 不由分说,自带最高效的索引属性;唯一索引 指的是该属性值重复率为0,一般可作为业务主键,例如学号;普通索引 与前者不同的是,属性值的重复率大于0,不能作为唯一指定条件,例如学生姓名。当“值重复率”低时,甚至接近主键或者唯一索引的效果,“普通索引”依然是行锁;当“值重复率”高时,MySQL 不会把这个“普通索引”当做索引,即造成了一个没有索引的 SQL,此时引发表锁。索引不是越多越好,索引存在一个和这个表相关的文件里,占用硬盘空间,宁缺勿滥,每个表都有主键(id),操作能使用主键尽量使用主键。同 JVM 自动优化 java 代码一样,MySQL 也具有自动优化 SQL 的功能。低效的索引将被忽略,这也就倒逼开发者使用正确且高效的索引。