pstreams.h (21408B)
1 /* 2 * 3 * C++ Portable Types Library (PTypes) 4 * Version 2.1.1 Released 27-Jun-2007 5 * 6 * Copyright (C) 2001-2007 Hovik Melikyan 7 * 8 * http://www.melikyan.com/ptypes/ 9 * 10 */ 11 12 #ifndef __PSTREAMS_H__ 13 #define __PSTREAMS_H__ 14 15 #ifndef __PPORT_H__ 16 #include "pport.h" 17 #endif 18 19 #ifndef __PTYPES_H__ 20 #include "ptypes.h" 21 #endif 22 23 #ifndef PTYPES_ST 24 # ifndef __PASYNC_H__ 25 # include "pasync.h" // for logfile.lock 26 # endif 27 #endif 28 29 #include <stdarg.h> 30 #include <errno.h> 31 32 33 #ifdef WIN32 34 # define _WINSOCKAPI_ // prevent inclusion of winsock.h, because we need winsock2.h 35 # include "windows.h" // for OVERLAPPED 36 #endif 37 38 39 namespace ptypes { 40 41 42 #ifdef _MSC_VER 43 #pragma pack(push, 4) 44 #endif 45 46 47 // -------------------------------------------------------------------- // 48 // --- abstract stream i/o classes ----------------------------------- // 49 // -------------------------------------------------------------------- // 50 51 52 // 53 // stream exception class 54 // 55 56 class iobase; 57 58 class ptpublic estream: public exception 59 { 60 protected: 61 int code; 62 iobase* errstm; 63 public: 64 estream(iobase* ierrstm, int icode, const char* imsg); 65 estream(iobase* ierrstm, int icode, const string& imsg); 66 virtual ~estream(); 67 int get_code() { return code; } 68 iobase* get_errstm() { return errstm; } 69 }; 70 71 72 typedef void (ptdecl *iostatusevent)(iobase* sender, int code); 73 74 ptpublic int ptdecl unixerrno(); 75 ptpublic const char* ptdecl unixerrmsg(int code); 76 77 78 // status codes: compatible with WinInet API 79 // additional status codes are defined in pinet.h for ipsocket 80 81 const int IO_CREATED = 1; 82 const int IO_OPENING = 5; 83 const int IO_OPENED = 35; 84 const int IO_READING = 37; 85 const int IO_WRITING = 38; 86 const int IO_EOF = 45; 87 const int IO_CLOSING = 250; 88 const int IO_CLOSED = 253; 89 90 91 // 92 // iobase 93 // 94 95 enum ioseekmode 96 { 97 IO_BEGIN, 98 IO_CURRENT, 99 IO_END 100 }; 101 102 103 const int invhandle = -1; 104 105 106 class ptpublic iobase: public component 107 { 108 friend class fdxoutstm; 109 110 protected: 111 bool active; // active status, changed by open() and close() 112 bool cancelled; // the stream was cancelled by cancel() 113 bool eof; // end of file reached, only for input streams 114 int handle; // used in many derivative classes 115 large abspos; // physical stream position 116 int bufsize; // buffer size, can be changed only when not active 117 char* bufdata; // internal: allocated buffer data 118 int bufpos; // internal: current position 119 int bufend; // internal: current data size in the buffer 120 int stmerrno; // UNIX-compatible error numbers, see comments in piobase.cxx 121 string deferrormsg; // internal: default error message when an exception is thrown, 122 int status; // stream status code, see IO_xxx constants above 123 iostatusevent onstatus; // user-defined status change handler 124 125 virtual void bufalloc(); 126 virtual void buffree(); 127 void bufclear() { bufpos = 0; bufend = 0; } 128 129 void errstminactive(); 130 void errbufrequired(); 131 void requireactive() { if (!active) errstminactive(); } 132 void requirebuf() { requireactive(); if (bufdata == 0) errbufrequired(); } 133 int convertoffset(large); 134 135 virtual void doopen() = 0; 136 virtual void doclose(); 137 virtual large doseek(large newpos, ioseekmode mode); 138 139 virtual void chstat(int newstat); 140 virtual int uerrno(); 141 virtual const char* uerrmsg(int code); 142 143 public: 144 iobase(int ibufsize = -1); 145 virtual ~iobase(); 146 147 void open(); 148 void close(); 149 void cancel(); 150 void reopen() { open(); } 151 large seekx(large newpos, ioseekmode mode = IO_BEGIN); 152 int seek(int newpos, ioseekmode mode = IO_BEGIN) { return convertoffset(seekx(newpos, mode)); } 153 void error(int code, const char* defmsg); 154 virtual void flush(); 155 156 virtual string get_errormsg(); 157 virtual string get_errstmname(); 158 virtual string get_streamname() = 0; 159 160 bool get_active() { return active; } 161 void set_active(bool newval); 162 bool get_cancelled() { return cancelled; } 163 void set_cancelled(bool newval) { cancelled = newval; } 164 int get_handle() { return handle; } 165 int get_bufsize() { return bufsize; } 166 void set_bufsize(int newval); 167 int get_stmerrno() { return stmerrno; } 168 int get_status() { return status; } 169 iostatusevent get_onstatus() { return onstatus; } 170 void set_onstatus(iostatusevent newval) { onstatus = newval; } 171 }; 172 typedef iobase* piobase; 173 174 175 ptpublic extern int defbufsize; 176 ptpublic extern int stmbalance; 177 178 179 // 180 // instm - abstract input stream 181 // 182 183 const char eofchar = 0; 184 185 class ptpublic instm: public iobase 186 { 187 protected: 188 virtual int dorawread(char* buf, int count); 189 int rawread(char* buf, int count); 190 virtual void bufvalidate(); 191 void skipeol(); 192 193 public: 194 instm(int ibufsize = -1); 195 virtual ~instm(); 196 virtual int classid(); 197 198 bool get_eof(); 199 void set_eof(bool ieof) { eof = ieof; } 200 bool get_eol(); 201 int get_dataavail(); 202 char preview(); 203 char get(); 204 void putback(); 205 string token(const cset& chars); 206 string token(const cset& chars, int limit); 207 int token(const cset& chars, char* buf, int size); 208 string line(); 209 string line(int limit); 210 int line(char* buf, int size, bool eateol = true); 211 int read(void* buf, int count); 212 int skip(int count); 213 int skiptoken(const cset& chars); 214 void skipline(bool eateol = true); 215 large tellx(); 216 int tell() { return convertoffset(tellx()); } 217 large seekx(large newpos, ioseekmode mode = IO_BEGIN); 218 int seek(int newpos, ioseekmode mode = IO_BEGIN) { return convertoffset(seekx(newpos, mode)); } 219 }; 220 typedef instm* pinstm; 221 222 223 // 224 // outstm - abstract output stream 225 // 226 227 class ptpublic outstm: public iobase 228 { 229 protected: 230 bool flusheol; 231 232 virtual int dorawwrite(const char* buf, int count); 233 int rawwrite(const char* buf, int count); 234 virtual void bufvalidate(); 235 void bufadvance(int delta) 236 { bufpos += delta; if (bufend < bufpos) bufend = bufpos; } 237 bool canwrite(); 238 239 public: 240 outstm(bool iflusheol = false, int ibufsize = -1); 241 virtual ~outstm(); 242 virtual int classid(); 243 244 bool get_flusheol() { return flusheol; } 245 void set_flusheol(bool newval) { flusheol = newval; } 246 247 virtual void flush(); 248 bool get_eof() { return eof; } 249 void put(char c); 250 void put(const char* str); 251 void put(const string& str); 252 void vputf(const char* fmt, va_list); 253 void putf(const char* fmt, ...); 254 void putline(const char* str); 255 void putline(const string& str); 256 void puteol(); 257 int write(const void* buf, int count); 258 large tellx() { return abspos + bufpos; } 259 int tell() { return convertoffset(tellx()); } 260 large seekx(large newpos, ioseekmode mode = IO_BEGIN); 261 int seek(int newpos, ioseekmode mode = IO_BEGIN) { return convertoffset(seekx(newpos, mode)); } 262 }; 263 typedef outstm* poutstm; 264 265 266 // %t and %T formats 267 ptpublic extern const char* const shorttimefmt; // "%d-%b-%Y %X" 268 ptpublic extern const char* const longtimefmt; // "%a %b %d %X %Y" 269 270 271 // 272 // internal class used in fdxstm 273 // 274 275 class ptpublic fdxstm; 276 277 278 class ptpublic fdxoutstm: public outstm 279 { 280 friend class fdxstm; 281 282 protected: 283 fdxstm* in; 284 virtual void chstat(int newstat); 285 virtual int uerrno(); 286 virtual const char* uerrmsg(int code); 287 virtual void doopen(); 288 virtual void doclose(); 289 virtual int dorawwrite(const char* buf, int count); 290 291 public: 292 fdxoutstm(int ibufsize, fdxstm* iin); 293 virtual ~fdxoutstm(); 294 virtual string get_streamname(); 295 }; 296 typedef fdxstm* pfdxstm; 297 298 299 // 300 // fdxstm: abstract full-duplex stream (for sockets and pipes) 301 // 302 303 class ptpublic fdxstm: public instm 304 { 305 friend class fdxoutstm; 306 307 protected: 308 fdxoutstm out; 309 310 virtual int dorawwrite(const char* buf, int count); 311 312 public: 313 314 fdxstm(int ibufsize = -1); 315 virtual ~fdxstm(); 316 virtual int classid(); 317 318 void set_bufsize(int newval); // sets both input and output buffer sizes 319 320 void open(); // rewritten to pass the call to the output stream too 321 void close(); 322 void cancel(); 323 virtual void flush(); 324 large tellx(bool); // true for input and false for output 325 int tell(bool forin) { return convertoffset(tellx(forin)); } 326 327 // output interface: pretend this class is derived both 328 // from instm and outstm. actually we can't use multiple 329 // inheritance here, since this is a full-duplex stream, 330 // hence everything must be duplicated for input and output 331 void putf(const char* fmt, ...); 332 void put(char c) { out.put(c); } 333 void put(const char* str) { out.put(str); } 334 void put(const string& str) { out.put(str); } 335 void putline(const char* str) { out.putline(str); } 336 void putline(const string& str) { out.putline(str); } 337 void puteol() { out.puteol(); } 338 int write(const void* buf, int count) { return out.write(buf, count); } 339 bool get_flusheol() { return out.get_flusheol(); } 340 void set_flusheol(bool newval) { out.set_flusheol(newval); } 341 342 operator outstm&() { return out; } 343 }; 344 345 346 // 347 // abstract input filter class 348 // 349 350 class ptpublic infilter: public instm 351 { 352 protected: 353 instm* stm; 354 char* savebuf; 355 int savecount; 356 string postponed; 357 358 void copytobuf(string& s); 359 void copytobuf(pconst& buf, int& count); 360 bool copytobuf(char c); 361 362 virtual void freenotify(component* sender); 363 virtual void doopen(); 364 virtual void doclose(); 365 virtual int dorawread(char* buf, int count); 366 virtual void dofilter() = 0; 367 368 bool bufavail() { return savecount > 0; } 369 void post(const char* buf, int count); 370 void post(const char* s); 371 void post(char c); 372 virtual void post(string s); 373 374 public: 375 infilter(instm* istm, int ibufsize = -1); 376 virtual ~infilter(); 377 378 virtual string get_errstmname(); 379 380 instm* get_stm() { return stm; } 381 void set_stm(instm* stm); 382 }; 383 384 385 // 386 // abstract output filter class 387 // 388 389 class ptpublic outfilter: public outstm 390 { 391 protected: 392 outstm* stm; 393 virtual void freenotify(component* sender); 394 virtual void doopen(); 395 virtual void doclose(); 396 397 public: 398 outfilter(outstm* istm, int ibufsize = -1); 399 virtual ~outfilter(); 400 virtual string get_errstmname(); 401 outstm* get_stm() { return stm; } 402 void set_stm(outstm* stm); 403 }; 404 405 406 // 407 // inmemory - memory stream 408 // 409 410 class ptpublic inmemory: public instm 411 { 412 protected: 413 string mem; 414 virtual void bufalloc(); 415 virtual void buffree(); 416 virtual void bufvalidate(); 417 virtual void doopen(); 418 virtual void doclose(); 419 virtual large doseek(large newpos, ioseekmode mode); 420 virtual int dorawread(char* buf, int count); 421 422 public: 423 inmemory(const string& imem); 424 virtual ~inmemory(); 425 virtual int classid(); 426 virtual string get_streamname(); 427 large seekx(large newpos, ioseekmode mode = IO_BEGIN); 428 int seek(int newpos, ioseekmode mode = IO_BEGIN) { return convertoffset(seekx(newpos, mode)); } 429 string get_strdata() { return mem; } 430 void set_strdata(const string& data); 431 }; 432 433 434 // 435 // outmemory - memory stream 436 // 437 438 class ptpublic outmemory: public outstm 439 { 440 protected: 441 string mem; 442 int limit; 443 444 virtual void doopen(); 445 virtual void doclose(); 446 virtual large doseek(large newpos, ioseekmode mode); 447 virtual int dorawwrite(const char* buf, int count); 448 449 public: 450 outmemory(int limit = -1); 451 virtual ~outmemory(); 452 virtual int classid(); 453 virtual string get_streamname(); 454 large tellx() { return abspos; } 455 int tell() { return (int)abspos; } 456 string get_strdata(); 457 }; 458 459 460 // -------------------------------------------------------------------- // 461 // --- file input/output --------------------------------------------- // 462 // -------------------------------------------------------------------- // 463 464 465 // 466 // infile - file input 467 // 468 469 class outfile; 470 471 class ptpublic infile: public instm 472 { 473 protected: 474 string filename; 475 int syshandle; // if not -1, assigned to handle in open() instead of opening a file by a name 476 int peerhandle; // pipe peer handle, needed for closing the peer after fork() on unix 477 478 virtual void doopen(); 479 virtual void doclose(); 480 481 public: 482 infile(); 483 infile(const char* ifn); 484 infile(const string& ifn); 485 virtual ~infile(); 486 virtual int classid(); 487 488 void pipe(outfile&); 489 virtual string get_streamname(); 490 int get_syshandle() { return syshandle; } 491 void set_syshandle(int ihandle) { close(); syshandle = ihandle; } 492 int get_peerhandle() { return peerhandle; } 493 string get_filename() { return filename; } 494 void set_filename(const string& ifn) { close(); filename = ifn; } 495 void set_filename(const char* ifn) { close(); filename = ifn; } 496 }; 497 498 499 // 500 // outfile - file output 501 // 502 503 class ptpublic outfile: public outstm 504 { 505 protected: 506 friend class infile; // infile::pipe() needs access to peerhandle 507 508 string filename; 509 int syshandle; // if not -1, assigned to handle in open() instead of opening a file by a name 510 int peerhandle; // pipe peer handle, needed for closing the peer after fork() on unix 511 int umode; // unix file mode (unix only), default = 644 512 bool append; // append (create new if needed), default = false 513 514 virtual void doopen(); 515 virtual void doclose(); 516 517 public: 518 outfile(); 519 outfile(const char* ifn, bool iappend = false); 520 outfile(const string& ifn, bool iappend = false); 521 virtual ~outfile(); 522 virtual int classid(); 523 524 virtual void flush(); 525 virtual string get_streamname(); 526 527 int get_syshandle() { return syshandle; } 528 void set_syshandle(int ihandle) { close(); syshandle = ihandle; } 529 int get_peerhandle() { return peerhandle; } 530 string get_filename() { return filename; } 531 void set_filename(const string& ifn) { close(); filename = ifn; } 532 void set_filename(const char* ifn) { close(); filename = ifn; } 533 bool get_append() { return append; } 534 void set_append(bool iappend) { close(); append = iappend; } 535 int get_umode() { return umode; } 536 void set_umode(int iumode) { close(); umode = iumode; } 537 }; 538 539 540 // 541 // logfile - file output with thread-safe putf() 542 // 543 544 class ptpublic logfile: public outfile 545 { 546 protected: 547 #ifndef PTYPES_ST 548 mutex lock; 549 #endif 550 public: 551 logfile(); 552 logfile(const char* ifn, bool iappend = true); 553 logfile(const string& ifn, bool iappend = true); 554 virtual ~logfile(); 555 virtual int classid(); 556 557 void vputf(const char* fmt, va_list); 558 void putf(const char* fmt, ...); 559 }; 560 561 562 // 563 // intee - UNIX tee-style utility class 564 // 565 566 class ptpublic intee: public infilter { 567 protected: 568 outfile file; 569 virtual void doopen(); 570 virtual void doclose(); 571 virtual void dofilter(); 572 public: 573 intee(instm* istm, const char* ifn, bool iappend = false); 574 intee(instm* istm, const string& ifn, bool iappend = false); 575 virtual ~intee(); 576 577 outfile* get_file() { return &file; } 578 virtual string get_streamname(); 579 }; 580 581 582 // -------------------------------------------------------------------- // 583 // --- named pipes --------------------------------------------------- // 584 // -------------------------------------------------------------------- // 585 586 587 // on Unix this directory can be overridden by providing the 588 // full path, e.g. '/var/run/mypipe'. the path is ignored on 589 // Windows and is always replaced with '\\<server>\pipe\' 590 591 #ifndef WIN32 592 # define DEF_NAMED_PIPES_DIR "/tmp/" 593 #endif 594 595 596 #ifdef WIN32 597 598 const int DEF_PIPE_TIMEOUT = 20000; // in milliseconds, for reading and writing 599 const int DEF_PIPE_OPEN_TIMEOUT = 1000; // for connecting to the remote pipe: 600 const int DEF_PIPE_OPEN_RETRY = 5; // will double the timeout value for each retry, 601 // i.e. 1 second, then 2, then 4 etc. 602 const int DEF_PIPE_SYSTEM_BUF_SIZE = 4096; 603 604 #endif 605 606 607 class ptpublic namedpipe: public fdxstm 608 { 609 friend class npserver; 610 611 protected: 612 string pipename; 613 int svhandle; 614 615 #ifdef WIN32 616 // we use overlapped IO in order to have timed waiting in serve() 617 // and also to implement timeout error on the client side 618 OVERLAPPED ovr; 619 virtual int dorawread(char* buf, int count); 620 virtual int dorawwrite(const char* buf, int count); 621 static string ptdecl realpipename(const string& pipename, const string& svrname = nullstring); 622 void initovr(); 623 #else 624 static string realpipename(const string& pipename); 625 static bool setupsockaddr(const string& pipename, void* sa); 626 void initovr() {} 627 #endif 628 629 virtual void doopen(); 630 virtual void doclose(); 631 virtual large doseek(large, ioseekmode); 632 633 public: 634 namedpipe(); 635 namedpipe(const string& ipipename); 636 #ifdef WIN32 637 namedpipe(const string& ipipename, const string& servername); 638 #endif 639 virtual ~namedpipe(); 640 virtual int classid(); 641 642 virtual void flush(); 643 virtual string get_streamname(); 644 645 string get_pipename() { return pipename; } 646 void set_pipename(const string&); 647 void set_pipename(const char*); 648 }; 649 650 651 class ptpublic npserver: public unknown 652 { 653 string pipename; 654 int handle; 655 bool active; 656 657 void error(int code, const char* defmsg); 658 void open(); 659 void close(); 660 #ifdef WIN32 661 void openinst(); 662 void closeinst(); 663 #endif 664 665 public: 666 npserver(const string& ipipename); 667 ~npserver(); 668 669 bool serve(namedpipe& client, int timeout = -1); 670 }; 671 672 673 // -------------------------------------------------------------------- // 674 // --- utility streams ----------------------------------------------- // 675 // -------------------------------------------------------------------- // 676 677 // 678 // MD5 -- message digest algorithm 679 // Derived from L. Peter Deutsch's work, please see src/pmd5.cxx 680 // 681 682 683 const int md5_digsize = 16; 684 typedef uchar md5_digest[md5_digsize]; 685 686 // from md5.h 687 688 typedef unsigned char md5_byte_t; /* 8-bit byte */ 689 typedef unsigned int md5_word_t; /* 32-bit word */ 690 691 692 typedef struct md5_state_s 693 { 694 md5_word_t count[2]; /* message length in bits, lsw first */ 695 md5_word_t abcd[4]; /* digest buffer */ 696 md5_byte_t buf[64]; /* accumulate block */ 697 } md5_state_t; 698 699 700 class ptpublic outmd5: public outfilter 701 { 702 protected: 703 md5_state_s ctx; 704 md5_digest digest; 705 706 virtual void doopen(); 707 virtual void doclose(); 708 virtual int dorawwrite(const char* buf, int count); 709 710 public: 711 outmd5(outstm* istm = nil); 712 virtual ~outmd5(); 713 714 virtual string get_streamname(); 715 716 const unsigned char* get_bindigest() { close(); return digest; } 717 string get_digest(); 718 }; 719 720 721 // 722 // null output stream 723 // 724 725 726 class ptpublic outnull: public outstm 727 { 728 protected: 729 virtual int dorawwrite(const char*, int); 730 virtual void doopen(); 731 virtual void doclose(); 732 public: 733 outnull(); 734 virtual ~outnull(); 735 virtual string get_streamname(); 736 }; 737 738 739 // -------------------------------------------------------------------- // 740 // --- unit ---------------------------------------------------------- // 741 // -------------------------------------------------------------------- // 742 743 744 #ifdef _MSC_VER 745 // disable "type name first seen using 'struct' now seen using 'class'" warning 746 # pragma warning (disable: 4099) 747 // disable "class '...' needs to have dll-interface to be used by clients of class 748 // '...'" warning, since the compiler may sometimes give this warning incorrectly. 749 # pragma warning (disable: 4251) 750 #endif 751 752 class unit_thread; 753 754 class ptpublic unit: public component 755 { 756 protected: 757 friend class unit_thread; 758 759 unit* pipe_next; // next unit in the pipe chain, assigned by connect() 760 unit_thread* main_thread; // async execution thread, started by run() if necessary 761 int running; // running status, to protect from recursive calls to run() and waitfor() 762 763 void do_main(); 764 765 public: 766 compref<instm> uin; 767 compref<outstm> uout; 768 769 unit(); 770 virtual ~unit(); 771 virtual int classid(); 772 773 // things that may be overridden in descendant classes 774 virtual void main(); // main code, called from run() 775 virtual void cleanup(); // main code cleanup, called from run() 776 777 // service methods 778 void connect(unit* next); 779 void run(bool async = false); 780 void waitfor(); 781 }; 782 typedef unit* punit; 783 784 785 typedef unit CUnit; // send me a $10 check if you use this alias (not obligatory though, 786 // because the library is free, after all) 787 788 789 // 790 // standard input, output and error devices 791 // 792 793 ptpublic extern infile pin; 794 ptpublic extern logfile pout; 795 ptpublic extern logfile perr; 796 ptpublic extern outnull pnull; 797 798 799 #ifdef _MSC_VER 800 #pragma pack(pop) 801 #endif 802 803 804 } 805 806 #endif // __PSTREAMS_H__ 807