ReentrantLock source code analysis of Java concurrency

ReentrantLock source code analysis of Java concurrency

ReentrantLock source code analysis of Java concurrency

Condition

In the previous chapter, we probably learned about the use of Condition, let's take a look at the implementation of Condition and juc. Condition under juc is essentially an interface, it only defines how to use this interface, and the specific implementation is actually done by subclasses. Access to resources

public interface Condition { void await () throws InterruptedException ; void awaitUninterruptibly () ; long awaitNanos ( long nanosTimeout) throws InterruptedException ; boolean await ( long time, TimeUnit unit) throws InterruptedException ; boolean awaitUntil (Date deadline) throws InterruptedException ; void signal () ; void signalAll () ; } Copy code

I believe that there is no need for the author to introduce the three methods of await(), signal(), and signalAll(). Some people even know the effects of these methods when they see other methods. Here the author briefly introduces:

awaitUninterruptibly(): After releasing the lock and suspending the current thread, this method will ignore the interrupt until it is awakened by the signal that the condition is satisfied, and the code that the condition is satisfied is executed. awaitNanos(long nanosTimeout): Pass in a time in nanoseconds. If the returned value> 0, it means that the waiting thread gets the signal that the condition is established within a predetermined time and can execute the code after the condition is established; if it is less than or equal to 0, it means that the thread has not received the signal that the condition is established within the predetermined time and cannot The code for which the execution condition is satisfied. await(long time, TimeUnit unit) is similar to awaitUntil(Date deadline). It also limits a timeout period. If a signal that the condition is established within the timeout period is received, it returns true to execute the code after the condition is established, otherwise it cannot be executed after the condition is established Code. Next, let's take a look at the specific implementation of the Condition interface under the juc package. Under the juc package, there are actually two internal classes that implement the Condition interface respectively. The first is our old friend: AbstractQueuedSynchronizer.ConditionObject, and the second is AbstractQueuedLongSynchronizer.ConditionObject, the implementation of these two internal classes are very similar, or even the same sometimes, here the author introduces the implementation of AbstractQueuedSynchronizer.ConditionObject. Access to resources

