00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00036 #include "OW_config.h"
00037 #include "OW_Thread.hpp"
00038 #include "OW_Assertion.hpp"
00039 #include "OW_Format.hpp"
00040 #include "OW_ThreadBarrier.hpp"
00041 #include "OW_NonRecursiveMutexLock.hpp"
00042 #include "OW_ExceptionIds.hpp"
00043
00044 #include <cstring>
00045 #include <cstdio>
00046 #include <cerrno>
00047 #include <iostream>
00048 #include <csignal>
00049 #include <cassert>
00050
00051 #ifdef OW_HAVE_OPENSSL
00052 #include <openssl/err.h>
00053 #endif
00054
00055
00056 namespace OW_NAMESPACE
00057 {
00058
00060 OW_DEFINE_EXCEPTION_WITH_ID(Thread);
00061 OW_DEFINE_EXCEPTION_WITH_ID(CancellationDenied);
00063
00064 struct ThreadParam
00065 {
00066 ThreadParam(Thread* t, const ThreadDoneCallbackRef& c, const ThreadBarrier& b)
00067 : thread(t)
00068 , cb(c)
00069 , thread_barrier(b)
00070 {}
00071 Thread* thread;
00072 ThreadDoneCallbackRef cb;
00073 ThreadBarrier thread_barrier;
00074 };
00075 static Thread_t zeroThread();
00076 static Thread_t NULLTHREAD = zeroThread();
00078 static inline bool
00079 sameId(const Thread_t& t1, const Thread_t& t2)
00080 {
00081 return ThreadImpl::sameThreads(t1, t2);
00082 }
00084
00085 Thread::Thread()
00086 : m_id(NULLTHREAD)
00087 , m_isRunning(false)
00088 , m_isStarting(false)
00089 , m_joined(false)
00090 , m_cancelRequested(false)
00091 , m_cancelled(false)
00092 {
00093 }
00095
00096 Thread::~Thread()
00097 {
00098 try
00099 {
00100 if (!m_joined)
00101 {
00102 join();
00103 }
00104 assert(m_isRunning == false);
00105 if (!sameId(m_id, NULLTHREAD))
00106 {
00107 ThreadImpl::destroyThread(m_id);
00108 }
00109 }
00110 catch (...)
00111 {
00112
00113 }
00114 }
00116
00117 void
00118 Thread::start(const ThreadDoneCallbackRef& cb)
00119 {
00120 if (isRunning())
00121 {
00122 OW_THROW(ThreadException,
00123 "Thread::start - thread is already running");
00124 }
00125 if (!sameId(m_id, NULLTHREAD))
00126 {
00127 OW_THROW(ThreadException,
00128 "Thread::start - cannot start previously run thread");
00129 }
00130 m_isStarting = true;
00131 UInt32 flgs = OW_THREAD_FLG_JOINABLE;
00132 ThreadBarrier thread_barrier(2);
00133
00134 ThreadParam* p = new ThreadParam(this, cb, thread_barrier);
00135 if (ThreadImpl::createThread(m_id, threadRunner, p, flgs) != 0)
00136 {
00137 OW_THROW(ThreadException, "ThreadImpl::createThread failed");
00138 }
00139 m_isStarting = false;
00140 thread_barrier.wait();
00141 }
00143
00144 Int32
00145 Thread::join()
00146 {
00147 OW_ASSERT(!sameId(m_id, NULLTHREAD));
00148 Int32 rval;
00149 if (ThreadImpl::joinThread(m_id, rval) != 0)
00150 {
00151 OW_THROW(ThreadException,
00152 Format("Thread::join - ThreadImpl::joinThread: %1(%2)",
00153 errno, strerror(errno)).c_str());
00154 }
00155
00156 m_isRunning = false;
00157 m_joined = true;
00158 return rval;
00159 }
00161
00162
00163 Int32
00164 Thread::threadRunner(void* paramPtr)
00165 {
00166 Thread_t theThreadID;
00167 Int32 rval = -1;
00168 try
00169 {
00170
00171 OW_ASSERT(paramPtr != NULL);
00172 ThreadParam* pParam = static_cast<ThreadParam*>(paramPtr);
00173 Thread* pTheThread = pParam->thread;
00174 ThreadImpl::saveThreadInTLS(pTheThread);
00175 theThreadID = pTheThread->m_id;
00176 ThreadDoneCallbackRef cb = pParam->cb;
00177 ThreadBarrier thread_barrier = pParam->thread_barrier;
00178 delete pParam;
00179 pTheThread->m_isRunning = true;
00180 thread_barrier.wait();
00181
00182 try
00183 {
00184 rval = pTheThread->run();
00185 }
00186
00187 catch (ThreadCancelledException&)
00188 {
00189 }
00190 catch (Exception& ex)
00191 {
00192 #ifdef OW_DEBUG
00193 std::cerr << "!!! Exception: " << ex.type() << " caught in Thread class\n";
00194 std::cerr << ex << std::endl;
00195 #endif
00196 pTheThread->doneRunning(cb);
00197
00198
00199 throw;
00200 }
00201 catch (...)
00202 {
00203 #ifdef OW_DEBUG
00204 std::cerr << "!!! Unknown Exception caught in Thread class" << std::endl;
00205 #endif
00206 pTheThread->doneRunning(cb);
00207
00208
00209 throw;
00210 }
00211
00212 pTheThread->doneRunning(cb);
00213
00214 }
00215 catch (Exception& ex)
00216 {
00217 #ifdef OW_DEBUG
00218 std::cerr << "!!! Exception: " << ex.type() << " caught in Thread class\n";
00219 std::cerr << ex << std::endl;
00220 #endif
00221
00222 ThreadImpl::exitThread(theThreadID, rval);
00223 }
00224 catch (...)
00225 {
00226 #ifdef OW_DEBUG
00227 std::cerr << "!!! Unknown Exception caught in Thread class" << std::endl;
00228 #endif
00229
00230 ThreadImpl::exitThread(theThreadID, rval);
00231 }
00232
00233 ThreadImpl::exitThread(theThreadID, rval);
00234 return rval;
00235 }
00236
00238 void
00239 Thread::doneRunning(const ThreadDoneCallbackRef& cb)
00240 {
00241 NonRecursiveMutexLock l(m_cancelLock);
00242 m_isRunning = m_isStarting = false;
00243 m_cancelled = true;
00244 m_cancelCond.notifyAll();
00245 if (cb)
00246 {
00247 cb->notifyThreadDone(this);
00248 }
00249
00250 #ifdef OW_HAVE_OPENSSL
00251
00252 ERR_remove_state(0);
00253 #endif
00254 }
00255
00257 static Thread_t
00258 zeroThread()
00259 {
00260 Thread_t zthr;
00261 ::memset(&zthr, 0, sizeof(zthr));
00262 return zthr;
00263 }
00265 void
00266 Thread::cooperativeCancel()
00267 {
00268 if (!isRunning())
00269 {
00270 return;
00271 }
00272
00273
00274 doCooperativeCancel();
00275 NonRecursiveMutexLock l(m_cancelLock);
00276 m_cancelRequested = true;
00277
00278 #if !defined(OW_WIN32)
00279
00280
00281
00282
00283 try
00284 {
00285 ThreadImpl::sendSignalToThread(m_id, SIGUSR1);
00286 }
00287 catch (ThreadException&)
00288 {
00289 }
00290 #endif
00291 }
00293 bool
00294 Thread::definitiveCancel(UInt32 waitForCooperativeSecs)
00295 {
00296 if (!isRunning())
00297 {
00298 return true;
00299 }
00300
00301
00302 doCooperativeCancel();
00303 NonRecursiveMutexLock l(m_cancelLock);
00304 m_cancelRequested = true;
00305
00306 #if !defined(OW_WIN32)
00307
00308
00309
00310
00311 try
00312 {
00313 ThreadImpl::sendSignalToThread(m_id, SIGUSR1);
00314 }
00315 catch (ThreadException&)
00316 {
00317 }
00318 #endif
00319
00320 while (!m_cancelled && isRunning())
00321 {
00322 if (!m_cancelCond.timedWait(l, waitForCooperativeSecs, 0))
00323 {
00324
00325 doDefinitiveCancel();
00326
00327 if (!m_cancelled && isRunning())
00328 {
00329 this->cancel();
00330 }
00331 return false;
00332 }
00333 }
00334 return true;
00335 }
00337 void
00338 Thread::cancel()
00339 {
00340
00341
00342 try
00343 {
00344 ThreadImpl::cancel(m_id);
00345 }
00346 catch (ThreadException&)
00347 {
00348 }
00349 m_cancelled = true;
00350 }
00352 void
00353 Thread::testCancel()
00354 {
00355 ThreadImpl::testCancel();
00356 }
00358 void
00359 Thread::doCooperativeCancel()
00360 {
00361 }
00363 void
00364 Thread::doDefinitiveCancel()
00365 {
00366 }
00367
00368 }
00369