C++ asynchronous call tool future/promise realization principle

C++ asynchronous call tool future/promise realization principle

blog.csdn.net/jiange_zh/a...

Preface
In asynchronous programming, various callbacks will be dazzling, the code is scattered, and maintenance is very difficult. The future/promise of boost and C++11 provide a good solution to make the code more beautiful and easy to maintain.

In my work, I have used future/promise several times, but it is still very unfamiliar, so I decided to learn its principles and use it more smoothly.

I checked a lot of information and found that many languages have this mechanism, but there is very little information about the promise of C++. There are only some tutorials to use, but no principle is found.

"Before the source code, there is no secret."

So I decided to learn from the source code!

The source code for this article is based on the version implemented by boost. Due to copyright, the complete source code is not posted. Friends who need to learn can refer to the implementation of boost.

For an introduction to future/promise, see a blog post I wrote earlier:

blog.csdn.net/jiange_zh/a...

1. Future/Promise-based asynchronous synchronous programming depends on the component
bind and callback, similar to the bind and function of the boost library;
shared_ptr, scoped_ptr, tuple, exception, refer to the shared_ptr, scoped_ptr, tuple, exception implementation in the boost library;
Future And Promise, draw on the future design ideas of the boost library;
when_all is implemented through Future, Promise, and tuple, aiming at the synchronization of asynchronous parallel.
2. Usage of Function and Bind
See a blog post I wrote before:

blog.csdn.net/jiange_zh/a...

The bind mentioned below is similar to the bind of boost, and the Callback is similar to the function, which is generally used with bind.

3. shared_ptr, scoped_ptr, tuple
shared_ptr: reference counting.

scoped_ptr: Ownership cannot be transferred.

Tuple: Many times we need to return multiple values for a function, we can use class or struct to encapsulate multiple values to be returned, and then return to encapsulate struct or class, but the disadvantage of using this method is the increase in the amount of code of the program , It is best to solve this problem through an anonymous struct or class. Boost::tuple provides us with a method similar to anonymous struct to solve the problem of multiple return values of functions. It not only enhances the readability of the code but does not increase the amount of code. In fact, std::pair is a special case of the two parameters of boost::tuple. For boost::tuple you can bind more parameters, or you can iteratively implement an infinite number of parameters.

4. When EnableSharedFromThis
uses the boost library, you often see the following classes:

class A:public enable_share_from_this<A>
1
Under what circumstances should class A inherit enable_share_from_this?

Use case: When class A is managed by share_ptr, and the current class object needs to be passed as a parameter to other functions in the member function of class A, it is necessary to pass a share_ptr pointing to itself.

We make class A inherit enable_share_from_this, and then return the share_ptr that points to itself through its member function share_from_this().

There are 2 doubts above:

1. When passing the current class object as a parameter to other functions, why pass share_ptr? Isn t it possible to pass this pointer directly?

A bare pointer is passed to the caller, and no one knows what the caller will do? If the caller deletes the object, and share_tr also points to the object at this time.

2. Is it possible to pass share_ptr in this way? share_ptr< this>

This will cause two non-shared share_ptr to point to an object, and finally cause the object to be destroyed twice.

A very typical example in the official boost documentation:

www.boost.org/doc/libs/1_...

Part of the code:

 1 class tcp_connection
2: public boost::enable_shared_from_this<tcp_connection>
3 {
4 public:
5 typedef boost::shared_ptr<tcp_connection> pointer;

7 static pointer create(boost::asio::io_service& io_service)
8 {
9 return pointer( new tcp_connection(io_service));
10}
11 
12 tcp::socket& socket()
13 {
14 return socket_;
15}
16 
17 void start()
18 {
19 message_ = make_daytime_string();
20 
21 boost::asio::async_write (socket_, boost::asio::buffer(message_), 23 boost::asio::placeholders::error,
22 boost::bind(&tcp_connection::handle_write, shared_from_this(),

24 boost::asio::placeholders::bytes_transferred));
25}
26 
27 private:
28 tcp_connection(boost::asio::io_service& io_service)
29: socket_(io_service)
30 {
31}
32 
33 void handle_write(const boost::system::error_code&/*error*/,
34 size_t/*bytes_transferred*/)
35 {
36}
37 
38 tcp::socket socket_;
39 std::string message_;
40 };
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class tcp_connection inherits enable_share_from_this. In line 22, its member function start() returns share_ptr pointing to itself through share_from_this .

