Dart asynchronous support

Dart asynchronous support

Dart is a single-threaded model, so there is no so-called main thread/child thread. Dart is also a model of Event-Looper event loop and Event-Queue event echelon. All events are through the **EventLooper event loop program** in turn carried out

Dart's Event Loop

  • Get Event from EventQueue
  • Processing Event
  • Until EventQueue is empty

Event Queue

Dart's Event Queue

And these events include user input, click, Timer, file IO and other Event Types

Single thread model

Once a Dart function starts to execute, it will execute to the end of the function, that is, the Dart function will not be interrupted by other Dart code

There is no concept of thread in Dart, only isolate. Each isolate is isolated and does not share memory. A Dart program starts in the main function of the Main isolate. After the Main function ends, the Main isolate threads start one by one ( one by one) starts to process each Event Main Isolate in the Event Queue

Event Queue and Microtask Queue

  • **Main Isolate** in Dart has only one Event Looper
  • But there are two Event Queues
    • Event Queue
    • Microtask Queue

The significance of the existence of Microtask Queue

Hope to use this Queue to handle things later, but things that need to be processed before the next message arrives

Microtask Queue && Event Queue execution flowchart

When the Event Looper is processing the Event in the Microtask Queue, the Event in the Event Queue stops processing. At this time, the App cannot draw any graphics, cannot process any mouse clicks, cannot process file IO, etc. Event-Looper selects the task execution The sequence is:

  • Prioritize all the events in the Microtask Queue to be executed
  • The Event in the Event Queue will not be executed until the Microtask Queue is empty

Microtask Queue && Event Queue

Dart can only know the sequence of Event processing, but it does not know the specific time point of an Event execution, because its processing model is a single-threaded loop, not based on clock scheduling (that is, its execution is only processed according to the Event. , It starts to loop the next Event, and unlike Thread scheduling in Java, there is no concept of time scheduling), that is, we specify another Delay Time Task, and hope that it will start executing after the expected time. It is possible Will not be executed at that time, you need to see whether the previous Event has been Dequeue

Asynchronous task scheduling

When there is code that can be executed in subsequent tasks, there are two ways, through the dart:async API in Lib:

  • Using the Future class, tasks can be added to the end of the Event Queue
  • Use the scheduleMicrotask (schedule micro task) function to add the task to the end of the Microtask Queue

When using EventQueue, you need to think clearly and try to avoid the microtask queue being too large, otherwise it will block the processing of other events. Use Event Queue

Future

Future is an implementation of asynchronous class, with JS in Promise somewhat similar

Future constructor

Commonly used Future constructor:

Future <T> Future (FutureOr <T> Function () Computation) copying the code
main( List < String > args) { var futureInstance = Future< String >(() => "12345" ); futureInstance.then((res) { print (res); }).catchError((err) { print (err); }); } Copy code

Future.value

main( List < String > args) { var futureInstance = Future.value( 1234 ); futureInstance.then((val)=> print (val)); } //Output result 1234Copy code

Future.error

main( List < String > args) { var futureInstance = Future.error( "This is an error" ); futureInstance.catchError((err) { print (err); }); } //The output result is an error copy code

Future.then

Generally, when there is a divide and conquer task, when a large task needs to be divided into many small tasks to be executed step by step, the Future.then function needs to be used to disassemble the task.

