Java multithreading advanced JUC

Java multithreading advanced JUC

Java multithreading advanced JUC

JUC refers to

java.util.concurrent
This package is used for multi-threaded development

ReentrantLock: used to replace Synchronized

Only one thread can access the critical section at the same time, regardless of whether it is read or written

Used to replace

synchroized
Keywords, how to use

  1. Create a ReentrantLock variable
    reentrantLock
  2. Then add above and below the code segment that needs to be synchronized
    reentrantLock.lock()
    with
    reentrantLock.unlock()
package kehao.thread.juc.lock; /* * Used instead of synchronized * */ import java.lang.reflect.Array; import java.util.ArrayList; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockTest { private int count = 0 ; private final Lock lock = new ReentrantLock(); public void funWithoutSync () { for ( int i = 0 ; i < 10000 ; i++) { count = count + 1 ; count = count- 1 ; } } public void funSynchronized () { synchronized ( this ) { for ( int i = 0 ; i < 10000 ; i++) { count = count + 1 ; count = count- 1 ; } } } public void funReentrantLock () { lock.lock(); for ( int i = 0 ; i < 10000 ; i++) { count = count + 1 ; count = count- 1 ; } lock.unlock(); } public int getCount () { return count; } public void setCount ( int count) { this .count = count; } public static void main (String[] args) throws InterruptedException { ReentrantLockTest reentrantLockTest = new ReentrantLockTest(); //Not applicable synchronous reentrantLockTest.setCount( 0 ); ArrayList<Thread> threads = new ArrayList<>(); for ( int i = 0 ; i < 5 ; i++) { Thread thread = new Thread(()->{ reentrantLockTest.funWithoutSync(); }); threads.add(thread); thread.start(); } for (Thread t:threads) { t.join(); } System.out.println( "without synchronzied --->" +reentrantLockTest.getCount()); //Use the synchronized keyword to synchronize reentrantLockTest.setCount( 0 ); threads.clear(); for ( int i = 0 ; i < 5 ; i++) { Thread thread = new Thread(()->{ reentrantLockTest.funSynchronized(); }); threads.add(thread); thread.start(); } for (Thread t:threads) { t.join(); } System.out.println( "synchronized --->" +reentrantLockTest.getCount()); //Use ReentrantLock for synchronization reentrantLockTest.setCount( 0 ); threads.clear(); for ( int i = 0 ; i < 5 ; i++) { Thread thread = new Thread(()->{ reentrantLockTest.funReentrantLock(); }); threads.add(thread); thread.start(); } for (Thread t:threads) { t.join(); } System.out.println( "ReentrantLock --->" +reentrantLockTest.getCount()); } } Copy code

operation result:

without synchronzied >1
synchronized >0
ReentrantLock >0

Condition keyword: used to replace wait() and notify() functions

When using ReentrantLock, because there is no lock object wrapped by synchroized, the wait and notify functions cannot be called, so you need to use Condition instead.

Moreover, Condition has a natural advantage over the native wait and notify functions. It is implemented through a blocking queue instead of interacting with the system, and the performance is better.

Each Condition object is equivalent to a blocking queue, so you can wake up threads for a specific blocking queue, and avoid using notifyAll(), because using notifyAll() wakes up all threads of the lock object, which is not necessarily what you need Threads, and multiple awakened threads may not be able to obtain the lock object and enter the critical area, so it needs to be constantly judged through the loop. A thread may sleep and wake up from sleep and wake up multiple times to obtain the lock object, resulting in low efficiency

When coding, different types of threads can be put into different blocking queues when blocking, which is easier to manage

Condition.await()
It is equivalent to adding to the blocking queue

Condition.signal()
It is equivalent to the first element of the blocking queue to dequeue

package kehao.thread.juc.lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Patient implements Runnable { //Wake up the doctor to treat the patient after being sick public void ill () { Hospital.lock.lock(); Hospital.patientNumber++; System.out.println( "get cold --->" + Hospital.patientNumber); Hospital.treatCondition.signal(); Hospital.lock.unlock(); } @Override public void run () { ill(); } } class Doctor implements Runnable { //Treat if there is no patient, wait and be awakened for treatment if there is one public void treat () { Hospital.lock.lock(); if (Hospital.patientNumber <= 0 ) { try { Hospital.treatCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } Hospital.patientNumber--; System.out.println( "drink more hot water --->" + Hospital.patientNumber); Hospital.lock.unlock(); } @Override public void run () { treat(); } } class Hospital { public static int patientNumber = 0 ; public static final Lock lock = new ReentrantLock(); public static final Condition illCondition = lock.newCondition();//Patient blocking queue public static final Condition treatCondition = lock.newCondition();//Doctor blocking the queue } public class ConditionTest { public static void main (String[] args) { Patient patient = new Patient(); Doctor doctor = new Doctor(); for ( int i = 0 ; i < 10 ; i++) { new Thread(doctor).start(); } for ( int i = 0 ; i < 10 ; i++) { new Thread(patient).start(); } } } Copy code

operation result:

get cold >1
drink more hot water >0
get cold >1
get cold >2
get cold >3
drink more hot water >2
drink more hot water >1
get cold >2
get cold > 3
drink more hot water >2
get cold >3
drink more hot water >2
get cold >3
get cold >4
drink more hot water >3
drink more hot water >2
get cold >3
drink more hot water >2
drink more hot water >1
drink more hot water >0

ReadWriteLock

Can read at the same time, but read and write, write and write cannot be parallel, which improves efficiency

Instructions:

  1. Get ReadWriteLock
  2. Get readLock
  3. Get writeLock
  4. Add a write lock where you need to write
  5. Add a read lock when you need to read
public class ReadWriteLockTest { private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final Lock readLock = readWriteLock.readLock(); // private final Lock writeLock = readWriteLock.writeLock(); private int count = 0 ; public void add () { writeLock.lock(); //Add write lock count = count + 1 ; writeLock.unlock(); //Unlock the write lock } private int getCount () { readLock.lock(); //Add read lock int returnCount = count; readLock.unlock(); //Interpret the lock return returnCount; } } Copy code

So the question is, if you want to read concurrently, why do you need to lock it?

This is to make the read and write not parallel. If the read is not locked, no read lock will be detected when writing, so the write lock is obtained and the modification is made directly, which causes the read and write to enter the critical area at the same time

StampedLock

StampedLock
with
ReadWriteLock
Compared with that, the improvement lies in the fact that it is also allowed to write after acquiring the write lock during the reading process! In this way, the data we read may be inconsistent, so a little extra code is needed to determine whether there is a write during the reading process. This read lock is an optimistic lock.

//Excerpt from the code of teacher Liao Xuefeng's Java course public class Point { private final StampedLock stampedLock = new StampedLock(); private double x; private double y; public void move ( double deltaX , double deltaY) { long stamp = stampedLock.writeLock();//Get write lock try { x += deltaX; y += deltaY; } finally { stampedLock.unlockWrite(stamp); //Release the write lock } } public double distanceFromOrigin () { long stamp = stampedLock.tryOptimisticRead();//Obtain an optimistic read lock //Note that the following two lines of code are not atomic operations //Assuming x,y = (100,200) double currentX = x; //this It has been read at x=100, but x, y may be modified to (300,400) by the writing thread. double currentY = y;// y has been read here, if it is not written, the reading is correct (100,200) //If there is a write, the read is wrong ( 100,400 ) if (!stampedLock.validate(stamp)) {//Check whether there are other write locks after the optimistic read lock stamp = stampedLock.readLock();//Get A pessimistic read lock try { currentX = x; currentY = y; } finally { stampedLock.unlockRead(stamp); //Release the pessimistic read lock } } return Math.sqrt(currentX * currentX + currentY * currentY); } } Copy code

JDK thread pool

When the thread pool is not used, the thread is to create the Thread thread object, and then start the thread

When using the thread pool, you do not need to obtain a specific Thread object for execution, but directly hand over the Runable object (or Callable object)

ExecutorService
, It will automatically obtain threads to complete the corresponding tasks, that is, the method of use is as follows:

  1. Use Executors object to create ExecutorService object
  2. ExecutorService object calls submit to submit Runnable or Callable tasks

ExecutorService
Just interfaces, several commonly used implementation classes provided by the Java standard library are:

ExecutorService
Implementation class
description
FixedThreadPoolThread pool with fixed number of threads
CachedThreadPoolThread pool dynamically adjusted according to the number of threads
SingleThreadExecutorThread pool for single-threaded execution only

It should be noted that the thread pool should be closed when the program ends , otherwise the program will not end by itself

There are several ways to close the thread pool:

Method namedescription
shutdown()
When closing the thread pool, it will wait for the task being executed to complete first, and then close
shutdownNow()
Immediately stop the task being performed
awaitTermination()
Wait for the specified time for the thread pool to close.

FixedThreadPool

ExecutorService es = Executors.newFixedThreadPool( 4 ); for ( int i = 0 ; i < 10 ; i++) { es.submit(()->{ System.out.println(Thread.currentThread().getName()); }); } Copy code

operation result

pool-1-thread-1
pool-1-thread-4
pool-1-thread-3
pool-1-thread-4
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool -1-thread-1
pool-1-thread-2
pool-1-thread-3

CachedThreadPool

ExecutorService es = Executors.newCachedThreadPool(); for ( int i = 0 ; i < 10 ; i++) { es.submit(()->{ System.out.println(Thread.currentThread().getName()); }); } Copy code

operation result:

pool-1-thread-1
pool-1-thread-5
pool-1-thread-4
pool-1-thread-3
pool-1-thread-2
pool-1-thread-8
pool-1-thread-7
pool -1-thread-6
pool-1-thread-9
pool-1-thread-5

If you want to set the size range of the thread pool, then

int min = 4 ; int max = 10 ; ES = ExecutorService new new the ThreadPoolExecutor (min, max, 60L , TimeUnit.SECONDS, new new SynchronousQueue <the Runnable> ()); duplicated code

ScheduledThreadPool

Used for regular and repeated execution, such as animation 24 times a second, then the task of switching screens will be executed every 1/24 second

Instructions:

  1. Create ScheduledExecutorService object

  2. Submit task

    There are three functions for submitting tasks

    • public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit)

      Execute tasks regularly, only once

      Runnable command: the content of the thread to be executed

      long delay: delay time

      TimeUnit unit: time unit

    • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

      Tasks are executed at regular intervals

      Runnable command: the content of the thread to be executed

      long initialDelay: the first delay time

      long period: interval time

      TimeUnit unit: time unit

    • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

      Tasks are executed at regular intervals

      Runnable command: the content of the thread to be executed

      long initialDelay: the first delay time

      long delay: interval time

      TimeUnit unit: time unit

The difference between FixedRate and FixedDelay:

FixedRate means that the task is always triggered at a fixed time interval, no matter how long the task is executed

FixedDelay means that after the last task is executed, it waits for a fixed time interval before executing the next task:

Thread with return value

Future

Future<V>
The interface represents a result that may be returned in the future, and the methods it defines are:

  • get()
    : Get results (may wait)
  • get(long timeout, TimeUnit unit)
    : Get the result, but only wait for the specified time;
  • cancel(boolean mayInterruptIfRunning)
    : Cancel the current task;
  • isDone()
    : Determine whether the task has been completed.

ExecutorService.submit()
If the method passes in a Callable object, then it can return a Future object, and then call a get() method to get the corresponding result. The get() method alone will cause blocking, so if you don t want to block, you can only pass isDone() To judge

ExecutorService es = Executors.newFixedThreadPool( 5 ); Future<String> futureString = es.submit(() -> { Thread.sleep( 3000 ); return "hello" ; }); System.out.println(futureString.get()); //will block until the corresponding thread is executed es.shutdown(); Copy code

CompletableFuture

It is used to solve the problem of either blocking or polling judgment in the Future method

Because it is to process the result of this thread, the original method is to judge whether there is a result, or just wait until the result is returned

This is very inefficient

CompletableFuture's solution is to directly give a callback function, the thread is executed, let it call itself, without the main thread to deal with

//Set thread task CompletableFuture<String> scf = CompletableFuture.supplyAsync(() -> { return "hello" ; }); //Set the result of successful return scf.thenAccept((result)->{ System.out.println(result); }); //Set up the handling of abnormal situations scf.exceptionally((e)->{ e.printStackTrace(); return null ; }); Copy code

operation result:

hello

CompletableFuture also supports serial operations on the same thread

use

thenApplyAsync
Function, pass in the callback function, the parameter of the callback function is the result returned by the previous task

CompletableFuture<String> scf = CompletableFuture.supplyAsync(() -> { return "hello" ; }); scf = scf.thenApplyAsync((str)->{ return str+ "CompletableFuture" ; }); //Set the result of successful return scf.thenAccept((result)->{ System.out.println(result); }); //Set up the handling of abnormal situations scf.exceptionally((e)->{ e.printStackTrace(); return null ; }); Copy code

operation result

hello CompletableFuture

However, if this code

scf = scf.thenApplyAsync((str)->{ return str+ "CompletableFuture" ; }); Copy code

Do not re-assign scf, the result is still hello

That is

CompletableFuture<String> scf = CompletableFuture.supplyAsync(() -> { return "hello" ; }); scf.thenApplyAsync((str)->{ return str+ "CompletableFuture" ; }); //Set the result of successful return scf.thenAccept((result)->{ System.out.println(result); }); //Set up the handling of abnormal situations scf.exceptionally((e)->{ e.printStackTrace(); return null ; }); Copy code

operation result:

hello

This is because

scf.thenApplyAsync((str)->{ return str+ "CompletableFuture" ; }); Copy code

The result of this function is another object,

thenAccept
It is set for the original object, so there is no callback function after the new object executes the related function.

CompletableFuture also supports parallel operation of multiple threads

Use the anyOf function to combine multiple CompletableFuture objects into one

Can achieve "any number

CompletableFuture
As long as one succeeds"

//Set thread task CompletableFuture<String> scfHello = CompletableFuture.supplyAsync(() -> { try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } return "hello" ; }); CompletableFuture<String> scfHi = CompletableFuture.supplyAsync(() -> { return "hi" ; }); CompletableFuture<Object> scf = CompletableFuture.anyOf(scfHello, scfHi); scf = scf.thenApplyAsync((str)->{ return str+ "CompletableFuture" ; }); //Set the result of successful return scf.thenAccept((result)->{ System.out.println(result); }); //Set up the handling of abnormal situations scf.exceptionally((e)->{ e.printStackTrace(); return null ; }); Copy code

operation result:

hi CompletableFuture

Can also use

allOf()
To achieve "all
CompletableFuture
Must succeed"

Fork/Join thread pool

Its function is to split a large task into multiple small tasks to execute in parallel

For example, the merge sort algorithm, which is equivalent to sorting an array into two small arrays, and then merging

It s just that when I learned the data structure to implement this algorithm, I didn t perform multi-threaded execution for each small part.

And here is to turn this kind of big task into a small task, for each small task, multi-threaded processing to improve efficiency

Definition method

  1. Define a class inheritance
    RecursiveTask<? >
  2. Rewrite
    protected <?> compute()
    , Used to turn big problems into small problems
class Sum extends RecursiveTask < Long > { private long [] array; private int start; private int end; public Sum ( long [] array, int start, int end) { this .array = array; this .start = start; this .end = end; } //Used to turn big problems into small problems @Override protected Long compute () { //When the problem is small enough if (end-start < 10 ){ long sum = 0 ; for ( int i = start; i <end ; i++) { sum+=array[i]; } return sum; } else { //The big problem is turned into a small problem int middle = (start+end)/2 ; Sum left = new Sum( this .array, start, middle); Sum right = new Sum( this .array, middle, end); invokeAll(left,right); Long leftSum = left.join(); Long rightSum = right.join(); return leftSum+rightSum; } } } Copy code

Instructions:

public class ForkJoinTest { public static void main (String[] args) { long [] array = new long [ 2000 ]; for ( int i = 0 ; i < 2000 ; i++) { array[i] = 2 ; } //Create an object Sum sumTask = new Sum(array, 0 , array.length); //Execute the task and get the result Long result = ForkJoinPool.commonPool().invoke(sumTask); System.out.println(result); } } Copy code

ThreadLocal

Is equivalent to a

Map<Thread,Object>

His role is to bind an object to the current thread

//A javabean class User { private String name; private String phone; public String getName () { return name; } public void setName (String name) { this .name = name; } public String getPhone () { return phone; } public void setPhone (String phone) { this .phone = phone; } public User(String name, String phone) { this.name = name; this.phone = phone; } public User() { } @Override public String toString () { return "User{" + "name='" + name + '\'' + ", phone='" + phone + '\'' + '}'; } } //main and the new thread access the same ThreadLocal object public class ThreadLocalTest { static ThreadLocal<User> userThreadLocal = new ThreadLocal<>(); //Initialize userThread public static void main (String[] args) { new Thread(()->{ User xiaoming = new User( " ", "123456"); userThreadLocal.set(xiaoming);// speak(); userThreadLocal.remove(); }).start(); System.out.println(Thread.currentThread().getName()+"--->"+userThreadLocal.get());// } private static void speak(){ System.out.println(Thread.currentThread().getName()+"--->"+userThreadLocal.get());// } }

main >null
Thread-0 >User{name= , phone= 123456 }

Null

main ThreadLocal
public class ThreadLocalTest {
static ThreadLocal userThreadLocal = new ThreadLocal<>();//userThread
public static void main(String[] args) {
new Thread(()->{
User xiaoming = new User( , 123456 );
userThreadLocal.set(xiaoming);//
speak();
userThreadLocal.remove();
}).start();
System.out.println(Thread.currentThread().getName()+" >"+userThreadLocal.get());//
}

private static void speak(){ System.out.println(Thread.currentThread().getName()+"--->"+userThreadLocal.get());// }

}

>main--->null >Thread-0--->User{name=' ', phone='123456'} Null `ThreadLocal.remove()` is used to delete the variables added to the thread, because if the thread pool is used, the thread will be reused, and if the object is not removed, it may be obtained Copy code