AQS source code exploration _09 Semaphore source code analysis

AQS source code exploration _09 Semaphore source code analysis


1 Introduction

  • Semaphore, semaphore, it saves a series of permits (permits), each call
    acquire()
    Will consume a license, and each call
    release()
    Both will return a license.
  • Semaphore is usually used to limit the number of accesses to shared resources at the same time, which is often referred to as current limit.
  • Semaphore semaphore, flow chart for obtaining a pass:

2. Introductory case

Case 1: Pool.java

/** * date: 2021/5/10 * @author csp */ public class Pool { /** * The maximum number of threads that can access resources at the same time */ private static final int MAX_AVAILABLE = 100 ; /** * Semaphore representation: object passes that can be obtained */ private final Semaphore available = new Semaphore(MAX_AVAILABLE, true ); /** * Shared resources, you can imagine that the items stored in the items array are all Connection objects, and the simulation is a connection pool */ protected Object[] items = new Object[MAX_AVAILABLE]; /** * The occupancy of shared resources corresponds to the items array one-to-one, such as: * The items[0] object is occupied by an external thread, then used[0] == true, otherwise used[0] == false */ protected boolean [] used = new boolean [MAX_AVAILABLE]; /** * Get a free object * If there are no free objects in the current pool, wait until there are free objects */ public Object getItem () throws InterruptedException { //Each call to acquire() will consume a permit (permits) available.acquire(); return getNextAvailableItem(); } /** * Return the object to the pool */ public void putItem (Object x) { if (markAsUnused(x)) available.release(); } /** * Get a free object in the pool, return Object if it succeeds, and Null if it fails * After success, the corresponding used[i] = true */ private synchronized Object getNextAvailableItem () { for ( int i = 0 ; i <MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true ; return items[i]; } } return null ; } /** * Return the object to the pool, return true if the return is successful * Failed to return: * 1. The object reference does not exist in the pool, return false * 2. There is a reference to the object in the pool, but the current state of the object is idle, and false is also returned */ private synchronized boolean markAsUnused (Object item) { for ( int i = 0 ; i <MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false ; return true ; } else return false ; } } return false ; } } Copy code

Case 2: SemaphoreTest02.java

/** * date: 2020/5/10 * @author csp */ public class SemaphoreTest02 { public static void main (String[] args) throws InterruptedException { //Declare the semaphore, the initial permit (permits) is 2 //Fair mode: fair is true final Semaphore semaphore = new Semaphore( 2 , true ); Thread tA = new Thread(() ->{ try { //Each call to acquire() will consume a permit (permits) semaphore.acquire(); System.out.println( "Thread A successfully obtained the pass" ); TimeUnit.SECONDS.sleep( 10 ); } catch (InterruptedException e) { } finally { //Each call to release() will return a permit (permits) semaphore.release(); } }); tA.start(); //Make sure thread A has executed TimeUnit.MILLISECONDS.sleep( 200 ); Thread tB = new Thread(() ->{ try { //Calling acquire(2) will consume 2 permits (permits) semaphore.acquire( 2 ); System.out.println( "Thread B successfully obtained the pass" ); } catch (InterruptedException e) { } finally { //Calling release(2) will return 2 permits (permits) semaphore.release( 2 ); } }); tB.start(); //Make sure that thread B has executed TimeUnit.MILLISECONDS.sleep( 200 ); Thread tC = new Thread(() ->{ try { //Each call to acquire() will consume a permit (permits) semaphore.acquire(); System.out.println( "Thread C successfully obtained the pass" ); } catch (InterruptedException e) { } finally { //Each call to release() will return a permit (permits) semaphore.release(); } }); tC.start(); } } Copy code

Results of the:

Thread A successfully obtained the pass Thread B successfully obtained the pass Thread C successfully obtained the pass Copy code

\

3. Source code analysis

Inner class Sync

  • Through several implementation methods of Sync, we have obtained the following information:

    • The permission is passed in when the method is constructed;
    • The permission is stored in the state variable state;
    • When trying to obtain a license, the value of state is reduced by one;
    • When the value of state is 0, the license can no longer be obtained;
    • When a license is released, the value of state is increased by 1;
    • The number of licenses can be changed dynamically;
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L ; //Construction method, pass in the number of permits, and put it in state Sync( int permits) { setState(permits); } //Get the number of permits final int getPermits () { return getState(); } //The unfair mode tries to acquire a license final int nonfairTryAcquireShared ( int acquires) { for (;;) { //First look at how many licenses are available int available = getState(); //minus the license that needs to be acquired this time How many permits are left int remaining = available-acquires; //If the remaining permits are less than 0, return directly //If the remaining permits are not less than 0, try to update the value of state atomically, and return the remaining permits if (remaining < 0) || compareAndSetState(available, remaining)) return remaining; } } //Release the license protected final boolean tryReleaseShared ( int releases) { for (;;) { //Let s take a look at a few more licenses int current = getState(); //add the license for this release int next = current + releases; //Detect overflow if (next <current) //overflow throw new Error( "Maximum permit count exceeded" ); //If the value of the atomic update of the state is successful, it means that the release of the permit is successful, and return true if (compareAndSetState( current, next)) return true ; } } //Reduce the permit final void reducePermits ( int reductions) { for (;;) { //First look at a few more permits int current = getState(); //Subtract the permits that will be reduced int next = current-reductions; //Detect overflow if (next> current) //underflow throw new Error( "Permit count underflow" ); //Atomically update the value of state, and return true if (compareAndSetState(current, next)) return ; } } //Destroy the permit final int drainPermits () { for (;;) { //First look at a few more permits int current = getState(); //If it is 0, return directly //If it is not 0, put state Atomic update to 0 if (current == 0 || compareAndSetState(current, 0 )) return current; } } } Copy code

\

Inner class NonfairSync

In unfair mode, directly call the parent class

nonfairTryAcquireShared()
Try to obtain permission.

static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L ; //Construction method, call the construction method of the parent class NonfairSync( int permits) { super (permits); } //Try to obtain permission, call the nonfairTryAcquireShared() method of the parent class protected int tryAcquireShared ( int acquires) { return nonfairTryAcquireShared(acquires); } } Copy code

\

Inner class FairSync

In the fair mode, first check whether there is a queue before it, if there is a queue, it will fail to obtain the permission and enter the queue, otherwise it will try to update the value of state atomically.

**Note:** For the convenience of reading, some methods in AQS are pasted in this inner class, and the method header comments are marked!

static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L ; FairSync( int permits) { super (permits); } /** * This method is located in AQS: * Attempt to obtain a pass, if successful, return a value of >= 0; * Return value <0 if acquisition fails */ protected int tryAcquireShared ( int acquires) { for (;;) { //Determine whether there is a waiter thread in the current AQS blocking queue, if there is a direct return -1, it means that the current aquire operation thread needs to enter the queue and wait... if (hasQueuedPredecessors()) return - 1 ; //How many situations are there when the execution reaches this point? //1. There are no other waiters in the AQS blocking queue when aquire is called //2. The current node is the headNext node in the blocking queue //Get state, state here means the pass int available = getState(); //remaining means that after the current thread obtains the pass, the number of semaphores remaining int remaining = available-acquires; //Condition 1: remaining <0 is true, indicating that the thread fails to obtain the pass.. //Condition 2: precondition, remaning >= 0, CAS update state is successful, indicating that the thread is successful in obtaining the pass, CAS fails, then spin. if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } /** * This method is located in AQS: */ public final void acquireSharedInterruptibly ( int arg) throws InterruptedException { //The condition is true: it means that the thread that currently calls the acquire method is already interrupted, and an exception is thrown directly.. if (Thread.interrupted()) throw new InterruptedException() ; //The thread that executes the task at the business level has broken the latch. Then other threads that call latch.await will not block here if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } /** * This method is located in AQS: */ private void doAcquireSharedInterruptibly ( int arg) throws InterruptedException { //Wrap the thread that calls the Semaphore.aquire method into a node and add it to the blocking queue of AQS. final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { //Get the predecessor node of the current thread node final Node p = node.predecessor(); //The condition is established, indicating the current The node corresponding to the thread is head.next node if (p == head) { //head.next node has the right to acquire a shared lock.. int r = tryAcquireShared(arg); //From the perspective of Semaphore: r represents the number of passes remaining if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; //help GC failed = false ; return ; } } //shouldParkAfterFailedAcquire will find a good father for the current thread, and finally set the status of the father node to signal(-1) and return true //parkAndCheckInterrupt suspends the thread corresponding to the current node... if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } /** * This method is located in AQS: * Set the current node as the head node, and propagate backward! (Wake up one by one!) */ private void setHeadAndPropagate (Node node, int propagate) { Node h = head; //Record old head for check below //Set the current node as the new head node. setHead(node); //propagate == 1 must be true when calling setHeadAndPropagete if (propagate> 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { //Get the successor node of the current node.. Node s = node.next; //Condition 1: When does s == null hold true? The current node node is already tail, and the condition will be established later. This situation will be handled in doReleaseShared().. //Condition 2: Precondition, s != null, the mode of s node must be shared mode. latch.await() -> addWaiter(Node.SHARED) if (s == null || s.isShared()) //In most cases, the doReleasseShared() method will be executed. doReleaseShared(); } } //AQS.releaseShared This method is located in AQS: public final boolean releaseShared ( int arg) { //The condition is established: it means that the current thread releases the resource successfully. After the resource is released successfully, wake up the thread that failed to obtain the resource.. if (tryReleaseShared( arg)) { //Wake up the thread that failed to obtain the resource... doReleaseShared(); return true ; } return false ; } /** * Wake up the thread that failed to obtain the resource * * CountDownLatch version * What kinds of paths will call the doReleaseShared method? * 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() wake up the thread corresponding to head.next in the current blocking queue. * 2. Thread that is awakened -> doAcquireSharedInterruptibly parkAndCheckInterrupt() wake-up -> setHeadAndPropagate() -> doReleaseShared() * * Semaphore version * What kinds of paths will call the doReleaseShared method? * */ //AQS.doReleaseShared This method is located in AQS: private void doReleaseShared () { for (;;) { //Get the head node in the current AQS Node h = head; //Condition 1: h != null is true, indicating that the blocking queue is not empty.. //not true: h == null when will it be like this? //After the latch is created, before any thread has called the await() method, a thread called the latch.countDown() operation and triggered the logic to wake up the blocking node.. //Condition 2: h != tail is established, indicating that there are other nodes besides the head node in the current blocking queue. //h == tail -> head and tail point to the same node object. When will this happen? //1. In the case of normal wake-up, the shared lock is acquired in turn, and the current thread executes here (this thread is the tail node.) //2. The first thread that calls the await() method and countDown() are triggered and triggered The thread that wakes up the blocking node is concurrent.. //Because the await() thread is the first thread to call latch.await(), there is nothing in the queue at this time, it needs to create a Head node, and then spin again Time enqueue //Before the await() thread enqueue is completed, suppose there is only the empty element head just added and created in the current queue. //At the same time, there is an external thread that calls countDown() and changes the state value from 1 to 0. Then this thread needs to do logic to wake up the elements in the blocking queue.. //Note: The thread that calls await() is because After the complete enqueue is completed, return to the upper method doAcquireSharedInterruptibly again to enter the spin, //Get the precursor of the current element and judge that it is head.next, so next the thread will set itself as head, and then the thread It returns from the await() method... if (h != null && h != tail) { //Execute to the if, indicating that the current head must have a successor node! int ws = h.waitStatus; //The current head status is signal, indicating that the successor node has not been awakened... if (ws == Node.SIGNAL) { // Change the status of the head node to 0 before waking up the successor node //Why is CAS used here? Looking back... //When the doReleaseShared method has multiple threads to wake up the head.next logic, //CAS may fail... //Case: //When the t3 thread returns false when if(h == head) returns false, t3 will continue to spin. Participate in the logic of waking up the next head.next.. //At this time, t3 executes to CAS WaitStatus(h,Node.SIGNAL, 0) successfully.. t4 also enters if before t3 is modified (ws == Node.SIGNAL) is inside, //but if t4 modifies CAS WaitStatus(h,Node.SIGNAL, 0), it will fail, because t3 is changed... if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 ) ) continue ; //loop to recheck cases //wake up subsequent nodes unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; //loop on failed CAS } //The condition is true: //1. Explain that the successor node that has just been awakened has not yet executed the logic of setting the current awakening node to head in the setHeadAndPropagate method. //At this time, the current thread jumps out directly... it's over.. //Don't worry at this time, the wake-up logic is interrupted here? , //Don't worry, because the awakened thread will execute the doReleaseShared method sooner or later. //2.h == null After the latch is created, no thread has called the await() method before, //a thread calls the latch.countDown() operation and triggers the logic to wake up the blocked node.. //3.h == tail -> head and tail point to the same node object //The condition is not established: //The awakened node is very active and directly set itself as the new head. At this time, its node (predecessor) is awakened, and the condition of executing h == head will not be established.. //At this time, the head node is The predecessor, will not jump out of the doReleaseShared method, and will continue to wake up the successor of the new head node... if (h == head) //loop if head changed break ; } } } Copy code

\

Construction method

When creating a Semaphore, you need to pass in the number of licenses. Semaphore is also non-fair mode by default, but you can call the second constructor to declare it as fair mode.

//The construction method, the number of permits must be passed in when creating, the unfair mode is used by default public Semaphore ( int permits) { sync = new NonfairSync(permits); } //Construction method, need to pass in the number of permits, and whether the fair mode public Semaphore ( int permits, boolean fair) { sync = fair? new FairSync(permits): new NonfairSync(permits); } Copy code

\

acquire() method

To obtain a license, the interruptible method is used by default. If the attempt to obtain a license fails, it will be queued in the AQS queue.

public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly( 1 ); } //Obtain a license, non-interrupted mode, if the attempt to obtain a license fails, it will enter the AQS queue. public void acquireUninterruptibly () { sync.acquireShared( 1 ); } Copy code

acquire(int permits) method

Obtain multiple licenses at once, and can be interrupted.

public void acquire ( int permits) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } //Obtain multiple licenses at once, non-disruptively. public void acquireUninterruptibly ( int permits) { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireShared(permits); } Copy code

tryAcquire() method

Try to obtain a license, use the unfair mode of Sync to try to obtain the license method, and return regardless of whether the license is obtained or not, try only once, and will not enter the queue.

public boolean tryAcquire () { return sync.nonfairTryAcquireShared( 1 ) >= 0 ; } //Try to acquire a license, first try to acquire a license, if it fails, it will wait for the timeout time. If no license is obtained within this period, return false, otherwise return true; public boolean tryAcquire ( long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos( 1 , unit.toNanos(timeout)); } Copy code

release() method

Release a license. When releasing a license, the value of state will increase by 1, and the next thread waiting to obtain the license will be awakened.

public void release () { sync.releaseShared( 1 ); } Copy code

release(int permits) method

If multiple licenses are released at a time, the value of state will increase the number of permits accordingly.

public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }

\

4

  • Semaphore
  • Semaphore AQS
  • Semaphore state
  • state 1
  • state 1