V. Future and Promise
5.1 Introduction A
Promise object can store a value of type T, which can be read by a future object (possibly in another thread). This is a means of synchronization provided by promises.

When constructing a promise, the promise object can be associated with the shared state. This shared state can store a value of type T or a class derived from std::exception, and can get the object associated with the promise object through get_future, and call After this function, the two objects share the same shared state.

The Promise object is an asynchronous provider, which can set the value of the shared state at a certain moment.

The Future object can return the value of the shared state, or if necessary, block the caller and wait for the shared state identifier to become ready before obtaining the value of the shared state.

5.2 Overview of diagrams


5.3 Learn from the use
The following is an example of using future/promise (currently only discussing the case of serial calls):

//Service external interface, serial call
taf::Int32 
AServantImp::queryResultSerial(const std::string& sIn, std::string &sOut, taf::JceCurrentPtr current)
{
//Set asynchronous response packet
current->setResponse(false );

   //Send an asynchronous request to service B, the return value type is
//promise::Future<std::string>,
//meaning that service B will return a string type data in the future
promise::Future<std::string > f = sendBReq(_pPrxB, sIn, current);

   //f calls its member function then, and
sets a processing function
for the return result of the string type to be reached in the future//Get the return result in handleBRspAndSendCReq,
//and return sendCReq(), which is f2, and then f2 through the chain The rule calls then
f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
.then(promise::bind(&handleCRspAndReturnClient, current));

    return 0;
}

promise::Future<std::string> 
sendBReq(BServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)
{
//Define a promise::Promise<std::string> type variable promise,
//The purpose is to promise to store a string type data in the promise,
//and then pass this variable to the BServantCallback object,
//then initiate an asynchronous call
//and finally return promise.getFuture(),
//meaning promise promised string type data
//can be obtained through promise::Future<std::string> type
//promise.getFuture()

    promise::Promise<std::string> promise;

    Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);

    prx->async_queryResult(cb, sIn);

    return promise.getFuture();//return a future to f
}

//
promise::Future<std::string> 
sendCReq(CServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)
{
//The meaning of this is similar to sendBReq
//...
}

//
promise::Future<std::string> 
handleBRspAndSendCReq(CServantPrx prx, JceCurrentPtr current, const promise::Future<std::string>& future)
{
std::string sResult("");
std::string sException ("");
try
{
//When this line of code is executed, the data promised by promie to the future has been reached
//The data reached is divided into two situations, one is normal data, that is, the result data of requesting service B is returned
Yes ,//Then calling future.get() will get this data.//The
second is abnormal data, that is, the result data of requesting service B is not returned. For example, the asynchronous call timeout
//then calling future.get() will throw Exception, so you need to try-catch it

        sResult = future.get();
return sendCReq(prx, sResult, current);
}
catch (exception& e)
{
TLOGDEBUG("Exception:" << e.what() << endl);
sException = e.what() ;

    promise::Promise<std::string> promise;
promise.setValue(sException); 
return promise.getFuture();
}

//
int 
handleCRspAndReturnClient(JceCurrentPtr current, const promise::Future<std::string>& future)
{
int ret = 0;
std::string sResult("");

    try
{
//Similar to handleBRspAndSendCReq processing
sResult = future.get();
}
catch (exception& e)
{
ret = -1;
sResult = e.what();
TLOGDEBUG("Exception:" << e.what() < <endl);
}

   //Return packet
AServant::async_response_queryResultSerial(current, ret, sResult);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
Let's see what happened step by step~

5.4 Define and initialize a Future type variable
promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current); 
1
sendBReq returns a Future through promise.getFuture(), and uses it to initialize f.

Two questions: 
1. How did promise.getFuture() come from? 
2. How is f initialized?

1. How did promise.getFuture() come from?

promise::Promise<std::string> promise;
Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);
prx->async_queryResult(cb,
sIn ); return promise.getFuture();//Return a future to f
1
2
3
4
There is a data member inside promise:

SharedPtr<detail::FutureObjectInterface<T>> m_future; 
1
The default structure of this member:

Promise()
: m_future(SharedPtr<detail::FutureObject<T> >(new detail::FutureObject<T>()))
{}
1
2
3
It uses FutureObject as the concrete realization of FutureObjectInterface.