Let's first look at the await() method to see how the lock is released in the blocking thread in this method, and how to grab the lock again when the thread is awakened. When the thread enters await(), it will first create a Node node encapsulating the Thread object of the thread at <1>, and then release the lock held by the thread at <2>. If multiple threads enter the await() method of the same condition object, each thread will have a corresponding Node node, and these Node nodes will form a queue. After releasing the lock, the thread will enter into the block at <3> until it receives the signal that the condition is established or the signal of thread interruption, so the thread is awakened from <3> and jumps out of the loop. At <4>, we saw our old friend acquireQueued(final Node node, long arg). If you have a good memory, you will know that this method can also block the thread until the thread grabs the lock. In this way, the await method has completed the work similar to the wait() method.

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java . io . Serializable { //... public class ConditionObject implements Condition , java . io . Serializable { public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //<1> long savedState = fullyRelease(node); //<2> int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park( this ); //<3> if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //<4> interruptMode = REINTERRUPT; if (node.nextWaiter != null) //clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } } //... } Copy code

Of course, the await() method above is far more simple than what the author said. The knowledge that the author said just now is a general context, and there are many details worthy of in-depth study. Now, I invite you to understand the principle of await() line by line with the author.

At the beginning of await(), we can see that in the ConditionObject class itself, two fields are maintained, firstWaiter and lastWaiter, through which a waiting queue can be formed for a certain condition object. When a thread calls the addConditionWaiter() method, it will first check whether the current thread is an exclusive lock thread at <1>. If it is not, an exception will be thrown, and if it is, the execution will continue, and then the tail node of the condition object will be obtained. The node is not empty, and the status of the tail node is not Node.CONDITION, here it will be judged that the condition node is no longer a valid condition node, and unlinkCancelledWaiters() at <2> will be called to remove the condition node from the queue, and then Get the tail node again. After that, a Node node will be created for the calling thread, and the waiting state is CONDITION (-2), indicating that this is a conditional node. If the tail node is empty, it means that there is no node in the current queue, so execute the code at <3> to change the current node Assign value to the head node (firstWaiter). If the tail node is not empty, assign the current node to the nextWaiter of the original tail node, and then assign the current node to the tail node. Since this code can only be executed by the thread that holds the lock, this code is thread-safe.

Next, let's take a look at how unlinkCancelledWaiters() removes a non-waiting node from the queue. In the conditional queue, as long as the state is not equal to CONDITION, it is an unconditional node. All nodes that enter the conditional queue are conditional nodes at the beginning, but they may change from conditional nodes to unconditional nodes for various reasons, so they need to be removed from the queue. As for the reasons, I will elaborate on them later, as long as you know There is a situation where a conditional node will become an unconditional node. Once this happens, the unconditional node must be removed from the conditional queue. This is similar to the original AQS node, but there are some differences. The original AQS believes that as long as the waiting state of the node is> 0, it is a failed node and must be removed from the queue. Access to resources

When a non-conditional node is to be removed from the conditional queue, it will start looping from the head node, and next points to the next node of the current node for the next round of judging whether there is a conditional node. If the traversed node is a conditional node, the code at <5> will be executed to assign the current node to the trail, so the trail always points to the lowest conditional node in the traversal process. If the current node is not a conditional node, the nextWaiter of the current node will be cleared to facilitate GC, and at the same time, it will be checked whether the valid node trail closest to the current node is null, which means that the node from the beginning to the current node is not a valid node, so the next node Next is assigned to the head node, otherwise the trail.nextWaiter, which is the last condition node, points to the next node until next is null, and the loop is out of the loop. If neither the head node to the tail node is a conditional node, the trail will always be null, and the branch at <4> will be executed to the end, both firstWaiter and lastWaiter will be null, and there is no node in the conditional queue.

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java . io . Serializable { //... private transient volatile Node head; private transient volatile Node tail; //... static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1 ; //The node status is canceled static final int SIGNAL = -1 ; //The successor node of the current node is waiting to be awakened static final int CONDITION = -2 ; //The node waiting condition is established volatile int waitStatus; //Waiting status volatile Thread thread ; //Node points to thread object /** * Generally used to connect to the next condition node, or use the special value Node.SHARED * Indicates that this is a shared node. */ Node nextWaiter; //... Node( int waitStatus) { WAITSTATUS. set ( this , waitStatus); THREAD. set ( this , Thread.currentThread()); } //... } public class ConditionObject implements Condition , java . io . Serializable { //... private transient Node firstWaiter; private transient Node lastWaiter; //... private Node addConditionWaiter () { if (!isHeldExclusively()) //<1> throw new IllegalMonitorStateException(); Node t = lastWaiter; //If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); //<2> t = lastWaiter; } Node node = new Node(Node.CONDITION); if (t == null) firstWaiter = node; //<3> else t.nextWaiter = node; lastWaiter = node; return node; } //... private void unlinkCancelledWaiters () { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { //<4> t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; //<5> t = next; } } } //... } Copy code

After encapsulating the thread as a Node node and adding it to the conditional queue, await() will also call fullyRelease(Node node) to release the lock. You can see here that getState() is called first to obtain the reference count of the lock, and then release(int arg ) Clear the reference count of the lock. If this method is not called by the thread occupying the lock, release(int arg) will return false, and an IllegalMonitorStateException will be thrown here. If the lock is released successfully, the original reference count savedState will be returned.

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java . io . Serializable { //... final int fullyRelease (Node node) { try { int savedState = getState(); if (release(savedState)) return savedState; throw new IllegalMonitorStateException() ; } catch (Throwable t) { node.waitStatus = Node.CANCELLED; throw t; } } //... public class ConditionObject implements Condition , java . io . Serializable { //... public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park( this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) //clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } //... } //... } Copy code

After that, it will be judged whether the current node is in the synchronization queue, that is, whether the node is in the header and tail queues of AQS, it is definitely not here, because at <1> of the following code, it can be determined that the waiting state of the current node is CONDITION, and The node does not have a predecessor node, so isOnSyncQueue(Node node) will return false, !isOnSyncQueue(node) is true, here will enter the loop at <2> and block the current thread.

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java . io . Serializable { //... final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) //<1> return false ; if (node.next != null) //If has successor, it must be on queue return true ; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ return findNodeFromTail(node); } //... public class ConditionObject implements Condition , java . io . Serializable { //... public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { //<2> LockSupport.park( this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) //clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } //... } //... } Copy code

Since the current thread has been blocked, then we have to find a way to wake up the thread and let the thread continue to execute. We all know that there are two ways to wake up the thread blocked in await(), one is to call the signal() method, the other is Is the interrupt thread. Let's start with signal(). Access to resources

When we call signal(), we still first determine whether the calling thread currently holds the lock. If the thread that does not own the lock calls this method, an IllegalMonitorStateException will be thrown. If the thread is the exclusive thread of the lock, then continue execution. Then get the head node (firstWaiter) of the conditional queue. If the head node is not empty, pass the head node to the doSignal(Node first) method. Access to resources

Observing the doSignal (Node first) method, we know that the first passed in for the first time will definitely execute transferForSignal (Node node). The return result of this method determines whether the loop in the doSignal (Node first) method can continue. Let's take a look at the transferForSignal(Node node) method first.

If the node we pass in transferForSignal(Node node) is a conditional node, that is, the waiting state is CONDITION, then the result at <1> must be true, and the original conditional node becomes an unconditional node. In other words, This node should be removed from the conditional queue. Then execute the code at <2> to enter the node into the synchronization queue, that is, the AQS queue. If the enqueue is successful, it will return to the predecessor node of the current node at <3>. Afterwards, it is judged whether the predecessor node status of the node is invalid, or whether the waiting state of the predecessor node can be changed to SIGNAL by CAS. If it fails or the CAS modification fails, the thread of the node is awakened, and finally true is returned.

If it is executed in accordance with this process, then the return of transferForSignal(first) is not true, (!transferForSignal(first) && (first = firstWaiter) != null) will not be established, and the loop will exit. In addition, we noticed that in the doSignal(Node first) method, the next node (first.nextWaiter) of the original head node (first) will be assigned to the head node (firstWaiter) of the waiting object every time the loop is executed, regardless of whether it is transferForSignal(Node If the return of node) is true or false, the original head node will be removed. If the node we passed into the conditional queue is no longer a conditional node, and there are other nodes in the queue, (!transferForSignal(first) && (first = firstWaiter ) != null) will be established until the first condition node in the queue is encountered, or there are no more nodes in the queue. During the loop, if the head node (firstWaiter) is found to be empty, it means that there are no extra nodes in the queue, and the tail node (firstWaiter) will be emptied. At the same time, the nextWaiter of the original head node (first) will be emptied every time the loop is repeated. Reference to facilitate garbage collection [access to resources] .

So to summarize the role of doSignal (Node first), it is to start from the head node of the conditional queue to determine whether the node is a conditional node, if it is, modify its waiting state and put it into the synchronization queue of AQS, and follow the condition The queue removes the node. If the head node is not a conditional node, unconditional nodes will be removed while traversing until a conditional node is encountered, or there is no node in the conditional queue. Access to resources

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java . io . Serializable { //... final boolean transferForSignal (Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //If the waiting state of the incoming node is not CONDITION, it means that the node is no longer a conditional node, and return false if (!node.compareAndSetWaitStatus(Node.CONDITION, 0 )) //<1> return false ; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //If the incoming node is a conditional node, the status of the node here will be changed from CONDITION to 0, and the original conditional node will be converted to an unconditional node, and enter the AQS synchronization queue Node p = enq(node); //<2> int ws = p.waitStatus; if (ws> 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) //<4> LockSupport.unpark(node.thread); return true ; } //... private Node enq (Node node) { for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return oldTail; //<3> } } else { initializeSyncQueue(); } } } //... public class ConditionObject implements Condition , java . io . Serializable { //... public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } //... private void doSignal (Node first) { do { if ((firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } //... } //... } Copy code

After calling the signal() method to remove the condition node from the condition queue and put it into the AQS synchronization queue, the current thread releases the lock and wakes up the thread of the next node of the synchronization queue head node, so the thread that was blocked by calling await() will be removed from the loop< 1> LockSupport.park(this) wakes up, and then executes the checkInterruptWhileWaiting(node) method. If the result is not 0, then jump out of the loop, if the result is 0, judge whether the isOnSyncQueue(node) condition is established, if the isOnSyncQueue(node) range The result is true to jump out of the loop, otherwise continue to loop. Access to resources

So let s take a look at the first method executed after the blocking thread is awakened.

checkInterruptWhileWaiting(node)
, This method is also simple, just judge whether the thread is interrupted, if not, return 0, our thread is not awakened by interruption, the return result here must be 0, so you can not enter the branch at <2> to jump out of the loop. After that, we judge whether the condition of isOnSyncQueue(node) is established. 1. the waiting state of the node is not CONDITION. Secondly, after the current node enters the synchronization queue, there must be a predecessor node, so the branch at <3> must not enter. If the node has a successor node, then it can be determined that this node must be a node in the synchronization queue, but after our node enters the synchronization queue, we assume that no other threads request the lock, so our node has no successor node, and we cannot enter the branch here. 4>. After that, we can only call findNodeFromTail(node) to find the current node from the tail node. It is certain that the current node can be found by traversing from the tail of the synchronization queue, because the current node is the tail node. So isOnSyncQueue(node) will eventually return true, indicating that the current node is already in the synchronization queue, so while (!isOnSyncQueue(node)) is judged to be false and out of the loop. Access to resources

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java . io . Serializable { //... public class ConditionObject implements Condition , java . io . Serializable { public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { //<1> LockSupport.park( this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) //<2> break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) //clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } //... private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted()? (transferAfterCancelledWait(node)? THROW_IE: REINTERRUPT): 0 ; } } //... final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) //<3> return false ; if (node.next != null) //If has successor, it must be on queue <4> return true ; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ return findNodeFromTail(node); } //... private boolean findNodeFromTail (Node node) { //We check for node first, since it's likely to be at or near tail. //tail is known to be non-null, so we could re-order to " save" //one null check, but we leave it this way to help the VM. for (Node p = tail;;) { //<5> if (p == node) return true ; if (p == null ) return false ; p = p.prev; } } //... } Copy code

After understanding how to wake up a blocked thread through signal(), let's take a look at how the flow of the blocked thread is interrupted. After the blocked thread is interrupted, it is awakened from LockSupport.park(this) to execute the logic in checkInterruptWhileWaiting(node). Here, it is judged that the result of Thread.interrupted() is true, so the return result of checkInterruptWhileWaiting(node) is also based on transferAfterCancelledWait(node) The decision is to return THROW_IE (-1) or REINTERRUPT (1).

When the transferAfterCancelledWait(node) method is called, if the waiting state of the node can be successfully modified from the condition node (CONDITION) to 0 by CAS at <1>, the current node will be put into the synchronization queue and return true, (transferAfterCancelledWait (node)? THROW_IE: REINTERRUPT) will return THROW_IE as the result, indicating that the node is enqueued during the interruption, and the status of the node is a conditional node. If the status of the node at <1> is not CONDITION, it means that the status of the node has been changed. It is not certain that other threads executed signal() before the interruption, which caused the waiting state of the node to be changed, or the current The thread is interrupted first, and then other threads call signal() to cause the waiting state of the node to be changed. Once this happens, you cannot enter the branch at <1>, and will continue to go down, and loop at <2> Determine whether the current node enters the synchronization queue. If it is not in the synchronization queue, loop until another thread puts the current node into the synchronization queue, and finally returns false, (transferAfterCancelledWait(node)? THROW_IE: REINTERRUPT) will return REINTERRUPT as the result. But whether it is returning to THROW_IE or REINTERRUPT, we can all enter the branch <3> out of the loop.

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java . io . Serializable { //... public class ConditionObject implements Condition , java . io . Serializable { //... private static final int REINTERRUPT = 1 ; private static final int THROW_IE = - 1 ; //... public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park( this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) //<3> break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) //clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } //... private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted()? (transferAfterCancelledWait(node)? THROW_IE: REINTERRUPT): 0 ; } } //... final boolean transferAfterCancelledWait (Node node) { if (node.compareAndSetWaitStatus(Node.CONDITION, 0 )) { //<1> enq(node); return true ; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ while (!isOnSyncQueue(node)) //<2> Thread.yield(); return false ; } //... } Copy code

We have seen how a blocking thread leaves the loop after being awakened and interrupted. Let s take a look at how the threads that leave the loop in these two ways will execute the code afterwards. 1. the node corresponding to the thread has entered the synchronization queue from the condition queue, so acquireQueued(node, savedState) will be called at <1> to try Acquire the lock. If the competition lock fails, the current thread will be blocked until it is awakened to restart a new round of competition until the lock grab succeeds. And acquireQueued(node, savedState) will return whether the current thread is interrupted. If it is a thread awakened by signal(), there is no interruption mark. After the lock grab successfully exits from acquireQueued(node, savedState), the return result is false, here Will not enter the branch at <1>, so interruptMode is 0. Then determine if the node nextWaiter is not empty, then clear the conditional queue and remove the unconditional nodes in the queue. Since interruptMode is 0, the branch at <3> will not be entered here.

If the thread is interrupted, after the acquireQueued(node, savedState) method exits successfully, the return result is true. If it is judged that interruptMode is not THROW_IE, it means that the current thread cannot judge whether the interrupt is executed first by signal() or after signal(). , Then mark the interrupt mode interruptMode as REINTERRUPT, enter the branch at <3> and execute the reportInterruptAfterWait(int interruptMode) method, and set the interrupt flag of the current thread to true. If the interruptMode is THROW_IE, it means that during the interruption period until the CAS is used to modify the waiting state of the node to be unconditionally successful, no other thread modifies the node state, so entering the branch at <3> and executing reportInterruptAfterWait(int interruptMode) will throw an interrupt exception .

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java . io . Serializable { //... public class ConditionObject implements Condition , java . io . Serializable { //... private static final int REINTERRUPT = 1 ; private static final int THROW_IE = - 1 ; //... public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park( this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //<1> interruptMode = REINTERRUPT; if (node.nextWaiter != null) //clean up if cancelled <2> unlinkCancelledWaiters(); if (interruptMode != 0 ) //<3> reportInterruptAfterWait(interruptMode); } //... } //... private void reportInterruptAfterWait ( int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } //... } Copy code

Next, the author introduces the awaitNanos(long nanosTimeout) method. At <1>, the expiration time is calculated based on the current system time plus the waiting time nanosTimeout passed in from the outside. Then use initialNanos to save the initial waiting time, because the cycle of this waiting time at <3> may be reduced, and the initial remaining time is needed later. Then we call the addConditionWaiter() method to put the current thread as a Node node into the condition queue, and then call fullyRelease(node) to release the lock occupied by the current thread, and then enter the loop at <3>. In this loop, the remaining waiting time nanosTimeout will be Decrease continuously, if it is less than or equal to 0, exit the loop; if the remaining waiting time is determined to be greater than 1000ns (SPIN_FOR_TIMEOUT_THRESHOLD) during the loop, select timing blocking. If the thread is interrupted during the blocking period, or the thread is awakened after the expiration time is reached, then After checkInterruptWhileWaiting(node) will be executed, this method will not be introduced here. I have introduced this method before, and also explained how interrupts and wake-ups leave the loop. After leaving the loop, the thread calls acquireQueued(node, savedState) to grab the lock again. After the lock is successfully grabbed, it is judged according to the interrupt mode interruptMode whether to throw an exception, mark the thread interruption state, or nothing happens.

If the thread is awakened or the thread interruption status is re-marked, the code after <5> will be executed. Here, the remaining time remaining will be calculated based on the expiration time and the current time. Generally, the remaining time (remaining) is less than the waiting time ( initialNanos) unless an overflow occurs. If the remaining time is less than the waiting time, the remaining time is returned, otherwise Long.MIN_VALUE is returned.

public class ConditionObject implements Condition , java . io . Serializable { //... static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L ; //... public final long awaitNanos ( long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //We don't check for nanosTimeout <= 0L here, to allow //awaitNanos(0) as a way to "yield the lock". final long deadline = System.nanoTime() + nanosTimeout; //<1> long initialNanos = nanosTimeout; //<2> Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { //<3> if (nanosTimeout <= 0L ) { transferAfterCancelledWait(node); break ; } if (nanosTimeout> SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos( this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline-System.nanoTime(); //<4> } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); long remaining = deadline-System.nanoTime(); //avoid overflow<5> return (remaining <= initialNanos)? remaining: Long.MIN_VALUE; } //... } Copy code

Finally, the author introduces signalAll() again and ends the chapter of ReentrantLock source code analysis. Of course, the implementation of Condition in AQS is not fully introduced here. There are still: awaitUninterruptibly(), await(long time, TimeUnit unit), awaitUntil(Date deadline), but these methods are similar to awaitNanos(long nanosTimeout), I believe Every judge must be able to think about these methods independently.

Next, let s take a look at signalAll(). In fact, this method can be easily understood as long as you have the previous basic knowledge. The difference between signalAll() and signal() is that signal() calls the transferForSignal(first) method as long as there are conditions in the queue When a conditional node is transformed into an unconditional node and enters the synchronization queue, it will exit. And signalAll() will call the transferForSignal(first) method to transfer all nodes in the queue and add them to the synchronous queue until the conditional queue is empty.

public class ConditionObject implements Condition , java . io . Serializable { //... public final void signalAll () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } //... private void doSignalAll (Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } //... } Copy code

At this point, we can be considered to understand the use of ReentrantLock and the principle of implementation. I don't know if you have such feelings. This chapter is not so much about ReentrantLock, but more like AQS. The internal classes of ReentrantLock only implement some interfaces required by AQS and have their own characteristics.

In fact, the real purpose of the author is to take everyone to understand AQS. In my opinion, AQS can definitely be said to be the core of the juc package and there is no one. It is just that AQS is too complex and abstract and needs to be viewed in multiple dimensions. So the author here uses ReentrantLock as an entry point to learn ReentrantLock and AQS with everyone.

ReentrantLock comes to an end here. In the following chapters, the author will work with you to look at AQS from more perspectives. See you in the next article.

Finally, I wish you all early success in your studies, get a satisfactory offer, get a quick promotion and raise your salary, and reach the pinnacle of life.

If you can, please give me a three-line support for me?????? Access to information