00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef TIMEOUTPROVIDER_H
00021 #define TIMEOUTPROVIDER_H
00022
00035 #include <list>
00036 #include <sys/time.h>
00037
00038 #include <cc++/config.h>
00039 #include <cc++/thread.h>
00040
00051 template <class TOCommand, class TOSubscriber>
00052 class TPRequest
00053 {
00054
00055 public:
00056
00057 TPRequest( TOSubscriber tsi, int timeoutMs, const TOCommand &command):
00058 subscriber(tsi)
00059 {
00060 struct timeval tv;
00061 gettimeofday(&tv, NULL );
00062
00063 when_ms = ((uint64)tv.tv_sec) * (uint64)1000 + ((uint64)tv.tv_usec) / (uint64)1000;
00064 when_ms += timeoutMs;
00065 this->command = command;
00066 }
00067
00071 bool happensBefore(uint64 t)
00072 {
00073 if (when_ms < t) {
00074 return true;
00075 }
00076 if (when_ms > t) {
00077 return false;
00078 }
00079 return false;
00080
00081 }
00082
00083 bool happensBefore(const TPRequest *req){
00084 return happensBefore(req->when_ms);
00085 }
00086
00091 int getMsToTimeout ()
00092 {
00093 struct timeval tv;
00094 gettimeofday(&tv, NULL );
00095
00096 uint64 now = ((uint64)tv.tv_sec) * (uint64)1000 + ((uint64)tv.tv_usec) / (uint64)1000;
00097
00098 if (happensBefore(now)) {
00099 return 0;
00100 }
00101 else {
00102 return (int)(when_ms - now);
00103 }
00104 }
00105
00106 TOCommand getCommand()
00107 {
00108 return command;
00109 }
00110
00111 TOSubscriber getSubscriber()
00112 {
00113 return subscriber;
00114 }
00115
00122 bool operator==(const TPRequest<TOCommand, TOSubscriber> &req)
00123 {
00124 if (req.subscriber == subscriber &&
00125 req.command == command &&
00126 req.when_ms == when_ms) {
00127 return true;
00128 }
00129 return false;
00130 }
00131
00132 private:
00133 TOSubscriber subscriber;
00134 uint64 when_ms;
00135
00136
00137 TOCommand command;
00138
00139 };
00140
00147 template<class TOCommand, class TOSubscriber>
00148 class TimeoutProvider : public ost::Thread, ost::Event {
00149
00150 public:
00151
00155 TimeoutProvider(): requests(), synchLock(), stop(false) { }
00156
00160 ~TimeoutProvider() {
00161 terminate();
00162 }
00163
00167 void stopThread(){
00168 stop = true;
00169 signal();
00170 }
00171
00181 void requestTimeout(int32_t time_ms, TOSubscriber subscriber, const TOCommand &command)
00182 {
00183 TPRequest<TOCommand, TOSubscriber>* request =
00184 new TPRequest<TOCommand, TOSubscriber>(subscriber, time_ms, command);
00185
00186 synchLock.enter();
00187
00188 if (requests.size()==0) {
00189 requests.push_front(request);
00190 signal();
00191 synchLock.leave();
00192 return;
00193 }
00194 if (request->happensBefore(requests.front())) {
00195 requests.push_front(request);
00196 signal();
00197 synchLock.leave();
00198 return;
00199 }
00200 if (requests.back()->happensBefore(request)){
00201 requests.push_back(request);
00202 signal();
00203 synchLock.leave();
00204 return;
00205 }
00206
00207 typename std::list<TPRequest<TOCommand, TOSubscriber>* >::iterator i;
00208 for(i = requests.begin(); i != requests.end(); i++ ) {
00209 if( request->happensBefore(*i)) {
00210 requests.insert(i, request);
00211 break;
00212 }
00213 }
00214 signal();
00215 synchLock.leave();
00216 }
00217
00223 void cancelRequest(TOSubscriber subscriber, const TOCommand &command)
00224 {
00225 synchLock.enter();
00226 typename std::list<TPRequest<TOCommand, TOSubscriber>* >::iterator i;
00227 for(i = requests.begin(); i != requests.end(); ) {
00228 if( (*i)->getCommand() == command &&
00229 (*i)->getSubscriber() == subscriber) {
00230 i = requests.erase(i);
00231 continue;
00232 }
00233 i++;
00234 }
00235 synchLock.leave();
00236 }
00237
00238 protected:
00239
00240 void run()
00241 {
00242 do {
00243 synchLock.enter();
00244 int32_t time = 3600000;
00245 int32_t size = 0;
00246 if ((size = requests.size()) > 0) {
00247 time = requests.front()->getMsToTimeout();
00248 }
00249 if (time == 0 && size > 0) {
00250 if (stop){
00251
00252 return;
00253 }
00254 TPRequest<TOCommand, TOSubscriber>* req = requests.front();
00255 TOSubscriber subs = req->getSubscriber();
00256 TOCommand command = req->getCommand();
00257
00258 requests.pop_front();
00259
00260 synchLock.leave();
00261 subs->handleTimeout(command);
00262 continue;
00263 }
00264 synchLock.leave();
00265 if (stop) {
00266
00267 return;
00268 }
00269 reset();
00270 wait(time);
00271 if (stop) {
00272
00273 return;
00274 }
00275 } while(true);
00276 }
00277
00278 private:
00279
00280
00281
00282 std::list<TPRequest<TOCommand, TOSubscriber> *> requests;
00283
00284 ost::Mutex synchLock;
00285
00286 bool stop;
00287
00288
00289
00290 };
00291
00292 #endif
00293