JUC analysis of abstract queue synchronizer (AQS-AbstractQueuedSynchronizer)

JUC analysis of abstract queue synchronizer (AQS-AbstractQueuedSynchronizer)

Abstract Queue Synchronizer (AQS-AbstractQueuedSynchronizer)

To understand from the name:

  • Abstract: It is an abstract class, which is implemented by subclasses
  • Queue: The data structure is a queue, and the queue is used to store data
  • Synchronization: based on it can realize the synchronization function

Let's start with the interpretation from these aspects, but first of all, we must first know the following characteristics to facilitate understanding

AbstractQueuedSynchronizer features

1. AQS can realize exclusive lock and shared lock.

2. Exclusive lock exclusive is a pessimistic lock. Ensure that only one thread passes through a blocking point and only one thread can acquire the lock.

3. Shared lock shared is an optimistic lock. Multiple thread blocking points can be allowed, and multiple threads can acquire locks at the same time. It allows a resource to be accessed by multiple read operations or by one write operation, but two operations cannot be accessed at the same time.

4. AQS uses an int type member variable state to represent the synchronization state. When state>0, it means that the lock has been acquired. When state=0, there is no lock. It provides three methods (getState(), setState(int newState), compareAndSetState(int expect, int update)) to operate on the synchronized state state, which can ensure that the operation on the state is safe.

5. AQS is implemented through a CLH queue (CLH lock is Craig, Landin, and Hagersten (CLH) locks, CLH lock is a spin lock, can ensure no starvation, provide first-come, first-served fairness. CLH lock It is also a scalable, high-performance, and fair spin lock based on a linked list. The application thread only spins on local variables. It continuously polls the state of the precursor. If it finds that the precursor releases the lock, it ends the spin.)

abstract

Let's take a look at the source code and see that it inherits from

AbstractOwnableSynchronizer
It is an abstract class.

public abstract class AbstractQueuedSynchronizer the extends AbstractOwnableSynchronizer the implements the Java . IO . Serializable copy the code

AQS uses a volatile variable internally

state
As the resource identifier. At the same time, several protected methods for obtaining and changing state are defined. Subclasses can override these methods to implement their own logic.

You can see that the class provides us with several protected-level methods, they are:

//Create a queue synchronizer instance, the initial state is 0 protected AbstractQueuedSynchronizer () {} //Return the current value of the synchronization state. protected final int getState () { return state; } //Set the value of the synchronization state protected final void setState ( int newState) { state = newState; } //Exclusive mode. Attempt to obtain resources, return true if successful, false if failed. protected boolean tryAcquire ( int arg) { throw new UnsupportedOperationException(); } //Exclusive mode. Attempt to release the resource. If it succeeds, it returns true, and if it fails, it returns false. protected boolean tryRelease ( int arg) { throw new UnsupportedOperationException(); } //Share method. Try to obtain resources. Negative number means failure; 0 means success, but there are no remaining resources available; positive number means success, and there are remaining resources protected int tryAcquireShared ( int arg) { throw new UnsupportedOperationException(); } //Share method. Try to release the resource, and return true if the subsequent waiting node is allowed to be awakened after the release, otherwise it returns false. protected boolean tryReleaseShared ( int arg) { throw new UnsupportedOperationException(); } Copy code

Although these methods are all protected methods, they are not specifically implemented in AQS, but directly throw exceptions. AQS implements a series of main logic. It can be seen that AQS is a

Abstract
Used for
Build locks and synchronizers
of
frame
, The use of AQS can simply and efficiently construct a widely used synchronizer, such as the one we mentioned
ReentrantLock
,
Semaphore
,
ReentrantReadWriteLock
,
SynchronousQueue
,
FutureTask
And so on
Based on AQS
of.

We can also use AQS to construct a custom synchronizer very easily, as long as the subclass implements several of its protected methods.

queue

The AQS class itself implements the maintenance of the specific thread waiting queue (such as

Failed to get resources into the team
/
Wake up
Wait). It uses a first-in-first-out (FIFO) double-ended queue (CLH) internally, and uses two pointers head and tail to identify the head and tail of the queue. The data structure is shown in the figure:

The queue does not store threads directly, but stores

Node node that owns the thread
.

Let's take a look at the structure of Node:

static final class Node { //Mark a node (corresponding thread) to wait in shared mode static final Node SHARED = new Node(); //Mark a node (corresponding thread) to wait for static final Node in exclusive mode EXCLUSIVE = null ; //The value of waitStatus indicates that the node (corresponding thread) has been cancelled static final int CANCELLED = 1 ; //The value of waitStatus indicates that the subsequent node (corresponding thread) needs to be awakened static final int SIGNAL = -1 ; //The value of waitStatus indicates that the node (corresponding thread) is waiting for a certain condition static final int CONDITION = -2 ; //The value of waitStatus indicates that resources are available, and the new head node needs to continue to wake up subsequent nodes. //(In shared mode, multithreading releases resources concurrently, and after head wakes up its subsequent nodes, //need to add more Resources are reserved for the subsequent nodes; when a new head node is set, its subsequent nodes will continue to be awakened) static final int PROPAGATE = -3 ; //Waiting status, value range, -3, -2, -1, 0, 1 volatile int waitStatus; volatile Node prev; //predecessor node volatile Node next; //successor node volatile Thread thread; //end Point corresponding to the thread Node nextWaiter; //the node waiting for the next waiting condition in the queue //The method of judging the sharing mode final boolean isShared () { return nextWaiter == SHARED; } Node(Thread thread, Node mode) { //Used by addWaiter this .nextWaiter = mode; this .thread = thread; } //Other methods are ignored, you can refer to the specific source code } //The private method of addWaiter in AQS private Node addWaiter (Node mode) { //This constructor of Node is used Node node = new Node(Thread.currentThread(), mode); //Other codes are omitted } Copy code

Through Node, we can implement two queues, one is to implement CLH queue (thread synchronization queue, two-way queue) through prev and next, and the other is to implement waiting thread queue (one-way queue) on Condition by nextWaiter. This Condition is mainly used in ReentrantLock Class

Synchronize

Two synchronization methods:

