punit.cxx (2475B)
1 /* 2 * 3 * C++ Portable Types Library (PTypes) 4 * Version 2.1.1 Released 27-Jun-2007 5 * 6 * Copyright (C) 2001-2007 Hovik Melikyan 7 * 8 * http://www.melikyan.com/ptypes/ 9 * 10 */ 11 12 #include "ptypes.h" 13 #include "pasync.h" 14 #include "pstreams.h" 15 16 #ifdef WIN32 17 # include <windows.h> 18 #else 19 # include <unistd.h> 20 #endif 21 22 23 namespace ptypes { 24 25 26 // 27 // internal thread class for running units asynchronously 28 // 29 30 class unit_thread: public thread 31 { 32 protected: 33 unit* target; 34 virtual void execute(); 35 public: 36 unit_thread(unit* itarget); 37 virtual ~unit_thread(); 38 }; 39 40 41 unit_thread::unit_thread(unit* itarget) 42 : thread(false), target(itarget) 43 { 44 start(); 45 } 46 47 48 49 unit_thread::~unit_thread() 50 { 51 waitfor(); 52 } 53 54 55 void unit_thread::execute() 56 { 57 target->do_main(); 58 } 59 60 61 // 62 // unit class 63 // 64 65 unit::unit() 66 : component(), pipe_next(nil), main_thread(nil), 67 running(0), uin(&pin), uout(&pout) 68 { 69 } 70 71 72 unit::~unit() 73 { 74 delete tpexchange<unit_thread>(&main_thread, nil); 75 } 76 77 78 int unit::classid() 79 { 80 return CLASS_UNIT; 81 } 82 83 84 void unit::main() 85 { 86 } 87 88 89 void unit::cleanup() 90 { 91 } 92 93 94 void unit::do_main() 95 { 96 try 97 { 98 if (!uout->get_active()) 99 uout->open(); 100 if (!uin->get_active()) 101 uin->open(); 102 main(); 103 if (uout->get_active()) 104 uout->flush(); 105 } 106 catch(exception* e) 107 { 108 perr.putf("Error: %s\n", pconst(e->get_message())); 109 delete e; 110 } 111 112 try 113 { 114 cleanup(); 115 } 116 catch(exception* e) 117 { 118 perr.putf("Error: %s\n", pconst(e->get_message())); 119 delete e; 120 } 121 122 if (pipe_next != nil) 123 uout->close(); 124 } 125 126 127 void unit::connect(unit* next) 128 { 129 waitfor(); 130 pipe_next = next; 131 infile* in = new infile(); 132 outfile* out = new outfile(); 133 next->uin = in; 134 uout = out; 135 in->pipe(*out); 136 } 137 138 139 void unit::waitfor() 140 { 141 if (running == 0) 142 return; 143 delete tpexchange<unit_thread>(&main_thread, nil); 144 unit* next = tpexchange<unit>(&pipe_next, nil); 145 if (next != nil) 146 { 147 next->waitfor(); 148 next->uin = &pin; 149 } 150 uout = &pout; 151 running = 0; 152 } 153 154 155 void unit::run(bool async) 156 { 157 if (pexchange(&running, 1) != 0) 158 return; 159 160 if (main_thread != nil) 161 fatal(CRIT_FIRST + 60, "Unit already running"); 162 163 if (pipe_next != nil) 164 pipe_next->run(true); 165 166 if (async) 167 main_thread = new unit_thread(this); 168 else 169 { 170 do_main(); 171 waitfor(); 172 } 173 } 174 175 176 }