JUC 并发包

JUC 并发包

线程安全

  • 无状态对象(例如 Servlet)一定是线程安全的。
  • 对于可能被多个线程访问的可变状态的变量,用锁来保护它。
  • 不可变对象(所有 field 都是 finalthis 指针没有逸出)一定是线程安全的。

Java 内存模型 (JMM)

在并发编程中,需要处理两个关键问题:线程之间如何通信及线程之间如何同步。在命令式编程中,线程之间的通信机制有两种:共享内存和消息传递。

Java的并发采用的是共享内存模型,Java线程之间的通信总是隐式进行,整个通信过程对程序员完全透明。

  • 线程之间共享程序的公共状态,通过写读内存中的公共状态进行隐式通信。
  • 在共享内存并发模型里,同步是显式进行的。程序员必须显式指定某个方法或某段代码需要在线程之间互斥执行。

Java 线程之间的通信 由 Java 内存模型(本文简称为 JMM )控制,JMM 决定一个线程对共享变量的写入何时对另一个线程可见

本地内存是 JMM 的一个抽象概念,并不真实存在。它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。Java内存模型的抽象结构示意图如下:

JMM 通过控制主内存与每个线程的本地内存之间的交互,来为 Java 程序员提供内存可见性保证。

由于 JVM 运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),用于存储线程私有的数据,线程与主内存中的变量操作必须通过工作内存间接完成,主要过程是将变量从主内存拷贝的每个线程各自的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,如果存在两个线程同时对一个主内存中的实例对象的变量进行操作就有可能诱发线程安全问题。为了解决类似上述的问题,JVM定义了一组规则,通过这组规则来决定一个线程对共享变量的写入何时对另一个线程可见,这组规则也称为Java内存模型(即JMM),JMM是围绕着程序执行的原子性、有序性、可见性展开的,下面我们看看这三个特性。

可见性

可见性是怎么导致的: 是因为每个 CPU 都有自己的缓存导致的。可见性指的是当一个线程修改了某个共享变量的值,其他线程是否能够马上得知这个修改的值。

  • 禁止编译器/处理器重排序:JMM属于语言级的内存模型,它确保在不同的编译器和不同的处理器平台之上,通过禁止特定类型的编译器重排序和处理器重排序,为程序员提供一致的内存可见性保证。处理器重排序,是通过在生成指令序列时,插入内存屏障来保证可见性。
  • synchronized:当线程获取锁时会从主内存中获取共享变量的最新值,释放锁的时候会将共享变量同步到主内存中。从而,synchronized具有可见性。
  • volatile:通过在指令中添加 lock 指令,以实现内存可见性

原子性

原子性指的是一个操作是不可中断的,即使是在多线程环境下,一个操作一旦开始就不会被其他线程影响。 基本数据类型

JMM 定义了 8 种操作都是原子的,不可再分的。

  • lock(锁定):作用于主内存中的变量,它把一个变量标识为一个线程独占的状态;
  • unlock(解锁):作用于主内存中的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
  • read(读取):作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存中,以便后面的load动作使用;
  • load(载入):作用于工作内存中的变量,它把read操作从主内存中得到的变量值放入工作内存中的变量副本
  • use(使用):作用于工作内存中的变量,它把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用到变量的值的字节码指令时将会执行这个操作;
  • assign(赋值):作用于工作内存中的变量,它把一个从执行引擎接收到的值赋给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作;
  • store(存储):作用于工作内存的变量,它把工作内存中一个变量的值传送给主内存中以便随后的write操作使用;
  • write(操作):作用于主内存的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中。

上面一共有八条原子操作,其中六条可以满足基本数据类型的访问读写具备原子性,还剩下lockunlock两条原子操作。

  • synchronized:如果我们需要更大范围的原子性操作就可以使用lock和unlock原子操作。尽管jvm没有把lock和unlock开放给我们使用,但jvm以更高层次的指令monitorenter和monitorexit指令开放给我们使用,反应到java代码中就是synchronized关键字,也就是说synchronized满足原子性。
  • volatile:volatile 能保证对这个单个变量的写入、读取的原子性,但是不保证复合操作的原子性。

有序性

