pmsgq.cxx (4386B)
1 /* 2 * 3 * C++ Portable Types Library (PTypes) 4 * Version 2.1.1 Released 27-Jun-2007 5 * 6 * Copyright (C) 2001-2007 Hovik Melikyan 7 * 8 * http://www.melikyan.com/ptypes/ 9 * 10 */ 11 12 #include "pasync.h" 13 14 15 namespace ptypes { 16 17 18 static void msgerror() 19 { 20 fatal(CRIT_FIRST + 42, "Invalid message object"); 21 } 22 23 24 message::message(int iid, pintptr iparam) 25 : next(nil), sync(nil), id(iid), param(iparam), result(0) 26 { 27 } 28 29 30 message::~message() 31 { 32 } 33 34 35 jobqueue::jobqueue(int ilimit) 36 : limit(ilimit), head(nil), tail(nil), qcount(0), sem(0), ovrsem(ilimit), qlock() 37 { 38 } 39 40 41 jobqueue::~jobqueue() 42 { 43 purgequeue(); 44 } 45 46 47 bool jobqueue::enqueue(message* msg, int timeout) 48 { 49 if (msg == nil) 50 msgerror(); 51 52 if (!ovrsem.wait(timeout)) 53 return false; 54 qlock.enter(); 55 msg->next = nil; 56 if (head != nil) 57 head->next = msg; 58 head = msg; 59 if (tail == nil) 60 tail = msg; 61 qcount++; 62 qlock.leave(); 63 sem.post(); 64 return true; 65 } 66 67 68 bool jobqueue::push(message* msg, int timeout) 69 { 70 if (msg == nil) 71 msgerror(); 72 73 if (!ovrsem.wait(timeout)) 74 return false; 75 qlock.enter(); 76 msg->next = tail; 77 tail = msg; 78 if (head == nil) 79 head = msg; 80 qcount++; 81 qlock.leave(); 82 sem.post(); 83 return true; 84 } 85 86 87 message* jobqueue::dequeue(bool safe, int timeout) 88 { 89 if (!sem.wait(timeout)) 90 return nil; 91 if (safe) 92 qlock.enter(); 93 message* msg = tail; 94 tail = msg->next; 95 qcount--; 96 if (tail == nil) 97 head = nil; 98 if (safe) 99 qlock.leave(); 100 ovrsem.post(); 101 return msg; 102 } 103 104 105 void jobqueue::purgequeue() 106 { 107 qlock.enter(); 108 while (get_count() > 0) 109 delete dequeue(false); 110 qlock.leave(); 111 } 112 113 114 bool jobqueue::post(message* msg, int timeout) 115 { 116 return enqueue(msg, timeout); 117 } 118 119 120 bool jobqueue::post(int id, pintptr param, int timeout) 121 { 122 return post(new message(id, param), timeout); 123 } 124 125 126 bool jobqueue::posturgent(message* msg, int timeout) 127 { 128 return push(msg, timeout); 129 } 130 131 132 bool jobqueue::posturgent(int id, pintptr param, int timeout) 133 { 134 return posturgent(new message(id, param), timeout); 135 } 136 137 138 message* jobqueue::getmessage(int timeout) 139 { 140 return dequeue(true, timeout); 141 } 142 143 144 msgqueue::msgqueue(int ilimit) 145 : jobqueue(ilimit), thrlock(), owner(0), quit(false) 146 { 147 } 148 149 150 msgqueue::~msgqueue() 151 { 152 } 153 154 155 void msgqueue::takeownership() 156 { 157 if (owner != pthrself()) 158 { 159 thrlock.enter(); // lock forever 160 // if (owner != 0) 161 // fatal(CRIT_FIRST + 45, "Ownership of the message queue already taken"); 162 owner = pthrself(); 163 } 164 } 165 166 167 pintptr msgqueue::finishmsg(message* msg) 168 { 169 if (msg != nil) 170 { 171 pintptr result = msg->result; 172 173 // if the message was sent by send(), 174 // just signale the semaphore 175 if (msg->sync != nil) 176 msg->sync->post(); 177 178 // otherwise finish it 179 else 180 delete msg; 181 182 return result; 183 } 184 else 185 return 0; 186 } 187 188 189 pintptr msgqueue::send(message* msg) 190 { 191 if (msg == nil) 192 msgerror(); 193 194 try 195 { 196 // if we are in the main thread, 197 // immediately handle the msg 198 if (pthrequal(owner)) 199 handlemsg(msg); 200 201 // if this is called from a concurrent thread, 202 // sync through a semaphore 203 else 204 { 205 if (msg->sync != nil) 206 msgerror(); 207 semaphore sync(0); 208 msg->sync = &sync; 209 push(msg); 210 msg->sync->wait(); 211 msg->sync = 0; 212 } 213 } 214 catch (...) 215 { 216 finishmsg(msg); 217 throw; 218 } 219 220 return finishmsg(msg); 221 } 222 223 224 pintptr msgqueue::send(int id, pintptr param) 225 { 226 return send(new message(id, param)); 227 } 228 229 230 void msgqueue::processone() 231 { 232 takeownership(); 233 message* msg = dequeue(); 234 try 235 { 236 handlemsg(msg); 237 } 238 catch(...) 239 { 240 finishmsg(msg); 241 throw; 242 } 243 finishmsg(msg); 244 } 245 246 247 void msgqueue::processmsgs() 248 { 249 while (!quit && get_count() > 0) 250 processone(); 251 } 252 253 254 void msgqueue::run() 255 { 256 quit = false; 257 do 258 { 259 processone(); 260 } 261 while (!quit); 262 } 263 264 265 void msgqueue::handlemsg(message* msg) 266 { 267 msghandler(*msg); 268 } 269 270 271 void msgqueue::defhandler(message& msg) 272 { 273 switch(msg.id) 274 { 275 case MSG_QUIT: 276 quit = true; 277 break; 278 } 279 } 280 281 282 }