pasync.h (12798B)
1 /* 2 * 3 * C++ Portable Types Library (PTypes) 4 * Version 2.1.1 Released 27-Jun-2007 5 * 6 * Copyright (C) 2001-2007 Hovik Melikyan 7 * 8 * http://www.melikyan.com/ptypes/ 9 * 10 */ 11 12 #ifndef __PASYNC_H__ 13 #define __PASYNC_H__ 14 15 #ifdef WIN32 16 # define _WINSOCKAPI_ // prevent inclusion of winsock.h, since we need winsock2.h 17 # include <windows.h> 18 #else 19 # include <pthread.h> 20 # ifndef __bsdi__ 21 # include <semaphore.h> 22 # endif 23 #endif 24 25 #ifndef __PPORT_H__ 26 #include "pport.h" 27 #endif 28 29 #ifndef __PTYPES_H__ 30 #include "ptypes.h" 31 #endif 32 33 34 namespace ptypes { 35 36 // 37 // Summary of implementation: 38 // 39 // atomic increment/decrement/exchange 40 // MSVC/BCC/i386: internal, asm 41 // GCC/i386: internal, asm 42 // GCC/PowerPC: internal, asm 43 // GCC/SPARC: internal, asm 44 // Other: internal, mutex hash table 45 // 46 // mutex 47 // Win32: Critical section 48 // Other: POSIX mutex 49 // 50 // trigger 51 // Win32: Event 52 // Other: internal, POSIX condvar/mutex 53 // 54 // rwlock: 55 // Win32: internal, Event/mutex 56 // MacOS: internal, POSIX condvar/mutex 57 // Other: POSIX rwlock 58 // 59 // semaphore: 60 // Win32: = timedsem 61 // MacOS: = timedsem 62 // Other: POSIX semaphore 63 // 64 // timedsem (with timed waiting): 65 // Win32: Semaphore 66 // Other: internal, POSIX mutex/condvar 67 // 68 69 70 #ifdef _MSC_VER 71 #pragma pack(push, 4) 72 #endif 73 74 75 #ifdef WIN32 76 typedef int pthread_id_t; 77 typedef HANDLE pthread_t; 78 #else 79 typedef pthread_t pthread_id_t; 80 #endif 81 82 83 ptpublic void ptdecl psleep(uint milliseconds); 84 ptpublic bool ptdecl pthrequal(pthread_id_t id); // note: this is NOT the thread handle, use thread::get_id() 85 ptpublic pthread_id_t ptdecl pthrself(); // ... same 86 87 88 // -------------------------------------------------------------------- // 89 // --- mutex ---------------------------------------------------------- // 90 // -------------------------------------------------------------------- // 91 92 93 #ifdef WIN32 94 95 struct ptpublic mutex: public noncopyable 96 { 97 protected: 98 CRITICAL_SECTION critsec; 99 public: 100 mutex() { InitializeCriticalSection(&critsec); } 101 ~mutex() { DeleteCriticalSection(&critsec); } 102 void enter() { EnterCriticalSection(&critsec); } 103 void leave() { LeaveCriticalSection(&critsec); } 104 void lock() { enter(); } 105 void unlock() { leave(); } 106 }; 107 108 109 #else 110 111 112 struct ptpublic mutex: public noncopyable 113 { 114 protected: 115 pthread_mutex_t mtx; 116 public: 117 mutex() { pthread_mutex_init(&mtx, 0); } 118 ~mutex() { pthread_mutex_destroy(&mtx); } 119 void enter() { pthread_mutex_lock(&mtx); } 120 void leave() { pthread_mutex_unlock(&mtx); } 121 void lock() { enter(); } 122 void unlock() { leave(); } 123 }; 124 125 #endif 126 127 128 // 129 // scopelock 130 // 131 132 class scopelock: public noncopyable 133 { 134 protected: 135 mutex* mtx; 136 public: 137 scopelock(mutex& imtx): mtx(&imtx) { mtx->lock(); } 138 ~scopelock() { mtx->unlock(); } 139 }; 140 141 142 // 143 // mutex table for hashed memory locking (undocumented) 144 // 145 146 #define _MUTEX_HASH_SIZE 29 // a prime number for hashing 147 148 #ifdef WIN32 149 # define pmemlock mutex 150 # define pmementer(m) (m)->lock() 151 # define pmemleave(m) (m)->unlock() 152 #else 153 # define _MTX_INIT PTHREAD_MUTEX_INITIALIZER 154 # define pmemlock pthread_mutex_t 155 # define pmementer pthread_mutex_lock 156 # define pmemleave pthread_mutex_unlock 157 #endif 158 159 160 ptpublic extern pmemlock _mtxtable[_MUTEX_HASH_SIZE]; 161 162 #define pgetmemlock(addr) (_mtxtable + pintptr(addr) % _MUTEX_HASH_SIZE) 163 164 165 // -------------------------------------------------------------------- // 166 // --- trigger -------------------------------------------------------- // 167 // -------------------------------------------------------------------- // 168 169 170 #ifdef WIN32 171 172 class ptpublic trigger: public noncopyable 173 { 174 protected: 175 HANDLE handle; // Event object 176 public: 177 trigger(bool autoreset, bool state); 178 ~trigger() { CloseHandle(handle); } 179 void wait() { WaitForSingleObject(handle, INFINITE); } 180 void post() { SetEvent(handle); } 181 void signal() { post(); } 182 void reset() { ResetEvent(handle); } 183 }; 184 185 186 #else 187 188 189 class ptpublic trigger: public noncopyable 190 { 191 protected: 192 pthread_mutex_t mtx; 193 pthread_cond_t cond; 194 int state; 195 bool autoreset; 196 public: 197 trigger(bool autoreset, bool state); 198 ~trigger(); 199 void wait(); 200 void post(); 201 void signal() { post(); } 202 void reset(); 203 }; 204 205 #endif 206 207 208 // -------------------------------------------------------------------- // 209 // --- rwlock --------------------------------------------------------- // 210 // -------------------------------------------------------------------- // 211 212 213 #if defined(WIN32) || defined(__DARWIN__) || defined(__bsdi__) 214 # define __PTYPES_RWLOCK__ 215 #elif defined(linux) 216 // on Linux rwlocks are included only with -D_GNU_SOURCE. 217 // programs that don't use rwlocks, do not need to define 218 // _GNU_SOURCE either. 219 # if defined(_GNU_SOURCE) || defined(__USE_UNIX98) 220 # define __POSIX_RWLOCK__ 221 # endif 222 #else 223 # define __POSIX_RWLOCK__ 224 #endif 225 226 227 #ifdef __PTYPES_RWLOCK__ 228 229 struct ptpublic rwlock: protected mutex 230 { 231 protected: 232 #ifdef WIN32 233 HANDLE reading; // Event object 234 HANDLE finished; // Event object 235 int readcnt; 236 int writecnt; 237 #else 238 pthread_mutex_t mtx; 239 pthread_cond_t readcond; 240 pthread_cond_t writecond; 241 int locks; 242 int writers; 243 int readers; 244 #endif 245 public: 246 rwlock(); 247 ~rwlock(); 248 void rdlock(); 249 void wrlock(); 250 void unlock(); 251 void lock() { wrlock(); } 252 }; 253 254 255 #elif defined(__POSIX_RWLOCK__) 256 257 258 struct ptpublic rwlock: public noncopyable 259 { 260 protected: 261 pthread_rwlock_t rw; 262 public: 263 rwlock(); 264 ~rwlock() { pthread_rwlock_destroy(&rw); } 265 void rdlock() { pthread_rwlock_rdlock(&rw); } 266 void wrlock() { pthread_rwlock_wrlock(&rw); } 267 void unlock() { pthread_rwlock_unlock(&rw); } 268 void lock() { wrlock(); } 269 }; 270 271 #endif 272 273 274 #if defined(__PTYPES_RWLOCK__) || defined(__POSIX_RWLOCK__) 275 276 // 277 // scoperead & scopewrite 278 // 279 280 class scoperead: public noncopyable 281 { 282 protected: 283 rwlock* rw; 284 public: 285 scoperead(rwlock& irw): rw(&irw) { rw->rdlock(); } 286 ~scoperead() { rw->unlock(); } 287 }; 288 289 290 class scopewrite: public noncopyable 291 { 292 protected: 293 rwlock* rw; 294 public: 295 scopewrite(rwlock& irw): rw(&irw) { rw->wrlock(); } 296 ~scopewrite() { rw->unlock(); } 297 }; 298 299 300 #endif 301 302 303 // -------------------------------------------------------------------- // 304 // --- semaphore ------------------------------------------------------ // 305 // -------------------------------------------------------------------- // 306 307 308 #if defined(WIN32) || defined(__DARWIN__) || defined(__bsdi__) 309 # define __SEM_TO_TIMEDSEM__ 310 #endif 311 312 313 #ifdef __SEM_TO_TIMEDSEM__ 314 315 // map ordinary semaphore to timed semaphore 316 317 class timedsem; 318 typedef timedsem semaphore; 319 320 321 #else 322 323 324 class ptpublic semaphore: public unknown 325 { 326 protected: 327 sem_t handle; 328 public: 329 semaphore(int initvalue); 330 virtual ~semaphore(); 331 332 void wait(); 333 void post(); 334 void signal() { post(); } 335 336 void decrement() { return wait(); } 337 void increment() { post(); } 338 }; 339 340 #endif 341 342 343 class ptpublic timedsem: public unknown 344 { 345 protected: 346 #ifdef WIN32 347 HANDLE handle; 348 #else 349 int count; 350 pthread_mutex_t mtx; 351 pthread_cond_t cond; 352 #endif 353 public: 354 timedsem(int initvalue); 355 virtual ~timedsem(); 356 bool wait(int msecs = -1); 357 void post(); 358 void signal() { post(); } 359 360 bool decrement(int msec = -1) { return wait(msec); } 361 void increment() { post(); } 362 }; 363 364 365 // -------------------------------------------------------------------- // 366 // --- thread --------------------------------------------------------- // 367 // -------------------------------------------------------------------- // 368 369 370 class ptpublic thread: public unknown 371 { 372 protected: 373 #ifdef WIN32 374 unsigned id; 375 #endif 376 pthread_t handle; 377 int autofree; 378 int running; 379 int signaled; 380 int finished; 381 int freed; 382 int reserved; // for priorities 383 timedsem relaxsem; 384 385 virtual void execute() = 0; 386 virtual void cleanup(); 387 388 bool relax(int msecs) { return relaxsem.wait(msecs); } 389 390 friend void _threadepilog(thread* thr); 391 392 #ifdef WIN32 393 friend unsigned __stdcall _threadproc(void* arg); 394 #else 395 friend void* _threadproc(void* arg); 396 #endif 397 398 public: 399 thread(bool iautofree); 400 virtual ~thread(); 401 402 #ifdef WIN32 403 pthread_id_t get_id() { return int(id); } 404 #else 405 pthread_id_t get_id() { return handle; } 406 #endif 407 408 bool get_running() { return running != 0; } 409 bool get_finished() { return finished != 0; } 410 bool get_signaled() { return signaled != 0; } 411 412 void start(); 413 void signal(); 414 void waitfor(); 415 }; 416 417 418 419 // -------------------------------------------------------------------- // 420 // --- jobqueue & msgqueue -------------------------------------------- // 421 // -------------------------------------------------------------------- // 422 423 424 const int MSG_USER = 0; 425 const int MSG_QUIT = -1; 426 427 const int DEF_QUEUE_LIMIT = 5000; 428 429 class ptpublic message: public unknown 430 { 431 protected: 432 message* next; // next in the message chain, used internally 433 semaphore* sync; // used internally by msgqueue::send(), when called from a different thread 434 friend class jobqueue; // my friends, job queue and message queue... 435 friend class msgqueue; 436 public: 437 int id; 438 pintptr param; 439 pintptr result; 440 message(int iid, pintptr iparam = 0); 441 virtual ~message(); 442 }; 443 444 445 class ptpublic jobqueue: public noncopyable 446 { 447 private: 448 int limit; // queue limit 449 message* head; // queue head 450 message* tail; // queue tail 451 int qcount; // number of items in the queue 452 timedsem sem; // queue semaphore 453 timedsem ovrsem; // overflow semaphore 454 mutex qlock; // critical sections in enqueue and dequeue 455 456 protected: 457 bool enqueue(message* msg, int timeout = -1); 458 bool push(message* msg, int timeout = -1); 459 message* dequeue(bool safe = true, int timeout = -1); 460 void purgequeue(); 461 462 public: 463 jobqueue(int ilimit = DEF_QUEUE_LIMIT); 464 virtual ~jobqueue(); 465 466 int get_count() const { return qcount; } 467 int get_limit() const { return limit; } 468 469 bool post(message* msg, int timeout = -1); 470 bool post(int id, pintptr param = 0, int timeout = -1); 471 bool posturgent(message* msg, int timeout = -1); 472 bool posturgent(int id, pintptr param = 0, int timeout = -1); 473 message* getmessage(int timeout = -1); 474 475 #ifdef PTYPES19_COMPAT 476 int msgsavail() const { return get_count(); } 477 #endif 478 }; 479 480 481 template <class T> class tjobqueue: protected jobqueue 482 { 483 public: 484 tjobqueue(int ilimit = DEF_QUEUE_LIMIT); 485 486 int get_count() const { return jobqueue::get_count(); } 487 int get_limit() const { return jobqueue::get_limit(); } 488 bool post(T* msg, int timeout = -1) { return jobqueue::post(msg, timeout); } 489 bool posturgent(T* msg, int timeout = -1) { return jobqueue::posturgent(msg, timeout); } 490 T* getmessage(int timeout = -1) { return (T*)jobqueue::getmessage(timeout); } 491 }; 492 493 494 class ptpublic msgqueue: protected jobqueue 495 { 496 private: 497 mutex thrlock; // lock for the queue processing 498 pthread_id_t owner; // thread ID of the queue processing thread 499 500 pintptr finishmsg(message* msg); 501 void handlemsg(message* msg); 502 void takeownership(); 503 504 protected: 505 bool quit; 506 507 void defhandler(message& msg); 508 virtual void msghandler(message& msg) = 0; 509 510 public: 511 msgqueue(int ilimit = DEF_QUEUE_LIMIT); 512 virtual ~msgqueue(); 513 514 // functions calling from the owner thread: 515 void processone(); // process one message, may hang if no msgs in the queue 516 void processmsgs(); // process all available messages and return 517 void run(); // process messages until MSG_QUIT 518 519 // functions calling from any thread: 520 int get_count() const { return jobqueue::get_count(); } 521 int get_limit() const { return jobqueue::get_limit(); } 522 bool post(message* msg, int timeout = -1) { return jobqueue::post(msg, timeout); } 523 bool post(int id, pintptr param = 0, int timeout = -1) { return jobqueue::post(id, param, timeout); } 524 bool posturgent(message* msg, int timeout = -1) { return jobqueue::posturgent(msg, timeout); } 525 bool posturgent(int id, pintptr param = 0, int timeout = -1) { return jobqueue::posturgent(id, param, timeout); } 526 pintptr send(message* msg); 527 pintptr send(int id, pintptr param = 0); 528 529 #ifdef PTYPES19_COMPAT 530 int msgsavail() const { return get_count(); } 531 #endif 532 }; 533 534 535 #ifdef _MSC_VER 536 #pragma pack(pop) 537 #endif 538 539 540 } 541 542 #endif // __PASYNC_H__