synchronized:synchronized 语义就要求线程在访问读写共享变量时只能“串行”执行,因此synchronized具有有序性 volatile:为了性能优化,编译器和处理器会进行指令重排序;也就是说java程序天然的有序性可以总结为:如果在本线程内观察,所有的操作都是有序的;如果在一个线程观察另一个线程,所有的操作都是无序的。

volatile 包含禁止指令重排序的语义,其具有有序性。

happens-before 原则

倘若在程序开发中,仅靠sychronizedvolatile关键字来保证原子性、可见性以及有序性,那么编写并发程序可能会显得十分麻烦,幸运的是,在Java内存模型中,还提供了happens-before 原则来辅助保证程序执行的原子性、可见性以及有序性的问题。

  • 解锁必然发生在后续的加锁之前
  • 程序顺序原则,即在一个线程内必须保证语义串行性,也就是说按照代码顺序执行
  • volatile 变量的写,先发生于读
  • 线程启动规则: 线程的 start() 方法先于它的每一个动作,即如果线程A在执行线程B的start方法之前修改了共享变量的值,那么当线程B执行start方法时,线程A对共享变量的修改对线程B可见
  • 线程终止规则: 线程的所有操作先于线程的终结,Thread.join()方法的作用是等待当前执行的线程终止
  • 传递性: A先于B ,B先于C 那么A必然先于C
  • 线程中断规则: 对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过 Thread.interrupted() 方法检测线程是否中断
  • 对象终结规则: 对象的构造函数执行,结束先于finalize()方法

禁止指令重排序

Java 内存模型是通过内存屏障 memory barrier 来禁止重排序的。对于即时编译器来说,内存屏障将限制它所能做的重排序优化。对于处理器来说,内存屏障会导致缓存的刷新操作。

对于即时编译器来说,它会针对前面提到的每一个 happens-before 关系,向正在编译的目标方法中插入相应的读读、读写、写读以及写写内存屏障。

AQS (AbstractQueuedSynchronizer)

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH:Craig、Landin and Hagersten 队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

AQS 维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列)。

/**
 * The synchronization state.
 */
private volatile int state;

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS 定义两种资源共享方式

  • Exclusive(独占,只有一个线程能执行,如 ReentrantLock
  • Share(共享,多个线程可同时执行,如 Semaphore/CountDownLatch)。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

Node

Node 结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。变量 waitStatus 则表示当前 Node 结点的等待状态。

static final class Node {
    
    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
        * waitStatus value to indicate the next acquireShared should
        * unconditionally propagate
        */
    static final int PROPAGATE = -3;

}

独占模式获取资源

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire 尝试获取资源,如果成功直接返回,这是非公平锁,因为 CLH 队列中可能还有别的线程正在等待。addWaiter 将该线程使用 CAS 的方式放入到队列的尾部,标记为独占模式。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued 使得线程在等待队列中一直查看,自己的前节点是否是 head,如果是 head 了,那么就可以尝试获取资源了。如果没有到 head,那么就去调用 LockSupport.park(this) 进行休息,进入休息后的线程,只有通过 unpark 或者 interrupt 才可以唤醒这个线程。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire 函数内部会帮助这个线程去检查状态,比如万一前节点已经放弃了,那么需要看前节点的再前节点的状态才可以:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

最后的 selfInterrupt 是为了补偿中断,线程在获取中断前,不响应中断,如果发现线程被中断了,即 acquireQueued 返回了 true,那么则需要主动补偿中断。

独占模式释放资源

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

unparkSuccessor 函数的作用就是找到等待队列中最前边的 waitStatus <=0 的线程,来 unpark 唤醒它。

共享模式获取资源

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

doAcquireShared 方法用于将 SHARED 状态的 Node,添加到队列的尾部去 park,直到其它线程 unpark 或者 interrupt 唤醒自己的时候才返回。

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

当这个线程获取指定个数的资源成功后,如果有剩余资源 (propagate > 0),继续唤醒下一个后继的 SHARED 线程:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

共享模式释放资源

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared 的内容:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

可响应中断的获取资源

上述获取资源的时候,线程在等待队列中都是忽略中断的。AQS 也提供了可以响应中断的获取资源的方法。acquireInterruptibly()/acquireSharedInterruptibly(),下面以 acquireSharedInterruptibly 方法为例,来说明两者不同。

acquireShared 方法的对比:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

doAcquireShared 的对比:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

主要不同的地方在于,可响应中断的获取资源,在发现当前线程被中断之后,会直接抛出 InterruptedException 异常。

Semaphore

使用 AQS 同步状态来保存信号量的当前计数。tryRelease 会增加计数,acquireShared 会减少计数。

构造器

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

用法示例

使用信号量控制对于 item 池的访问:

class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    
   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
   }

   // Not a particularly efficient data structure; just for demo

   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }
 }