The getFuture() method of Promise uses m_future to construct a temporary object Future<T> (the constructor is private, so the Future needs to be the friend of the Future) and returns, so the m_future and promise in the temporary object of promise.getFuture() The m_future in points to the same FutureObject object.

Future<T> getFuture() const
{
return Future<T> (m_future);
}
1
2
3
4 2.
How is f initialized?

Future< T> inherits from FutureBase< T >, the inherited data members:

typedef SharedPtr<detail::FutureObjectInterface<T>> FuturePtr;
FuturePtr m_future;
1
2
Our purpose is to use the m_future of promise.getFuture() (an anonymous temporary object) to initialize the m_future of f so that promise and promise.getFuture( ) And f's m_future both point to the same object, and then promise.getFuture() temporary object is destructed, leaving only promise and f's m_future to point to the same object. With this sharing, we can assign values in the promise (in BServantCallback calls setValue for assignment), and reads in f (via f.get())!

3.public constructors of Future<T>:

Future() {}

explicit Future(typename detail::FutureTraits<T>::rvalue_source_type t)
: detail::FutureBase<T>( SharedPtr<detail::PromptFutureObject<T>>
(new detail::PromptFutureObject<T>(t)))
{ }

Future(const ExceptionPtr& e)
: detail::FutureBase<T>(e)
{}
1
2
3
4
5
6
7
8
9
10
For the second one, since T is a string, detail::FutureTraits< T >::rvalue_source_type is actually The above is const std::string&.

From the above point of view, there is no matching constructor available, and its parent class does not have a copy constructor, so the compiler will copy the members one by one, and finally copy the m_future member in sendBReq. This member is managed by shared_ptr, so the promise The purpose of pointing to the same object as m_future of f is achieved.

5.5 Bind the callback function for processing, and call it in a chain after processing. After the
promised value is set, the callback function is needed for processing. Therefore, we need to bind the callback function through then. In addition, in order to support chained calls, then should return a future, which is generally the return value of the callback function, which is obtained by promise.getFuture() in the callback function.

Future< T> member functions inherited from FutureBase< T >:

get();
isDone();
hasValue();
hasException();
operator unspecified_bool_type() const;//Returns true if this future has been initialized.
1
2
3
4
5 The
above functions are all related functions that call m_future, so The details of the entire future are encapsulated in FutureObject and PromptFutureObject, so I won't go into it here.

Future< T> also implements a function then:

/**
* Register a callback which will be called once the future is satisfied. If an
* exception is thrown the callback will not be registered and then will not be called.

*/throws std::bad_alloc if memory is unavailable.
*/
template <typename R>
Future<typename detail::resolved_type<R>::type> 
then(const Callback<R(const Future&)>& callback) const
{
typedef typename detail::resolved_type<R>::type value_type;

    if (!this->m_future)
{
throwException(FutureUninitializedException(__FILE__, __LINE__));
}

    Promise<value_type> promise;

    this->m_future->registerCallback(
bind(&detail::SequentialCallback<R, T>::template run<R>,
owned(new detail::SequentialCallback<R, T>(callback, promise))));

    return promise.getFuture();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
This function accepts a Callback object as a parameter, Callback encapsulates a function, and its return The value is R and the parameter is Future.

For example, the following call:

f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current));
1
The signature of handleBRspAndSendCReq is as follows:

promise::Future<std::string> handleBRspAndSendCReq(CServantPrx prx, JceCurrentPtr current, const promise::Future<std::string>& future);
1
bind binds the first two parameters prx and current of the function handleBRspAndSendCReq, and the remaining The third parameter future.

5.5.1 The return value type of then After
looking at the parameters, let's take a look at the return value type of then:

Future<typename detail::resolved_type<R>::type> 
1
In the above example, the return type of handleBRspAndSendCReq is promise::Future<std::string>, which is used to realize the template parameter R of then.

Why is the return value type here not directly using R, but to be resolved by resolved_type?

Let's take a look at the definition of resolved_type first:

    template <typename T>
struct resolved_type 
{
typedef T type;
};

    template <typename T>
struct resolved_type<Future<T>> 
{
typedef T type;
};
1
2
3
4
5
6
7
8
9
10
11
The type of resolved_type< T> is T; the type of 
resolved_type< Future< T>> is also Is T. 
Regardless of whether it is an ordinary T or Future<T>, the type members determined by resolved_type are all T.