void main() { new Future(() => futureTask) //Asynchronous task function. then((m) => { print ( "futueTask execute result: $m " )}) //The child after the task is executed task .then ((m) { Print (m.length); return 32 ;}) //where m is the result returned after the task execution .then((m) => printLength(m)) .whenComplete(() => whenTaskCompelete); //callback function when all tasks are completed futureTask(); } int futureTask() { return 21 ; } void printLength( int length) { print ( "Text Length: $length " ); } void whenTaskCompelete() { print ( "Task Complete" ); } Copy code
[Running] dart "/Users/kim/test/main.dart" futueTask execute result:Closure: () => int from Function'futureTask ' : static . 1 Text Length: 32 [Done] exited with code = 0 in 0.209 secondsCopy code

Future.delayed

When the task needs to be executed later, you can use new Future.delay to delay the execution of the task. As mentioned above, only when the Event Queue of the Main isolate is in the idle state, the execution will be delayed by 1s, otherwise the waiting time will be longer than 1s is much longer

Future.delayed ( const the Duration (seconds The: . 1 ), () => FutureTask); duplicated code
Import 'DART: the async' ; main( List < String > args) { // Method 1 Future delay( int ms ) { var com = Completer(); Timer( Duration (milliseconds: ms), () { com.complete( "Method 1" ); }); return com.future; } delay( 1000 ).then((res) { print (res); }); // Method 2 Future.delayed( Duration (milliseconds: 2000 ), () { print ( "Method 2" ); }); } Copy code

When you need to do animation, do not use Future, but need to use animateFrame , note:

  • Then in the Future does not create a new Event and throw it into the Event Queue, but just an ordinary Function Call. After the FutureTask is executed, the execution starts immediately.
  • When the Future has been executed in the then function, a task will be created , the task will be added to it, and the task will execute the function passed in through then
  • Future just created an Event and inserted the Event to the end of the Event Queue
  • When using the Future.value constructor, it will be the same as the second one, creating a Task and throwing it into the microtask Queue to execute the function passed in by then
  • After the Future.sync constructor executes the function it passed in, it will also immediately create a Task and throw it into the microtask Queue for execution

Future.add and Future.wait

main( List < String > args) { Future future1() async { print ( "This is an asynchronous function 1" );} Future future2() async { print ( "This is an asynchronous function 2" );} var futureList = <Future>[]; futureList.add(future1()); futureList.add(future2()); Future.wait(futureList); } Copy code

Future.sync

Tasks that run synchronously: Synchronous operation means that the function passed in when the Future is constructed is running synchronously, and the currently passed functions can be executed together. Unlike then, the function that comes in the then callback is scheduled to be executed asynchronously in the micro task queue. of

Future. sync ((){ }); Copy code

Future.microtask

Inside the future, the scheduleMicrotask function is called to add the current task to the microtask-queue to achieve the function of'jumping the queue'

main( List < String > args) { Future.microtask(() { //Prioritized business logic }); } Copy code

Use scheduleMicrotask

In the top call relationship, use this function

async .scheduleMicrotask(() => microtask()); void microtask(){ //doing something } Copy code

async/await

The Dart library contains many functions that return Future or Stream objects. These functions return immediately after setting up a time-consuming task (such as an I/O group), without waiting for the task to complete. Use the async and await keywords to implement asynchronous programming. Allows you to implement asynchronous operations like writing synchronous code.

Deal with Future

The result of Future execution can be obtained in the following two ways:

  • Use async and await
  • Use Future API, described in detail, with reference library Overview

Code that uses the async and await keywords is asynchronous. Although it seems like I want to synchronize the code. For example, the following code uses await to wait for the execution result of an asynchronous function.

await lookUpVersion(); Copy code

To use await, the code must be in an asynchronous function (a function marked with async):

Future checkVersion() async { var version = await lookUpVersion(); //Do something with version } Copy code

Tip: Although an asynchronous function may perform time-consuming operations, it does not wait for these operations. In contrast, asynchronous functions are only executed when they encounter the first await expression ( see details ). In other words, it returns a Future object and resumes execution only after the await expression completes.

Use try, catch, and finally to handle errors caused by await in the code.

try { version = await lookUpVersion(); } catch (e) { //React to inability to look up the version } Copy code

You can use await multiple times in an asynchronous function. For example, the result of the function is waited three times in the following code:

var EntryPoint = the await findEntrypoint (); var The exitCode = the await runExecutable (EntryPoint, args); the await flushThenExit (The exitCode); duplicated code

In an await expression, the value of the expression is usually a Future object; if not, the value of the expression will be automatically packaged into a Future object

  • The Future object indicates the promise of returning an object (similar to a promise)
  • The result of await expression execution is the returned object.
  • The await expression will block the execution of the code until the required object returns.

If using await causes a compile-time error, confirm whether await is in an asynchronous function. For example, to use await in the main() function of the application, the body of the main()) function must be marked as async:

Future main() async { checkVersion(); print ( 'In main: version is ${await lookUpVersion()} ' ); } Copy code

Declare asynchronous functions

The function whose body is marked by the async identifier is an asynchronous function. Add the async keyword to the function to return a Future. For example, consider the following synchronization function, which returns a String:

String lookUpVersion() => '1.0.0' ; Copy code

For example, the future implementation will be very time-consuming, change it to an asynchronous function, and the return value is Future.

Future< String > lookUpVersion() async => '1.0.0' ; Copy code

Note that the function body does not need to use the Future API. If necessary, Dart creates Future objects. If the function does not return a valid value, you need to set its return type to Future<void>

Processing Stream

When you need to get the data value from the Stream, you can use the following two ways:

  • Use async and an asynchronous loop (await for).
  • Use Stream API, for more details, refer to in the library tour

Tip: Before using await for, make sure the code is clear, and you really want to wait for the results of all streams. For example, you generally should not use await for UI event listeners, because the UI framework sends an endless stream of events.

The following is the use of asynchronous for loop:

await for (varOrType identifier in expression) { //Executes each time the stream emits a value. //Executes each time the stream emits a value. } Copy code

The value returned by the above expression must be of type Stream. The execution process is as follows:

  1. Wait until the stream emits a value.
  2. Execute the body of the for loop and set the variable to the value that should be emitted
  3. Repeat 1 and 2 until the stream is closed.

Use the break return statement to stop receiving the data of the stream, so that it breaks out of the for loop and unregisters from the stream.

  • If you encounter a compile-time error when implementing an asynchronous for loop, please check to make sure that await for is in an asynchronous function.
  • For example, to use an asynchronous for loop in the main() function of an application, the body of the main() function must be marked as async
Future main() async { //... await for ( var request in requestServer) { handleRequest(request); } //... } Copy code

For more information on asynchronous programming, please refer to the dart:async section. Also refer to the articles Dart Language Asynchrony Support: Phase 1 and Dart Language Asynchrony Support: Phase 2 , and Dart language specification

Stream

Stream is very characteristic but not easy to understand. Instead of treating it as a stream literally, I prefer to think of it as a factory or a machine.

Let's take a look at the characteristics of this machine:

  • It has an entrance where you can put things/commands (anything), this machine doesn t know when the entrance will put things in
  • The middle machine can produce or process, which should take some time
  • It has an export, and there should be products coming from there. We don t know when the products will come out of the export.

During the whole process, time is an uncertain factor. We can put things into the entrance of this machine at any time, and the machine will process it after putting it in, but we don't know how long it will be processed. So the exit needs to be specially dispatched to stare at, waiting for the machine to flow out. The whole process is viewed from an asynchronous perspective

We convert the machine model into a Stream

  • This big machine is StreamController, which is one of the ways to create a stream
  • StreamController has an entrance, called sink, it can use the add method to put things in, and after putting it in, you don t care anymore.
  • StreamController has an exit called stream. After the machine is processed, the product will be thrown out from the exit, but we don't know when it will come out, so we need to use the listen method to monitor this exit all the time. And when multiple items are put in, it will not disrupt the order, but first in, first out**

**

Stream operation is also a tool provided in dart to handle asynchronous operations. Unlike Future, it can receive the results of multiple asynchronous operations (regardless of success or failure). We can understand it as: when performing asynchronous tasks, you can pass multiple times Trigger success or failure events to deliver result data or error exceptions . For example, we often see scenarios in the development process: multiple file reads and writes, network downloads may initiate multiple, etc., then we will analyze the common use of Stream based on a case The operation:

void main(){ Stream.fromFutures([loginFuture,getUserInfoFuture,saveUserInfoFuture]) .listen((data){ //Callback of the results returned by each task },onError:((err){ //Callback for failed execution of each task }), onDone: ((){ //Monitor the callback when the execution of each task is completed }) ) .onDone((){ //Callback for all tasks completed }); } Future loginFuture = Future< String >((){ //The login operation login( 'admin' , '123456' ); }); String login( String userName, String pwd){ //Login operation } bool getUserInfo( int id){ //Get user information } Future< String > getUserInfoFuture =Future((){ getUserInfo( 1 ); }); Future saveUserInfoFuture = Future((){ saveUserInfo( "admin" ); }); void saveUserInfo( String userInfo){ } Copy code

It can be seen that in the listen method, we can perform fine callback processing for each task, and even after all tasks are executed, we have callbacks such as cancel pause resume onError onDone, etc., which can respectively fine-tune the different stages of the entire group task execution process. Processing, in the above we used the fromFutures method to use the Stream. In addition, during the use of the Stream, we often use the fromFuture method to process a single Futrue/fromIterable method to process the data in the collection. Of course, in addition to this conventional Stream operation, dart also provides two specialized operations/stream creation classes, which can implement complex operations of stream operations ** **

Method to obtain Stream

  • Through the constructor
  • Use StreamController
  • IO Stream

Stream has three construction methods

  • Stream.fromFuture creates a new single-subscription stream from the Future, when the future is completed, a data or error will be triggered, and then use the Down event to close the stream
  • Stream.fromFutures creates a single subscription stream from a set of Futures. Each future has its own data or error event. When the entire Future is completed, the stream will be closed. If Futures is empty, the stream will be closed immediately.
  • Stream.fromIterable creates a single subscription stream that gets its data from a collection
Stream.fromIntreable ([ . 1 , 2 , . 3 ]); Copy Code

**

StreamController

If you want to create a new stream, it's very simple! Use StreamController, it provides you with very rich functions, you can send data on streamController, handle errors, and get results!

//Any type of stream StreamController controller = StreamController(); controller.sink.add( 123 ); controller.sink.add( "xyz" ); controller.sink.add(()=> print ( "output" )); //Create a stream that handles int type StreamController< int > numController = StreamController(); numController.sink.add( 123 ); Copy code

Generics define what type of data we can push to the stream. It can be any type!

Example

Import 'DART: the async' ; void main( List < String > args) { StreamController controller = StreamController(); controller.sink.add( 123 ); controller.sink.add(() => print ( "Add a function" )); //Monitor the exit of this stream, and print this data when there is data flowing out StreamSubscription subscription = controller.stream.listen((data) { if (data is Function ) { data(); } else { print ( " $data " ); } }); } /* Output 123 Add a function */ Copy code

You need to pass a method to the listen function of stream. The input parameter (data) of this method is the result produced by our StreamController after processing. We monitor the export and obtain the result (data). Lambda expressions can be used here, or any other function.

Generate stream through async*

If we have a series of events to deal with, we may need to convert it into a stream. At this time, you can use async-yield* to generate a Stream

Stream< int > countStream( int to) async * { for ( int i = 1 ; i <= to; i++) { yield i; } } Copy code

When the loop exits, the Stream is done. We can combine the aforementioned await for to have a deeper experience

Import 'DART: the async' ; void main() async { var stream = countStream( 10 ); var sum = await sumStream(stream); print (sum); } Future< int > generateData( int data) async => data; Stream< int > countStream( int to) async * { for ( int i = 1 ; i <= to; i++) { yield await generateData(i); } } Future< int > sumStream(Stream< int > stream) async { var sum = 0 ; await for ( var value in stream) { sum += value; } return sum; } Copy code

Click here to run the above sample code directly, click here to view Promise and yeild in JavaScript

Methods of monitoring Stream

The most common way to monitor a stream is to listen. When an event is emitted, the stream will notify the listener. The Listen method provides these types of trigger events:

  • onData (required) Triggered when data is received
  • onError Triggered when Error is received
  • Triggered when onDone ends
  • unsubscribeOnError Whether to unsubscribe when encountering the first Error, the default is false

Use await for to process Stream

In addition to listening, we can also use await for to handle

Future< int > sumStream(Stream< int > stream) async { var sum = 0 ; await for ( var value in stream) { sum += value; } return sum; } Copy code

This code will receive a Stream and count the sum of all events, and then return the result. await for can handle each event when it arrives. We know that the timing of receiving events for a Stream is uncertain, so when should we exit the await for loop? The answer is, when the Stream is completed or closed

Convert and clean existing streams

If you already have a stream (Transforming an existing stream), you can transform it into a new stream. very simple! Stream provides: map() where() expand() take() method, which can easily convert an existing stream into a new stream

**

where

If you want to filter out some unwanted events. For example, in a guessing game, the user can input numbers, and when the input is correct, we will make a certain response. And we must filter out all wrong answers, this time we can use where to filter out unwanted numbers

(stream objects) .where ((event) {... }) copy the code

The where function receives an event, and whenever something flows from this stream to the where function, this is the event. We may not need this event at all, but it must be passed in as a parameter, and then the data that meets the requirements can be filtered and returned**

take

If you want to control how many things this stream can transmit at most. For example, to enter a password, we may want the user to enter a maximum of four times, then we can use take to limit

.take( 4 ); Copy code

The take function receives an int, which represents the maximum number of events that can pass through the take function. When the number of transmissions reaches this number, the stream will be closed and can no longer be transmitted

transform

If you need more control over the transformation, then please use the transform() method. He needs to be used with StreamTransformer. Let s take a look at the following number guessing game first, and then I ll explain it to you

Import 'DART: the async' ; main( List < String > args) { StreamController< int > controller = StreamController< int >(); final transformer = StreamTransformer< int , String >.fromHandlers(handleData: (value, sink) { if (value == 100 ) { sink.add( "You guessed it" ); } else { sink.addError( 'I haven't guessed it yet, try again' ); } }); controller.stream .transform(transformer) .listen((data) => print (data), onError: (err) => print (err)); controller.sink.add( 23 ); //controller.sink.add(100); } //output: not guessed, try again it copy the code

StreamTransformer<S,T> is the inspector of our stream, he is responsible for receiving the information passed by the stream, and then processing it to return a new stream.

  • S represents the input type of the previous stream. Here we input a number, so it is an int.
  • T represents the input type of the stream after conversion. What we add here is a string of strings, so it is String.
  • handleData receives a value and creates a new stream and exposes the sink, where we can transform the stream.
  • We can also addError to tell that there is a problem later

Then we monitor the stream after the transform. When the transformed event flows out, we print the event. This event is the data we just added into the sink. onError can catch the err we add.

Types of Stream

"Single-subscription" streams

A single subscription stream allows only one listener during the entire life cycle of the stream . It will not generate events until there are listeners, and it will stop sending events when you cancel listening, even if you are still in Sink.add more events. Even after the first subscription is cancelled, it is not allowed to listen twice on a single subscription stream. Single subscription streams are usually used to stream larger continuous data blocks, such as file I/O.

Import 'DART: the async' ; main( List < String > args) { StreamController controller = StreamController(); controller.stream.listen((data) => print (data)); controller.stream.listen((data) => print (data)); controller.sink.add( 123 ); } //Output: Unhandled exception: Bad state: Stream has already been listened to. Single subscription stream cannot have multiple listeners # 0 _StreamController._subscribe (dart: async/stream_controller.dart: 670 : 7 ) # 1 _ControllerStream._createSubscription (dart: async/stream_controller.dart: 820 : 19 ) # 2 _StreamImpl.listen (dart: async/stream_impl.dart: 474 : 9 ) # 3 main (file: ///Users/Ken/Desktop/test/nam.dart:6:21) # 4 _startIsolate.<anonymous closure> (dart:isolate-patch/isolate_patch.dart: 305 : 32 ) # . 5 _RawReceivePortImpl._handleMessage (DART: Patch-the isolate/isolate_patch.dart: 174 : 12 is ) Copy Code

"Broadcast" streams Multi-subscription streams

The broadcast stream allows any number of listeners, and events can be generated regardless of whether there are listeners. So listeners who come in halfway will not receive the previous message. If multiple listeners want to listen to a single subscription stream, please use asBroadcastStream to create a broadcast stream on top of a non-broadcast stream. If the listener is added to the broadcast stream when an event is triggered , The listener will not receive the event currently being triggered. If you cancel the listening, the listener will immediately stop receiving the event. General streams are single-subscription streams. Broadcast streams inherited from Stream must override isBroadcast to return true

Import 'DART: the async' ; main( List <String > args) { StreamController controller = StreamController(); //Convert a single subscription stream into a broadcast stream Stream stream = controller.stream.asBroadcastStream(); stream.listen((data) => print (data)); stream.listen((data) => print (data)); controller.sink.add( 123 ); } //Output: 123 123Copy code