公平信号量

static final class FairSync extends Sync {

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

}

公平锁的实现,重点在于如果 hasQueuedPredecessors() 那么直接返回 -1,即获取失败。即如果队列中有正在等待的线程,则直接将当前线程加入到等待队列中 park

非公平信号量

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

非公平信号量获取资源的时候,则直接尝试 CAS 获取,不管队列里面有没有。

锁 VS 信号量

锁是服务于共享资源的;而 semaphore 是服务于多个线程间的执行的逻辑顺序的。

CountDownLatch

构造器

public CountDownLatch(int count) {
    this.sync = new Sync(count);
}

用法示例一

发出开始信号之后,所有的 Worker 才能开始干活:

class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(N);

     for (int i = 0; i < N; ++i) // create and start threads
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don't let run yet
     startSignal.countDown();      // let all threads proceed
     doSomethingElse();
     doneSignal.await();           // wait for all to finish
   }
 }

 class Worker implements Runnable {
   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;
   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
   }
   public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

用法示例二

将问题分解为 N 部分,然后主线程等待每一部分都完成,再汇总工作。如果线程需要经常 count down 的话,那么可以使用 CyclicBarrier

class Driver2 { // ...
   void main() throws InterruptedException {
     CountDownLatch doneSignal = new CountDownLatch(N);
     Executor e = ...

     for (int i = 0; i < N; ++i) // create and start threads
       e.execute(new WorkerRunnable(doneSignal, i));

     doneSignal.await();           // wait for all to finish
   }
 }

 class WorkerRunnable implements Runnable {
   private final CountDownLatch doneSignal;
   private final int i;
   WorkerRunnable(CountDownLatch doneSignal, int i) {
     this.doneSignal = doneSignal;
     this.i = i;
   }
   public void run() {
     try {
       doWork(i);
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

同步器的实现

只有非公平版本:

private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

CyclicBarrier

构造器

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

用法示例

class Solver {
   final int N;
   final float[][] data;
   final CyclicBarrier barrier;

   class Worker implements Runnable {
     int myRow;
     Worker(int row) { myRow = row; }
     public void run() {
       while (!done()) {
         processRow(myRow);

         try {
           barrier.await();
         } catch (InterruptedException ex) {
           return;
         } catch (BrokenBarrierException ex) {
           return;
         }
       }
     }
   }

    // 构造器
   public Solver(float[][] matrix) {
     data = matrix;
     N = matrix.length;
     Runnable barrierAction =
       new Runnable() { public void run() { mergeRows(...); }};
     barrier = new CyclicBarrier(N, barrierAction);

     List<Thread> threads = new ArrayList<Thread>(N);
     for (int i = 0; i < N; i++) {
       Thread thread = new Thread(new Worker(i));
       threads.add(thread);
       thread.start();
     }

     // wait until done
     for (Thread thread : threads)
       thread.join();
   }
 }

Condition

作用

条件变量,就是用来替代 waitnotifynotifyAll 等操作的。锁保证了操作的原子性。

生产者消费者

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
                ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
                --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

ReentrantLock

获取锁

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

释放锁

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

可重入性

首先在 AQS 中有一个放置当前独占线程的变量:

private transient Thread exclusiveOwnerThread;

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

protected final Thread getExclusiveOwnerThread() {
    return exclusiveOwnerThread;
}

这样的话,那么可以直接根据 Thread.currentThread() == getExclusiveOwnerThread() 来判断是否可以重入:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 判断是否是当前线程
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 不是当前线程,则不可获取资源
    return false;
}

当有线程持有该锁时,值就会在原来的基础上 +1,同一个线程多次获得锁时,就会多次 +1,这里就是可重入的概念。

ReentrantReadWriteLock

构造器

内部持有两把锁

public ReentrantReadWriteLock() {
    this(false);
}

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

AtomicInteger

递增实现

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

unsafegetAndAddInt 内部:

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

public native int getIntVolatile(Object var1, long var2);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

参考