libqi-api
2.0.6.8
|
00001 #pragma once 00002 /* 00003 ** Copyright (C) 2012 Aldebaran Robotics 00004 ** See COPYING for the license 00005 */ 00006 00007 #ifndef _QI_DETAILS_FUTURE_HXX_ 00008 #define _QI_DETAILS_FUTURE_HXX_ 00009 00010 #include <vector> 00011 #include <utility> // pair 00012 #include <boost/bind.hpp> 00013 #include <qi/eventloop.hpp> 00014 00015 #include <qi/log.hpp> 00016 00017 namespace qi { 00018 00019 namespace detail { 00020 00021 class FutureBasePrivate; 00022 class QI_API FutureBase { 00023 public: 00024 FutureBase(); 00025 ~FutureBase(); 00026 00027 FutureState wait(int msecs) const; 00028 FutureState state() const; 00029 bool isRunning() const; 00030 bool isFinished() const; 00031 bool isCanceled() const; 00032 bool hasError(int msecs) const; 00033 bool hasValue(int msecs) const; 00034 const std::string &error(int msecs) const; 00035 void reportStart(); 00036 void reset(); 00037 00038 protected: 00039 void reportValue(); 00040 void reportError(const std::string &message); 00041 void reportCanceled(); 00042 boost::recursive_mutex& mutex(); 00043 void notifyFinish(); 00044 00045 public: 00046 FutureBasePrivate *_p; 00047 }; 00048 00049 00050 //common state shared between a Promise and multiple Futures 00051 template <typename T> 00052 class FutureBaseTyped : public FutureBase { 00053 public: 00054 typedef typename FutureType<T>::type ValueType; 00055 FutureBaseTyped() 00056 : _value() 00057 , _async(FutureCallbackType_Async) 00058 { 00059 } 00060 00061 bool isCancelable() const 00062 { 00063 return _onCancel; 00064 } 00065 00066 void cancel(qi::Future<T>& future) 00067 { 00068 if (isFinished()) 00069 return; 00070 if (!_onCancel) 00071 throw FutureException(FutureException::ExceptionState_FutureNotCancelable); 00072 _onCancel(Promise<T>(future)); 00073 } 00074 00075 void setOnCancel(boost::function<void (Promise<T>)> onCancel) 00076 { 00077 _onCancel = onCancel; 00078 } 00079 00080 00081 void callCbNotify(qi::Future<T>& future) 00082 { 00083 for(unsigned i = 0; i<_onResult.size(); ++i) 00084 { 00085 try { 00086 if (_async == FutureCallbackType_Async) 00087 getEventLoop()->post(boost::bind(_onResult[i], future)); 00088 else 00089 _onResult[i](future); 00090 } catch(const qi::PointerLockException&) { // do nothing 00091 } catch(const std::exception& e) { 00092 qiLogError("qi.future") << "Exception caught in future callback " 00093 << e.what(); 00094 } catch (...) { 00095 qiLogError("qi.future") 00096 << "Unknown exception caught in future callback"; 00097 } 00098 } 00099 notifyFinish(); 00100 } 00101 00102 void setValue(qi::Future<T>& future, const ValueType &value) 00103 { 00104 // report-ready + onResult() must be Atomic to avoid 00105 // missing callbacks/double calls in case connect() is invoked at 00106 // the same time 00107 boost::recursive_mutex::scoped_lock lock(mutex()); 00108 if (!isRunning()) 00109 throw FutureException(FutureException::ExceptionState_PromiseAlreadySet); 00110 00111 _value = value; 00112 reportValue(); 00113 callCbNotify(future); 00114 } 00115 00116 /* 00117 * inplace api for promise 00118 */ 00119 void set(qi::Future<T>& future) 00120 { 00121 // report-ready + onResult() must be Atomic to avoid 00122 // missing callbacks/double calls in case connect() is invoked at 00123 // the same time 00124 boost::recursive_mutex::scoped_lock lock(mutex()); 00125 if (!isRunning()) 00126 throw FutureException(FutureException::ExceptionState_PromiseAlreadySet); 00127 00128 reportValue(); 00129 callCbNotify(future); 00130 } 00131 00132 void setError(qi::Future<T>& future, const std::string &message) 00133 { 00134 boost::recursive_mutex::scoped_lock lock(mutex()); 00135 if (!isRunning()) 00136 throw FutureException(FutureException::ExceptionState_PromiseAlreadySet); 00137 00138 reportError(message); 00139 callCbNotify(future); 00140 } 00141 00142 void setCanceled(qi::Future<T>& future) { 00143 boost::recursive_mutex::scoped_lock lock(mutex()); 00144 if (!isRunning()) 00145 throw FutureException(FutureException::ExceptionState_PromiseAlreadySet); 00146 00147 reportCanceled(); 00148 callCbNotify(future); 00149 } 00150 00151 00152 void connect(qi::Future<T> future, const boost::function<void (qi::Future<T>)> &s) 00153 { 00154 bool ready; 00155 { 00156 boost::recursive_mutex::scoped_lock lock(mutex()); 00157 _onResult.push_back(s); 00158 ready = isFinished(); 00159 } 00160 //result already ready, notify the callback 00161 if (ready) { 00162 if (_async == FutureCallbackType_Async) 00163 getEventLoop()->post(boost::bind(s, future)); 00164 else 00165 { 00166 try { 00167 s(future); 00168 } catch(const ::qi::PointerLockException&) 00169 {/*do nothing*/} 00170 } 00171 } 00172 } 00173 00174 const ValueType &value(int msecs) const { 00175 FutureState state = wait(msecs); 00176 if (state == FutureState_Running) 00177 throw FutureException(FutureException::ExceptionState_FutureTimeout); 00178 if (state == FutureState_Canceled) 00179 throw FutureException(FutureException::ExceptionState_FutureCanceled); 00180 if (state == FutureState_FinishedWithError) 00181 throw FutureUserException(error(FutureTimeout_None)); 00182 return _value; 00183 } 00184 00185 private: 00186 friend class Promise<T>; 00187 typedef std::vector<boost::function<void (qi::Future<T>)> > Callbacks; 00188 Callbacks _onResult; 00189 ValueType _value; 00190 boost::function<void (Promise<T>)> _onCancel; 00191 FutureCallbackType _async; 00192 }; 00193 00194 template <typename T> 00195 void waitForFirstHelper(qi::Promise< qi::Future<T> >& prom, 00196 qi::Future<T>& fut, 00197 qi::Atomic<int>* count) { 00198 if (!prom.future().isFinished() && !fut.hasError()) 00199 { 00200 // An other future can trigger at the same time. 00201 // Don't bother to lock, just catch the FutureAlreadySet exception 00202 try 00203 { 00204 prom.setValue(fut); 00205 } 00206 catch(const FutureException&) 00207 {} 00208 } 00209 if (! --*count) 00210 { 00211 // I'm the last 00212 if (!prom.future().isFinished()) 00213 { 00214 // same 'race' as above. between two setError, not between a value and 00215 // an error. 00216 try 00217 { 00218 prom.setValue(makeFutureError<T>("No future returned successfully.")); 00219 } 00220 catch(const FutureException&) 00221 {} 00222 } 00223 delete count; 00224 } 00225 } 00226 } // namespace detail 00227 00228 template <typename T> 00229 qi::Future<T> makeFutureError(const std::string &error, FutureCallbackType async) { 00230 qi::Promise<T> prom(async); 00231 prom.setError(error); 00232 return prom.future(); 00233 } 00234 00235 template <typename T> 00236 void waitForAll(std::vector<Future<T> >& vect) { 00237 typename std::vector< Future<T> >::iterator it; 00238 qi::FutureBarrier<T> barrier; 00239 00240 for (it = vect.begin(); it != vect.end(); ++it) { 00241 barrier.addFuture(*it); 00242 } 00243 barrier.future().wait(); 00244 } 00245 00246 template <typename T> 00247 qi::FutureSync< qi::Future<T> > waitForFirst(std::vector< Future<T> >& vect) { 00248 typename std::vector< Future<T> >::iterator it; 00249 qi::Promise< qi::Future<T> > prom; 00250 qi::Atomic<int>* count = new qi::Atomic<int>(); 00251 count->swap((int)vect.size()); 00252 for (it = vect.begin(); it != vect.end(); ++it) { 00253 it->connect(boost::bind<void>(&detail::waitForFirstHelper<T>, prom, *it, count)); 00254 } 00255 return prom.future(); 00256 } 00257 00258 namespace detail 00259 { 00260 template<typename FT, typename PT, typename CONV> 00261 void futureAdapter(Future<FT> f, Promise<PT> p, CONV converter) 00262 { 00263 if (f.hasError()) 00264 p.setError(f.error()); 00265 else if (f.isCanceled()) 00266 p.setCanceled(); 00267 else 00268 { 00269 try { 00270 converter(f.value(), p.value()); 00271 } 00272 catch (const std::exception& e) 00273 { 00274 p.setError(std::string("futureAdapter conversion error: ") + e.what()); 00275 return; 00276 } 00277 p.trigger(); 00278 } 00279 } 00280 00281 template<typename FT> 00282 void futureCancelAdapter(boost::weak_ptr<FutureBaseTyped<FT> > wf) 00283 { 00284 if (boost::shared_ptr<FutureBaseTyped<FT> > f = wf.lock()) 00285 Future<FT>(f).cancel(); 00286 } 00287 } 00288 00289 template <> 00290 struct FutureValueConverter<void, void> 00291 { 00292 void operator()(void* in, void* out) 00293 { 00294 } 00295 }; 00296 00297 template <typename T> 00298 struct FutureValueConverter<T, void> 00299 { 00300 void operator()(const T& in, void* out) 00301 { 00302 } 00303 }; 00304 00305 template <typename T> 00306 struct FutureValueConverter<void, T> 00307 { 00308 void operator()(void* in, const T& out) 00309 { 00310 } 00311 }; 00312 00313 template<typename FT, typename PT> 00314 void adaptFuture(const Future<FT>& f, Promise<PT>& p) 00315 { 00316 if (f.isCancelable()) 00317 p.setup(boost::bind(&detail::futureCancelAdapter<FT>, 00318 boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p))); 00319 const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, FutureValueConverter<FT, PT> >, _1, p, 00320 FutureValueConverter<FT, PT>())); 00321 } 00322 00323 template<typename FT, typename PT, typename CONV> 00324 void adaptFuture(const Future<FT>& f, Promise<PT>& p, CONV converter) 00325 { 00326 if (f.isCancelable()) 00327 p.setup(boost::bind(&detail::futureCancelAdapter<FT>, 00328 boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p))); 00329 const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, CONV>, _1, p, converter)); 00330 } 00331 } 00332 00333 #endif // _QI_DETAILS_FUTURE_HXX_