Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Class Members | File Members

TimeoutProvider.h

Go to the documentation of this file.
00001 /*
00002   Copyright (C) 2006, 2005, 2004 Erik Eliasson, Johan Bilien, Werner Dittmann
00003 
00004   This library is free software; you can redistribute it and/or
00005   modify it under the terms of the GNU Lesser General Public
00006   License as published by the Free Software Foundation; either
00007   version 2.1 of the License, or (at your option) any later version.
00008 
00009   This library is distributed in the hope that it will be useful,
00010   but WITHOUT ANY WARRANTY; without even the implied warranty of
00011   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00012   Lesser General Public License for more details.
00013 
00014   You should have received a copy of the GNU Lesser General Public
00015   License along with this library; if not, write to the Free Software
00016   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 */
00018 
00019 
00020 #ifndef TIMEOUTPROVIDER_H
00021 #define TIMEOUTPROVIDER_H
00022 
00035 #include <list>
00036 #include <sys/time.h>
00037 
00038 #include <cc++/config.h>
00039 #include <cc++/thread.h>
00040 
00051 template <class TOCommand, class TOSubscriber>
00052 class TPRequest
00053 {
00054 
00055 public:
00056 
00057     TPRequest( TOSubscriber tsi, int timeoutMs, const TOCommand &command):
00058         subscriber(tsi)
00059     {
00060         struct timeval tv;
00061         gettimeofday(&tv, NULL );
00062 
00063         when_ms = ((uint64)tv.tv_sec) * (uint64)1000 + ((uint64)tv.tv_usec) / (uint64)1000;
00064         when_ms += timeoutMs;
00065         this->command = command;
00066     }
00067 
00071     bool happensBefore(uint64 t)
00072     {
00073         if (when_ms < t) {
00074             return true;
00075         }
00076         if (when_ms > t) {
00077             return false;
00078         }
00079         return false; // if equal it does not "happens_before"
00080 
00081     }
00082 
00083     bool happensBefore(const TPRequest *req){
00084         return happensBefore(req->when_ms);
00085     }
00086 
00091     int getMsToTimeout ()
00092     {
00093         struct timeval tv;
00094         gettimeofday(&tv, NULL );
00095 
00096         uint64 now = ((uint64)tv.tv_sec) * (uint64)1000 + ((uint64)tv.tv_usec) / (uint64)1000;
00097 
00098         if (happensBefore(now)) {
00099             return 0;
00100         }
00101         else {
00102             return (int)(when_ms - now);
00103         }
00104     }
00105 
00106     TOCommand getCommand()
00107     {
00108         return command;
00109     }
00110 
00111     TOSubscriber getSubscriber()
00112     {
00113         return subscriber;
00114     }
00115 
00122     bool operator==(const TPRequest<TOCommand, TOSubscriber> &req)
00123     {
00124         if (req.subscriber == subscriber &&
00125                req.command == command &&
00126                req.when_ms == when_ms) {
00127             return true;
00128         }
00129         return false;
00130     }
00131 
00132 private:
00133     TOSubscriber subscriber;
00134     uint64 when_ms;             // Time since Epoch in ms when the timeout
00135                                 // will happen
00136 
00137     TOCommand command;          // Command that will be delivered to the
00138                                 // receiver (subscriber) of the timeout.
00139 };
00140 
00147 template<class TOCommand, class TOSubscriber>
00148         class TimeoutProvider : public ost::Thread, ost::Event {
00149 
00150 public:
00151 
00155     TimeoutProvider(): requests(), synchLock(), stop(false)  { }
00156 
00160     ~TimeoutProvider() {
00161         terminate();
00162     }
00163 
00167     void stopThread(){
00168         stop = true;
00169         signal();               // signal event to waiting thread
00170     }
00171 
00181     void requestTimeout(int32_t time_ms, TOSubscriber subscriber, const TOCommand &command)
00182     {
00183         TPRequest<TOCommand, TOSubscriber>* request =
00184                 new TPRequest<TOCommand, TOSubscriber>(subscriber, time_ms, command);
00185 
00186         synchLock.enter();
00187 
00188         if (requests.size()==0) {
00189             requests.push_front(request);
00190             signal();
00191             synchLock.leave();
00192             return;
00193         }
00194         if (request->happensBefore(requests.front())) {
00195             requests.push_front(request);
00196             signal();
00197             synchLock.leave();
00198             return;
00199         }
00200         if (requests.back()->happensBefore(request)){
00201             requests.push_back(request);
00202             signal();
00203             synchLock.leave();
00204             return;
00205         }
00206 
00207         typename std::list<TPRequest<TOCommand, TOSubscriber>* >::iterator i;
00208         for(i = requests.begin(); i != requests.end(); i++ ) {
00209             if( request->happensBefore(*i)) {
00210                 requests.insert(i, request);
00211                 break;
00212             }
00213         }
00214         signal();
00215         synchLock.leave();
00216     }
00217 
00223     void cancelRequest(TOSubscriber subscriber, const TOCommand &command)
00224     {
00225         synchLock.enter();
00226         typename std::list<TPRequest<TOCommand, TOSubscriber>* >::iterator i;
00227         for(i = requests.begin(); i != requests.end(); ) {
00228             if( (*i)->getCommand() == command &&
00229                 (*i)->getSubscriber() == subscriber) {
00230                 i = requests.erase(i);
00231                 continue;
00232             }
00233             i++;
00234         }
00235         synchLock.leave();
00236     }
00237 
00238 protected:
00239 
00240     void run()
00241     {
00242         do {
00243             synchLock.enter();
00244             int32_t time = 3600000;
00245             int32_t size = 0;
00246             if ((size = requests.size()) > 0) {
00247                 time = requests.front()->getMsToTimeout();
00248             }
00249             if (time == 0 && size > 0) {
00250                 if (stop){      // This must be checked so that we will
00251                                 // stop even if we have timeouts to deliver.
00252                     return;
00253                 }
00254                 TPRequest<TOCommand, TOSubscriber>* req = requests.front();
00255                 TOSubscriber subs = req->getSubscriber();
00256                 TOCommand command = req->getCommand();
00257 
00258                 requests.pop_front();
00259 
00260                 synchLock.leave(); // call the command with free Mutex
00261                 subs->handleTimeout(command);
00262                 continue;
00263             }
00264             synchLock.leave();
00265             if (stop) {         // If we were told to stop while delivering
00266                                 // a timeout we will exit here
00267                 return;
00268             }
00269             reset();            // ready to receive triggers again
00270             wait(time);
00271             if (stop) {         // If we are told to exit while waiting we
00272                                 // will exit
00273                 return;
00274             }
00275         } while(true);
00276     }
00277 
00278 private:
00279 
00280     // The timeouts are ordered in the order of which they
00281     // will expire. Nearest in future is first in list.
00282     std::list<TPRequest<TOCommand, TOSubscriber> *> requests;
00283 
00284     ost::Mutex synchLock;       // Protects the internal data structures
00285 
00286     bool stop;          // Flag to tell the worker thread
00287                         // to terminate. Set to true and
00288                         // wake the worker thread to
00289                         // terminate it.
00290 };
00291 
00292 #endif
00293 

© sourcejam.com 2005-2008