Let's look at the following another then call:

f.then(promise::bind(&handleCRspAndReturnClient, current));
1
The signature of handleCRspAndReturnClient is as follows:

int handleCRspAndReturnClient(JceCurrentPtr current, const promise::Future<std::string>& future)
1
At this time, int will be used to realize the template parameter R.

In order to be able to make chain calls, we should ensure that what then returns is a Future, so at this time, we can't directly use R, but need to use Future<R>. If R itself is Future<T>, we can extract T through resolved_type.

5.5.2 The function body
of then First clarify the mission of then: 
1. Register a callback function to handle the ready future; 
2. Bring back the return value of the callback function and return a future to the user for chained calls.

Operations in the function body: 
1. 1. ensure that m_future has been initialized. 
2. Define a Promise variable:

Promise<value_type> promise;
1
3. Call this->m_future->registerCallback() to register the callback function we passed in. Here, bind and SequentialCallback are used for packaging:

this->m_future->registerCallback(
bind(&detail::SequentialCallback<R, T>::template run<R>,
owned(new detail::SequentialCallback<R, T>(callback, promise))));
1
2
3
4. Return the future of the promise:

return promise.getFuture();
1 The
specific details will not be discussed here, just need to understand that the promise here returns a Future<value_type> and value_type is closely related to the return value of the function callback (such as handleBRspAndSendCReq) (such as handleBRspAndSendCReq) A Future<std::string>, its value_type is string, handleCRspAndReturnClient returns an int, and its value_type is int).

The role of Promise and SequentialCallback is to bring back the return value of the callback, and finally return it to the user for chain call.

2016/9/15 update:

Bring back the return value of the callback

Now briefly talk about how to bring back the return value of callback and generate a future return in then:

//code snippet in then()
Promise<value_type> promise;

this->m_future->registerCallback(bind(
&detail::SequentialCallback<R, T>::template run<R>,
owned(new detail::SequentialCallback<R, T>(callback, promise))));

return promise.getFuture();
1
2
3
4
5
6
7
8
Here we introduce an extra layer of promise, which promises to bring back the return value of the function callback()** (if the return value is a future, it is still Bring back a future, if it is not a future, such as int, bring back a future<int>).

As you can see, bind is bound to the member function run of SequentialCallback (template indicates that this is a template). 
The call is made through the SequentialCallback object of the second parameter, which encapsulates the callback and a promise.

When the future of the upper layer is ready, the callback function will be called. At this time, the run function of SequentialCallback is called instead of the real callback. In the run function, the real callback is called for processing, and the return value of the callback is set to the promise of the SequentialCallback object, and what then returns is the future associated with this promise.

Therefore, through a layer of indirect future/promise, then successfully returns a future of the return value of the callback.

There are multiple overloaded versions of Run:

