libqi-api  2.0.6.8
/home/opennao/work/master/sdk/libqi/qi/details/future.hxx
Go to the documentation of this file.
00001 #pragma once
00002 /*
00003 **  Copyright (C) 2012 Aldebaran Robotics
00004 **  See COPYING for the license
00005 */
00006 
00007 #ifndef _QI_DETAILS_FUTURE_HXX_
00008 #define _QI_DETAILS_FUTURE_HXX_
00009 
00010 #include <vector>
00011 #include <utility> // pair
00012 #include <boost/bind.hpp>
00013 #include <qi/eventloop.hpp>
00014 
00015 #include <qi/log.hpp>
00016 
00017 namespace qi {
00018 
00019   namespace detail {
00020 
00021     class FutureBasePrivate;
00022     class QI_API FutureBase {
00023     public:
00024       FutureBase();
00025       ~FutureBase();
00026 
00027       FutureState wait(int msecs) const;
00028       FutureState state() const;
00029       bool isRunning() const;
00030       bool isFinished() const;
00031       bool isCanceled() const;
00032       bool hasError(int msecs) const;
00033       bool hasValue(int msecs) const;
00034       const std::string &error(int msecs) const;
00035       void reportStart();
00036       void reset();
00037 
00038     protected:
00039       void reportValue();
00040       void reportError(const std::string &message);
00041       void reportCanceled();
00042       boost::recursive_mutex& mutex();
00043       void notifyFinish();
00044 
00045     public:
00046       FutureBasePrivate *_p;
00047     };
00048 
00049 
00050     //common state shared between a Promise and multiple Futures
00051     template <typename T>
00052     class FutureBaseTyped : public FutureBase {
00053     public:
00054       typedef typename FutureType<T>::type ValueType;
00055       FutureBaseTyped()
00056         : _value()
00057         , _async(FutureCallbackType_Async)
00058       {
00059       }
00060 
00061       bool isCancelable() const
00062       {
00063         return _onCancel;
00064       }
00065 
00066       void cancel(qi::Future<T>& future)
00067       {
00068         if (isFinished())
00069           return;
00070         if (!_onCancel)
00071           throw FutureException(FutureException::ExceptionState_FutureNotCancelable);
00072         _onCancel(Promise<T>(future));
00073       }
00074 
00075       void setOnCancel(boost::function<void (Promise<T>)> onCancel)
00076       {
00077         _onCancel = onCancel;
00078       }
00079 
00080 
00081       void callCbNotify(qi::Future<T>& future)
00082       {
00083         for(unsigned i = 0; i<_onResult.size(); ++i)
00084         {
00085           try {
00086             if (_async == FutureCallbackType_Async)
00087               getEventLoop()->post(boost::bind(_onResult[i], future));
00088             else
00089               _onResult[i](future);
00090           } catch(const qi::PointerLockException&) { // do nothing
00091           } catch(const std::exception& e) {
00092             qiLogError("qi.future") << "Exception caught in future callback "
00093                                     << e.what();
00094           } catch (...) {
00095             qiLogError("qi.future")
00096                 << "Unknown exception caught in future callback";
00097           }
00098         }
00099         notifyFinish();
00100       }
00101 
00102       void setValue(qi::Future<T>& future, const ValueType &value)
00103       {
00104         // report-ready + onResult() must be Atomic to avoid
00105         // missing callbacks/double calls in case connect() is invoked at
00106         // the same time
00107         boost::recursive_mutex::scoped_lock lock(mutex());
00108         if (!isRunning())
00109           throw FutureException(FutureException::ExceptionState_PromiseAlreadySet);
00110 
00111         _value = value;
00112         reportValue();
00113         callCbNotify(future);
00114       }
00115 
00116       /*
00117        * inplace api for promise
00118        */
00119       void set(qi::Future<T>& future)
00120       {
00121         // report-ready + onResult() must be Atomic to avoid
00122         // missing callbacks/double calls in case connect() is invoked at
00123         // the same time
00124         boost::recursive_mutex::scoped_lock lock(mutex());
00125         if (!isRunning())
00126           throw FutureException(FutureException::ExceptionState_PromiseAlreadySet);
00127 
00128         reportValue();
00129         callCbNotify(future);
00130       }
00131 
00132       void setError(qi::Future<T>& future, const std::string &message)
00133       {
00134         boost::recursive_mutex::scoped_lock lock(mutex());
00135         if (!isRunning())
00136           throw FutureException(FutureException::ExceptionState_PromiseAlreadySet);
00137 
00138         reportError(message);
00139         callCbNotify(future);
00140       }
00141 
00142       void setCanceled(qi::Future<T>& future) {
00143         boost::recursive_mutex::scoped_lock lock(mutex());
00144         if (!isRunning())
00145           throw FutureException(FutureException::ExceptionState_PromiseAlreadySet);
00146 
00147         reportCanceled();
00148         callCbNotify(future);
00149       }
00150 
00151 
00152       void connect(qi::Future<T> future, const boost::function<void (qi::Future<T>)> &s)
00153       {
00154         bool ready;
00155         {
00156           boost::recursive_mutex::scoped_lock lock(mutex());
00157           _onResult.push_back(s);
00158           ready = isFinished();
00159         }
00160         //result already ready, notify the callback
00161         if (ready) {
00162           if (_async == FutureCallbackType_Async)
00163             getEventLoop()->post(boost::bind(s, future));
00164           else
00165           {
00166             try {
00167               s(future);
00168             } catch(const ::qi::PointerLockException&)
00169             {/*do nothing*/}
00170           }
00171         }
00172       }
00173 
00174       const ValueType &value(int msecs) const {
00175         FutureState state = wait(msecs);
00176         if (state == FutureState_Running)
00177           throw FutureException(FutureException::ExceptionState_FutureTimeout);
00178         if (state == FutureState_Canceled)
00179           throw FutureException(FutureException::ExceptionState_FutureCanceled);
00180         if (state == FutureState_FinishedWithError)
00181           throw FutureUserException(error(FutureTimeout_None));
00182         return _value;
00183       }
00184 
00185     private:
00186       friend class Promise<T>;
00187       typedef std::vector<boost::function<void (qi::Future<T>)> > Callbacks;
00188       Callbacks                _onResult;
00189       ValueType                _value;
00190       boost::function<void (Promise<T>)> _onCancel;
00191       FutureCallbackType       _async;
00192     };
00193 
00194     template <typename T>
00195     void waitForFirstHelper(qi::Promise< qi::Future<T> >& prom,
00196                             qi::Future<T>& fut,
00197                             qi::Atomic<int>* count) {
00198       if (!prom.future().isFinished() && !fut.hasError())
00199       {
00200         // An other future can trigger at the same time.
00201         // Don't bother to lock, just catch the FutureAlreadySet exception
00202         try
00203         {
00204           prom.setValue(fut);
00205         }
00206         catch(const FutureException&)
00207         {}
00208       }
00209       if (! --*count)
00210       {
00211         // I'm the last
00212         if (!prom.future().isFinished())
00213         {
00214           // same 'race' as above. between two setError, not between a value and
00215           // an error.
00216           try
00217           {
00218             prom.setValue(makeFutureError<T>("No future returned successfully."));
00219           }
00220           catch(const FutureException&)
00221           {}
00222         }
00223         delete count;
00224       }
00225     }
00226   } // namespace detail
00227 
00228   template <typename T>
00229   qi::Future<T> makeFutureError(const std::string &error, FutureCallbackType async) {
00230     qi::Promise<T> prom(async);
00231     prom.setError(error);
00232     return prom.future();
00233   }
00234 
00235   template <typename T>
00236   void waitForAll(std::vector<Future<T> >& vect) {
00237     typename std::vector< Future<T> >::iterator it;
00238     qi::FutureBarrier<T> barrier;
00239 
00240     for (it = vect.begin(); it != vect.end(); ++it) {
00241       barrier.addFuture(*it);
00242     }
00243     barrier.future().wait();
00244   }
00245 
00246   template <typename T>
00247   qi::FutureSync< qi::Future<T> > waitForFirst(std::vector< Future<T> >& vect) {
00248     typename std::vector< Future<T> >::iterator it;
00249     qi::Promise< qi::Future<T> > prom;
00250     qi::Atomic<int>* count = new qi::Atomic<int>();
00251     count->swap((int)vect.size());
00252     for (it = vect.begin(); it != vect.end(); ++it) {
00253       it->connect(boost::bind<void>(&detail::waitForFirstHelper<T>, prom, *it, count));
00254     }
00255     return prom.future();
00256   }
00257 
00258   namespace detail
00259   {
00260     template<typename FT, typename PT, typename CONV>
00261     void futureAdapter(Future<FT> f, Promise<PT> p, CONV converter)
00262     {
00263       if (f.hasError())
00264         p.setError(f.error());
00265       else if (f.isCanceled())
00266         p.setCanceled();
00267       else
00268       {
00269         try {
00270           converter(f.value(), p.value());
00271         }
00272         catch (const std::exception& e)
00273         {
00274           p.setError(std::string("futureAdapter conversion error: ") + e.what());
00275           return;
00276         }
00277         p.trigger();
00278       }
00279     }
00280 
00281     template<typename FT>
00282     void futureCancelAdapter(boost::weak_ptr<FutureBaseTyped<FT> > wf)
00283     {
00284       if (boost::shared_ptr<FutureBaseTyped<FT> > f = wf.lock())
00285         Future<FT>(f).cancel();
00286     }
00287   }
00288 
00289   template <>
00290   struct FutureValueConverter<void, void>
00291   {
00292     void operator()(void* in, void* out)
00293     {
00294     }
00295   };
00296 
00297   template <typename T>
00298   struct FutureValueConverter<T, void>
00299   {
00300     void operator()(const T& in, void* out)
00301     {
00302     }
00303   };
00304 
00305   template <typename T>
00306   struct FutureValueConverter<void, T>
00307   {
00308     void operator()(void* in, const T& out)
00309     {
00310     }
00311   };
00312 
00313   template<typename FT, typename PT>
00314   void adaptFuture(const Future<FT>& f, Promise<PT>& p)
00315   {
00316     if (f.isCancelable())
00317       p.setup(boost::bind(&detail::futureCancelAdapter<FT>,
00318             boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p)));
00319     const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, FutureValueConverter<FT, PT> >, _1, p,
00320       FutureValueConverter<FT, PT>()));
00321   }
00322 
00323   template<typename FT, typename PT, typename CONV>
00324   void adaptFuture(const Future<FT>& f, Promise<PT>& p, CONV converter)
00325   {
00326     if (f.isCancelable())
00327       p.setup(boost::bind(&detail::futureCancelAdapter<FT>,
00328             boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p)));
00329     const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, CONV>, _1, p, converter));
00330   }
00331 }
00332 
00333 #endif  // _QI_DETAILS_FUTURE_HXX_
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines