java多线程6:ReentrantLock
发布于 2022年 01月 29日 01:01
下面看下JUC包下的一大并发神器ReentrantLock,是一个可重入的互斥锁,具有比synchronized更为强大的功能。
ReentrantLock基本用法
先来看一下ReentrantLock的简单用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public class MyDomain1 { private Lock lock = new ReentrantLock(); public void method1() { System.out.println( "进入method1方法" ); try { lock.lock(); for ( int i = 0 ; i < 5 ; i++) { System.out.println(Thread.currentThread().getName() + " i=" + i); Thread.sleep( 1000 ); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 | public class Mythread1_1 extends Thread { private MyDomain1 myDomain1; public Mythread1_1(MyDomain1 myDomain1) { this .myDomain1 = myDomain1; } @Override public void run() { myDomain1.method1(); } } |
开启三个线程同时执行测试方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | @Test public void test1() throws InterruptedException { MyDomain1 myDomain1 = new MyDomain1(); Mythread1_1 a = new Mythread1_1(myDomain1); Mythread1_1 c = new Mythread1_1(myDomain1); Mythread1_1 d = new Mythread1_1(myDomain1); a.start(); c.start(); d.start(); a.join(); c.join(); d.join(); } |
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | 进入method1方法 Thread- 0 i= 0 进入method1方法 进入method1方法 Thread- 0 i= 1 Thread- 0 i= 2 Thread- 0 i= 3 Thread- 0 i= 4 Thread- 1 i= 0 Thread- 1 i= 1 Thread- 1 i= 2 Thread- 1 i= 3 Thread- 1 i= 4 Thread- 2 i= 0 Thread- 2 i= 1 Thread- 2 i= 2 Thread- 2 i= 3 Thread- 2 i= 4 |
可以看到,代码流程进入到lock.lock()以后没有任何的交替打印,都是一个线程执行完后一个线程才开始执行,说明ReentrantLock具有加锁的功能。
看下ReentrantLock源码的构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | /** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock( boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } |
可以看到ReentrantLock支持两种加锁模式:公平锁和非公平锁。它是如何实现的呢?继续往下看
我们测试用例中,默认使用的是非公平锁的加锁方法,看下 NonfairSync 的lock() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | /** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState( 0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire( 1 ); } protected final boolean tryAcquire( int acquires) { return nonfairTryAcquire(acquires); } } |
第12行的 compareAndSetState方法,当第一个线程执行次方法时,会将 state 设置为1,执行成功后,exclusiveOwnerThread=线程1。
此时线程1正常执行业务,当线程2走到lock方法时,此时线程12执行compareAndSetState方法将返回false,执行 acquire(1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | /** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire( int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } |
非公平锁实现的tryAcquire
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire( int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState( 0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) // overflow throw new Error( "Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } |
此时线程2得到的state应该是1,并且 current != getExclusiveOwnerThread(),所以线程2会继续执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。
注意第8行到第13行,如果此时线程1已经释放了锁,那么线程2得到的state就是0了,它将走获取锁的逻辑,
第14行到第20行,这块就是ReentrantLock支持可重入的实现,也就是如果当前执行的线程是持有锁的线程,那么就可以获取锁,并将state+1。
如果线程1此时还没有释放锁,那么线程2将走到等待队列里
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | * * @param node the node * @param arg the acquire argument * @return { @code true } if interrupted while waiting */ final boolean acquireQueued( final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; // help GC failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } |
这个for循环对于线程2来说,首先再次尝试去获取锁,因为此时线程1可能已经释放锁了,如果依旧获取锁失败,则执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true ; if (ws > 0 ) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } |
这块代码,最好打个断点一步一步去执行,更容易看出每一步执行的逻辑以及值。
这个ws是节点predecessor的waitStatus,很明显是0,所以此时把pred的waitStatus设置为Noed.SIGNAL即-1并返回false。
既然返回了false,上面的if自然不成立,再走一次for循环,还是先尝试获取锁,不成功,继续走shouldParkAfterFailedAcquire,此时waitStatus为-1,小于0,走第三行的判断,返回true。
1 2 3 4 5 6 7 8 9 | /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park( this ); return Thread.interrupted(); } |
最后一步,线程2调用LockSupport的park方法。
接下来就到线程1执行完任务后,将执行unlock方法 释放锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public void unlock() { sync.release( 1 ); } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release( int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } |
首先tryRelease(1) ,代码逻辑比较简单,就是将state设置0 (注意这是同一个锁只lock一次的情况下),并将 exclusiveOwnerThread设置为null
1 2 3 4 5 6 7 8 9 10 11 12 | protected final boolean tryRelease( int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread( null ); } setState(c); return free; } |
当锁释放完成后,继续执行release方法的 unparkSuccessor(h),
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); } |
h的下一个Node,这个Node里面的线程就是线程2,由于这个Node不等于null,线程2最终被unpark了,线程2可以继续运行。
有一个很重要的问题是:锁被解了怎样保证整个FIFO队列减少一个Node呢?
还记得线程2被park在 acquireQueued方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | final boolean acquireQueued( final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; // help GC failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } |
被阻塞的线程2是被阻塞在第14行,注意这里并没有return语句,阻塞完成线程2继续进行for循环。线程2所在的Node的前驱Node是p,线程2尝试tryAcquire,成功,
然后线程2就成为了head节点了,把p的next设置为null,这样原头Node里面的所有对象都不指向任何块内存空间,h属于栈内存的内容,方法结束被自动回收,
这样随着方法的调用完毕,原头Node也没有任何的引用指向它了,这样它就被GC自动回收了。此时,遇到一个return语句,acquireQueued方法结束,后面的Node也是一样的原理。
至此线程2 lock方法执行完成,并成功获取到锁。
至此ReentrantLock的非公平锁的加锁与锁释放逻辑已经大致清楚了,那么公平锁的加锁过程又是如何呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire( int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState( 0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error( "Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } } |
1:tryAcquire(1),因为是第一个线程,所以当前status=0,尝试获取锁,hasQueuedPredecessors方法也是和非公平锁一个代码上的区别
1 2 3 4 5 6 7 8 9 10 | public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } |
公平锁获取锁之前首先判断当前队列是否存在(head==tail)[不存在],设置staus=1,获取锁成功。
如果等待队列中存在等待线程,则取出第一个等待的线程(head.next),并返回第一个等待的线程是否是当前线程,
只有当等到队列的第一个等待的线程是当前线程尝试获取锁的线程,才会获取锁成功。
假如此时线程t2,也来获取锁,调用tryAcquire(1)时,因为status!=0,返回fasle,调用addWaiter(Node.EXCLUSIVE),
此时会生成一个队列,队列的head为 new Node(), tail为t2的Node,调用acquireQueued(t2的Node),因为此时t2所在Node的prev为head,所以会尝试直接获取一次锁,
如果获取成功,将t2的Node设置为head,如果没有获取锁,shouldParkAfterFailedAcquire(),t2 Park()。
ReentrantLock持有的锁
定义一个对象,分别有两个测试方法,一个用ReentrantLock加锁,一个用synchronized加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | public class MyDomain1 { private Lock lock = new ReentrantLock(); public void method1() { System.out.println( "进入method1方法" ); try { lock.lock(); for ( int i = 0 ; i < 5 ; i++) { System.out.println(Thread.currentThread().getName() + " i=" + i); Thread.sleep( 1000 ); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } // 为了测试 lock 和 synchronized同步方法不是同一把锁 public synchronized void method2() { System.out.println( "进入method2方法" ); for ( int j = 0 ; j < 5 ; j++) { System.out.println(Thread.currentThread().getName() + " j=" + j); try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } |
定义两个线程类,分别调用一个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 | public class Mythread1_1 extends Thread { private MyDomain1 myDomain1; public Mythread1_1(MyDomain1 myDomain1) { this .myDomain1 = myDomain1; } @Override public void run() { myDomain1.method1(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 | public class Mythread1_2 extends Thread { private MyDomain1 myDomain1; public Mythread1_2(MyDomain1 myDomain1) { this .myDomain1 = myDomain1; } @Override public void run() { myDomain1.method2(); } } |
1 2 3 4 5 6 7 8 9 10 11 | @Test public void test1() throws InterruptedException { MyDomain1 myDomain1 = new MyDomain1(); Mythread1_1 a = new Mythread1_1(myDomain1); Mythread1_2 b = new Mythread1_2(myDomain1); a.start(); b.start(); a.join(); b.join(); } |
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 | 进入method1方法 Thread- 0 i= 0 进入method2方法 Thread- 1 j= 0 Thread- 0 i= 1 Thread- 1 j= 1 Thread- 1 j= 2 Thread- 0 i= 2 Thread- 0 i= 3 Thread- 1 j= 3 Thread- 0 i= 4 Thread- 1 j= 4 |
可以看到两个线路交替打印,说明 ReentrantLock 和 synchronized同步方法不是同一把锁
Condition
ReentrantLock实现等待/通知模型,这也是比synchronized更为强大的功能点之一。
1、一个ReentrantLock里面可以创建多个Condition实例,实现多路通知
2、notify()方法进行通知时,被通知的线程时Java虚拟机随机选择的,但是ReentrantLock结合Condition可以实现有选择性地通知
3、await()和signal()之前,必须要先lock()获得锁,使用完毕在finally中unlock()释放锁,这和wait()、notify()/notifyAll()使用前必须先获得对象锁是一样的
先看个示例
定义一个对象并new了两个condition,然后分别执行await方法,再定义一个signal方法,只唤醒其中一个condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | public class MyDomain2 { private Lock lock = new ReentrantLock(); private Condition conditionA = lock.newCondition(); private Condition conditionB = lock.newCondition(); public void await() { System.out.println( "进入await方法" ); try { lock.lock(); System.out.println(Thread.currentThread().getName() + " conditionA await " + System.currentTimeMillis()); conditionA.await(); System.out.println(Thread.currentThread().getName() + " conditionA await out " + System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void await2() { System.out.println( "进入await2方法" ); try { lock.lock(); System.out.println(Thread.currentThread().getName() + " conditionB await " + System.currentTimeMillis()); conditionB.await(); System.out.println(Thread.currentThread().getName() + " conditionB await out " + System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void signal() { System.out.println( "进入signal方法" ); try { lock.lock(); System.out.println(Thread.currentThread().getName() + " conditionA signal " + System.currentTimeMillis()); conditionA.signal(); Thread.sleep( 3000 ); System.out.println(Thread.currentThread().getName() + " conditionA signal " + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } |
一个线程执行await方法,一个线程负责执行signal
1 2 3 4 5 6 7 8 9 10 11 12 13 | public class Mythread2_1 extends Thread { private MyDomain2 myDomain2; public Mythread2_1(MyDomain2 myDomain2) { this .myDomain2 = myDomain2; } @Override public void run() { myDomain2.await(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 | public class Mythread2_2 extends Thread { private MyDomain2 myDomain2; public Mythread2_2(MyDomain2 myDomain2) { this .myDomain2 = myDomain2; } @Override public void run() { myDomain2.signal(); } } |
测试方法
1 2 3 4 5 6 7 8 9 10 11 12 13 | @Test public void test2() throws InterruptedException { MyDomain2 myDomain2 = new MyDomain2(); Mythread2_1 a = new Mythread2_1(myDomain2); Mythread2_2 b = new Mythread2_2(myDomain2); a.start(); Thread.sleep( 5000 ); b.start(); a.join(); b.join(); } |
执行结果:
1 2 3 4 5 6 | 进入await方法 Thread- 0 conditionA await 1639549418811 进入signal方法 Thread- 1 conditionA signal 1639549423817 Thread- 1 conditionA signal 1639549426820 Thread- 0 conditionA await out 1639549426820 |
可以看到进入await方法后,线程1 park住了,5秒钟后,待signal执行完成后,线程1才开始继续执行。
同时condition还有signalAll方法,可以唤醒同一个condition所有在等待的线程。
看过 ReentrantLock源码的应该注意到 AbstractQueuedSynchronizer, 它也是JUC包实现的核心抽象同步器,
也是CountDownLatch、Semphore等并发类的核心组件,这个我们后续再继续研究。
参考文献
1:《Java并发编程的艺术》
2:《Java多线程编程核心技术》