//For callback which returns void.
template <typename U>
typename enable_if<is_void<U> >::type run(const FuturePtr& future)
{
try 
{
m_callback(future);
m_promise.set();

catch (... ) 
{
m_promise.setException(currentException());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
When the callback return value is void, there is no need to return a value, so the promise calls an empty set.

//For callback which returns non-void non-future type
template <typename U>
typename enable_if_c<!is_void<U>::value && !is_future_type<U>::value>::type
run(const FuturePtr& future)
{
try 
{
m_promise.setValue(m_callback(future));

catch (...) 
{
m_promise.setException(currentException());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
When the return value of callback is not When void and not future, call m_callback and set the return value to the promise, so the value can be obtained by promise.getFuture().get().

//For callback which returns future type.
template <typename U>
typename enable_if<is_future_type<U> >::type run(const FuturePtr& future)
{
try 
{
m_callback(future).then(
bind(&ForwardValue<value_type>::template run<value_type>,
owned(new ForwardValue<value_type>(m_promise))));

catch (...) 
{
m_promise.setException(currentException());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
When the callback return value is a future, use then to bind the future to ForwardValue's run function (similar technique), and then use the get() method to fetch the value in the future from the run function and set it to then Among the defined promises, the return value of then stores the return value of callback in the form of a future.

As for how to judge the type, of course it is a sharp weapon traits technique, which will not be expanded here.

5.5.3 Summary of then function
Let's take a look at the following example again to clarify the content of these two lines of code:

promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);
f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
.then(promise::bind(&handleCRspAndReturnClient, current)) ;
1
2
3
1. we define a Promise in sendBReq and bind it to BServantCallback.

When the package is called asynchronously, BServantCallback will be called back, and the setValue of Promise will be called for assignment.

After the assignment is completed, the callback handleBRspAndSendCReq bound by then will be called for processing.

Since we use promise.getFuture() to make f and promise's m_future** point to the same object**, we can use f.get() to read the value in the callback handleBRspAndSendCReq.

f.get() just completed the handleBRsp part. In SendCReq, similar to sendBReq, I defined a Promise. We need to use the future associated with it (obtained by promise.getFuture()) as the return value of handleBRspAndSendCReq. And return this future to the user through the Promise and SequentialCallback in then, so that the user can continue to call then to specify the handle.

Summary: then helps us register the handle callback function in the future. When the future is readable, the handle will be called for processing, and then the return value of the handle will be brought back for us for chain calling.

5.6 Combing the relationship between future and promise
Sorting Both have an m_future member whose type is

SharedPtr<detail::FutureObjectInterface<T>>
1


Because Future<T> needs to be specialized for void, in order to avoid excessive repetition of code, the part that has nothing to do with the specialization is extracted to form FutureBase as a base class.

As can be seen from the above figure, both Future and Promise hold m_future, and both are shared and related through this object (implemented by promise.getFuture()). Among them, Promise provides a set (write) interface for m_future to the outside, and Future provides a get (read) interface for m_future to the outside.

5.7 Implementation of FutureObjectInterface
following figure shows the specific implementation of FutureObjectInterface:

As you can see, FutureObjectInterface has two implementations: FutureObject and PromptFutureObject.

The FutureObjectInterface of the Promise is a FutureObject.

There are two situations for Future's FutureObjectInterface: PromptFutureObject is used when constructing a Future directly with a value (such as calling makeFuture to obtain a future), and FutureObject is used in other cases (such as a future obtained through Promise).

So, what is the difference between the two?

For the first application scenario, the future is not obtained through a promise, but directly constructed with an immediate number:

explicit Future(typename detail::FutureTraits<T>::rvalue_source_type t)
: detail::FutureBase<T>(SharedPtr<detail::PromptFutureObject<T>>
(new detail::PromptFutureObject<T>(t)))
{ }
1
2
3
4 For
example, the following application scenario:

Future< int> hsF = makeFuture<int>(-1);//Use immediate data to construct
if (openSwitch)
{
//make an asynchronous call in sendBReq, and return a future through promise.getFuture()
hsF = sendBReq();
}
//
1
2
3
4
5
6
7
In the handle, we can decide whether to deal with it through the following judgments:

int result = hsF.get();
if (result != -1)
{
//handle
}
1
2
3
4
5
This usage of immediate number construction, since its value has been set, there is no need to wait for the promise to be filled in. Therefore, the PromptFutureObject inside the future is read-only, and there is no need to lock it. If you still use this version of FutureObject, it will do useless work on locking and unlocking. Therefore, PromptFutureObject is optimized for this scenario.

When Future and Promise share the same m_future, because Future and Promise may be in different threads, they may read and write at the same time. There is a race condition here, so locks are required. FutureObject is a locked version.

Regarding FutureObject, there are several points to note: 
1. At the beginning of the introduction, we said that Future can get the value of the shared state (via the get() method), block the caller and wait for the shared state if necessary The flag becomes ready before the value of the shared state can be obtained. 


2. setValue can only be called once, that is, the value of the shared state can only be set once, if you try to set it a second time, an exception will be thrown. 


3. In registerCallback, judge whether the value has been set according to m_is_done. If m_is_done is true, call callback (this->sharedFromThis()) directly to process, otherwise add callback to the m_pending_callbacks list and call after setValue. In setValue, in addition to setting the value, the doPendingCallbacks() function is also called, in which the callbacks in the m_pending_callbacks list are called one by one.

Finally, regarding the implementation of when_all (parallel asynchronous calls), I will add time later~
--------------------- 
Author: Jiange_zh 
Source: CSDN 
Original text: blog.csdn .net/jiange_zh/a...
Copyright statement: This article is the original article of the blogger, please attach a link to the blog post if you reprint it!