  • Exclusive mode (Exclusive): The resource is exclusive and can only be acquired by one thread at a time. Such as ReentrantLock.
  • Share mode (Share): It can be acquired by multiple threads at the same time, and the specific number of resources can be specified by parameters. Such as Semaphore/CountDownLatch.

Synchronization classes that implement both modes at the same time, such as ReadWriteLock

Access to resources

The entry point for acquiring resources is the acquire(int arg) method. arg is the number of resources to be acquired, which is always 1 in exclusive mode.

public final void acquire ( int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } Copy code

First call tryAcquire(arg) to try to acquire resources. Mentioned earlier

This method is implemented in the subclass
If the resource acquisition fails, the thread is inserted into the waiting queue through the addWaiter(Node.EXCLUSIVE) method. The parameters passed in represent the ones to be inserted
Node is exclusive
of. The concrete realization of this method:

private Node addWaiter (Node mode) { //Generate the Node node corresponding to this thread Node node = new Node(Thread.currentThread(), mode); //Insert Node into the queue Node pred = tail; if (pred != null ) { node.prev = pred; //Try using CAS and return if if successful (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //If the waiting queue is empty or the above CAS fails, spin CAS to insert enq(node); return node; } //In AQS, there will be multiple threads competing for resources at the same time. //Therefore , there will definitely be multiple threads inserting nodes at the same time. //The thread safety of the operation is guaranteed by CAS spin. //Spin CAS inserted into the waiting queue private Node enq ( final Node node) { for (;;) { Node t = tail; if (t == null ) { //Must initialize if (compareAndSetHead( new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } Copy code

If the setting is successful, it means that you have acquired the lock, and return true. The action of state being 0 and setting 1 has been done once externally, and doing it again internally is just to increase the probability, and such an operation does not account for the overhead in terms of locking. If the status is not 0, determine whether the current thread is the owner of the exclusive lock. If it is the Owner, try to increase the status by acquires (that is, increase by 1). If the status value is out of range, an exception will be thrown. If it is not out of range, Return true after setting the state (a function similar to bias is realized, re-entrant, but no further expropriation is required). If the status is not 0, and it is not the owner, it returns false.

Now through the addWaiter method, a Node has been

Put it at the end of the waiting queue
Up. And the node in the waiting queue is
Get resources one by one from the beginning node
. For the specific implementation, let's take a look at the acquireQueued method:

final boolean acquireQueued ( final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; //spin for (;;) { final Node p = node.predecessor(); //if node is the predecessor The node p is the head, which means that node is the second node, and you can try to acquire the resource if (p == head && tryAcquire(arg)) { //After you get the resource, point the head to the node. //So the node pointed to by head is the node where the resource is currently obtained or null. setHead(node); p.next = null ; //help GC failed = false ; return interrupted; } //If you can rest, enter the waiting state until it is unpark() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } Copy code

Here, LockSupport.park(this) is used internally in the parkAndCheckInterrupt method. By the way, I will briefly introduce park.

The LockSupport class is a class introduced in Java 6, which provides basic thread synchronization primitives. LockSupport actually calls a function in the Unsafe class. In Unsafe, there are only two functions: park(boolean isAbsolute, long time): block the current thread unpark(Thread jthread): stop the given thread from blocking

So after the node enters the waiting queue, it is

Call park to make it enter the blocking state
of. Only the thread of the head node is active.

The process of acquiring resources by the acquire method:

Of course, besides acquire, there are three other methods to acquire resources:

  • acquireInterruptibly: apply for interruptible resources (exclusive mode)
  • acquireShared: apply for resources in shared mode
  • acquireSharedInterruptibly: apply for interruptible resources (shared mode)

Interruptible means that InterruptedException may be thrown when the thread is interrupted

Release resources

Releasing resources is much simpler than acquiring resources. There is only a small implementation in AQS.

Source code:

public final boolean release ( int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } Copy code

The action of the tryRelease method can be regarded as an operation to set the lock state, and the state is subtracted from the parameter value passed in (parameter is 1). If the result state is 0, the Owner of the exclusive lock is set to null, so that Other threads have the opportunity to execute.

In exclusive locks, the state will increase by 1 when the lock is locked (of course you can modify this value yourself), and it will be subtracted by 1 when unlocked. The same lock may be superimposed to 2, 3, and 3 after being reentrant. 4 For these values, only the number of unlock() corresponding to the number of lock() will set the Owner thread to be empty, and only in this case will it return true. Everyone should pay attention to this when writing code. If you use lock() in the loop body or deliberately use more than two lock(), and finally only unlock() once, you may not be able to release the lock in the end. Causes a deadlock.

private void unparkSuccessor (Node node) { //If the status is negative, try to set it to 0 int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); //Get the successor node of the head node head.next Node s = node.next; //If the successor node is empty or the status is greater than 0 //From the previous definition, we know that there is only one possibility greater than 0, that is, the node has been cancelled if (s == null || s.waitStatus> 0 ) { s = null ; //All useful nodes in the waiting queue are moved forward for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } //If the successor node is not empty, if (s != null ) LockSupport.unpark(s.thread); } Copy code

The method unparkSuccessor(Node) means that the lock is really to be released. It passes in the head node. The first action that will happen inside is to get the next node of the head node. If the obtained node is not empty, pass directly: " LockSupport.unpark() method to release the corresponding suspended thread.

Pay attention to the public account: java treasure