libqi-api  2.1.4.13
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
future.hxx
Go to the documentation of this file.
1 #pragma once
2 /*
3 ** Copyright (C) 2012 Aldebaran Robotics
4 ** See COPYING for the license
5 */
6 
7 #ifndef _QI_DETAILS_FUTURE_HXX_
8 #define _QI_DETAILS_FUTURE_HXX_
9 
10 #include <vector>
11 #include <utility> // pair
12 #include <boost/bind.hpp>
13 #include <qi/eventloop.hpp>
14 
15 #include <qi/log.hpp>
16 
17 namespace qi {
18 
19  namespace detail {
20 
21  class FutureBasePrivate;
22  class QI_API FutureBase {
23  public:
24  FutureBase();
25  ~FutureBase();
26 
27  FutureState wait(int msecs) const;
28  FutureState state() const;
29  bool isRunning() const;
30  bool isFinished() const;
31  bool isCanceled() const;
32  bool hasError(int msecs) const;
33  bool hasValue(int msecs) const;
34  const std::string &error(int msecs) const;
35  void reportStart();
36  void reset();
37 
38  protected:
39  void reportValue();
40  void reportError(const std::string &message);
41  void reportCanceled();
42  boost::recursive_mutex& mutex();
43  void notifyFinish();
44 
45  public:
46  FutureBasePrivate *_p;
47  };
48 
49 
50  //common state shared between a Promise and multiple Futures
51  template <typename T>
52  class FutureBaseTyped : public FutureBase {
53  public:
54  typedef typename FutureType<T>::type ValueType;
55  FutureBaseTyped()
56  : _value()
57  , _async(FutureCallbackType_Async)
58  {
59  }
60 
61  bool isCancelable() const
62  {
63  return _onCancel;
64  }
65 
66  void cancel(qi::Future<T>& future)
67  {
68  if (isFinished())
69  return;
70  if (!_onCancel)
72  _onCancel(Promise<T>(future));
73  }
74 
75  void setOnCancel(boost::function<void (Promise<T>)> onCancel)
76  {
77  _onCancel = onCancel;
78  }
79 
80 
81  void callCbNotify(qi::Future<T>& future)
82  {
83  for(unsigned i = 0; i<_onResult.size(); ++i)
84  {
85  try {
86  if (_async == FutureCallbackType_Async)
87  getEventLoop()->post(boost::bind(_onResult[i], future));
88  else
89  _onResult[i](future);
90  } catch(const qi::PointerLockException&) { // do nothing
91  } catch(const std::exception& e) {
92  qiLogError("qi.future") << "Exception caught in future callback "
93  << e.what();
94  } catch (...) {
95  qiLogError("qi.future")
96  << "Unknown exception caught in future callback";
97  }
98  }
99  notifyFinish();
100  }
101 
102  void setValue(qi::Future<T>& future, const ValueType &value)
103  {
104  // report-ready + onResult() must be Atomic to avoid
105  // missing callbacks/double calls in case connect() is invoked at
106  // the same time
107  boost::recursive_mutex::scoped_lock lock(mutex());
108  if (!isRunning())
110 
111  _value = value;
112  reportValue();
113  callCbNotify(future);
114  }
115 
116  /*
117  * inplace api for promise
118  */
119  void set(qi::Future<T>& future)
120  {
121  // report-ready + onResult() must be Atomic to avoid
122  // missing callbacks/double calls in case connect() is invoked at
123  // the same time
124  boost::recursive_mutex::scoped_lock lock(mutex());
125  if (!isRunning())
127 
128  reportValue();
129  callCbNotify(future);
130  }
131 
132  void setError(qi::Future<T>& future, const std::string &message)
133  {
134  boost::recursive_mutex::scoped_lock lock(mutex());
135  if (!isRunning())
137 
138  reportError(message);
139  callCbNotify(future);
140  }
141 
142  void setCanceled(qi::Future<T>& future) {
143  boost::recursive_mutex::scoped_lock lock(mutex());
144  if (!isRunning())
146 
147  reportCanceled();
148  callCbNotify(future);
149  }
150 
151 
152  void connect(qi::Future<T> future,
153  const boost::function<void (qi::Future<T>)> &s,
154  FutureCallbackType type)
155  {
156  bool ready;
157  {
158  boost::recursive_mutex::scoped_lock lock(mutex());
159  _onResult.push_back(s);
160  ready = isFinished();
161  }
162  //result already ready, notify the callback
163  if (ready) {
164  if (type == FutureCallbackType_Async)
165  getEventLoop()->post(boost::bind(s, future));
166  else
167  {
168  try {
169  s(future);
170  } catch(const ::qi::PointerLockException&)
171  {/*do nothing*/}
172  }
173  }
174  }
175 
176  const ValueType &value(int msecs) const {
177  FutureState state = wait(msecs);
178  if (state == FutureState_Running)
179  throw FutureException(FutureException::ExceptionState_FutureTimeout);
180  if (state == FutureState_Canceled)
181  throw FutureException(FutureException::ExceptionState_FutureCanceled);
182  if (state == FutureState_FinishedWithError)
183  throw FutureUserException(error(FutureTimeout_None));
184  return _value;
185  }
186 
187  private:
188  friend class Promise<T>;
189  typedef std::vector<boost::function<void (qi::Future<T>)> > Callbacks;
190  Callbacks _onResult;
191  ValueType _value;
192  boost::function<void (Promise<T>)> _onCancel;
193  FutureCallbackType _async;
194  };
195 
196  template <typename T>
198  qi::Future<T>& fut,
199  qi::Atomic<int>* count) {
200  if (!prom.future().isFinished() && !fut.hasError())
201  {
202  // An other future can trigger at the same time.
203  // Don't bother to lock, just catch the FutureAlreadySet exception
204  try
205  {
206  prom.setValue(fut);
207  }
208  catch(const FutureException&)
209  {}
210  }
211  if (! --*count)
212  {
213  // I'm the last
214  if (!prom.future().isFinished())
215  {
216  // same 'race' as above. between two setError, not between a value and
217  // an error.
218  try
219  {
220  prom.setValue(makeFutureError<T>("No future returned successfully."));
221  }
222  catch(const FutureException&)
223  {}
224  }
225  delete count;
226  }
227  }
228  } // namespace detail
229 
230  template <typename T>
231  qi::Future<T> makeFutureError(const std::string &error, FutureCallbackType async) {
232  qi::Promise<T> prom(async);
233  prom.setError(error);
234  return prom.future();
235  }
236 
237  template <typename T>
238  void waitForAll(std::vector<Future<T> >& vect) {
239  typename std::vector< Future<T> >::iterator it;
240  qi::FutureBarrier<T> barrier;
241 
242  for (it = vect.begin(); it != vect.end(); ++it) {
243  barrier.addFuture(*it);
244  }
245  barrier.future().wait();
246  }
247 
248  template <typename T>
250  typename std::vector< Future<T> >::iterator it;
252  qi::Atomic<int>* count = new qi::Atomic<int>();
253  count->swap((int)vect.size());
254  for (it = vect.begin(); it != vect.end(); ++it) {
255  it->connect(boost::bind<void>(&detail::waitForFirstHelper<T>, prom, *it, count));
256  }
257  return prom.future();
258  }
259 
260  namespace detail
261  {
262  template<typename FT, typename PT, typename CONV>
263  void futureAdapter(Future<FT> f, Promise<PT> p, CONV converter)
264  {
265  if (f.hasError())
266  p.setError(f.error());
267  else if (f.isCanceled())
268  p.setCanceled();
269  else
270  {
271  try {
272  converter(f.value(), p.value());
273  }
274  catch (const std::exception& e)
275  {
276  p.setError(std::string("futureAdapter conversion error: ") + e.what());
277  return;
278  }
279  p.trigger();
280  }
281  }
282 
283  template<typename FT>
284  void futureCancelAdapter(boost::weak_ptr<FutureBaseTyped<FT> > wf)
285  {
286  if (boost::shared_ptr<FutureBaseTyped<FT> > f = wf.lock())
287  Future<FT>(f).cancel();
288  }
289  }
290 
291  template <>
292  struct FutureValueConverter<void, void>
293  {
294  void operator()(void* in, void* out)
295  {
296  }
297  };
298 
299  template <typename T>
300  struct FutureValueConverter<T, void>
301  {
302  void operator()(const T& in, void* out)
303  {
304  }
305  };
306 
307  template <typename T>
308  struct FutureValueConverter<void, T>
309  {
310  void operator()(void* in, const T& out)
311  {
312  }
313  };
314 
315  template<typename FT, typename PT>
316  void adaptFuture(const Future<FT>& f, Promise<PT>& p)
317  {
318  if (f.isCancelable())
319  p.setup(boost::bind(&detail::futureCancelAdapter<FT>,
320  boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p)));
321  const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, FutureValueConverter<FT, PT> >, _1, p,
323  }
324 
325  template<typename FT, typename PT, typename CONV>
326  void adaptFuture(const Future<FT>& f, Promise<PT>& p, CONV converter)
327  {
328  if (f.isCancelable())
329  p.setup(boost::bind(&detail::futureCancelAdapter<FT>,
330  boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p)));
331  const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, CONV>, _1, p, converter));
332  }
333 }
334 
335 #endif // _QI_DETAILS_FUTURE_HXX_
This class helps waiting on multiple futures at the same point.
Definition: future.hpp:521
void futureAdapter(Future< FT > f, Promise< PT > p, CONV converter)
Definition: future.hxx:263
const ValueType & value(int msecs=FutureTimeout_Infinite) const
Return the value associated to a Future.
Definition: future.hpp:178
void trigger()
Definition: future.hpp:501
void operator()(void *in, void *out)
Definition: future.hxx:294
Future< std::vector< Future< T > > > future()
Gets the future result for the barrier.
Definition: future.hpp:544
The future has been canceled.
Definition: future.hpp:65
qi::Future< T > makeFutureError(const std::string &error, FutureCallbackType async)
Definition: future.hxx:231
FutureState
Definition: future.hpp:61
void post(const boost::function< void()> &callback, uint64_t usDelay=0)
Similar to async() but without cancelation or notification.
void setCanceled()
Definition: future.hpp:483
bool hasError(int msecs=FutureTimeout_Infinite) const
hasError
Definition: future.hpp:218
bool isCancelable() const
Definition: future.hpp:257
ValueType & value()
Definition: future.hpp:498
Future< T > future() const
Get a future linked to this promise. Can be called multiple times.
Definition: future.hpp:493
qi::FutureSync< qi::Future< T > > waitForFirst(std::vector< Future< T > > &vect)
Helper function to wait for the first valid future.
Definition: future.hxx:249
Operation pending.
Definition: future.hpp:64
void operator()(void *in, const T &out)
Definition: future.hxx:310
void futureCancelAdapter(boost::weak_ptr< FutureBaseTyped< FT > > wf)
Definition: future.hxx:284
void cancel()
Definition: future.hpp:250
Future is not tied to a promise.
Definition: future.hpp:63
EventLoop * getEventLoop()
Return the global eventloop, created on demand on first call.
Specialize this struct to provide conversion between future values.
Definition: future.hpp:583
void waitForFirstHelper(qi::Promise< qi::Future< T > > &prom, qi::Future< T > &fut, qi::Atomic< int > *count)
Definition: future.hxx:197
void adaptFuture(const Future< FT > &f, Promise< PT > &p)
Definition: future.hxx:316
FutureCallbackType
Definition: future.hpp:69
#define qiLogError(...)
Log in error mode.
Definition: log.hpp:70
const std::string & error(int msecs=FutureTimeout_Infinite) const
error
Definition: future.hpp:236
void setError(const std::string &msg)
Definition: future.hpp:476
T swap(T value)
bool addFuture(qi::Future< T > fut)
Adds the future to the barrier.
Definition: future.hpp:532
boost::function< RF > bind(const AF &fun,...)
bool isCanceled() const
isCanceled
Definition: future.hpp:209
boost::shared_ptr< detail::FutureBaseTyped< T > > _p
Definition: future.hpp:310
void waitForAll(std::vector< Future< T > > &vect)
Helper function to wait on a vector of futures.
Definition: future.hxx:238
Convenient log macro.
#define QI_API
Definition: api.hpp:24
void operator()(const T &in, void *out)
Definition: future.hxx:302
void setup(boost::function< void(qi::Promise< T >)> cancelCallback, FutureCallbackType async=FutureCallbackType_Async)
Definition: future.hpp:503